Enhance Watcher Applier Engine

Add a new config option 'action_execution_rule' which is a dict type.
Its key field is strategy name and the value is 'ALWAYS' or 'ANY'.
'ALWAYS' means the callback function returns True as usual.
'ANY' means the return depends on the result of previous action
execution. The callback returns True if previous action gets failed,
and the engine continues to run the next action. If previous action
executes success, the callback returns False then the next action
will be ignored.
For strategies that aren't in 'action_execution_rule', the callback
always returns True.

If exception is throwing out during the action execution, reverting will
be triggered by taskflow. To continue executing the next action,
we return False instead of throwing an exception.

Change-Id: Ib5afa214d8d097d739aad35d18b3fe5c8e4de8fc
Implements: blueprint enhance-watcher-applier-engine
This commit is contained in:
licanwei 2018-03-20 19:52:00 -07:00
parent e8c08e2abb
commit 69cf0d3ee5
6 changed files with 120 additions and 24 deletions

View File

@ -76,6 +76,7 @@ Watcher Applier
This component is in charge of executing the This component is in charge of executing the
:ref:`Action Plan <action_plan_definition>` built by the :ref:`Action Plan <action_plan_definition>` built by the
:ref:`Watcher Decision Engine <watcher_decision_engine_definition>`. :ref:`Watcher Decision Engine <watcher_decision_engine_definition>`.
Taskflow is the default workflow engine for Watcher.
It connects to the :ref:`message bus <amqp_bus_definition>` and launches the It connects to the :ref:`message bus <amqp_bus_definition>` and launches the
:ref:`Action Plan <action_plan_definition>` whenever a triggering message is :ref:`Action Plan <action_plan_definition>` whenever a triggering message is
@ -110,6 +111,23 @@ If the :ref:`Action <action_definition>` fails, the
previous state of the :ref:`Managed resource <managed_resource_definition>` previous state of the :ref:`Managed resource <managed_resource_definition>`
(i.e. before the command was sent to the underlying OpenStack service). (i.e. before the command was sent to the underlying OpenStack service).
In Stein, added a new config option 'action_execution_rule' which is a
dict type. Its key field is strategy name and the value is 'ALWAYS' or 'ANY'.
'ALWAYS' means the callback function returns True as usual.
'ANY' means the return depends on the result of previous action execution.
The callback returns True if previous action gets failed, and the engine
continues to run the next action. If previous action executes success,
the callback returns False then the next action will be ignored.
For strategies that aren't in 'action_execution_rule', the callback always
returns True.
Please add the next section in the watcher.conf file
if your strategy needs this feature.
::
[watcher_workflow_engines.taskflow]
action_execution_rule = {'your strategy name': 'ANY'}
.. _archi_watcher_cli_definition: .. _archi_watcher_cli_definition:
Watcher CLI Watcher CLI

View File

@ -0,0 +1,16 @@
---
features:
- |
Added a new config option 'action_execution_rule' which is a dict type.
Its key field is strategy name and the value is 'ALWAYS' or 'ANY'.
'ALWAYS' means the callback function returns True as usual.
'ANY' means the return depends on the result of previous action execution.
The callback returns True if previous action gets failed, and the engine
continues to run the next action. If previous action executes success,
the callback returns False then the next action will be ignored.
For strategies that aren't in 'action_execution_rule', the callback always
returns True.
Please add the next section in the watcher.conf file
if your strategy needs this feature.
[watcher_workflow_engines.taskflow]
action_execution_rule = {'your strategy name': 'ANY'}

View File

