From 3f254e32a945b9504717a1f91c5eaa24b154df52 Mon Sep 17 00:00:00 2001 From: dineshbhor Date: Mon, 21 Nov 2016 17:11:52 +0530 Subject: [PATCH] Add host_failure workflow for 'reserved_host' recovery_method Added workflow for evacuating instances from failed_host using 'reserved_host' recovery method. Partially-Implements: blueprint implement-recovery-methods Change-Id: I6dc2b35cf40c4506beec866c0a660179b19cd3ca --- masakari/conf/engine.py | 5 +- masakari/engine/driver.py | 2 +- masakari/engine/drivers/taskflow/base.py | 3 +- masakari/engine/drivers/taskflow/driver.py | 18 ++- .../engine/drivers/taskflow/host_failure.py | 108 +++++++++++++---- .../drivers/taskflow/process_failure.py | 5 +- masakari/engine/manager.py | 20 +++- masakari/exception.py | 16 ++- .../taskflow/test_host_failure_flow.py | 112 +++++++++++++----- .../taskflow/test_process_failure_flow.py | 6 +- masakari/tests/unit/engine/test_engine_mgr.py | 64 +++++++++- masakari/utils.py | 26 +++- ...host_recovery_method-d2de1f205136c8d5.yaml | 8 ++ 13 files changed, 318 insertions(+), 75 deletions(-) create mode 100644 releasenotes/notes/reserved_host_recovery_method-d2de1f205136c8d5.yaml diff --git a/masakari/conf/engine.py b/masakari/conf/engine.py index 93c9faf7..4e4c65d9 100644 --- a/masakari/conf/engine.py +++ b/masakari/conf/engine.py @@ -62,9 +62,10 @@ notification_opts = [ "the notification will be considered as duplicate and " "it will be ignored." ), - cfg.IntOpt('wait_period_after_service_disabled', + cfg.IntOpt('wait_period_after_service_update', default=180, - help='Wait until service is disabled'), + help='Number of seconds to wait after a service is enabled ' + 'or disabled.'), cfg.IntOpt('wait_period_after_evacuation', default=90, help='Wait until instance is evacuated'), diff --git a/masakari/engine/driver.py b/masakari/engine/driver.py index 8b9b7a29..2806b9d7 100644 --- a/masakari/engine/driver.py +++ b/masakari/engine/driver.py @@ -40,7 +40,7 @@ LOG = logging.getLogger(__name__) class NotificationDriver(object): @abc.abstractmethod def execute_host_failure(self, context, host_name, recovery_method, - notification_uuid): + notification_uuid, reserved_host_list=None): pass @abc.abstractmethod diff --git a/masakari/engine/drivers/taskflow/base.py b/masakari/engine/drivers/taskflow/base.py index f6ab1640..f5583ad4 100644 --- a/masakari/engine/drivers/taskflow/base.py +++ b/masakari/engine/drivers/taskflow/base.py @@ -53,7 +53,8 @@ class MasakariTask(task.Task): class SpecialFormatter(formatters.FailureFormatter): # Exception is an excepted case, don't include traceback in log if fails. - _NO_TRACE_EXCEPTIONS = (exception.SkipInstanceRecoveryException,) + _NO_TRACE_EXCEPTIONS = (exception.SkipInstanceRecoveryException, + exception.SkipHostRecoveryException) def __init__(self, engine): super(SpecialFormatter, self).__init__(engine) diff --git a/masakari/engine/drivers/taskflow/driver.py b/masakari/engine/drivers/taskflow/driver.py index b7c20c3b..d8aad36b 100644 --- a/masakari/engine/drivers/taskflow/driver.py +++ b/masakari/engine/drivers/taskflow/driver.py @@ -40,7 +40,7 @@ class TaskFlowDriver(driver.NotificationDriver): super(TaskFlowDriver, self).__init__() def execute_host_failure(self, context, host_name, recovery_method, - notification_uuid): + notification_uuid, reserved_host_list=None): novaclient = nova.API() # get flow for host failure process_what = { @@ -54,9 +54,14 @@ class TaskFlowDriver(driver.NotificationDriver): process_what) elif recovery_method == ( fields.FailoverSegmentRecoveryMethod.RESERVED_HOST): - raise NotImplementedError(_("Flow not implemented for " - "recovery_method"), - recovery_method) + if not reserved_host_list: + msg = _('No reserved_hosts available for evacuation.') + LOG.info(msg) + raise exception.ReservedHostsUnavailable(message=msg) + + process_what['reserved_host_list'] = reserved_host_list + flow_engine = host_failure.get_rh_flow( + novaclient, process_what) elif recovery_method == ( fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY): raise NotImplementedError(_("Flow not implemented for " @@ -77,7 +82,10 @@ class TaskFlowDriver(driver.NotificationDriver): # taskflow sends out and redirect them to a more useful log for # masakari's debugging (or error reporting) usage. with base.DynamicLogListener(flow_engine, logger=LOG): - flow_engine.run() + try: + flow_engine.run() + except exception.LockAlreadyAcquired as ex: + raise exception.HostRecoveryFailureException(ex.message) def execute_instance_failure(self, context, instance_uuid, notification_uuid): diff --git a/masakari/engine/drivers/taskflow/host_failure.py b/masakari/engine/drivers/taskflow/host_failure.py index a0095dbc..c02867fb 100644 --- a/masakari/engine/drivers/taskflow/host_failure.py +++ b/masakari/engine/drivers/taskflow/host_failure.py @@ -21,11 +21,13 @@ from oslo_service import loopingcall from oslo_utils import strutils import taskflow.engines from taskflow.patterns import linear_flow +from taskflow import retry import masakari.conf from masakari.engine.drivers.taskflow import base from masakari import exception from masakari.i18n import _, _LI +from masakari import utils CONF = masakari.conf.CONF @@ -48,8 +50,8 @@ class DisableComputeServiceTask(base.MasakariTask): # Sleep until nova-compute service is marked as disabled. msg = _LI("Sleeping %(wait)s sec before starting recovery " "thread until nova recognizes the node down.") - LOG.info(msg, {'wait': CONF.wait_period_after_service_disabled}) - eventlet.sleep(CONF.wait_period_after_service_disabled) + LOG.info(msg, {'wait': CONF.wait_period_after_service_update}) + eventlet.sleep(CONF.wait_period_after_service_update) class PrepareHAEnabledInstancesTask(base.MasakariTask): @@ -74,34 +76,72 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask): [instance for instance in instance_list if strutils.bool_from_string(instance.metadata.get('HA_Enabled', False))]) + if not instance_list: + msg = _('No instances to evacuate on host: %s.') % host_name + LOG.info(msg) + raise exception.SkipHostRecoveryException(message=msg) return { "instance_list": instance_list, } -class AutoEvacuationInstancesTask(base.MasakariTask): +class EvacuateInstancesTask(base.MasakariTask): default_provides = set(["instance_list"]) def __init__(self, novaclient): requires = ["instance_list"] - super(AutoEvacuationInstancesTask, self).__init__(addons=[ACTION], - requires=requires) + super(EvacuateInstancesTask, self).__init__(addons=[ACTION], + requires=requires) self.novaclient = novaclient - def execute(self, context, instance_list): - for instance in instance_list: - vm_state = getattr(instance, "OS-EXT-STS:vm_state") - if vm_state in ['active', 'error', 'resized', 'stopped']: - # Evacuate API only evacuates an instance in - # active, stop or error state. If an instance is in - # resized status, masakari resets the instance - # state to *error* to evacuate it. - if vm_state == 'resized': - self.novaclient.reset_instance_state( - context, instance.id) - # evacuate the instances to new host - self.novaclient.evacuate_instance(context, instance.id) + def execute(self, context, instance_list, reserved_host=None): + def _do_evacuate(context, instance_list, reserved_host=None): + if reserved_host: + self.novaclient.enable_disable_service( + context, reserved_host.name, enable=True) + + # Sleep until nova-compute service is marked as enabled. + msg = _LI("Sleeping %(wait)s sec before starting recovery " + "thread until nova recognizes the node up.") + LOG.info(msg, { + 'wait': CONF.wait_period_after_service_update}) + eventlet.sleep(CONF.wait_period_after_service_update) + + # Set reserved property of reserved_host to False + reserved_host.reserved = False + reserved_host.save() + + for instance in instance_list: + vm_state = getattr(instance, "OS-EXT-STS:vm_state") + if vm_state in ['active', 'error', 'resized', 'stopped']: + # Evacuate API only evacuates an instance in + # active, stop or error state. If an instance is in + # resized status, masakari resets the instance + # state to *error* to evacuate it. + if vm_state == 'resized': + self.novaclient.reset_instance_state( + context, instance.id) + + # evacuate the instance + self.novaclient.evacuate_instance( + context, instance.id, + target=reserved_host.name if reserved_host else None) + + lock_name = reserved_host.name if reserved_host else None + + @utils.synchronized(lock_name) + def do_evacuate_with_reserved_host(context, instance_list, + reserved_host): + _do_evacuate(context, instance_list, reserved_host=reserved_host) + + if lock_name: + do_evacuate_with_reserved_host(context, instance_list, + reserved_host) + else: + # No need to acquire lock on reserved_host when recovery_method is + # 'auto' as the selection of compute host will be decided by nova. + _do_evacuate(context, instance_list) return { "instance_list": instance_list, @@ -112,7 +152,7 @@ class ConfirmEvacuationTask(base.MasakariTask): def __init__(self, novaclient): requires = ["instance_list", "host_name"] super(ConfirmEvacuationTask, self).__init__(addons=[ACTION], - requires=requires) + requires=requires) self.novaclient = novaclient def execute(self, context, instance_list, host_name): @@ -152,7 +192,7 @@ class ConfirmEvacuationTask(base.MasakariTask): 'instances': failed_evacuation_instances, 'host_name': host_name } - raise exception.AutoRecoveryFailureException(message=msg) + raise exception.HostRecoveryFailureException(message=msg) def get_auto_flow(novaclient, process_what): @@ -171,7 +211,33 @@ def get_auto_flow(novaclient, process_what): auto_evacuate_flow.add(DisableComputeServiceTask(novaclient), PrepareHAEnabledInstancesTask(novaclient), - AutoEvacuationInstancesTask(novaclient), + EvacuateInstancesTask(novaclient), ConfirmEvacuationTask(novaclient)) return taskflow.engines.load(auto_evacuate_flow, store=process_what) + + +def get_rh_flow(novaclient, process_what): + """Constructs and returns the engine entrypoint flow. + + This flow will do the following: + + 1. Disable compute service on source host + 2. Get all HA_Enabled instances. + 3. Evacuate all the HA_Enabled instances using reserved_host. + 4. Confirm evacuation of instances. + """ + flow_name = ACTION.replace(":", "_") + "_engine" + nested_flow = linear_flow.Flow(flow_name) + + rh_flow = linear_flow.Flow( + "retry_%s" % flow_name, retry=retry.ParameterizedForEach( + rebind=['reserved_host_list'], provides='reserved_host')) + + rh_flow.add(PrepareHAEnabledInstancesTask(novaclient), + EvacuateInstancesTask(novaclient), + ConfirmEvacuationTask(novaclient)) + + nested_flow.add(DisableComputeServiceTask(novaclient), rh_flow) + + return taskflow.engines.load(nested_flow, store=process_what) diff --git a/masakari/engine/drivers/taskflow/process_failure.py b/masakari/engine/drivers/taskflow/process_failure.py index ab73e3f7..f4cb3439 100644 --- a/masakari/engine/drivers/taskflow/process_failure.py +++ b/masakari/engine/drivers/taskflow/process_failure.py @@ -70,8 +70,9 @@ class ConfirmComputeNodeDisabledTask(base.MasakariTask): try: # add a timeout to the periodic call. periodic_call.start(interval=CONF.verify_interval) - etimeout.with_timeout(CONF.wait_period_after_service_disabled, - periodic_call.wait) + etimeout.with_timeout( + CONF.wait_period_after_service_update, + periodic_call.wait) except etimeout.Timeout: msg = _("Failed to disable service %(process_name)s") % { 'process_name': process_name diff --git a/masakari/engine/manager.py b/masakari/engine/manager.py index b44cd694..b69ae629 100644 --- a/masakari/engine/manager.py +++ b/masakari/engine/manager.py @@ -145,12 +145,24 @@ class MasakariManager(manager.Manager): host_obj.update(update_data) host_obj.save() + reserved_host_list = None + if not recovery_method == ( + fields.FailoverSegmentRecoveryMethod.AUTO): + reserved_host_list = objects.HostList.get_all( + context, filters={ + 'failover_segment_id': host_obj.failover_segment_id, + 'reserved': True}) + try: self.driver.execute_host_failure( context, host_name, recovery_method, - notification.notification_uuid) - except (exception.MasakariException, - exception.AutoRecoveryFailureException): + notification.notification_uuid, + reserved_host_list=reserved_host_list) + except exception.SkipHostRecoveryException: + notification_status = fields.NotificationStatus.FINISHED + except (exception.HostRecoveryFailureException, + exception.ReservedHostsUnavailable, + exception.MasakariException): notification_status = fields.NotificationStatus.ERROR else: LOG.warning(_LW("Invalid event: %(event)s received for " @@ -163,7 +175,7 @@ class MasakariManager(manager.Manager): return notification_status def _process_notification(self, context, notification): - @utils.synchronized(notification.source_host_uuid) + @utils.synchronized(notification.source_host_uuid, blocking=True) def do_process_notification(notification): LOG.info(_LI('Processing notification %(notification_uuid)s of ' 'type: %(type)s'), { diff --git a/masakari/exception.py b/masakari/exception.py index 1bc8be9c..649bd96e 100644 --- a/masakari/exception.py +++ b/masakari/exception.py @@ -307,8 +307,8 @@ class HostOnMaintenanceError(Invalid): code = http.CONFLICT -class AutoRecoveryFailureException(MasakariException): - msg_fmt = _('Failed to execute auto recovery method.') +class HostRecoveryFailureException(MasakariException): + msg_fmt = _('Failed to execute host recovery.') class InstanceRecoveryFailureException(MasakariException): @@ -323,6 +323,10 @@ class SkipProcessRecoveryException(MasakariException): msg_fmt = _('Skipping execution of process recovery workflow.') +class SkipHostRecoveryException(MasakariException): + msg_fmt = _('Skipping execution of host recovery workflow.') + + class ProcessRecoveryFailureException(MasakariException): msg_fmt = _('Failed to execute process recovery workflow.') @@ -340,3 +344,11 @@ class FailoverSegmentInUse(Conflict): class HostInUse(Conflict): msg_fmt = _("Host %(uuid)s can't be updated as it is in-use to process " "notifications.") + + +class ReservedHostsUnavailable(MasakariException): + msg_fmt = _('No reserved_hosts available for evacuation.') + + +class LockAlreadyAcquired(MasakariException): + msg_fmt = _('Lock is already acquired on %(resource)s.') diff --git a/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py b/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py index 4d8aa8e9..2fa96065 100644 --- a/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py +++ b/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py @@ -25,6 +25,7 @@ from masakari import conf from masakari import context from masakari.engine.drivers.taskflow import host_failure from masakari import exception +from masakari.objects import host as host_obj from masakari import test from masakari.tests.unit import fakes @@ -37,10 +38,10 @@ class HostFailureTestCase(test.TestCase): super(HostFailureTestCase, self).setUp() self.ctxt = context.get_admin_context() # overriding 'wait_period_after_evacuation' and - # 'wait_period_after_service_disabled' to 2 seconds to + # 'wait_period_after_service_update' to 2 seconds to # reduce the wait period. self.override_config("wait_period_after_evacuation", 2) - self.override_config("wait_period_after_service_disabled", 2) + self.override_config("wait_period_after_service_update", 2) self.override_config("evacuate_all_instances", False, "host_failure") self.instance_host = "fake-host" @@ -59,11 +60,13 @@ class HostFailureTestCase(test.TestCase): def _test_disable_compute_service(self): task = host_failure.DisableComputeServiceTask(self.novaclient) - with mock.patch.object(fakes.FakeNovaClient.Services, - "disable") as mock_disable: + with mock.patch.object( + self.novaclient, + "enable_disable_service") as mock_enable_disable_service: task.execute(self.ctxt, self.instance_host) - mock_disable.assert_called_once_with(self.instance_host, - "nova-compute") + + mock_enable_disable_service.assert_called_once_with( + self.ctxt, self.instance_host) def _test_instance_list(self): task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) @@ -80,10 +83,21 @@ class HostFailureTestCase(test.TestCase): return instance_list - def _auto_evacuate_instances(self, instance_list): - task = host_failure.AutoEvacuationInstancesTask(self.novaclient) - instance_list = task.execute( - self.ctxt, instance_list['instance_list']) + def _evacuate_instances(self, instance_list, reserved_host=None): + task = host_failure.EvacuateInstancesTask(self.novaclient) + if reserved_host: + with mock.patch.object( + self.novaclient, + "enable_disable_service") as mock_enable_disable_service: + instance_list = task.execute(self.ctxt, + instance_list['instance_list'], + reserved_host=reserved_host) + + mock_enable_disable_service.assert_called_once_with( + self.ctxt, reserved_host.name, enable=True) + else: + instance_list = task.execute( + self.ctxt, instance_list['instance_list']) return instance_list @@ -95,7 +109,7 @@ class HostFailureTestCase(test.TestCase): self._verify_instance_evacuated() @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow(self, _mock_novaclient): + def test_host_failure_flow_for_auto_recovery(self, _mock_novaclient): _mock_novaclient.return_value = self.fake_client self.override_config("evacuate_all_instances", True, "host_failure") @@ -111,15 +125,43 @@ class HostFailureTestCase(test.TestCase): # execute PrepareHAEnabledInstancesTask instance_list = self._test_instance_list() - # execute AutoEvacuationInstancesTask - instance_list = self._auto_evacuate_instances( - instance_list) + # execute EvacuateInstancesTask + instance_list = self._evacuate_instances(instance_list) # execute ConfirmEvacuationTask self._test_confirm_evacuate_task(instance_list) @mock.patch('masakari.compute.nova.novaclient') - def test_auto_evacuate_instances_task(self, _mock_novaclient): + def test_host_failure_flow_for_reserved_host_recovery( + self, _mock_novaclient): + _mock_novaclient.return_value = self.fake_client + self.override_config("evacuate_all_instances", + True, "host_failure") + + # create test data + self.fake_client.servers.create(id="1", host=self.instance_host, + ha_enabled=True) + self.fake_client.servers.create(id="2", host=self.instance_host) + reserved_host = fakes.create_fake_host(name="fake-reserved-host", + reserved=True) + + # execute DisableComputeServiceTask + self._test_disable_compute_service() + + # execute PrepareHAEnabledInstancesTask + instance_list = self._test_instance_list() + + # execute EvacuateInstancesTask + with mock.patch.object(host_obj.Host, "save") as mock_save: + instance_list = self._evacuate_instances( + instance_list, reserved_host=reserved_host) + self.assertEqual(1, mock_save.call_count) + + # execute ConfirmEvacuationTask + self._test_confirm_evacuate_task(instance_list) + + @mock.patch('masakari.compute.nova.novaclient') + def test_evacuate_instances_task(self, _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data @@ -134,8 +176,8 @@ class HostFailureTestCase(test.TestCase): # execute PrepareHAEnabledInstancesTask instance_list = self._test_instance_list() - # execute AutoEvacuationInstancesTask - task = host_failure.AutoEvacuationInstancesTask(self.novaclient) + # execute EvacuateInstancesTask + task = host_failure.EvacuateInstancesTask(self.novaclient) # mock evacuate method of FakeNovaClient to confirm that evacuate # method is called. with mock.patch.object(fakes.FakeNovaClient.ServerManager, @@ -157,8 +199,8 @@ class HostFailureTestCase(test.TestCase): # execute PrepareHAEnabledInstancesTask task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) - instance_list = task.execute(self.ctxt, self.instance_host) - self.assertEqual(0, len(instance_list['instance_list'])) + self.assertRaises(exception.SkipHostRecoveryException, task.execute, + self.ctxt, self.instance_host) @mock.patch('masakari.compute.nova.novaclient') def test_host_failure_flow_evacuation_failed(self, _mock_novaclient): @@ -172,8 +214,8 @@ class HostFailureTestCase(test.TestCase): "instance_list": self.fake_client.servers.list() } - # execute AutoEvacuationInstancesTask - instance_list = self._auto_evacuate_instances( + # execute EvacuateInstancesTask + instance_list = self._evacuate_instances( instance_list) def fake_get_server(context, host): @@ -186,7 +228,7 @@ class HostFailureTestCase(test.TestCase): # execute ConfirmEvacuationTask task = host_failure.ConfirmEvacuationTask(self.novaclient) self.assertRaises( - exception.AutoRecoveryFailureException, task.execute, + exception.HostRecoveryFailureException, task.execute, self.ctxt, instance_list['instance_list'], self.instance_host) @@ -205,8 +247,8 @@ class HostFailureTestCase(test.TestCase): "instance_list": self.fake_client.servers.list() } - # execute AutoEvacuationInstancesTask - instance_list = self._auto_evacuate_instances( + # execute EvacuateInstancesTask + instance_list = self._evacuate_instances( instance_list) # execute ConfirmEvacuationTask @@ -227,8 +269,8 @@ class HostFailureTestCase(test.TestCase): "instance_list": self.fake_client.servers.list() } - # execute AutoEvacuationInstancesTask - instance_list = self._auto_evacuate_instances( + # execute EvacuateInstancesTask + instance_list = self._evacuate_instances( instance_list) # execute ConfirmEvacuationTask @@ -249,9 +291,23 @@ class HostFailureTestCase(test.TestCase): "instance_list": self.fake_client.servers.list() } - # execute AutoEvacuationInstancesTask - instance_list = self._auto_evacuate_instances( + # execute EvacuateInstancesTask + instance_list = self._evacuate_instances( instance_list) # execute ConfirmEvacuationTask self._test_confirm_evacuate_task(instance_list) + + @mock.patch('masakari.compute.nova.novaclient') + def test_host_failure_flow_no_instances_on_host(self, _mock_novaclient): + _mock_novaclient.return_value = self.fake_client + self.override_config("evacuate_all_instances", + True, "host_failure") + + # execute DisableComputeServiceTask + self._test_disable_compute_service() + + # execute PrepareHAEnabledInstancesTask + task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) + self.assertRaises(exception.SkipHostRecoveryException, task.execute, + self.ctxt, self.instance_host) diff --git a/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py b/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py index 9e35eac3..6432f772 100644 --- a/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py +++ b/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py @@ -36,9 +36,9 @@ class ProcessFailureTestCase(test.TestCase): self.service_host = "fake-host" self.novaclient = nova.API() self.fake_client = fakes.FakeNovaClient() - # overriding 'wait_period_after_service_disabled' to 2 seconds to - # reduce the wait period. - self.override_config('wait_period_after_service_disabled', 2) + # overriding 'wait_period_after_service_update' to 2 seconds + # to reduce the wait period. + self.override_config('wait_period_after_service_update', 2) @mock.patch('masakari.compute.nova.novaclient') def test_compute_process_failure_flow(self, _mock_novaclient): diff --git a/masakari/tests/unit/engine/test_engine_mgr.py b/masakari/tests/unit/engine/test_engine_mgr.py index ac2ca023..f8e9374f 100644 --- a/masakari/tests/unit/engine/test_engine_mgr.py +++ b/masakari/tests/unit/engine/test_engine_mgr.py @@ -216,15 +216,17 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): @mock.patch.object(host_obj.Host, "get_by_uuid") @mock.patch.object(host_obj.Host, "save") + @mock.patch.object(host_obj.HostList, "get_all") @mock.patch("masakari.engine.drivers.taskflow." "TaskFlowDriver.execute_host_failure") @mock.patch.object(notification_obj.Notification, "save") def test_process_notification_type_compute_host_event_stopped( - self, mock_notification_save, mock_host_failure, + self, mock_notification_save, mock_host_failure, mock_get_all, mock_host_save, mock_host_obj): notification = self._get_compute_host_type_notification() mock_host_failure.side_effect = self._fake_notification_workflow() fake_host = fakes.create_fake_host() + mock_get_all.return_value = None fake_host.failover_segment = fakes.create_fake_failover_segment() mock_host_obj.return_value = fake_host self.engine.process_notification(self.context, @@ -233,22 +235,76 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): mock_host_failure.assert_called_once_with( self.context, fake_host.name, fake_host.failover_segment.recovery_method, - notification.notification_uuid) + notification.notification_uuid, reserved_host_list=None) @mock.patch.object(host_obj.Host, "get_by_uuid") @mock.patch.object(host_obj.Host, "save") + @mock.patch.object(host_obj.HostList, "get_all") + @mock.patch.object(notification_obj.Notification, "save") + def test_process_notification_host_failure_without_reserved_hosts( + self, mock_notification_save, mock_get_all, + mock_host_save, mock_host_obj): + reserved_host_list = [] + mock_get_all.return_value = reserved_host_list + + fake_host = fakes.create_fake_host() + fake_host.failover_segment = fakes.create_fake_failover_segment( + recovery_method='reserved_host') + mock_host_obj.return_value = fake_host + + notification = self._get_compute_host_type_notification() + + self.engine.process_notification(self.context, + notification=notification) + + self.assertEqual("error", notification.status) + + @mock.patch.object(host_obj.Host, "get_by_uuid") + @mock.patch.object(host_obj.Host, "save") + @mock.patch.object(host_obj.HostList, "get_all") + @mock.patch("masakari.engine.drivers.taskflow." + "TaskFlowDriver.execute_host_failure") + @mock.patch.object(notification_obj.Notification, "save") + def test_process_notification_host_failure_with_reserved_hosts( + self, mock_notification_save, mock_host_failure, mock_get_all, + mock_host_save, mock_host_obj): + reserved_host_list = [fakes.create_fake_host(reserved=True)] + mock_get_all.return_value = reserved_host_list + + fake_host = fakes.create_fake_host() + fake_host.failover_segment = fakes.create_fake_failover_segment( + recovery_method='reserved_host') + mock_host_obj.return_value = fake_host + + notification = self._get_compute_host_type_notification() + mock_host_failure.side_effect = self._fake_notification_workflow() + + self.engine.process_notification(self.context, + notification=notification) + + self.assertEqual("finished", notification.status) + mock_host_failure.assert_called_once_with( + self.context, + fake_host.name, fake_host.failover_segment.recovery_method, + notification.notification_uuid, + reserved_host_list=reserved_host_list) + + @mock.patch.object(host_obj.Host, "get_by_uuid") + @mock.patch.object(host_obj.Host, "save") + @mock.patch.object(host_obj.HostList, "get_all") @mock.patch("masakari.engine.drivers.taskflow." "TaskFlowDriver.execute_host_failure") @mock.patch.object(notification_obj.Notification, "save") def test_process_notification_type_compute_host_recovery_exception( - self, mock_notification_save, mock_host_failure, + self, mock_notification_save, mock_host_failure, mock_get_all, mock_host_save, mock_host_obj): notification = self._get_compute_host_type_notification() fake_host = fakes.create_fake_host() + mock_get_all.return_value = None fake_host.failover_segment = fakes.create_fake_failover_segment() mock_host_obj.return_value = fake_host mock_host_failure.side_effect = self._fake_notification_workflow( - exc=exception.AutoRecoveryFailureException) + exc=exception.HostRecoveryFailureException) self.engine.process_notification(self.context, notification=notification) self.assertEqual("error", notification.status) diff --git a/masakari/utils.py b/masakari/utils.py index ba5850f6..6f3655d2 100644 --- a/masakari/utils.py +++ b/masakari/utils.py @@ -41,8 +41,6 @@ CONF = masakari.conf.CONF LOG = logging.getLogger(__name__) -synchronized = lockutils.synchronized_with_prefix('masakari-') - def utf8(value): """Try to turn a string into utf-8 if possible. @@ -267,3 +265,27 @@ def validate_integer(value, name, min_value=None, max_value=None): 'max_value': max_value}) ) return value + + +def synchronized(name, semaphores=None, blocking=False): + def wrap(f): + @six.wraps(f) + def inner(*args, **kwargs): + lock_name = 'masakari-%s' % name + int_lock = lockutils.internal_lock(lock_name, + semaphores=semaphores) + LOG.debug("Acquiring lock: %(lock_name)s on resource: " + "%(resource)s", {'lock_name': lock_name, + 'resource': f.__name__}) + + if not int_lock.acquire(blocking=blocking): + raise exception.LockAlreadyAcquired(resource=name) + try: + return f(*args, **kwargs) + finally: + LOG.debug("Releasing lock: %(lock_name)s on resource: " + "%(resource)s", {'lock_name': lock_name, + 'resource': f.__name__}) + int_lock.release() + return inner + return wrap diff --git a/releasenotes/notes/reserved_host_recovery_method-d2de1f205136c8d5.yaml b/releasenotes/notes/reserved_host_recovery_method-d2de1f205136c8d5.yaml new file mode 100644 index 00000000..1ab67103 --- /dev/null +++ b/releasenotes/notes/reserved_host_recovery_method-d2de1f205136c8d5.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Implemented workflow for 'reserved_host' recovery method in case of host + failure. Now operator can create or update failover segment with + 'reserved_host' recovery method along with the existing 'auto' method. + When 'reserved_host' recovery_method is set to a failover segment, + operators should also add one or more hosts with reserved flag set as True.