Fix 'pause' engine command

* Commands going after 'pause' in 'on-XXX' clauses
  were never processed after workflow resume. The
  solution is to introduce a notion of a workflow
  execution backlog where we can save these commands
  in a serialized form so that the engine dispatcher
  could see and process them after resume.
* Other minor changes

Change-Id: I963b5660daf528d1caf6a785311de4fb272cafd0
Closes-Bug: #1714054
This commit is contained in:
Renat Akhmerov 2018-02-19 17:35:11 +02:00
parent e3cb610c08
commit 9726189c43
8 changed files with 176 additions and 16 deletions

View File

@ -20,6 +20,8 @@ from mistral import exceptions as exc
from mistral.workflow import commands from mistral.workflow import commands
from mistral.workflow import states from mistral.workflow import states
BACKLOG_KEY = 'backlog_commands'
def _compare_task_commands(a, b): def _compare_task_commands(a, b):
if not isinstance(a, commands.RunTask) or not a.is_waiting(): if not isinstance(a, commands.RunTask) or not a.is_waiting():
@ -67,7 +69,8 @@ def _rearrange_commands(cmds):
cmds.sort(key=functools.cmp_to_key(_compare_task_commands)) cmds.sort(key=functools.cmp_to_key(_compare_task_commands))
return cmds return cmds
elif state_cmd_idx == 0: elif (state_cmd_idx == 0 and
not isinstance(state_cmd, commands.PauseWorkflow)):
return cmds[0:1] return cmds[0:1]
res = cmds[0:state_cmd_idx] res = cmds[0:state_cmd_idx]
@ -76,26 +79,66 @@ def _rearrange_commands(cmds):
res.append(state_cmd) res.append(state_cmd)
# If the previously found state changing command is 'pause' then we need
# to also add a tail of the initial command list to the result so that
# we can save them to the command backlog.
if isinstance(state_cmd, commands.PauseWorkflow):
res.extend(cmds[state_cmd_idx + 1:])
return res return res
def _save_command_to_backlog(wf_ex, cmd):
backlog_cmds = wf_ex.runtime_context.get(BACKLOG_KEY, [])
if not backlog_cmds:
wf_ex.runtime_context[BACKLOG_KEY] = backlog_cmds
backlog_cmds.append(cmd.to_dict())
def _poll_commands_from_backlog(wf_ex):
backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY, [])
if not backlog_cmds:
return []
return [
commands.restore_command_from_dict(wf_ex, cmd_dict)
for cmd_dict in backlog_cmds
]
@profiler.trace('dispatcher-dispatch-commands', hide_args=True) @profiler.trace('dispatcher-dispatch-commands', hide_args=True)
def dispatch_workflow_commands(wf_ex, wf_cmds): def dispatch_workflow_commands(wf_ex, wf_cmds):
# TODO(rakhmerov): I don't like these imports but otherwise we have # Run commands from the backlog.
# import cycles. _process_commands(wf_ex, _poll_commands_from_backlog(wf_ex))
# Run new commands.
_process_commands(wf_ex, wf_cmds)
def _process_commands(wf_ex, cmds):
if not cmds:
return
from mistral.engine import task_handler from mistral.engine import task_handler
from mistral.engine import workflow_handler as wf_handler from mistral.engine import workflow_handler as wf_handler
if not wf_cmds: for cmd in _rearrange_commands(cmds):
return if states.is_completed(wf_ex.state):
break
if wf_ex.state == states.PAUSED:
# Save all commands after 'pause' to the backlog so that
# they can be processed after the workflow is resumed.
_save_command_to_backlog(wf_ex, cmd)
continue
for cmd in _rearrange_commands(wf_cmds):
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
task_handler.run_task(cmd) task_handler.run_task(cmd)
elif isinstance(cmd, commands.SetWorkflowState): elif isinstance(cmd, commands.SetWorkflowState):
wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg)
else: else:
raise exc.MistralError('Unsupported workflow command: %s' % cmd) raise exc.MistralError('Unsupported workflow command: %s' % cmd)
if wf_ex.state != states.RUNNING:
break

View File