@ -58,6 +58,7 @@ class BaseWorkFlowEngine(loadable.Loadable):
self._action_factory = factory.ActionFactory() self._action_factory = factory.ActionFactory()
self._osc = None self._osc = None
self._is_notified = False self._is_notified = False
self.execution_rule = None
@classmethod @classmethod
def get_config_opts(cls): def get_config_opts(cls):
@ -206,11 +207,14 @@ class BaseTaskFlowActionContainer(flow_task.Task):
et = eventlet.spawn(_do_execute_action, *args, **kwargs) et = eventlet.spawn(_do_execute_action, *args, **kwargs)
# NOTE: check for the state of action plan periodically,so that if # NOTE: check for the state of action plan periodically,so that if
# action is finished or action plan is cancelled we can exit from here. # action is finished or action plan is cancelled we can exit from here.
result = False
while True: while True:
action_object = objects.Action.get_by_uuid( action_object = objects.Action.get_by_uuid(
self.engine.context, self._db_action.uuid, eager=True) self.engine.context, self._db_action.uuid, eager=True)
action_plan_object = objects.ActionPlan.get_by_id( action_plan_object = objects.ActionPlan.get_by_id(
self.engine.context, action_object.action_plan_id) self.engine.context, action_object.action_plan_id)
if action_object.state == objects.action.State.SUCCEEDED:
result = True
if (action_object.state in [objects.action.State.SUCCEEDED, if (action_object.state in [objects.action.State.SUCCEEDED,
objects.action.State.FAILED] or objects.action.State.FAILED] or
action_plan_object.state in CANCEL_STATE): action_plan_object.state in CANCEL_STATE):
@ -226,6 +230,7 @@ class BaseTaskFlowActionContainer(flow_task.Task):
if (action_plan_object.state in CANCEL_STATE and abort): if (action_plan_object.state in CANCEL_STATE and abort):
et.kill() et.kill()
et.wait() et.wait()
return result
# NOTE: catch the greenlet exit exception due to thread kill, # NOTE: catch the greenlet exit exception due to thread kill,
# taskflow will call revert for the action, # taskflow will call revert for the action,
@ -236,7 +241,8 @@ class BaseTaskFlowActionContainer(flow_task.Task):
except Exception as e: except Exception as e:
LOG.exception(e) LOG.exception(e)
raise # return False instead of raising an exception
return False
def post_execute(self): def post_execute(self):
try: try:

View File

@ -38,8 +38,6 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
""" """
def decider(self, history): def decider(self, history):
# FIXME(jed) not possible with the current Watcher Planner
#
# decider A callback function that will be expected to # decider A callback function that will be expected to
# decide at runtime whether v should be allowed to execute # decide at runtime whether v should be allowed to execute
# (or whether the execution of v should be ignored, # (or whether the execution of v should be ignored,
@ -48,7 +46,11 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
# all u decidable links that have v as a target. It is expected # all u decidable links that have v as a target. It is expected
# to return a single boolean # to return a single boolean
# (True to allow v execution or False to not). # (True to allow v execution or False to not).
return True LOG.info("decider history: %s", history)
if history and self.execution_rule == 'ANY':
return not list(history.values())[0]
else:
return True
@classmethod @classmethod
def get_config_opts(cls): def get_config_opts(cls):
@ -59,9 +61,27 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
min=1, min=1,
required=True, required=True,
help='Number of workers for taskflow engine ' help='Number of workers for taskflow engine '
'to execute actions.') 'to execute actions.'),
cfg.DictOpt(
'action_execution_rule',
default={},
help='The execution rule for linked actions,'
'the key is strategy name and '
'value ALWAYS means all actions will be executed,'
'value ANY means if previous action executes '
'success, the next action will be ignored.'
'None means ALWAYS.')
] ]
def get_execution_rule(self, actions):
if actions:
actionplan_object = objects.ActionPlan.get_by_id(
self.context, actions[0].action_plan_id)
strategy_object = objects.Strategy.get_by_id(
self.context, actionplan_object.strategy_id)
return self.config.action_execution_rule.get(
strategy_object.name)
def execute(self, actions): def execute(self, actions):
try: try:
# NOTE(jed) We want to have a strong separation of concern # NOTE(jed) We want to have a strong separation of concern
@ -72,6 +92,7 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
# the users to change it. # the users to change it.
# The current implementation uses graph with linked actions. # The current implementation uses graph with linked actions.
# todo(jed) add olso conf for retry and name # todo(jed) add olso conf for retry and name
self.execution_rule = self.get_execution_rule(actions)
flow = gf.Flow("watcher_flow") flow = gf.Flow("watcher_flow")
actions_uuid = {} actions_uuid = {}
for a in actions: for a in actions:

View File

