Merge "Fix 'pause' engine command"
This commit is contained in:
commit
f3ab048b4b
@ -20,6 +20,8 @@ from mistral import exceptions as exc
|
||||
from mistral.workflow import commands
|
||||
from mistral.workflow import states
|
||||
|
||||
BACKLOG_KEY = 'backlog_commands'
|
||||
|
||||
|
||||
def _compare_task_commands(a, b):
|
||||
if not isinstance(a, commands.RunTask) or not a.is_waiting():
|
||||
@ -67,7 +69,8 @@ def _rearrange_commands(cmds):
|
||||
cmds.sort(key=functools.cmp_to_key(_compare_task_commands))
|
||||
|
||||
return cmds
|
||||
elif state_cmd_idx == 0:
|
||||
elif (state_cmd_idx == 0 and
|
||||
not isinstance(state_cmd, commands.PauseWorkflow)):
|
||||
return cmds[0:1]
|
||||
|
||||
res = cmds[0:state_cmd_idx]
|
||||
@ -76,26 +79,66 @@ def _rearrange_commands(cmds):
|
||||
|
||||
res.append(state_cmd)
|
||||
|
||||
# If the previously found state changing command is 'pause' then we need
|
||||
# to also add a tail of the initial command list to the result so that
|
||||
# we can save them to the command backlog.
|
||||
if isinstance(state_cmd, commands.PauseWorkflow):
|
||||
res.extend(cmds[state_cmd_idx + 1:])
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def _save_command_to_backlog(wf_ex, cmd):
|
||||
backlog_cmds = wf_ex.runtime_context.get(BACKLOG_KEY, [])
|
||||
|
||||
if not backlog_cmds:
|
||||
wf_ex.runtime_context[BACKLOG_KEY] = backlog_cmds
|
||||
|
||||
backlog_cmds.append(cmd.to_dict())
|
||||
|
||||
|
||||
def _poll_commands_from_backlog(wf_ex):
|
||||
backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY, [])
|
||||
|
||||
if not backlog_cmds:
|
||||
return []
|
||||
|
||||
return [
|
||||
commands.restore_command_from_dict(wf_ex, cmd_dict)
|
||||
for cmd_dict in backlog_cmds
|
||||
]
|
||||
|
||||
|
||||
@profiler.trace('dispatcher-dispatch-commands', hide_args=True)
|
||||
def dispatch_workflow_commands(wf_ex, wf_cmds):
|
||||
# TODO(rakhmerov): I don't like these imports but otherwise we have
|
||||
# import cycles.
|
||||
# Run commands from the backlog.
|
||||
_process_commands(wf_ex, _poll_commands_from_backlog(wf_ex))
|
||||
|
||||
# Run new commands.
|
||||
_process_commands(wf_ex, wf_cmds)
|
||||
|
||||
|
||||
def _process_commands(wf_ex, cmds):
|
||||
if not cmds:
|
||||
return
|
||||
|
||||
from mistral.engine import task_handler
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
|
||||
if not wf_cmds:
|
||||
return
|
||||
for cmd in _rearrange_commands(cmds):
|
||||
if states.is_completed(wf_ex.state):
|
||||
break
|
||||
|
||||
if wf_ex.state == states.PAUSED:
|
||||
# Save all commands after 'pause' to the backlog so that
|
||||
# they can be processed after the workflow is resumed.
|
||||
_save_command_to_backlog(wf_ex, cmd)
|
||||
|
||||
continue
|
||||
|
||||
for cmd in _rearrange_commands(wf_cmds):
|
||||
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
|
||||
task_handler.run_task(cmd)
|
||||
elif isinstance(cmd, commands.SetWorkflowState):
|
||||
wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg)
|
||||
else:
|
||||
raise exc.MistralError('Unsupported workflow command: %s' % cmd)
|
||||
|
||||
if wf_ex.state != states.RUNNING:
|
||||
break
|
||||
|
@ -260,9 +260,9 @@ class WaitAfterPolicy(base.TaskPolicy):
|
||||
end_state = task_ex.state
|
||||
end_state_info = task_ex.state_info
|
||||
|
||||
# TODO(rakhmerov): Policies probably needs to have tasks.Task
|
||||
# interface in order to change manage task state safely.
|
||||
# Set task state to 'DELAYED'.
|
||||
# TODO(rakhmerov): Policies probably need to have tasks.Task
|
||||
# interface in order to manage task state safely.
|
||||
# Set task state to 'RUNNING_DELAYED'.
|
||||
task_ex.state = states.RUNNING_DELAYED
|
||||
task_ex.state_info = (
|
||||
'Suspended by wait-after policy for %s seconds' % self.delay
|
||||
@ -391,6 +391,7 @@ class RetryPolicy(base.TaskPolicy):
|
||||
_log_task_delay(task_ex, self.delay)
|
||||
|
||||
data_flow.invalidate_task_execution_result(task_ex)
|
||||
|
||||
task_ex.state = states.RUNNING_DELAYED
|
||||
|
||||
policy_context['retry_no'] = retry_no + 1
|
||||
|
@ -356,7 +356,7 @@ class RegularTask(Task):
|
||||
@profiler.trace('regular-task-on-action-complete', hide_args=True)
|
||||
def on_action_complete(self, action_ex):
|
||||
state = action_ex.state
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
# TODO(rakhmerov): Here we can define more informative messages for
|
||||
# cases when action is successful and when it's not. For example,
|
||||
# in state_info we can specify the cause action.
|
||||
state_info = (None if state == states.SUCCESS
|
||||
|
@ -195,6 +195,7 @@ class Workflow(object):
|
||||
if self.wf_ex.task_execution_id:
|
||||
# Import the task_handler module here to avoid circular reference.
|
||||
from mistral.engine import task_handler
|
||||
|
||||
task_handler.schedule_on_action_update(self.wf_ex)
|
||||
|
||||
def prepare_input(self, input_dict):
|
||||
@ -239,6 +240,9 @@ class Workflow(object):
|
||||
|
||||
self._continue_workflow(cmds)
|
||||
|
||||
def _get_backlog(self):
|
||||
return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY)
|
||||
|
||||
def _continue_workflow(self, cmds):
|
||||
# When resuming a workflow we need to ignore all 'pause'
|
||||
# commands because workflow controller takes tasks that
|
||||
@ -255,7 +259,7 @@ class Workflow(object):
|
||||
if states.is_completed(t_ex.state) and not t_ex.processed:
|
||||
t_ex.processed = True
|
||||
|
||||
if cmds:
|
||||
if cmds or self._get_backlog():
|
||||
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
|
||||
else:
|
||||
self.check_and_complete()
|
||||
|
@ -108,6 +108,28 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
# Let's resume the workflow and wait till it succeeds.
|
||||
self.engine.resume_workflow(wf_ex.id)
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(2, len(task_execs))
|
||||
self._assert_single_item(
|
||||
task_execs,
|
||||
name='task1',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
self._assert_single_item(
|
||||
task_execs,
|
||||
name='task2',
|
||||
state=states.SUCCESS
|
||||
)
|
||||
|
||||
|
||||
WORKBOOK2 = """
|
||||
---
|
||||
|
@ -34,6 +34,13 @@ class WorkflowCommand(object):
|
||||
self.ctx = ctx or {}
|
||||
self.triggered_by = triggered_by
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'task_name': self.task_spec.get_name(),
|
||||
'ctx': self.ctx,
|
||||
'triggered_by': self.triggered_by
|
||||
}
|
||||
|
||||
|
||||
class Noop(WorkflowCommand):
|
||||
"""No-operation command."""
|
||||
@ -41,6 +48,13 @@ class Noop(WorkflowCommand):
|
||||
def __repr__(self):
|
||||
return "NOOP [workflow=%s]" % self.wf_ex.name
|
||||
|
||||
def to_dict(self):
|
||||
d = super(Noop, self).to_dict()
|
||||
|
||||
d['cmd_name'] = 'noop'
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class RunTask(WorkflowCommand):
|
||||
"""Instruction to run a workflow task."""
|
||||
@ -74,9 +88,18 @@ class RunTask(WorkflowCommand):
|
||||
)
|
||||
)
|
||||
|
||||
def to_dict(self):
|
||||
d = super(RunTask, self).to_dict()
|
||||
|
||||
d['cmd_name'] = self.task_spec.get_name()
|
||||
d['wait'] = self.wait
|
||||
d['unique_key'] = self.unique_key
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class RunExistingTask(WorkflowCommand):
|
||||
"""Command for running already existent task."""
|
||||
"""Command to run an existing workflow task."""
|
||||
|
||||
def __init__(self, wf_ex, wf_spec, task_ex, reset=True, triggered_by=None):
|
||||
super(RunExistingTask, self).__init__(
|
||||
@ -91,6 +114,16 @@ class RunExistingTask(WorkflowCommand):
|
||||
self.reset = reset
|
||||
self.unique_key = task_ex.unique_key
|
||||
|
||||
def to_dict(self):
|
||||
d = super(RunExistingTask, self).to_dict()
|
||||
|
||||
d['cmd_name'] = 'run_existing_task'
|
||||
d['task_ex_id'] = self.task_ex.id
|
||||
d['reset'] = self.reset
|
||||
d['unique_key'] = self.unique_key
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class SetWorkflowState(WorkflowCommand):
|
||||
"""Instruction to change a workflow state."""
|
||||
@ -108,6 +141,14 @@ class SetWorkflowState(WorkflowCommand):
|
||||
self.new_state = new_state
|
||||
self.msg = msg
|
||||
|
||||
def to_dict(self):
|
||||
d = super(SetWorkflowState, self).to_dict()
|
||||
|
||||
d['new_state'] = self.new_state
|
||||
d['msg'] = self.msg
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class FailWorkflow(SetWorkflowState):
|
||||
"""Instruction to fail a workflow."""
|
||||
@ -127,6 +168,13 @@ class FailWorkflow(SetWorkflowState):
|
||||
def __repr__(self):
|
||||
return "Fail [workflow=%s]" % self.wf_ex.name
|
||||
|
||||
def to_dict(self):
|
||||
d = super(FailWorkflow, self).to_dict()
|
||||
|
||||
d['cmd_name'] = 'fail'
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class SucceedWorkflow(SetWorkflowState):
|
||||
"""Instruction to succeed a workflow."""
|
||||
@ -146,6 +194,13 @@ class SucceedWorkflow(SetWorkflowState):
|
||||
def __repr__(self):
|
||||
return "Succeed [workflow=%s]" % self.wf_ex.name
|
||||
|
||||
def to_dict(self):
|
||||
d = super(SucceedWorkflow, self).to_dict()
|
||||
|
||||
d['cmd_name'] = 'succeed'
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class PauseWorkflow(SetWorkflowState):
|
||||
"""Instruction to pause a workflow."""
|
||||
@ -165,6 +220,13 @@ class PauseWorkflow(SetWorkflowState):
|
||||
def __repr__(self):
|
||||
return "Pause [workflow=%s]" % self.wf_ex.name
|
||||
|
||||
def to_dict(self):
|
||||
d = super(PauseWorkflow, self).to_dict()
|
||||
|
||||
d['cmd_name'] = 'pause'
|
||||
|
||||
return d
|
||||
|
||||
|
||||
RESERVED_CMDS = dict(zip(
|
||||
tasks.RESERVED_TASK_NAMES, [
|
||||
@ -180,6 +242,13 @@ def get_command_class(cmd_name):
|
||||
return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None
|
||||
|
||||
|
||||
# TODO(rakhmerov): IMO the way how we instantiate commands is weird.
|
||||
# If we look at how we implement the logic of saving commands to
|
||||
# dicts (to_dict) and restoring back from dicts then we'll see
|
||||
# the lack of symmetry and unified way to do that depending on a
|
||||
# command. Also RunExistingTask turns to be a special case that
|
||||
# is not processed with this method at all. Might be a 'bad smell'.
|
||||
# This all makes me think that we need to do some refactoring here.
|
||||
def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
|
||||
params=None, triggered_by=None):
|
||||
cmd_cls = get_command_class(cmd_name) or RunTask
|
||||
@ -201,3 +270,23 @@ def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
|
||||
ctx,
|
||||
triggered_by=triggered_by
|
||||
)
|
||||
|
||||
|
||||
def restore_command_from_dict(wf_ex, cmd_dict):
|
||||
cmd_name = cmd_dict['cmd_name']
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec_by_execution_id(wf_ex.id)
|
||||
task_spec = wf_spec.get_tasks()[cmd_dict['task_name']]
|
||||
ctx = cmd_dict['ctx']
|
||||
params = {'msg': cmd_dict.get('msg')} if 'msg' in cmd_dict else None
|
||||
triggered_by = cmd_dict.get('triggered_by')
|
||||
|
||||
return create_command(
|
||||
cmd_name,
|
||||
wf_ex,
|
||||
wf_spec,
|
||||
task_spec,
|
||||
ctx,
|
||||
params,
|
||||
triggered_by
|
||||
)
|
||||
|
@ -29,7 +29,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DirectWorkflowController(base.WorkflowController):
|
||||
"""'Direct workflow' handler.
|
||||
"""'Direct workflow' controller.
|
||||
|
||||
This handler implements the workflow pattern which is based on
|
||||
direct transitions between tasks, i.e. after each task completion
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
"""Valid task and workflow states."""
|
||||
|
||||
# TODO(rakhmerov): need docs explaining what each state means.
|
||||
IDLE = 'IDLE'
|
||||
WAITING = 'WAITING'
|
||||
RUNNING = 'RUNNING'
|
||||
|
Loading…
Reference in New Issue
Block a user