From 9726189c4309d8c5f07a63ac64eeaa2c15af2f76 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 19 Feb 2018 17:35:11 +0200 Subject: [PATCH] Fix 'pause' engine command * Commands going after 'pause' in 'on-XXX' clauses were never processed after workflow resume. The solution is to introduce a notion of a workflow execution backlog where we can save these commands in a serialized form so that the engine dispatcher could see and process them after resume. * Other minor changes Change-Id: I963b5660daf528d1caf6a785311de4fb272cafd0 Closes-Bug: #1714054 --- mistral/engine/dispatcher.py | 61 ++++++++++++--- mistral/engine/policies.py | 7 +- mistral/engine/tasks.py | 2 +- mistral/engine/workflows.py | 6 +- mistral/tests/unit/engine/test_commands.py | 22 ++++++ mistral/workflow/commands.py | 91 +++++++++++++++++++++- mistral/workflow/direct_workflow.py | 2 +- mistral/workflow/states.py | 1 + 8 files changed, 176 insertions(+), 16 deletions(-) diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py index 29e92d2df..4099f9db0 100644 --- a/mistral/engine/dispatcher.py +++ b/mistral/engine/dispatcher.py @@ -20,6 +20,8 @@ from mistral import exceptions as exc from mistral.workflow import commands from mistral.workflow import states +BACKLOG_KEY = 'backlog_commands' + def _compare_task_commands(a, b): if not isinstance(a, commands.RunTask) or not a.is_waiting(): @@ -67,7 +69,8 @@ def _rearrange_commands(cmds): cmds.sort(key=functools.cmp_to_key(_compare_task_commands)) return cmds - elif state_cmd_idx == 0: + elif (state_cmd_idx == 0 and + not isinstance(state_cmd, commands.PauseWorkflow)): return cmds[0:1] res = cmds[0:state_cmd_idx] @@ -76,26 +79,66 @@ def _rearrange_commands(cmds): res.append(state_cmd) + # If the previously found state changing command is 'pause' then we need + # to also add a tail of the initial command list to the result so that + # we can save them to the command backlog. + if isinstance(state_cmd, commands.PauseWorkflow): + res.extend(cmds[state_cmd_idx + 1:]) + return res +def _save_command_to_backlog(wf_ex, cmd): + backlog_cmds = wf_ex.runtime_context.get(BACKLOG_KEY, []) + + if not backlog_cmds: + wf_ex.runtime_context[BACKLOG_KEY] = backlog_cmds + + backlog_cmds.append(cmd.to_dict()) + + +def _poll_commands_from_backlog(wf_ex): + backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY, []) + + if not backlog_cmds: + return [] + + return [ + commands.restore_command_from_dict(wf_ex, cmd_dict) + for cmd_dict in backlog_cmds + ] + + @profiler.trace('dispatcher-dispatch-commands', hide_args=True) def dispatch_workflow_commands(wf_ex, wf_cmds): - # TODO(rakhmerov): I don't like these imports but otherwise we have - # import cycles. + # Run commands from the backlog. + _process_commands(wf_ex, _poll_commands_from_backlog(wf_ex)) + + # Run new commands. + _process_commands(wf_ex, wf_cmds) + + +def _process_commands(wf_ex, cmds): + if not cmds: + return + from mistral.engine import task_handler from mistral.engine import workflow_handler as wf_handler - if not wf_cmds: - return + for cmd in _rearrange_commands(cmds): + if states.is_completed(wf_ex.state): + break + + if wf_ex.state == states.PAUSED: + # Save all commands after 'pause' to the backlog so that + # they can be processed after the workflow is resumed. + _save_command_to_backlog(wf_ex, cmd) + + continue - for cmd in _rearrange_commands(wf_cmds): if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): task_handler.run_task(cmd) elif isinstance(cmd, commands.SetWorkflowState): wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) else: raise exc.MistralError('Unsupported workflow command: %s' % cmd) - - if wf_ex.state != states.RUNNING: - break diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 08e378063..c570de7c9 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -260,9 +260,9 @@ class WaitAfterPolicy(base.TaskPolicy): end_state = task_ex.state end_state_info = task_ex.state_info - # TODO(rakhmerov): Policies probably needs to have tasks.Task - # interface in order to change manage task state safely. - # Set task state to 'DELAYED'. + # TODO(rakhmerov): Policies probably need to have tasks.Task + # interface in order to manage task state safely. + # Set task state to 'RUNNING_DELAYED'. task_ex.state = states.RUNNING_DELAYED task_ex.state_info = ( 'Suspended by wait-after policy for %s seconds' % self.delay @@ -391,6 +391,7 @@ class RetryPolicy(base.TaskPolicy): _log_task_delay(task_ex, self.delay) data_flow.invalidate_task_execution_result(task_ex) + task_ex.state = states.RUNNING_DELAYED policy_context['retry_no'] = retry_no + 1 diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index faad0fb63..8d6436b34 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -356,7 +356,7 @@ class RegularTask(Task): @profiler.trace('regular-task-on-action-complete', hide_args=True) def on_action_complete(self, action_ex): state = action_ex.state - # TODO(rakhmerov): Here we can define more informative messages + # TODO(rakhmerov): Here we can define more informative messages for # cases when action is successful and when it's not. For example, # in state_info we can specify the cause action. state_info = (None if state == states.SUCCESS diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 1585e69c0..5bd275c54 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -195,6 +195,7 @@ class Workflow(object): if self.wf_ex.task_execution_id: # Import the task_handler module here to avoid circular reference. from mistral.engine import task_handler + task_handler.schedule_on_action_update(self.wf_ex) def prepare_input(self, input_dict): @@ -239,6 +240,9 @@ class Workflow(object): self._continue_workflow(cmds) + def _get_backlog(self): + return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY) + def _continue_workflow(self, cmds): # When resuming a workflow we need to ignore all 'pause' # commands because workflow controller takes tasks that @@ -255,7 +259,7 @@ class Workflow(object): if states.is_completed(t_ex.state) and not t_ex.processed: t_ex.processed = True - if cmds: + if cmds or self._get_backlog(): dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) else: self.check_and_complete() diff --git a/mistral/tests/unit/engine/test_commands.py b/mistral/tests/unit/engine/test_commands.py index 81c9abd80..d1a0ecd36 100644 --- a/mistral/tests/unit/engine/test_commands.py +++ b/mistral/tests/unit/engine/test_commands.py @@ -108,6 +108,28 @@ class SimpleEngineCommandsTest(base.EngineTestCase): state=states.SUCCESS ) + # Let's resume the workflow and wait till it succeeds. + self.engine.resume_workflow(wf_ex.id) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + self.assertEqual(2, len(task_execs)) + self._assert_single_item( + task_execs, + name='task1', + state=states.SUCCESS + ) + self._assert_single_item( + task_execs, + name='task2', + state=states.SUCCESS + ) + WORKBOOK2 = """ --- diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index 03b16804a..5d37f00d9 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -34,6 +34,13 @@ class WorkflowCommand(object): self.ctx = ctx or {} self.triggered_by = triggered_by + def to_dict(self): + return { + 'task_name': self.task_spec.get_name(), + 'ctx': self.ctx, + 'triggered_by': self.triggered_by + } + class Noop(WorkflowCommand): """No-operation command.""" @@ -41,6 +48,13 @@ class Noop(WorkflowCommand): def __repr__(self): return "NOOP [workflow=%s]" % self.wf_ex.name + def to_dict(self): + d = super(Noop, self).to_dict() + + d['cmd_name'] = 'noop' + + return d + class RunTask(WorkflowCommand): """Instruction to run a workflow task.""" @@ -74,9 +88,18 @@ class RunTask(WorkflowCommand): ) ) + def to_dict(self): + d = super(RunTask, self).to_dict() + + d['cmd_name'] = self.task_spec.get_name() + d['wait'] = self.wait + d['unique_key'] = self.unique_key + + return d + class RunExistingTask(WorkflowCommand): - """Command for running already existent task.""" + """Command to run an existing workflow task.""" def __init__(self, wf_ex, wf_spec, task_ex, reset=True, triggered_by=None): super(RunExistingTask, self).__init__( @@ -91,6 +114,16 @@ class RunExistingTask(WorkflowCommand): self.reset = reset self.unique_key = task_ex.unique_key + def to_dict(self): + d = super(RunExistingTask, self).to_dict() + + d['cmd_name'] = 'run_existing_task' + d['task_ex_id'] = self.task_ex.id + d['reset'] = self.reset + d['unique_key'] = self.unique_key + + return d + class SetWorkflowState(WorkflowCommand): """Instruction to change a workflow state.""" @@ -108,6 +141,14 @@ class SetWorkflowState(WorkflowCommand): self.new_state = new_state self.msg = msg + def to_dict(self): + d = super(SetWorkflowState, self).to_dict() + + d['new_state'] = self.new_state + d['msg'] = self.msg + + return d + class FailWorkflow(SetWorkflowState): """Instruction to fail a workflow.""" @@ -127,6 +168,13 @@ class FailWorkflow(SetWorkflowState): def __repr__(self): return "Fail [workflow=%s]" % self.wf_ex.name + def to_dict(self): + d = super(FailWorkflow, self).to_dict() + + d['cmd_name'] = 'fail' + + return d + class SucceedWorkflow(SetWorkflowState): """Instruction to succeed a workflow.""" @@ -146,6 +194,13 @@ class SucceedWorkflow(SetWorkflowState): def __repr__(self): return "Succeed [workflow=%s]" % self.wf_ex.name + def to_dict(self): + d = super(SucceedWorkflow, self).to_dict() + + d['cmd_name'] = 'succeed' + + return d + class PauseWorkflow(SetWorkflowState): """Instruction to pause a workflow.""" @@ -165,6 +220,13 @@ class PauseWorkflow(SetWorkflowState): def __repr__(self): return "Pause [workflow=%s]" % self.wf_ex.name + def to_dict(self): + d = super(PauseWorkflow, self).to_dict() + + d['cmd_name'] = 'pause' + + return d + RESERVED_CMDS = dict(zip( tasks.RESERVED_TASK_NAMES, [ @@ -180,6 +242,13 @@ def get_command_class(cmd_name): return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None +# TODO(rakhmerov): IMO the way how we instantiate commands is weird. +# If we look at how we implement the logic of saving commands to +# dicts (to_dict) and restoring back from dicts then we'll see +# the lack of symmetry and unified way to do that depending on a +# command. Also RunExistingTask turns to be a special case that +# is not processed with this method at all. Might be a 'bad smell'. +# This all makes me think that we need to do some refactoring here. def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx, params=None, triggered_by=None): cmd_cls = get_command_class(cmd_name) or RunTask @@ -201,3 +270,23 @@ def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx, ctx, triggered_by=triggered_by ) + + +def restore_command_from_dict(wf_ex, cmd_dict): + cmd_name = cmd_dict['cmd_name'] + + wf_spec = spec_parser.get_workflow_spec_by_execution_id(wf_ex.id) + task_spec = wf_spec.get_tasks()[cmd_dict['task_name']] + ctx = cmd_dict['ctx'] + params = {'msg': cmd_dict.get('msg')} if 'msg' in cmd_dict else None + triggered_by = cmd_dict.get('triggered_by') + + return create_command( + cmd_name, + wf_ex, + wf_spec, + task_spec, + ctx, + params, + triggered_by + ) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index 847e9d519..017a610d1 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -29,7 +29,7 @@ LOG = logging.getLogger(__name__) class DirectWorkflowController(base.WorkflowController): - """'Direct workflow' handler. + """'Direct workflow' controller. This handler implements the workflow pattern which is based on direct transitions between tasks, i.e. after each task completion diff --git a/mistral/workflow/states.py b/mistral/workflow/states.py index 1c657e896..861f7af05 100644 --- a/mistral/workflow/states.py +++ b/mistral/workflow/states.py @@ -15,6 +15,7 @@ """Valid task and workflow states.""" +# TODO(rakhmerov): need docs explaining what each state means. IDLE = 'IDLE' WAITING = 'WAITING' RUNNING = 'RUNNING'