@ -66,9 +66,11 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
applier_manager=mock.MagicMock()) applier_manager=mock.MagicMock())
self.engine.config.max_workers = 2 self.engine.config.max_workers = 2
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch('taskflow.engines.load') @mock.patch('taskflow.engines.load')
@mock.patch('taskflow.patterns.graph_flow.Flow.link') @mock.patch('taskflow.patterns.graph_flow.Flow.link')
def test_execute(self, graph_flow, engines): def test_execute(self, graph_flow, engines, m_actionplan, m_strategy):
actions = mock.MagicMock() actions = mock.MagicMock()
try: try:
self.engine.execute(actions) self.engine.execute(actions)
@ -111,14 +113,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_execution_notification')
@mock.patch.object(notifications.action, 'send_update') @mock.patch.object(notifications.action, 'send_update')
def test_execute_with_one_action(self, mock_send_update, def test_execute_with_one_action(self, mock_send_update,
mock_execution_notification, mock_execution_notification,
m_get_actionplan): m_get_actionplan, m_get_strategy):
m_get_actionplan.return_value = obj_utils.get_test_action_plan( m_get_actionplan.return_value = obj_utils.get_test_action_plan(
self.context, id=0) self.context, id=0)
m_get_strategy.return_value = obj_utils.get_test_strategy(
self.context, id=1)
actions = [self.create_action("nop", {'message': 'test'})] actions = [self.create_action("nop", {'message': 'test'})]
try: try:
self.engine.execute(actions) self.engine.execute(actions)
@ -127,14 +132,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_execution_notification')
@mock.patch.object(notifications.action, 'send_update') @mock.patch.object(notifications.action, 'send_update')
def test_execute_nop_sleep(self, mock_send_update, def test_execute_nop_sleep(self, mock_send_update,
mock_execution_notification, mock_execution_notification,
m_get_actionplan): m_get_actionplan, m_get_strategy):
m_get_actionplan.return_value = obj_utils.get_test_action_plan( m_get_actionplan.return_value = obj_utils.get_test_action_plan(
self.context, id=0) self.context, id=0)
m_get_strategy.return_value = obj_utils.get_test_strategy(
self.context, id=1)
actions = [] actions = []
first_nop = self.create_action("nop", {'message': 'test'}) first_nop = self.create_action("nop", {'message': 'test'})
second_nop = self.create_action("nop", {'message': 'second test'}) second_nop = self.create_action("nop", {'message': 'second test'})
@ -149,14 +157,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_execution_notification')
@mock.patch.object(notifications.action, 'send_update') @mock.patch.object(notifications.action, 'send_update')
def test_execute_with_parents(self, mock_send_update, def test_execute_with_parents(self, mock_send_update,
mock_execution_notification, mock_execution_notification,
m_get_actionplan): m_get_actionplan, m_get_strategy):
m_get_actionplan.return_value = obj_utils.get_test_action_plan( m_get_actionplan.return_value = obj_utils.get_test_action_plan(
self.context, id=0) self.context, id=0)
m_get_strategy.return_value = obj_utils.get_test_strategy(
self.context, id=1)
actions = [] actions = []
first_nop = self.create_action( first_nop = self.create_action(
"nop", {'message': 'test'}, "nop", {'message': 'test'},
@ -221,13 +232,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_execution_notification')
@mock.patch.object(notifications.action, 'send_update') @mock.patch.object(notifications.action, 'send_update')
def test_execute_with_two_actions(self, m_send_update, m_execution, def test_execute_with_two_actions(self, m_send_update, m_execution,
m_get_actionplan): m_get_actionplan, m_get_strategy):
m_get_actionplan.return_value = obj_utils.get_test_action_plan( m_get_actionplan.return_value = obj_utils.get_test_action_plan(
self.context, id=0) self.context, id=0)
m_get_strategy.return_value = obj_utils.get_test_strategy(
self.context, id=1)
actions = [] actions = []
second = self.create_action("sleep", {'duration': 0.0}) second = self.create_action("sleep", {'duration': 0.0})
first = self.create_action("nop", {'message': 'test'}) first = self.create_action("nop", {'message': 'test'})
@ -242,13 +256,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_execution_notification')
@mock.patch.object(notifications.action, 'send_update') @mock.patch.object(notifications.action, 'send_update')
def test_execute_with_three_actions(self, m_send_update, m_execution, def test_execute_with_three_actions(self, m_send_update, m_execution,
m_get_actionplan): m_get_actionplan, m_get_strategy):
m_get_actionplan.return_value = obj_utils.get_test_action_plan( m_get_actionplan.return_value = obj_utils.get_test_action_plan(
self.context, id=0) self.context, id=0)
m_get_strategy.return_value = obj_utils.get_test_strategy(
self.context, id=1)
actions = [] actions = []
third = self.create_action("nop", {'message': 'next'}) third = self.create_action("nop", {'message': 'next'})
second = self.create_action("sleep", {'duration': 0.0}) second = self.create_action("sleep", {'duration': 0.0})
@ -269,13 +286,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_execution_notification')
@mock.patch.object(notifications.action, 'send_update') @mock.patch.object(notifications.action, 'send_update')
def test_execute_with_exception(self, m_send_update, m_execution, def test_execute_with_exception(self, m_send_update, m_execution,
m_get_actionplan): m_get_actionplan, m_get_strategy):
m_get_actionplan.return_value = obj_utils.get_test_action_plan( m_get_actionplan.return_value = obj_utils.get_test_action_plan(
self.context, id=0) self.context, id=0)
m_get_strategy.return_value = obj_utils.get_test_strategy(
self.context, id=1)
actions = [] actions = []
third = self.create_action("no_exist", {'message': 'next'}) third = self.create_action("no_exist", {'message': 'next'})
@ -290,29 +310,28 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
actions.append(second) actions.append(second)
actions.append(third) actions.append(third)
self.assertRaises(exception.WorkflowExecutionException, self.engine.execute(actions)
self.engine.execute, actions)
self.check_action_state(first, objects.action.State.SUCCEEDED) self.check_action_state(first, objects.action.State.SUCCEEDED)
self.check_action_state(second, objects.action.State.SUCCEEDED) self.check_action_state(second, objects.action.State.SUCCEEDED)
self.check_action_state(third, objects.action.State.FAILED) self.check_action_state(third, objects.action.State.FAILED)
@mock.patch.object(objects.Strategy, "get_by_id")
@mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(objects.ActionPlan, "get_by_id")
@mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_execution_notification')
@mock.patch.object(notifications.action, 'send_update') @mock.patch.object(notifications.action, 'send_update')
@mock.patch.object(factory.ActionFactory, "make_action") @mock.patch.object(factory.ActionFactory, "make_action")
def test_execute_with_action_exception(self, m_make_action, m_send_update, def test_execute_with_action_failed(self, m_make_action, m_send_update,
m_send_execution, m_get_actionplan): m_send_execution, m_get_actionplan,
m_get_strategy):
m_get_actionplan.return_value = obj_utils.get_test_action_plan( m_get_actionplan.return_value = obj_utils.get_test_action_plan(
self.context, id=0) self.context, id=0)
m_get_strategy.return_value = obj_utils.get_test_strategy(
self.context, id=1)
actions = [self.create_action("fake_action", {})] actions = [self.create_action("fake_action", {})]
m_make_action.return_value = FakeAction(mock.Mock()) m_make_action.return_value = FakeAction(mock.Mock())
exc = self.assertRaises(exception.WorkflowExecutionException, self.engine.execute(actions)
self.engine.execute, actions)
self.assertIsInstance(exc.kwargs['error'],
exception.ActionExecutionFailure)
self.check_action_state(actions[0], objects.action.State.FAILED) self.check_action_state(actions[0], objects.action.State.FAILED)
@mock.patch.object(objects.ActionPlan, "get_by_uuid") @mock.patch.object(objects.ActionPlan, "get_by_uuid")
@ -353,3 +372,20 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
def test_decider(self):
# execution_rule is ALWAYS
self.engine.execution_rule = 'ALWAYS'
history = {'action1': True}
self.assertTrue(self.engine.decider(history))
history = {'action1': False}
self.assertTrue(self.engine.decider(history))
# execution_rule is ANY
self.engine.execution_rule = 'ANY'
history = {'action1': True}
self.assertFalse(self.engine.decider(history))
history = {'action1': False}
self.assertTrue(self.engine.decider(history))

View File

@ -21,7 +21,6 @@ import mock
from watcher.applier.workflow_engine import default as tflow from watcher.applier.workflow_engine import default as tflow
from watcher.common import clients from watcher.common import clients
from watcher.common import exception
from watcher.common import nova_helper from watcher.common import nova_helper
from watcher import objects from watcher import objects
from watcher.tests.db import base from watcher.tests.db import base
@ -81,8 +80,8 @@ class TestTaskFlowActionContainer(base.DbTestCase):
db_action=action, db_action=action,
engine=self.engine) engine=self.engine)
self.assertRaises(exception.ActionExecutionFailure, result = action_container.execute()
action_container.execute, action_id=action.uuid) self.assertFalse(result)
self.assertTrue(action.state, objects.action.State.FAILED) self.assertTrue(action.state, objects.action.State.FAILED)