@ -260,9 +260,9 @@ class WaitAfterPolicy(base.TaskPolicy):
end_state = task_ex.state end_state = task_ex.state
end_state_info = task_ex.state_info end_state_info = task_ex.state_info
# TODO(rakhmerov): Policies probably needs to have tasks.Task # TODO(rakhmerov): Policies probably need to have tasks.Task
# interface in order to change manage task state safely. # interface in order to manage task state safely.
# Set task state to 'DELAYED'. # Set task state to 'RUNNING_DELAYED'.
task_ex.state = states.RUNNING_DELAYED task_ex.state = states.RUNNING_DELAYED
task_ex.state_info = ( task_ex.state_info = (
'Suspended by wait-after policy for %s seconds' % self.delay 'Suspended by wait-after policy for %s seconds' % self.delay
@ -391,6 +391,7 @@ class RetryPolicy(base.TaskPolicy):
_log_task_delay(task_ex, self.delay) _log_task_delay(task_ex, self.delay)
data_flow.invalidate_task_execution_result(task_ex) data_flow.invalidate_task_execution_result(task_ex)
task_ex.state = states.RUNNING_DELAYED task_ex.state = states.RUNNING_DELAYED
policy_context['retry_no'] = retry_no + 1 policy_context['retry_no'] = retry_no + 1

View File

@ -356,7 +356,7 @@ class RegularTask(Task):
@profiler.trace('regular-task-on-action-complete', hide_args=True) @profiler.trace('regular-task-on-action-complete', hide_args=True)
def on_action_complete(self, action_ex): def on_action_complete(self, action_ex):
state = action_ex.state state = action_ex.state
# TODO(rakhmerov): Here we can define more informative messages # TODO(rakhmerov): Here we can define more informative messages for
# cases when action is successful and when it's not. For example, # cases when action is successful and when it's not. For example,
# in state_info we can specify the cause action. # in state_info we can specify the cause action.
state_info = (None if state == states.SUCCESS state_info = (None if state == states.SUCCESS

View File

@ -195,6 +195,7 @@ class Workflow(object):
if self.wf_ex.task_execution_id: if self.wf_ex.task_execution_id:
# Import the task_handler module here to avoid circular reference. # Import the task_handler module here to avoid circular reference.
from mistral.engine import task_handler from mistral.engine import task_handler
task_handler.schedule_on_action_update(self.wf_ex) task_handler.schedule_on_action_update(self.wf_ex)
def prepare_input(self, input_dict): def prepare_input(self, input_dict):
@ -239,6 +240,9 @@ class Workflow(object):
self._continue_workflow(cmds) self._continue_workflow(cmds)
def _get_backlog(self):
return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY)
def _continue_workflow(self, cmds): def _continue_workflow(self, cmds):
# When resuming a workflow we need to ignore all 'pause' # When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that # commands because workflow controller takes tasks that
@ -255,7 +259,7 @@ class Workflow(object):
if states.is_completed(t_ex.state) and not t_ex.processed: if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True t_ex.processed = True
if cmds: if cmds or self._get_backlog():
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
else: else:
self.check_and_complete() self.check_and_complete()

View File

@ -108,6 +108,28 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
state=states.SUCCESS state=states.SUCCESS
) )
# Let's resume the workflow and wait till it succeeds.
self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
self._assert_single_item(
task_execs,
name='task1',
state=states.SUCCESS
)
self._assert_single_item(
task_execs,
name='task2',
state=states.SUCCESS
)
WORKBOOK2 = """ WORKBOOK2 = """
--- ---

View File

@ -34,6 +34,13 @@ class WorkflowCommand(object):
self.ctx = ctx or {} self.ctx = ctx or {}
self.triggered_by = triggered_by self.triggered_by = triggered_by
def to_dict(self):
return {
'task_name': self.task_spec.get_name(),
'ctx': self.ctx,
'triggered_by': self.triggered_by
}
class Noop(WorkflowCommand): class Noop(WorkflowCommand):
"""No-operation command.""" """No-operation command."""
@ -41,6 +48,13 @@ class Noop(WorkflowCommand):
def __repr__(self): def __repr__(self):
return "NOOP [workflow=%s]" % self.wf_ex.name return "NOOP [workflow=%s]" % self.wf_ex.name
def to_dict(self):
d = super(Noop, self).to_dict()
d['cmd_name'] = 'noop'
return d
class RunTask(WorkflowCommand): class RunTask(WorkflowCommand):
"""Instruction to run a workflow task.""" """Instruction to run a workflow task."""
@ -74,9 +88,18 @@ class RunTask(WorkflowCommand):
) )
) )
def to_dict(self):
d = super(RunTask, self).to_dict()
d['cmd_name'] = self.task_spec.get_name()
d['wait'] = self.wait
d['unique_key'] = self.unique_key
return d
class RunExistingTask(WorkflowCommand): class RunExistingTask(WorkflowCommand):
"""Command for running already existent task.""" """Command to run an existing workflow task."""
def __init__(self, wf_ex, wf_spec, task_ex, reset=True, triggered_by=None): def __init__(self, wf_ex, wf_spec, task_ex, reset=True, triggered_by=None):
super(RunExistingTask, self).__init__( super(RunExistingTask, self).__init__(
@ -91,6 +114,16 @@ class RunExistingTask(WorkflowCommand):
self.reset = reset self.reset = reset
self.unique_key = task_ex.unique_key self.unique_key = task_ex.unique_key
def to_dict(self):
d = super(RunExistingTask, self).to_dict()
d['cmd_name'] = 'run_existing_task'
d['task_ex_id'] = self.task_ex.id
d['reset'] = self.reset
d['unique_key'] = self.unique_key
return d
class SetWorkflowState(WorkflowCommand): class SetWorkflowState(WorkflowCommand):
"""Instruction to change a workflow state.""" """Instruction to change a workflow state."""
@ -108,6 +141,14 @@ class SetWorkflowState(WorkflowCommand):
self.new_state = new_state self.new_state = new_state
self.msg = msg self.msg = msg
def to_dict(self):
d = super(SetWorkflowState, self).to_dict()
d['new_state'] = self.new_state
d['msg'] = self.msg
return d
class FailWorkflow(SetWorkflowState): class FailWorkflow(SetWorkflowState):
"""Instruction to fail a workflow.""" """Instruction to fail a workflow."""
@ -127,6 +168,13 @@ class FailWorkflow(SetWorkflowState):
def __repr__(self): def __repr__(self):
return "Fail [workflow=%s]" % self.wf_ex.name return "Fail [workflow=%s]" % self.wf_ex.name
def to_dict(self):
d = super(FailWorkflow, self).to_dict()
d['cmd_name'] = 'fail'
return d
class SucceedWorkflow(SetWorkflowState): class SucceedWorkflow(SetWorkflowState):
"""Instruction to succeed a workflow.""" """Instruction to succeed a workflow."""
@ -146,6 +194,13 @@ class SucceedWorkflow(SetWorkflowState):
def __repr__(self): def __repr__(self):
return "Succeed [workflow=%s]" % self.wf_ex.name return "Succeed [workflow=%s]" % self.wf_ex.name
def to_dict(self):
d = super(SucceedWorkflow, self).to_dict()
d['cmd_name'] = 'succeed'
return d
class PauseWorkflow(SetWorkflowState): class PauseWorkflow(SetWorkflowState):
"""Instruction to pause a workflow.""" """Instruction to pause a workflow."""
@ -165,6 +220,13 @@ class PauseWorkflow(SetWorkflowState):
def __repr__(self): def __repr__(self):
return "Pause [workflow=%s]" % self.wf_ex.name return "Pause [workflow=%s]" % self.wf_ex.name
def to_dict(self):
d = super(PauseWorkflow, self).to_dict()
d['cmd_name'] = 'pause'
return d
RESERVED_CMDS = dict(zip( RESERVED_CMDS = dict(zip(
tasks.RESERVED_TASK_NAMES, [ tasks.RESERVED_TASK_NAMES, [
@ -180,6 +242,13 @@ def get_command_class(cmd_name):
return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None
# TODO(rakhmerov): IMO the way how we instantiate commands is weird.
# If we look at how we implement the logic of saving commands to
# dicts (to_dict) and restoring back from dicts then we'll see
# the lack of symmetry and unified way to do that depending on a
# command. Also RunExistingTask turns to be a special case that
# is not processed with this method at all. Might be a 'bad smell'.
# This all makes me think that we need to do some refactoring here.
def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx, def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
params=None, triggered_by=None): params=None, triggered_by=None):
cmd_cls = get_command_class(cmd_name) or RunTask cmd_cls = get_command_class(cmd_name) or RunTask
@ -201,3 +270,23 @@ def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
ctx, ctx,
triggered_by=triggered_by triggered_by=triggered_by
) )
def restore_command_from_dict(wf_ex, cmd_dict):
cmd_name = cmd_dict['cmd_name']
wf_spec = spec_parser.get_workflow_spec_by_execution_id(wf_ex.id)
task_spec = wf_spec.get_tasks()[cmd_dict['task_name']]
ctx = cmd_dict['ctx']
params = {'msg': cmd_dict.get('msg')} if 'msg' in cmd_dict else None
triggered_by = cmd_dict.get('triggered_by')
return create_command(
cmd_name,
wf_ex,
wf_spec,
task_spec,
ctx,
params,
triggered_by
)

View File

@ -29,7 +29,7 @@ LOG = logging.getLogger(__name__)
class DirectWorkflowController(base.WorkflowController): class DirectWorkflowController(base.WorkflowController):
"""'Direct workflow' handler. """'Direct workflow' controller.
This handler implements the workflow pattern which is based on This handler implements the workflow pattern which is based on
direct transitions between tasks, i.e. after each task completion direct transitions between tasks, i.e. after each task completion

View File

@ -15,6 +15,7 @@
"""Valid task and workflow states.""" """Valid task and workflow states."""
# TODO(rakhmerov): need docs explaining what each state means.
IDLE = 'IDLE' IDLE = 'IDLE'
WAITING = 'WAITING' WAITING = 'WAITING'
RUNNING = 'RUNNING' RUNNING = 'RUNNING'