diff --git a/doc/source/architecture.rst b/doc/source/architecture.rst index 432359b9f..6df6f1f90 100644 --- a/doc/source/architecture.rst +++ b/doc/source/architecture.rst @@ -76,6 +76,7 @@ Watcher Applier This component is in charge of executing the :ref:`Action Plan ` built by the :ref:`Watcher Decision Engine `. +Taskflow is the default workflow engine for Watcher. It connects to the :ref:`message bus ` and launches the :ref:`Action Plan ` whenever a triggering message is @@ -110,6 +111,23 @@ If the :ref:`Action ` fails, the previous state of the :ref:`Managed resource ` (i.e. before the command was sent to the underlying OpenStack service). +In Stein, added a new config option 'action_execution_rule' which is a +dict type. Its key field is strategy name and the value is 'ALWAYS' or 'ANY'. +'ALWAYS' means the callback function returns True as usual. +'ANY' means the return depends on the result of previous action execution. +The callback returns True if previous action gets failed, and the engine +continues to run the next action. If previous action executes success, +the callback returns False then the next action will be ignored. +For strategies that aren't in 'action_execution_rule', the callback always +returns True. +Please add the next section in the watcher.conf file +if your strategy needs this feature. + +:: + + [watcher_workflow_engines.taskflow] + action_execution_rule = {'your strategy name': 'ANY'} + .. _archi_watcher_cli_definition: Watcher CLI diff --git a/releasenotes/notes/enhance-watcher-applier-engine-86c676ce8f179e68.yaml b/releasenotes/notes/enhance-watcher-applier-engine-86c676ce8f179e68.yaml new file mode 100644 index 000000000..f3508ecf9 --- /dev/null +++ b/releasenotes/notes/enhance-watcher-applier-engine-86c676ce8f179e68.yaml @@ -0,0 +1,16 @@ +--- +features: + - | + Added a new config option 'action_execution_rule' which is a dict type. + Its key field is strategy name and the value is 'ALWAYS' or 'ANY'. + 'ALWAYS' means the callback function returns True as usual. + 'ANY' means the return depends on the result of previous action execution. + The callback returns True if previous action gets failed, and the engine + continues to run the next action. If previous action executes success, + the callback returns False then the next action will be ignored. + For strategies that aren't in 'action_execution_rule', the callback always + returns True. + Please add the next section in the watcher.conf file + if your strategy needs this feature. + [watcher_workflow_engines.taskflow] + action_execution_rule = {'your strategy name': 'ANY'} diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index 677394490..bf2b968f2 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -58,6 +58,7 @@ class BaseWorkFlowEngine(loadable.Loadable): self._action_factory = factory.ActionFactory() self._osc = None self._is_notified = False + self.execution_rule = None @classmethod def get_config_opts(cls): @@ -206,11 +207,14 @@ class BaseTaskFlowActionContainer(flow_task.Task): et = eventlet.spawn(_do_execute_action, *args, **kwargs) # NOTE: check for the state of action plan periodically,so that if # action is finished or action plan is cancelled we can exit from here. + result = False while True: action_object = objects.Action.get_by_uuid( self.engine.context, self._db_action.uuid, eager=True) action_plan_object = objects.ActionPlan.get_by_id( self.engine.context, action_object.action_plan_id) + if action_object.state == objects.action.State.SUCCEEDED: + result = True if (action_object.state in [objects.action.State.SUCCEEDED, objects.action.State.FAILED] or action_plan_object.state in CANCEL_STATE): @@ -226,6 +230,7 @@ class BaseTaskFlowActionContainer(flow_task.Task): if (action_plan_object.state in CANCEL_STATE and abort): et.kill() et.wait() + return result # NOTE: catch the greenlet exit exception due to thread kill, # taskflow will call revert for the action, @@ -236,7 +241,8 @@ class BaseTaskFlowActionContainer(flow_task.Task): except Exception as e: LOG.exception(e) - raise + # return False instead of raising an exception + return False def post_execute(self): try: diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index 157cb9848..764addb3b 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -38,8 +38,6 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): """ def decider(self, history): - # FIXME(jed) not possible with the current Watcher Planner - # # decider – A callback function that will be expected to # decide at runtime whether v should be allowed to execute # (or whether the execution of v should be ignored, @@ -48,7 +46,11 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): # all u decidable links that have v as a target. It is expected # to return a single boolean # (True to allow v execution or False to not). - return True + LOG.info("decider history: %s", history) + if history and self.execution_rule == 'ANY': + return not list(history.values())[0] + else: + return True @classmethod def get_config_opts(cls): @@ -59,9 +61,27 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): min=1, required=True, help='Number of workers for taskflow engine ' - 'to execute actions.') + 'to execute actions.'), + cfg.DictOpt( + 'action_execution_rule', + default={}, + help='The execution rule for linked actions,' + 'the key is strategy name and ' + 'value ALWAYS means all actions will be executed,' + 'value ANY means if previous action executes ' + 'success, the next action will be ignored.' + 'None means ALWAYS.') ] + def get_execution_rule(self, actions): + if actions: + actionplan_object = objects.ActionPlan.get_by_id( + self.context, actions[0].action_plan_id) + strategy_object = objects.Strategy.get_by_id( + self.context, actionplan_object.strategy_id) + return self.config.action_execution_rule.get( + strategy_object.name) + def execute(self, actions): try: # NOTE(jed) We want to have a strong separation of concern @@ -72,6 +92,7 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): # the users to change it. # The current implementation uses graph with linked actions. # todo(jed) add olso conf for retry and name + self.execution_rule = self.get_execution_rule(actions) flow = gf.Flow("watcher_flow") actions_uuid = {} for a in actions: diff --git a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py index b4f10041b..08053904a 100644 --- a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py +++ b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py @@ -66,9 +66,11 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): applier_manager=mock.MagicMock()) self.engine.config.max_workers = 2 + @mock.patch.object(objects.Strategy, "get_by_id") + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch('taskflow.engines.load') @mock.patch('taskflow.patterns.graph_flow.Flow.link') - def test_execute(self, graph_flow, engines): + def test_execute(self, graph_flow, engines, m_actionplan, m_strategy): actions = mock.MagicMock() try: self.engine.execute(actions) @@ -111,14 +113,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.Strategy, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_with_one_action(self, mock_send_update, mock_execution_notification, - m_get_actionplan): + m_get_actionplan, m_get_strategy): m_get_actionplan.return_value = obj_utils.get_test_action_plan( self.context, id=0) + m_get_strategy.return_value = obj_utils.get_test_strategy( + self.context, id=1) actions = [self.create_action("nop", {'message': 'test'})] try: self.engine.execute(actions) @@ -127,14 +132,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.Strategy, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_nop_sleep(self, mock_send_update, mock_execution_notification, - m_get_actionplan): + m_get_actionplan, m_get_strategy): m_get_actionplan.return_value = obj_utils.get_test_action_plan( self.context, id=0) + m_get_strategy.return_value = obj_utils.get_test_strategy( + self.context, id=1) actions = [] first_nop = self.create_action("nop", {'message': 'test'}) second_nop = self.create_action("nop", {'message': 'second test'}) @@ -149,14 +157,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.Strategy, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_with_parents(self, mock_send_update, mock_execution_notification, - m_get_actionplan): + m_get_actionplan, m_get_strategy): m_get_actionplan.return_value = obj_utils.get_test_action_plan( self.context, id=0) + m_get_strategy.return_value = obj_utils.get_test_strategy( + self.context, id=1) actions = [] first_nop = self.create_action( "nop", {'message': 'test'}, @@ -221,13 +232,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.Strategy, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_with_two_actions(self, m_send_update, m_execution, - m_get_actionplan): + m_get_actionplan, m_get_strategy): m_get_actionplan.return_value = obj_utils.get_test_action_plan( self.context, id=0) + m_get_strategy.return_value = obj_utils.get_test_strategy( + self.context, id=1) actions = [] second = self.create_action("sleep", {'duration': 0.0}) first = self.create_action("nop", {'message': 'test'}) @@ -242,13 +256,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.Strategy, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_with_three_actions(self, m_send_update, m_execution, - m_get_actionplan): + m_get_actionplan, m_get_strategy): m_get_actionplan.return_value = obj_utils.get_test_action_plan( self.context, id=0) + m_get_strategy.return_value = obj_utils.get_test_strategy( + self.context, id=1) actions = [] third = self.create_action("nop", {'message': 'next'}) second = self.create_action("sleep", {'duration': 0.0}) @@ -269,13 +286,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.Strategy, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_with_exception(self, m_send_update, m_execution, - m_get_actionplan): + m_get_actionplan, m_get_strategy): m_get_actionplan.return_value = obj_utils.get_test_action_plan( self.context, id=0) + m_get_strategy.return_value = obj_utils.get_test_strategy( + self.context, id=1) actions = [] third = self.create_action("no_exist", {'message': 'next'}) @@ -290,29 +310,28 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): actions.append(second) actions.append(third) - self.assertRaises(exception.WorkflowExecutionException, - self.engine.execute, actions) + self.engine.execute(actions) self.check_action_state(first, objects.action.State.SUCCEEDED) self.check_action_state(second, objects.action.State.SUCCEEDED) self.check_action_state(third, objects.action.State.FAILED) + @mock.patch.object(objects.Strategy, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') @mock.patch.object(factory.ActionFactory, "make_action") - def test_execute_with_action_exception(self, m_make_action, m_send_update, - m_send_execution, m_get_actionplan): + def test_execute_with_action_failed(self, m_make_action, m_send_update, + m_send_execution, m_get_actionplan, + m_get_strategy): m_get_actionplan.return_value = obj_utils.get_test_action_plan( self.context, id=0) + m_get_strategy.return_value = obj_utils.get_test_strategy( + self.context, id=1) actions = [self.create_action("fake_action", {})] m_make_action.return_value = FakeAction(mock.Mock()) - exc = self.assertRaises(exception.WorkflowExecutionException, - self.engine.execute, actions) - - self.assertIsInstance(exc.kwargs['error'], - exception.ActionExecutionFailure) + self.engine.execute(actions) self.check_action_state(actions[0], objects.action.State.FAILED) @mock.patch.object(objects.ActionPlan, "get_by_uuid") @@ -353,3 +372,20 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + + def test_decider(self): + # execution_rule is ALWAYS + self.engine.execution_rule = 'ALWAYS' + history = {'action1': True} + self.assertTrue(self.engine.decider(history)) + + history = {'action1': False} + self.assertTrue(self.engine.decider(history)) + + # execution_rule is ANY + self.engine.execution_rule = 'ANY' + history = {'action1': True} + self.assertFalse(self.engine.decider(history)) + + history = {'action1': False} + self.assertTrue(self.engine.decider(history)) diff --git a/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py b/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py index e591a6b06..8ebaa08be 100644 --- a/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py +++ b/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py @@ -21,7 +21,6 @@ import mock from watcher.applier.workflow_engine import default as tflow from watcher.common import clients -from watcher.common import exception from watcher.common import nova_helper from watcher import objects from watcher.tests.db import base @@ -81,8 +80,8 @@ class TestTaskFlowActionContainer(base.DbTestCase): db_action=action, engine=self.engine) - self.assertRaises(exception.ActionExecutionFailure, - action_container.execute, action_id=action.uuid) + result = action_container.execute() + self.assertFalse(result) self.assertTrue(action.state, objects.action.State.FAILED)