From 5566129d64b7fdacc5690c81c919e9e95c25b464 Mon Sep 17 00:00:00 2001 From: Kirill Izotov Date: Wed, 9 Jul 2014 15:33:27 +0700 Subject: [PATCH] Make executor able to work in isolated environment The idea is to simplify executor as much as possible (first of all, remove all db calls). As of now, all the work related to creating, resolving and updating tasks, converting action params and results and so on should be done by engine. Executer here is barely an action runner that receives an action name, params and some kind of session identifier (task id in our case) and reports back the status and results along with the session identifier to reference the task it have done. In order to keep the model simple, we had to make several assumptions: - Any task that was queued for execution is expected to be executed and eventually report back by putting its results into `convey_task_result` queue. - All executors are the same. Each of them should be able to execute the action and return the same result. - All executor should be also able to access any external resource without any additional params evaluation on an executor side Conflicts: mistral/engine/drivers/default/executor.py Change-Id: I4f1f8fb08cd977ba90f69462108e15f9cfb26250 --- mistral/actions/action_factory.py | 39 +++++++ mistral/engine/__init__.py | 98 ++++++++++------ mistral/engine/data_flow.py | 77 +++++++++++-- mistral/engine/drivers/default/engine.py | 24 ++-- mistral/engine/drivers/default/executor.py | 107 ++++-------------- mistral/tests/base.py | 8 +- .../resources/control_flow/one_sync_task.yaml | 23 ++-- .../resources/control_flow/require_flow.yaml | 71 +++--------- .../data_flow/task_with_two_dependencies.yaml | 1 + .../tests/unit/actions/test_action_factory.py | 37 ++++++ .../tests/unit/engine/default/test_engine.py | 62 +++++----- .../unit/engine/default/test_executor.py | 70 +++++++++--- mistral/tests/unit/engine/test_data_flow.py | 4 +- .../unit/engine/test_data_flow_module.py | 69 ++++++++--- mistral/tests/unit/engine/test_task_retry.py | 10 +- 15 files changed, 429 insertions(+), 271 deletions(-) diff --git a/mistral/actions/action_factory.py b/mistral/actions/action_factory.py index a7153fb33..36fb285c7 100644 --- a/mistral/actions/action_factory.py +++ b/mistral/actions/action_factory.py @@ -20,6 +20,7 @@ from stevedore import extension from mistral.actions import base from mistral.actions import std_actions from mistral import exceptions as exc +from mistral import expressions as expr from mistral.openstack.common import log as logging from mistral.workbook import actions from mistral.workbook import tasks @@ -169,5 +170,43 @@ def create_action(db_task): (db_task, e)) +def resolve_adhoc_action_name(workbook, action_name): + action_spec = workbook.get_action(action_name) + + if not action_spec: + msg = 'Ad-hoc action class is not registered ' \ + '[workbook=%s, action=%s, action_spec=%s]' % \ + (workbook, action_name, action_spec) + raise exc.ActionException(msg) + + base_cls = get_action_class(action_spec.clazz) + + if not base_cls: + msg = 'Ad-hoc action base class is not registered ' \ + '[workbook=%s, action=%s, base_class=%s]' % \ + (workbook, action_name, base_cls) + raise exc.ActionException(msg) + + return action_spec.clazz + + +def convert_adhoc_action_params(workbook, action_name, params): + base_params = workbook.get_action(action_name).base_parameters + + if not base_params: + return {} + + return expr.evaluate_recursively(base_params, params) + + +def convert_adhoc_action_result(workbook, action_name, result): + transformer = workbook.get_action(action_name).output + + if not transformer: + return result + + # Use base action result as a context for evaluating expressions. + return expr.evaluate_recursively(transformer, result) + # Registering actions on module load. _register_action_classes() diff --git a/mistral/engine/__init__.py b/mistral/engine/__init__.py index 7283a26be..942971d51 100644 --- a/mistral/engine/__init__.py +++ b/mistral/engine/__init__.py @@ -21,11 +21,12 @@ from oslo import messaging import six from stevedore import driver -# Submoules of mistral.engine will throw NoSuchOptError if configuration +# Submodules of mistral.engine will throw NoSuchOptError if configuration # options required at top level of this __init__.py are not imported before # the submodules are referenced. cfg.CONF.import_opt('workflow_trace_log_name', 'mistral.config') +from mistral.actions import action_factory as a_f from mistral import context as auth_context from mistral.db import api as db_api from mistral import dsl_parser as parser @@ -35,6 +36,7 @@ from mistral.engine import states from mistral.engine import workflow from mistral import exceptions as exc from mistral.openstack.common import log as logging +from mistral.workbook import tasks as wb_task LOG = logging.getLogger(__name__) @@ -42,7 +44,7 @@ WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) def get_transport(transport=None): - return (transport if transport else messaging.get_transport(cfg.CONF)) + return transport if transport else messaging.get_transport(cfg.CONF) def get_engine(name, transport): @@ -64,7 +66,7 @@ class Engine(object): self.transport = get_transport(transport) @abc.abstractmethod - def _run_tasks(cls, tasks): + def _run_task(cls, task_id, action_name, action_params): raise NotImplementedError() def start_workflow_execution(self, cntx, **kwargs): @@ -91,21 +93,31 @@ class Engine(object): # Persist execution and tasks in DB. try: workbook = self._get_workbook(workbook_name) - execution = self._create_execution(workbook_name, - task_name, + execution = self._create_execution(workbook_name, task_name, context) + # Create the whole tree of tasks required by target task, including + # target task itself. tasks = self._create_tasks( workflow.find_workflow_tasks(workbook, task_name), workbook, workbook_name, execution['id'] ) + # Create a list of tasks that can be executed immediately (have + # their requirements satisfied, or, at that point, rather don't + # have them at all) along with the list of tasks that require some + # delay before they'll be executed. tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) + # Populate context with special variables such as `openstack` and + # `__execution`. self._add_variables_to_data_flow_context(context, execution) - data_flow.prepare_tasks(tasks_to_start, context) + # Update task with new context and params. + executables = data_flow.prepare_tasks(tasks_to_start, + context, + workbook) db_api.commit_tx() except Exception as e: @@ -118,7 +130,8 @@ class Engine(object): for task in delayed_tasks: self._schedule_run(workbook, task, context) - self._run_tasks(tasks_to_start) + for task_id, action_name, action_params in executables: + self._run_task(task_id, action_name, action_params) return execution @@ -172,19 +185,33 @@ class Engine(object): else ", result = %s]" % result WORKFLOW_TRACE.info(wf_trace_msg) + action_name = wb_task.TaskSpec(task['task_spec'])\ + .get_full_action_name() + + if not a_f.get_action_class(action_name): + action = a_f.resolve_adhoc_action_name(workbook, action_name) + + if not action: + msg = 'Unknown action [workbook=%s, action=%s]' % \ + (workbook, action_name) + raise exc.ActionException(msg) + + result = a_f.convert_adhoc_action_result(workbook, + action_name, + result) + task_output = data_flow.get_task_output(task, result) # Update task state. - task, outbound_context = self._update_task(workbook, task, state, - task_output) + task, context = self._update_task(workbook, task, state, + task_output) execution = db_api.execution_get(task['execution_id']) self._create_next_tasks(task, workbook) # Determine what tasks need to be started. - tasks = db_api.tasks_get(workbook_name=task['workbook_name'], - execution_id=task['execution_id']) + tasks = db_api.tasks_get(execution_id=task['execution_id']) new_exec_state = self._determine_execution_state(execution, tasks) @@ -194,19 +221,25 @@ class Engine(object): (execution['id'], execution['state'], new_exec_state) WORKFLOW_TRACE.info(wf_trace_msg) - execution = \ - db_api.execution_update(execution['id'], { - "state": new_exec_state - }) + execution = db_api.execution_update(execution['id'], { + "state": new_exec_state + }) LOG.info("Changed execution state: %s" % execution) + # Create a list of tasks that can be executed immediately (have + # their requirements satisfied) along with the list of tasks that + # require some delay before they'll be executed. tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) - self._add_variables_to_data_flow_context(outbound_context, - execution) + # Populate context with special variables such as `openstack` and + # `__execution`. + self._add_variables_to_data_flow_context(context, execution) - data_flow.prepare_tasks(tasks_to_start, outbound_context) + # Update task with new context and params. + executables = data_flow.prepare_tasks(tasks_to_start, + context, + workbook) db_api.commit_tx() except Exception as e: @@ -216,14 +249,14 @@ class Engine(object): finally: db_api.end_tx() - if states.is_stopped_or_finished(execution["state"]): + if states.is_stopped_or_finished(execution['state']): return task for task in delayed_tasks: - self._schedule_run(workbook, task, outbound_context) + self._schedule_run(workbook, task, context) - if tasks_to_start: - self._run_tasks(tasks_to_start) + for task_id, action_name, action_params in executables: + self._run_task(task_id, action_name, action_params) return task @@ -352,7 +385,7 @@ class Engine(object): return task, outbound_context - def _schedule_run(cls, workbook, task, outbound_context): + def _schedule_run(self, workbook, task, outbound_context): """Schedules task to run after the delay defined in the task specification. If no delay is specified this method is a no-op. """ @@ -376,22 +409,23 @@ class Engine(object): execution_id = task['execution_id'] execution = db_api.execution_get(execution_id) - # Change state from DELAYED to IDLE to unblock processing. + # Change state from DELAYED to RUNNING. WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" % (task['name'], - task['state'], states.IDLE)) - - db_task = db_api.task_update(task['id'], - {"state": states.IDLE}) - task_to_start = [db_task] - data_flow.prepare_tasks(task_to_start, outbound_context) + task['state'], states.RUNNING)) + executables = data_flow.prepare_tasks([task], + outbound_context, + workbook) db_api.commit_tx() finally: db_api.end_tx() - if not states.is_stopped_or_finished(execution["state"]): - cls._run_tasks(task_to_start) + if states.is_stopped_or_finished(execution['state']): + return + + for task_id, action_name, action_params in executables: + self._run_task(task_id, action_name, action_params) task_spec = workbook.tasks.get(task['name']) retries, break_on, delay_sec = task_spec.get_retry_parameters() diff --git a/mistral/engine/data_flow.py b/mistral/engine/data_flow.py index 3c6d8905b..90828f402 100644 --- a/mistral/engine/data_flow.py +++ b/mistral/engine/data_flow.py @@ -14,17 +14,45 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect from oslo.config import cfg +from mistral.actions import action_factory as a_f from mistral.db import api as db_api +from mistral.engine import states +from mistral import exceptions as exc from mistral import expressions as expr from mistral.openstack.common import log as logging from mistral.services import trusts +from mistral.workbook import tasks as wb_task LOG = logging.getLogger(__name__) CONF = cfg.CONF +_ACTION_CTX_PARAM = 'action_context' + + +def _has_action_context_param(action_cls): + arg_spec = inspect.getargspec(action_cls.__init__) + + return _ACTION_CTX_PARAM in arg_spec.args + + +def _get_action_context(db_task, openstack_context): + result = { + 'workbook_name': db_task['workbook_name'], + 'execution_id': db_task['execution_id'], + 'task_id': db_task['id'], + 'task_name': db_task['name'], + 'task_tags': db_task['tags'], + } + + if openstack_context: + result.update({'openstack': openstack_context}) + + return result + def evaluate_task_parameters(task, context): params = task['task_spec'].get('parameters', {}) @@ -32,16 +60,51 @@ def evaluate_task_parameters(task, context): return expr.evaluate_recursively(params, context) -def prepare_tasks(tasks, context): +def prepare_tasks(tasks, context, workbook): + results = [] + for task in tasks: - # TODO(rakhmerov): Inbound context should be a merge of outbound - # contexts of task dependencies, if any. - task['in_context'] = context - task['parameters'] = evaluate_task_parameters(task, context) + # TODO(rakhmerov): Inbound context should be a merge of + # outbound contexts of task dependencies, if any. + action_params = evaluate_task_parameters(task, context) db_api.task_update(task['id'], - {'in_context': task['in_context'], - 'parameters': task['parameters']}) + {'state': states.RUNNING, + 'in_context': context, + 'parameters': action_params}) + + # Get action name. Unwrap ad-hoc and reevaluate params if + # necessary. + action_name = wb_task.TaskSpec(task['task_spec'])\ + .get_full_action_name() + + openstack_ctx = context.get('openstack') + + if not a_f.get_action_class(action_name): + # If action is not found in registered actions try to find + # ad-hoc action definition. + if openstack_ctx is not None: + action_params.update({'openstack': openstack_ctx}) + + action = a_f.resolve_adhoc_action_name(workbook, action_name) + + if not action: + msg = 'Unknown action [workbook=%s, action=%s]' % \ + (workbook, action_name) + raise exc.ActionException(msg) + + action_params = a_f.convert_adhoc_action_params(workbook, + action_name, + action_params) + action_name = action + + if _has_action_context_param(a_f.get_action_class(action_name)): + action_params[_ACTION_CTX_PARAM] = \ + _get_action_context(task, openstack_ctx) + + results.append((task['id'], action_name, action_params)) + + return results def get_task_output(task, result): diff --git a/mistral/engine/drivers/default/engine.py b/mistral/engine/drivers/default/engine.py index 60585d4d8..4b8af4331 100644 --- a/mistral/engine/drivers/default/engine.py +++ b/mistral/engine/drivers/default/engine.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from oslo.config import cfg -from oslo import messaging - from mistral import context as auth_context from mistral import engine from mistral.engine import executor @@ -25,16 +22,7 @@ LOG = logging.getLogger(__name__) class DefaultEngine(engine.Engine): - def _notify_task_executors(self, tasks): - # TODO(m4dcoder): Use a pool for transport and client - if not self.transport: - self.transport = messaging.get_transport(cfg.CONF) - exctr = executor.ExecutorClient(self.transport) - for task in tasks: - LOG.info("Submitted task for execution: '%s'" % task) - exctr.handle_task(auth_context.ctx(), task=task) - - def _run_tasks(self, tasks): + def _run_task(self, task_id, action_name, params): # TODO(rakhmerov): # This call outside of DB transaction creates a window # when the engine may crash and DB will not be consistent with @@ -43,4 +31,12 @@ class DefaultEngine(engine.Engine): # However, making this call in DB transaction is really bad # since it makes transaction much longer in time and under load # may overload DB with open transactions. - self._notify_task_executors(tasks) + # TODO(m4dcoder): Use a pool for transport and client + + exctr = executor.ExecutorClient(self.transport) + + LOG.info("Submitted task for execution: '%s'" % task_id) + exctr.handle_task(auth_context.ctx(), + task_id=task_id, + action_name=action_name, + params=params) diff --git a/mistral/engine/drivers/default/executor.py b/mistral/engine/drivers/default/executor.py index 17966e4b6..07d1648d7 100644 --- a/mistral/engine/drivers/default/executor.py +++ b/mistral/engine/drivers/default/executor.py @@ -17,7 +17,6 @@ from oslo.config import cfg from mistral.actions import action_factory as a_f -from mistral.db import api as db_api from mistral.engine import executor from mistral.engine import states from mistral import exceptions as exc @@ -30,103 +29,43 @@ WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) class DefaultExecutor(executor.Executor): - def _log_action_exception(self, message, task, exc): - LOG.exception("%s [task_id=%s, action='%s', action_spec='%s']\n %s" % - (message, task['id'], task['task_spec']['action'], - task['action_spec'], exc)) + def _log_action_exception(self, message, task_id, action, params, ex): + LOG.exception("%s [task_id=%s, action='%s', params='%s']\n %s" % + (message, task_id, action, params, ex)) - def _do_task_action(self, task): - """Executes the action defined by the task and return result. + def handle_task(self, cntx, task_id, action_name, params={}): + """Handle the execution of the workbook task. - :param task: a task definition - :type task: dict + :param task_id: task identifier + :type task_id: str + :param action_name: a name of the action to run + :type action_name: str + :param params: a dict of action parameters """ - LOG.info("Starting task action [task_id=%s, " - "action='%s', action_spec='%s']" % - (task['id'], task['task_spec']['action'], - task['action_spec'])) - action = a_f.create_action(task) + action_cls = a_f.get_action_class(action_name) # TODO(dzimine): on failure, convey failure details back + try: + action = action_cls(**params) + except Exception as e: + raise exc.ActionException("Failed to create action" + "[action_name=%s, params=%s]: %s" % + (action_name, params, e)) if action.is_sync(): try: state, result = states.SUCCESS, action.run() except exc.ActionException as ex: - self._log_action_exception("Action failed", task, ex) + self._log_action_exception("Action failed", task_id, + action_name, params, ex) state, result = states.ERROR, None - self.engine.convey_task_result(task['id'], state, result) + self.engine.convey_task_result(task_id, state, result) else: try: action.run() except exc.ActionException as ex: - self._log_action_exception("Action failed", task, ex) - self.engine.convey_task_result(task['id'], states.ERROR, None) - - def _handle_task_error(self, task, exception): - """Handle unexpected exception from the task execution. - - :param task: the task corresponding to the exception - :type task: dict - :param exception: an exception thrown during the execution of the task - :type exception: Exception - """ - # TODO(dzimine): why exception is a parameter here? - # TODO(dzimine): convey exception details to end user - # (why task failed?) - try: - db_api.start_tx() - try: - db_api.execution_update(task['execution_id'], - {'state': states.ERROR}) - db_api.task_update(task['id'], - {'state': states.ERROR}) - db_api.commit_tx() - finally: - db_api.end_tx() - except Exception as ex: - LOG.exception(ex) - - def handle_task(self, cntx, **kwargs): - """Handle the execution of the workbook task. - - :param cntx: a request context dict - :type cntx: MistralContext - :param kwargs: a dict of method arguments - :type kwargs: dict - """ - try: - task = kwargs.get('task', None) - if not task: - raise Exception('No task is provided to the executor.') - - LOG.info("Received a task: %s" % task) - - db_task = db_api.task_get(task['id']) - db_exec = db_api.execution_get(task['execution_id']) - - if not db_exec or not db_task: - return - - if db_exec['state'] != states.RUNNING or \ - db_task['state'] != states.IDLE: - return - - # Update the state to running before performing action. The - # do_task_action assigns state to the task which is the appropriate - # value to preserve. - - WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" % (db_task['name'], - db_task['state'], - states.RUNNING)) - - db_api.task_update(task['id'], - {'state': states.RUNNING}) - - self._do_task_action(db_task) - except Exception as ex: - self._log_action_exception("Unexpected exception while trying " - "to execute action", task, ex) - self._handle_task_error(task, ex) + self._log_action_exception("Action failed", task_id, + action_name, params, ex) + self.engine.convey_task_result(task_id, states.ERROR, None) diff --git a/mistral/tests/base.py b/mistral/tests/base.py index 26c003bef..2efd8788e 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -165,13 +165,15 @@ class EngineTestCase(DbTestCase): return cls.backend.get_workflow_execution_state(cntx, **kwargs) @classmethod - def mock_run_tasks(cls, tasks): + def mock_run_task(cls, task_id, action_name, params): """Mock the engine _run_tasks to send requests directly to the task executor instead of going through the oslo.messaging transport. """ exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport) - for task in tasks: - exctr.handle_task({}, task=task) + exctr.handle_task(auth_context.ctx(), + task_id=task_id, + action_name=action_name, + params=params) @classmethod def mock_handle_task(cls, cntx, **kwargs): diff --git a/mistral/tests/resources/control_flow/one_sync_task.yaml b/mistral/tests/resources/control_flow/one_sync_task.yaml index 08acbd07b..d93c3ded6 100644 --- a/mistral/tests/resources/control_flow/one_sync_task.yaml +++ b/mistral/tests/resources/control_flow/one_sync_task.yaml @@ -1,19 +1,20 @@ Namespaces: - Nova: + MyActions: actions: - create-vm: - class: std.http + concat: + class: std.echo base-parameters: - url: http://path_to_nova/url_for_create + output: '{$.left} {$.right}' parameters: - - image_id - - flavor_id + - left + - right output: - vm_id: $.base_output.server_id + string: $.output + Workflow: tasks: - create-vm-nova: - action: Nova.create-vm + build_name: + action: MyActions.concat parameters: - image_id: 1234 - flavor_id: 2 \ No newline at end of file + left: Stormin + right: Stanley \ No newline at end of file diff --git a/mistral/tests/resources/control_flow/require_flow.yaml b/mistral/tests/resources/control_flow/require_flow.yaml index 4538d89b2..5bdf7a27e 100644 --- a/mistral/tests/resources/control_flow/require_flow.yaml +++ b/mistral/tests/resources/control_flow/require_flow.yaml @@ -1,65 +1,26 @@ Namespaces: - MyRest: - class: std.mistral_http - base-parameters: - method: GET - headers: - X-Auth-Token: $.auth_token - + MyActions: actions: - create-vm: + concat: + class: std.echo base-parameters: - url: http://some_host/service/action/execute - headers: - Content-Type: 'application/json' + output: '{$.left} {$.right}' parameters: - - image_id - - flavor_id + - left + - right output: - - backup-vm: - base-parameters: - url: http://some_host/url_for_backup - parameters: - - server_id - - attach-volume: - base-parameters: - url: /url_for_attach - parameters: - - size - - mnt_path - - format-volume: - base-parameters: - url: /url_for_format - parameters: - - server_id + string: $.output Workflow: tasks: - create-vms: - action: MyRest.create-vm + build_name: + action: MyActions.concat parameters: - image_id: 1234 - flavor_id: 42 - - attach-volumes: - requires: create-vms - action: MyRest.attach-volume + left: Stormin + right: Stanley + greet: + requires: [build_name] + action: MyActions.concat parameters: - size: 1234 - mnt_path: /path/to/volume - - format-volumes: - requires: [attach-volumes] - action: MyRest.format-volume - parameters: - server_id: 123 - - backup-vms: - requires: - - create-vms - action: MyRest.backup-vm - parameters: - server_id: 123 + left: Greetings + right: {$.string} \ No newline at end of file diff --git a/mistral/tests/resources/data_flow/task_with_two_dependencies.yaml b/mistral/tests/resources/data_flow/task_with_two_dependencies.yaml index 7e524af48..337c8ad1a 100644 --- a/mistral/tests/resources/data_flow/task_with_two_dependencies.yaml +++ b/mistral/tests/resources/data_flow/task_with_two_dependencies.yaml @@ -54,6 +54,7 @@ Workflow: f_name: $.full_name build_greeting: + requires: [build_full_name] action: MyService.build_greeting publish: greet_msg: $.greeting diff --git a/mistral/tests/unit/actions/test_action_factory.py b/mistral/tests/unit/actions/test_action_factory.py index 3140c58d5..53761bfc9 100644 --- a/mistral/tests/unit/actions/test_action_factory.py +++ b/mistral/tests/unit/actions/test_action_factory.py @@ -19,6 +19,7 @@ import json from mistral.actions import action_factory as a_f from mistral.actions import std_actions as std +from mistral import dsl_parser as parser from mistral.engine import data_flow from mistral import exceptions from mistral.openstack.common import log as logging @@ -283,3 +284,39 @@ class ActionFactoryTest(base.BaseTest): action = a_f.create_action(db_task) self.assertEqual("'Tango and Cash' is a cool movie!", action.run()) + + def test_resolve_adhoc_action_name(self): + workbook = parser.get_workbook( + base.get_resource('control_flow/one_sync_task.yaml')) + action_name = 'MyActions.concat' + + action = a_f.resolve_adhoc_action_name(workbook, action_name) + + self.assertEqual('std.echo', action) + + def test_convert_adhoc_action_params(self): + workbook = parser.get_workbook( + base.get_resource('control_flow/one_sync_task.yaml')) + action_name = 'MyActions.concat' + params = { + 'left': 'Stormin', + 'right': 'Stanley' + } + + parameters = a_f.convert_adhoc_action_params(workbook, + action_name, + params) + + self.assertEqual({'output': 'Stormin Stanley'}, parameters) + + def test_convert_adhoc_action_result(self): + workbook = parser.get_workbook( + base.get_resource('control_flow/one_sync_task.yaml')) + action_name = 'MyActions.concat' + result = {'output': 'Stormin Stanley'} + + parameters = a_f.convert_adhoc_action_result(workbook, + action_name, + result) + + self.assertEqual({'string': 'Stormin Stanley'}, parameters) diff --git a/mistral/tests/unit/engine/default/test_engine.py b/mistral/tests/unit/engine/default/test_engine.py index 1086f5125..e6c4b9780 100644 --- a/mistral/tests/unit/engine/default/test_engine.py +++ b/mistral/tests/unit/engine/default/test_engine.py @@ -17,9 +17,11 @@ import mock from oslo.config import cfg from mistral.actions import std_actions +from mistral import context as auth_context from mistral.db import api as db_api from mistral import engine from mistral.engine.drivers.default import engine as concrete_engine +from mistral.engine import executor from mistral.engine import states from mistral import expressions from mistral.openstack.common import log as logging @@ -38,6 +40,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') # TODO(rakhmerov): add more tests for errors, execution stop etc. +@mock.patch.object(auth_context, 'ctx', mock.MagicMock()) @mock.patch.object( engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @@ -48,21 +51,28 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') std_actions.HTTPAction, 'run', mock.MagicMock(return_value={'state': states.SUCCESS})) class TestEngine(base.EngineTestCase): - @mock.patch.object( - concrete_engine.DefaultEngine, "_notify_task_executors", - mock.MagicMock(return_value="")) + @mock.patch.object(executor.ExecutorClient, "handle_task", + mock.MagicMock()) @mock.patch.object( db_api, 'workbook_get', mock.MagicMock(return_value={'definition': base.get_resource( - 'control_flow/one_async_task.yaml')})) + 'control_flow/one_sync_task.yaml')})) def test_with_one_task(self): - execution = self.engine.start_workflow_execution(WB_NAME, "create-vms", + execution = self.engine.start_workflow_execution(WB_NAME, "build_name", CONTEXT) task = db_api.tasks_get(workbook_name=WB_NAME, execution_id=execution['id'])[0] - self.engine.convey_task_result(task['id'], states.SUCCESS, None) + executor.ExecutorClient.handle_task\ + .assert_called_once_with(auth_context.ctx(), + params={'output': 'Stormin Stanley'}, + task_id=task['id'], + action_name='std.echo') + + self.engine.convey_task_result(task['id'], + states.SUCCESS, + {'output': 'Stormin Stanley'}) task = db_api.tasks_get(workbook_name=WB_NAME, execution_id=execution['id'])[0] @@ -70,20 +80,22 @@ class TestEngine(base.EngineTestCase): self.assertEqual(execution['state'], states.SUCCESS) self.assertEqual(task['state'], states.SUCCESS) + self.assertEqual( + task['output'], + {'task': {'build_name': {'string': 'Stormin Stanley'}}}) @mock.patch.object( engine.EngineClient, 'get_workflow_execution_state', mock.MagicMock( side_effect=base.EngineTestCase.mock_get_workflow_state)) - @mock.patch.object( - concrete_engine.DefaultEngine, "_notify_task_executors", - mock.MagicMock(return_value="")) + @mock.patch.object(executor.ExecutorClient, "handle_task", + mock.MagicMock()) @mock.patch.object( db_api, 'workbook_get', mock.MagicMock(return_value={'definition': base.get_resource( 'control_flow/require_flow.yaml')})) def test_require_flow(self): - execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms", + execution = self.engine.start_workflow_execution(WB_NAME, "greet", CONTEXT) tasks = db_api.tasks_get(workbook_name=WB_NAME, @@ -99,9 +111,7 @@ class TestEngine(base.EngineTestCase): self.assertEqual(2, len(tasks)) self.assertEqual(tasks[0]['state'], states.SUCCESS) - # Since we mocked out executor notification we expect IDLE - # for the second task. - self.assertEqual(tasks[1]['state'], states.IDLE) + self.assertEqual(tasks[1]['state'], states.RUNNING) self.assertEqual(states.RUNNING, self.engine.get_workflow_execution_state( WB_NAME, execution['id'])) @@ -121,8 +131,8 @@ class TestEngine(base.EngineTestCase): WB_NAME, execution['id'])) @mock.patch.object( - concrete_engine.DefaultEngine, '_run_tasks', - mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + concrete_engine.DefaultEngine, '_run_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task)) @mock.patch.object( expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) @mock.patch.object( @@ -130,8 +140,7 @@ class TestEngine(base.EngineTestCase): mock.MagicMock(return_value={'definition': base.get_resource( 'control_flow/one_sync_task.yaml')})) def test_with_one_sync_task(self): - execution = self.engine.start_workflow_execution(WB_NAME, - "create-vm-nova", + execution = self.engine.start_workflow_execution(WB_NAME, "build_name", CONTEXT) task = db_api.tasks_get(workbook_name=WB_NAME, @@ -142,8 +151,8 @@ class TestEngine(base.EngineTestCase): self.assertEqual(task['state'], states.SUCCESS) @mock.patch.object( - concrete_engine.DefaultEngine, '_run_tasks', - mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + concrete_engine.DefaultEngine, '_run_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task)) @mock.patch.object( expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) @mock.patch.object( @@ -205,8 +214,8 @@ class TestEngine(base.EngineTestCase): self.assertEqual(execution['state'], states.SUCCESS) @mock.patch.object( - concrete_engine.DefaultEngine, '_run_tasks', - mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + concrete_engine.DefaultEngine, '_run_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task)) @mock.patch.object( expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) @mock.patch.object( @@ -259,16 +268,13 @@ class TestEngine(base.EngineTestCase): self._assert_single_item(tasks, state=states.ERROR) self.assertEqual(execution['state'], states.SUCCESS) - @mock.patch.object( - concrete_engine.DefaultEngine, "_notify_task_executors", - mock.MagicMock(return_value="")) @mock.patch.object( db_api, 'workbook_get', mock.MagicMock(return_value={'definition': base.get_resource( 'control_flow/no_namespaces.yaml')})) @mock.patch.object( - concrete_engine.DefaultEngine, '_run_tasks', - mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + concrete_engine.DefaultEngine, '_run_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task)) def test_engine_with_no_namespaces(self): execution = self.engine.start_workflow_execution(WB_NAME, "task1", {}) @@ -286,8 +292,8 @@ class TestEngine(base.EngineTestCase): mock.MagicMock(return_value={'definition': base.get_resource( 'control_flow/one_std_task.yaml')})) @mock.patch.object( - concrete_engine.DefaultEngine, '_run_tasks', - mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + concrete_engine.DefaultEngine, '_run_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task)) def test_engine_task_std_action_with_namespaces(self): execution = self.engine.start_workflow_execution(WB_NAME, "std_http_task", {}) diff --git a/mistral/tests/unit/engine/default/test_executor.py b/mistral/tests/unit/engine/default/test_executor.py index bbe66a134..9946b4a9a 100644 --- a/mistral/tests/unit/engine/default/test_executor.py +++ b/mistral/tests/unit/engine/default/test_executor.py @@ -95,9 +95,6 @@ SAMPLE_CONTEXT = { @mock.patch.object( executor.ExecutorClient, 'handle_task', mock.MagicMock(side_effect=base.EngineTestCase.mock_handle_task)) -@mock.patch.object( - engine.EngineClient, 'convey_task_result', - mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) class TestExecutor(base.DbTestCase): def __init__(self, *args, **kwargs): super(TestExecutor, self).__init__(*args, **kwargs) @@ -122,30 +119,69 @@ class TestExecutor(base.DbTestCase): self.assertIsInstance(self.task, dict) self.assertIn('id', self.task) - @mock.patch.object( - std_actions.HTTPAction, 'run', mock.MagicMock(return_value={})) - def test_handle_task(self): + @mock.patch.object(std_actions.EchoAction, 'run') + @mock.patch.object(engine.EngineClient, 'convey_task_result', + mock.MagicMock()) + def test_handle_task(self, action): + task_id = '12345' + action_name = 'std.echo' + params = { + 'output': 'some' + } # Send the task request to the Executor. ex_client = executor.ExecutorClient(self.transport) - ex_client.handle_task(SAMPLE_CONTEXT, task=self.task) + ex_client.handle_task(SAMPLE_CONTEXT, + task_id=task_id, + action_name=action_name, + params=params) - # Check task execution state. - db_task = db_api.task_get(self.task['id']) - self.assertEqual(db_task['state'], states.SUCCESS) + engine.EngineClient.convey_task_result\ + .assert_called_once_with(task_id, + states.SUCCESS, + action.return_value) - @mock.patch.object( - std_actions.HTTPAction, 'run', - mock.MagicMock(side_effect=exceptions.ActionException)) + @mock.patch.object(engine.EngineClient, 'convey_task_result', + mock.MagicMock()) + def test_handle_task_not_registered(self): + task_id = '12345' + action_name = 'not.registered' + params = { + 'output': 'some' + } + + # Send the task request to the Executor. + ex_client = executor.ExecutorClient(self.transport) + self.assertRaises(exceptions.ActionException, ex_client.handle_task, + SAMPLE_CONTEXT, + task_id=task_id, + action_name=action_name, + params=params) + + self.assertFalse(engine.EngineClient.convey_task_result.called) + + @mock.patch.object(std_actions.EchoAction, 'run', + mock.MagicMock(side_effect=exceptions.ActionException)) + @mock.patch.object(engine.EngineClient, 'convey_task_result', + mock.MagicMock()) def test_handle_task_action_exception(self): + task_id = '12345' + action_name = 'std.echo' + params = { + 'output': 'some' + } # Send the task request to the Executor. ex_client = executor.ExecutorClient(self.transport) with mock.patch('mistral.engine.drivers.default.executor.' 'DefaultExecutor._log_action_exception') as log: - ex_client.handle_task(SAMPLE_CONTEXT, task=self.task) + ex_client.handle_task(SAMPLE_CONTEXT, + task_id=task_id, + action_name=action_name, + params=params) self.assertTrue(log.called, "Exception must be logged") - # Check task execution state. - db_task = db_api.task_get(self.task['id']) - self.assertEqual(db_task['state'], states.ERROR) + engine.EngineClient.convey_task_result\ + .assert_called_once_with(task_id, + states.ERROR, + None) diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py index 8f2a943ff..2045ebc0d 100644 --- a/mistral/tests/unit/engine/test_data_flow.py +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -68,8 +68,8 @@ def create_workbook(definition_path): engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( - concrete_engine.DefaultEngine, '_run_tasks', - mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + concrete_engine.DefaultEngine, '_run_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task)) class DataFlowTest(base.EngineTestCase): def _check_in_context_execution(self, task): self.assertIn('__execution', task['in_context']) diff --git a/mistral/tests/unit/engine/test_data_flow_module.py b/mistral/tests/unit/engine/test_data_flow_module.py index 8a791f693..4c73e8b98 100644 --- a/mistral/tests/unit/engine/test_data_flow_module.py +++ b/mistral/tests/unit/engine/test_data_flow_module.py @@ -14,10 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy + from mistral.db import api as db_api from mistral.engine import data_flow +from mistral.engine import states from mistral.openstack.common import log as logging from mistral.tests import base +from mistral.workbook import workbook LOG = logging.getLogger(__name__) @@ -38,6 +42,7 @@ TASK = { 'execution_id': EXEC_ID, 'name': 'my_task', 'task_spec': { + 'action': 'std.echo', 'parameters': { 'p1': 'My string', 'p2': '$.param3.param32', @@ -47,17 +52,31 @@ TASK = { 'new_key11': '$.new_key1' } }, - 'service_spec': { - 'actions': { - 'action': { - 'output': { - # This one should not be evaluated. - 'server_id': '$.server.id' + 'in_context': CONTEXT +} + +TASK2 = copy.deepcopy(TASK) +TASK2['task_spec']['action'] = 'some.thing' + +WORKBOOK = { + 'Namespaces': { + 'some': { + 'actions': { + 'thing': { + 'class': 'std.echo', + 'base-parameters': { + 'output': '{$.p1} {$.p2}' + } } } } }, - 'in_context': CONTEXT + 'Workflow': { + 'tasks': { + 'first_task': TASK['task_spec'], + 'second_task': TASK2['task_spec'] + } + } } @@ -70,18 +89,36 @@ class DataFlowModuleTest(base.DbTestCase): self.assertEqual('val32', parameters['p2']) def test_prepare_tasks(self): - task = db_api.task_create(EXEC_ID, TASK.copy()) - tasks = [task] + wb = workbook.WorkbookSpec(WORKBOOK) - data_flow.prepare_tasks(tasks, CONTEXT) + tasks = [ + db_api.task_create(EXEC_ID, TASK.copy()), + db_api.task_create(EXEC_ID, TASK2.copy()) + ] - db_task = db_api.task_get(tasks[0]['id']) + executables = data_flow.prepare_tasks(tasks, CONTEXT, wb) - self.assertDictEqual(CONTEXT, db_task['in_context']) - self.assertDictEqual({'p1': 'My string', - 'p2': 'val32', - 'p3': ''}, - db_task['parameters']) + self.assertEqual(2, len(executables)) + + self.assertEqual(tasks[0]['id'], executables[0][0]) + self.assertEqual('std.echo', executables[0][1]) + self.assertDictEqual({'p2': 'val32', 'p3': '', 'p1': 'My string'}, + executables[0][2]) + + self.assertEqual(tasks[1]['id'], executables[1][0]) + self.assertEqual('std.echo', executables[1][1]) + self.assertDictEqual({'output': 'My string val32'}, + executables[1][2]) + + for task in tasks: + db_task = db_api.task_get(task['id']) + + self.assertDictEqual(CONTEXT, db_task['in_context']) + self.assertDictEqual({'p1': 'My string', + 'p2': 'val32', + 'p3': ''}, + db_task['parameters']) + self.assertEqual(states.RUNNING, db_task['state']) def test_get_outbound_context(self): output = data_flow.get_task_output(TASK, {'new_key1': 'new_val1'}) diff --git a/mistral/tests/unit/engine/test_task_retry.py b/mistral/tests/unit/engine/test_task_retry.py index 45145efd6..c46bc9b5a 100644 --- a/mistral/tests/unit/engine/test_task_retry.py +++ b/mistral/tests/unit/engine/test_task_retry.py @@ -63,8 +63,8 @@ class FailBeforeSuccessMocker(object): engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( - concrete_engine.DefaultEngine, '_run_tasks', - mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + concrete_engine.DefaultEngine, '_run_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task)) @mock.patch.object( std_actions.HTTPAction, 'run', mock.MagicMock(return_value='result')) @@ -160,6 +160,12 @@ class TaskRetryTest(base.EngineTestCase): retry_count, _, delay = task_spec.get_retry_parameters() for x in xrange(0, retry_count): + tasks = db_api.tasks_get(workbook_name=WB_NAME, + execution_id=execution['id']) + + self._assert_single_item(tasks, name=task_name) + self._assert_single_item(tasks, state=states.RUNNING) + self.engine.convey_task_result(tasks[0]['id'], states.ERROR, {'output': 'result'})