Merge "Add leader election for periodic sync with ironic"
This commit is contained in:
commit
63e1d749fb
@ -14,6 +14,7 @@
|
|||||||
from oslo_concurrency import lockutils
|
from oslo_concurrency import lockutils
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
import tooz
|
||||||
from tooz import coordination
|
from tooz import coordination
|
||||||
|
|
||||||
from ironic_inspector import utils
|
from ironic_inspector import utils
|
||||||
@ -42,6 +43,8 @@ class Coordinator(object):
|
|||||||
self.coordinator = None
|
self.coordinator = None
|
||||||
self.started = False
|
self.started = False
|
||||||
self.prefix = prefix if prefix else 'default'
|
self.prefix = prefix if prefix else 'default'
|
||||||
|
self.is_leader = False
|
||||||
|
self.supports_election = True
|
||||||
|
|
||||||
def start(self, heartbeat=True):
|
def start(self, heartbeat=True):
|
||||||
"""Start coordinator.
|
"""Start coordinator.
|
||||||
@ -85,6 +88,24 @@ class Coordinator(object):
|
|||||||
except coordination.GroupAlreadyExist:
|
except coordination.GroupAlreadyExist:
|
||||||
LOG.debug('Group %s already exists.', self.group_name)
|
LOG.debug('Group %s already exists.', self.group_name)
|
||||||
|
|
||||||
|
def _join_election(self):
|
||||||
|
self.is_leader = False
|
||||||
|
|
||||||
|
def _when_elected(event):
|
||||||
|
LOG.info('This conductor instance is a group leader now.')
|
||||||
|
self.is_leader = True
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.coordinator.watch_elected_as_leader(
|
||||||
|
self.group_name, _when_elected)
|
||||||
|
self.coordinator.run_elect_coordinator()
|
||||||
|
except tooz.NotImplemented:
|
||||||
|
LOG.warning('The coordination backend does not support leader '
|
||||||
|
'elections, assuming we are a leader. This is '
|
||||||
|
'deprecated, please use a supported backend.')
|
||||||
|
self.is_leader = True
|
||||||
|
self.supports_election = False
|
||||||
|
|
||||||
def join_group(self):
|
def join_group(self):
|
||||||
"""Join service group."""
|
"""Join service group."""
|
||||||
self._validate_state()
|
self._validate_state()
|
||||||
@ -97,6 +118,8 @@ class Coordinator(object):
|
|||||||
request.get()
|
request.get()
|
||||||
except coordination.MemberAlreadyExist:
|
except coordination.MemberAlreadyExist:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
self._join_election()
|
||||||
LOG.debug('Joined group %s', self.group_name)
|
LOG.debug('Joined group %s', self.group_name)
|
||||||
|
|
||||||
def leave_group(self):
|
def leave_group(self):
|
||||||
@ -125,6 +148,18 @@ class Coordinator(object):
|
|||||||
lock_name = (self.lock_prefix + uuid).encode('ascii')
|
lock_name = (self.lock_prefix + uuid).encode('ascii')
|
||||||
return self.coordinator.get_lock(lock_name)
|
return self.coordinator.get_lock(lock_name)
|
||||||
|
|
||||||
|
def run_elect_coordinator(self):
|
||||||
|
"""Trigger a new leader election."""
|
||||||
|
if self.supports_election:
|
||||||
|
LOG.debug('Starting leader election')
|
||||||
|
self.coordinator.run_elect_coordinator()
|
||||||
|
LOG.debug('Finished leader election')
|
||||||
|
else:
|
||||||
|
LOG.warning('The coordination backend does not support leader '
|
||||||
|
'elections, assuming we are a leader. This is '
|
||||||
|
'deprecated, please use a supported backend.')
|
||||||
|
self.is_leader = True
|
||||||
|
|
||||||
|
|
||||||
_COORDINATOR = None
|
_COORDINATOR = None
|
||||||
|
|
||||||
|
@ -51,6 +51,7 @@ class ConductorManager(object):
|
|||||||
self._periodics_worker = None
|
self._periodics_worker = None
|
||||||
self._zeroconf = None
|
self._zeroconf = None
|
||||||
self._shutting_down = semaphore.Semaphore()
|
self._shutting_down = semaphore.Semaphore()
|
||||||
|
self.coordinator = None
|
||||||
|
|
||||||
def init_host(self):
|
def init_host(self):
|
||||||
"""Initialize Worker host
|
"""Initialize Worker host
|
||||||
@ -70,6 +71,24 @@ class ConductorManager(object):
|
|||||||
|
|
||||||
db.init()
|
db.init()
|
||||||
|
|
||||||
|
self.coordinator = None
|
||||||
|
try:
|
||||||
|
self.coordinator = coordination.get_coordinator(prefix='conductor')
|
||||||
|
self.coordinator.start(heartbeat=True)
|
||||||
|
self.coordinator.join_group()
|
||||||
|
except Exception as exc:
|
||||||
|
if CONF.standalone:
|
||||||
|
LOG.info('Coordination backend cannot be started, assuming '
|
||||||
|
'no other instances are running. Error: %s', exc)
|
||||||
|
self.coordinator = None
|
||||||
|
else:
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
LOG.critical('Failure when connecting to coordination '
|
||||||
|
'backend', exc_info=True)
|
||||||
|
self.del_host()
|
||||||
|
else:
|
||||||
|
LOG.info('Successfully connected to coordination backend.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
hooks = plugins_base.validate_processing_hooks()
|
hooks = plugins_base.validate_processing_hooks()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
@ -91,11 +110,20 @@ class ConductorManager(object):
|
|||||||
)(sync_with_ironic)
|
)(sync_with_ironic)
|
||||||
|
|
||||||
callables = [(periodic_clean_up_, None, None),
|
callables = [(periodic_clean_up_, None, None),
|
||||||
(sync_with_ironic_, None, None)]
|
(sync_with_ironic_, (self,), None)]
|
||||||
|
|
||||||
driver_task = driver.get_periodic_sync_task()
|
driver_task = driver.get_periodic_sync_task()
|
||||||
if driver_task is not None:
|
if driver_task is not None:
|
||||||
callables.append((driver_task, None, None))
|
callables.append((driver_task, None, None))
|
||||||
|
|
||||||
|
# run elections periodically if we have a coordinator
|
||||||
|
# that we were able to start
|
||||||
|
if (self.coordinator and self.coordinator.started):
|
||||||
|
periodic_leader_election_ = periodics.periodic(
|
||||||
|
spacing=CONF.leader_election_interval
|
||||||
|
)(periodic_leader_election)
|
||||||
|
callables.append((periodic_leader_election_, (self,), None))
|
||||||
|
|
||||||
self._periodics_worker = periodics.PeriodicWorker(
|
self._periodics_worker = periodics.PeriodicWorker(
|
||||||
callables=callables,
|
callables=callables,
|
||||||
executor_factory=periodics.ExistingExecutor(utils.executor()),
|
executor_factory=periodics.ExistingExecutor(utils.executor()),
|
||||||
@ -109,28 +137,14 @@ class ConductorManager(object):
|
|||||||
self._zeroconf.register_service('baremetal-introspection',
|
self._zeroconf.register_service('baremetal-introspection',
|
||||||
endpoint)
|
endpoint)
|
||||||
|
|
||||||
if not CONF.standalone:
|
|
||||||
try:
|
|
||||||
coordinator = coordination.get_coordinator(prefix='conductor')
|
|
||||||
coordinator.start(heartbeat=True)
|
|
||||||
coordinator.join_group()
|
|
||||||
except tooz.ToozError:
|
|
||||||
with excutils.save_and_reraise_exception():
|
|
||||||
LOG.critical('Failed when connecting to coordination '
|
|
||||||
'backend.')
|
|
||||||
self.del_host()
|
|
||||||
else:
|
|
||||||
LOG.info('Successfully connected to coordination backend.')
|
|
||||||
|
|
||||||
def del_host(self):
|
def del_host(self):
|
||||||
"""Shutdown the ironic inspector conductor service."""
|
"""Shutdown the ironic inspector conductor service."""
|
||||||
|
|
||||||
if not CONF.standalone:
|
if self.coordinator is not None:
|
||||||
try:
|
try:
|
||||||
coordinator = coordination.get_coordinator(prefix='conductor')
|
if self.coordinator.started:
|
||||||
if coordinator.started:
|
self.coordinator.leave_group()
|
||||||
coordinator.leave_group()
|
self.coordinator.stop()
|
||||||
coordinator.stop()
|
|
||||||
except tooz.ToozError:
|
except tooz.ToozError:
|
||||||
LOG.exception('Failed to stop coordinator')
|
LOG.exception('Failed to stop coordinator')
|
||||||
|
|
||||||
@ -201,9 +215,22 @@ def periodic_clean_up(): # pragma: no cover
|
|||||||
pxe_filter.driver().sync(ir_utils.get_client())
|
pxe_filter.driver().sync(ir_utils.get_client())
|
||||||
|
|
||||||
|
|
||||||
def sync_with_ironic():
|
def sync_with_ironic(conductor):
|
||||||
|
if (conductor.coordinator is not None
|
||||||
|
and not conductor.coordinator.is_leader):
|
||||||
|
LOG.debug('The conductor is not a leader, skipping syncing '
|
||||||
|
'with ironic')
|
||||||
|
return
|
||||||
|
|
||||||
|
LOG.debug('Syncing with ironic')
|
||||||
ironic = ir_utils.get_client()
|
ironic = ir_utils.get_client()
|
||||||
# TODO(yuikotakada): pagination
|
# TODO(yuikotakada): pagination
|
||||||
ironic_nodes = ironic.nodes(fields=["uuid"], limit=None)
|
ironic_nodes = ironic.nodes(fields=["uuid"], limit=None)
|
||||||
ironic_node_uuids = {node.id for node in ironic_nodes}
|
ironic_node_uuids = {node.id for node in ironic_nodes}
|
||||||
node_cache.delete_nodes_not_in_list(ironic_node_uuids)
|
node_cache.delete_nodes_not_in_list(ironic_node_uuids)
|
||||||
|
|
||||||
|
|
||||||
|
def periodic_leader_election(conductor):
|
||||||
|
if conductor.coordinator is not None:
|
||||||
|
conductor.coordinator.run_elect_coordinator()
|
||||||
|
return
|
||||||
|
@ -64,6 +64,9 @@ _OPTS = [
|
|||||||
'Not advisable if the deployment uses a PXE filter, '
|
'Not advisable if the deployment uses a PXE filter, '
|
||||||
'and will result in the ironic-inspector ceasing '
|
'and will result in the ironic-inspector ceasing '
|
||||||
'periodic cleanup activities.')),
|
'periodic cleanup activities.')),
|
||||||
|
cfg.IntOpt('leader_election_interval',
|
||||||
|
default=10,
|
||||||
|
help=_('Interval (in seconds) between leader elections.')),
|
||||||
cfg.BoolOpt('use_ssl',
|
cfg.BoolOpt('use_ssl',
|
||||||
default=False,
|
default=False,
|
||||||
help=_('SSL Enabled/Disabled')),
|
help=_('SSL Enabled/Disabled')),
|
||||||
|
@ -79,15 +79,21 @@ class TestManagerInitHost(BaseManagerTest):
|
|||||||
self.mock_executor.return_value.submit.assert_called_once_with(
|
self.mock_executor.return_value.submit.assert_called_once_with(
|
||||||
self.manager._periodics_worker.start)
|
self.manager._periodics_worker.start)
|
||||||
|
|
||||||
def test_no_introspection_data_store(self):
|
@mock.patch.object(coordination, 'get_coordinator', autospec=True)
|
||||||
|
def test_no_introspection_data_store(self, mock_get_coord):
|
||||||
CONF.set_override('store_data', 'none', 'processing')
|
CONF.set_override('store_data', 'none', 'processing')
|
||||||
|
mock_coordinator = mock.MagicMock()
|
||||||
|
mock_get_coord.return_value = mock_coordinator
|
||||||
self.manager.init_host()
|
self.manager.init_host()
|
||||||
self.mock_log.warning.assert_called_once_with(
|
self.mock_log.warning.assert_called_once_with(
|
||||||
'Introspection data will not be stored. Change "[processing] '
|
'Introspection data will not be stored. Change "[processing] '
|
||||||
'store_data" option if this is not the desired behavior')
|
'store_data" option if this is not the desired behavior')
|
||||||
|
|
||||||
|
@mock.patch.object(coordination, 'get_coordinator', autospec=True)
|
||||||
@mock.patch.object(mdns, 'Zeroconf', autospec=True)
|
@mock.patch.object(mdns, 'Zeroconf', autospec=True)
|
||||||
def test_init_host(self, mock_zc):
|
def test_init_host(self, mock_zc, mock_get_coord):
|
||||||
|
mock_coordinator = mock.MagicMock()
|
||||||
|
mock_get_coord.return_value = mock_coordinator
|
||||||
self.manager.init_host()
|
self.manager.init_host()
|
||||||
self.mock_db_init.assert_called_once_with()
|
self.mock_db_init.assert_called_once_with()
|
||||||
self.mock_validate_processing_hooks.assert_called_once_with()
|
self.mock_validate_processing_hooks.assert_called_once_with()
|
||||||
@ -112,10 +118,13 @@ class TestManagerInitHost(BaseManagerTest):
|
|||||||
self.mock_exit.assert_called_once_with(1)
|
self.mock_exit.assert_called_once_with(1)
|
||||||
self.mock_filter.init_filter.assert_not_called()
|
self.mock_filter.init_filter.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(coordination, 'get_coordinator', autospec=True)
|
||||||
@mock.patch.object(mdns, 'Zeroconf', autospec=True)
|
@mock.patch.object(mdns, 'Zeroconf', autospec=True)
|
||||||
@mock.patch.object(keystone, 'get_endpoint', autospec=True)
|
@mock.patch.object(keystone, 'get_endpoint', autospec=True)
|
||||||
def test_init_host_with_mdns(self, mock_endpoint, mock_zc):
|
def test_init_host_with_mdns(self, mock_endpoint, mock_zc, mock_get_coord):
|
||||||
CONF.set_override('enable_mdns', True)
|
CONF.set_override('enable_mdns', True)
|
||||||
|
mock_coordinator = mock.MagicMock()
|
||||||
|
mock_get_coord.return_value = mock_coordinator
|
||||||
self.manager.init_host()
|
self.manager.init_host()
|
||||||
self.mock_db_init.assert_called_once_with()
|
self.mock_db_init.assert_called_once_with()
|
||||||
self.mock_validate_processing_hooks.assert_called_once_with()
|
self.mock_validate_processing_hooks.assert_called_once_with()
|
||||||
@ -149,9 +158,9 @@ class TestManagerInitHost(BaseManagerTest):
|
|||||||
None)
|
None)
|
||||||
self.assertRaises(tooz.ToozError, self.manager.init_host)
|
self.assertRaises(tooz.ToozError, self.manager.init_host)
|
||||||
self.mock_db_init.assert_called_once_with()
|
self.mock_db_init.assert_called_once_with()
|
||||||
self.mock_validate_processing_hooks.assert_called_once_with()
|
self.mock_validate_processing_hooks.assert_not_called()
|
||||||
self.mock_filter.init_filter.assert_called_once_with()
|
self.mock_filter.init_filter.assert_not_called()
|
||||||
self.assert_periodics()
|
self.assertIsNone(self.manager._periodics_worker)
|
||||||
mock_get_coord.assert_called_once_with(prefix='conductor')
|
mock_get_coord.assert_called_once_with(prefix='conductor')
|
||||||
mock_del_host.assert_called_once_with(self.manager)
|
mock_del_host.assert_called_once_with(self.manager)
|
||||||
|
|
||||||
|
6
releasenotes/notes/leader-election-c6692d9962f30ad1.yaml
Normal file
6
releasenotes/notes/leader-election-c6692d9962f30ad1.yaml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Adds periodic leader election for the cleanup sync with Ironic.
|
||||||
|
The election interval is configured by the new
|
||||||
|
``leader_election_interval`` config option.
|
Loading…
Reference in New Issue
Block a user