Fixing engine transaction model and error handling

* Transaction in on_action_complete() must not be splitted into 2 parts,
  it caused the bug with after task completion logic
* Fix executor behavior so that it doesn't send an error back to engine
  if a error came from engine itself. It should report back only errors
  occurred with an action itself.
* YAQL and other expected Mistral exceptions in transitions should not
  lead to transaction rollback and rollback of action result. For example
  if action result came and it's valid but while evaluating transition
  conditions we got a YAQL exception then action result should be stored
  normally w/o transaction rollback and corresponding task and workflow
  should fail with corresponding state_info.
* Fixed all tests
* Minor cosmetic changes

Closes-Bug: #1524477

Change-Id: I09086e40a5902bbb6c977bf195cb035e31f21246
This commit is contained in:
Renat Akhmerov 2016-03-28 13:44:54 +06:00
parent 9a1a157274
commit ad07ba0d68
11 changed files with 474 additions and 331 deletions

28
AUTHORS
View File

@ -1,43 +1,71 @@
Abhishek Chanda <abhishek@cloudscaling.com>
Alexander Kuznetsov <akuznetsov@mirantis.com>
Anastasia Kuznetsova <akuznetsova@mirantis.com>
Andreas Jaeger <aj@suse.com>
Angus Salkeld <angus.salkeld@rackspace.com>
Ankita Wagh <ankita_wagh@symmactoolkit-c02lr80ufd57.symc.symantec.com>
Antoine Musso <hashar@free.fr>
Bertrand Lallau <bertrand.lallau@gmail.com>
Bhaskar Duvvuri <dbasu84@gmail.com>
Boris Pavlovic <boris@pavlovic.me>
Bryan Havenstein <bryan.havenstein@ericsson.com>
Chaozhe.Chen <chaozhe.chen@easystack.cn>
Christian Berendt <berendt@b1-systems.de>
Claudiu Belu <cbelu@cloudbasesolutions.com>
Dan Prince <dprince@redhat.com>
Daryl Mowrer <dmowrer@us.ibm.com>
David C Kennedy <david.c.kennedy@hp.com>
David Charles Kennedy <dkennedy@hp.com>
Dawid Deja <dawid.deja@intel.com>
Dmitri Zimine <dz@stackstorm.com>
Doug Hellmann <doug@doughellmann.com>
Ed Cranford <ed.cranford@rackspace.com>
Gal Margalit <gal.margalit@alcatel-lucent.com>
Guy Paz <guy.paz@alcatel-lucent.com>
Jeremy Stanley <fungi@yuggoth.org>
Jiri Tomasek <jtomasek@redhat.com>
Kevin Pouget <kpouget@altair.com>
Kirill Izotov <enykeev@stackstorm.com>
Lakshmi Kannan <lakshmi@stackstorm.com>
Limor <limor.bortman@nokia.com>
Limor Stotland <limor.bortman@alcatel-lucent.com>
Lingxian Kong <konglingxian@huawei.com>
Liu Sheng <liusheng@huawei.com>
LiuNanke <nanke.liu@easystack.cn>
Manas Kelshikar <manas@stackstorm.com>
Michael Krotscheck <krotscheck@gmail.com>
Michal Gershenzon <michal.gershenzon@alcatel-lucent.com>
Monty Taylor <mordred@inaugust.com>
Moshe Elisha <moshe.elisha@alcatel-lucent.com>
Nikolay Mahotkin <nmakhotkin@mirantis.com>
Noa Koffman <noa.koffman@alcatel-lucent.com>
Oleksii Chuprykov <ochuprykov@mirantis.com>
Pierre-Arthur MATHIEU <pierre-arthur.mathieu@hp.com>
Ray Chen <chenrano2002@gmail.com>
Renat Akhmerov <rakhmerov@mirantis.com>
Renat Akhmerov <renat.akhmerov@gmail.com>
Rico Lin <rico.lin.guanyu@gmail.com>
Rinat Sabitov <rinat.sabitov@gmail.com>
Sergey Kolekonov <skolekonov@mirantis.com>
Sergey Murashov <smurashov@mirantis.com>
Shuquan Huang <huang.shuquan@99cloud.net>
Thierry Carrez <thierry@openstack.org>
Thomas Herve <therve@redhat.com>
Timur Nurlygayanov <tnurlygayanov@mirantis.com>
Venkata Mahesh Kotha <venkatamaheshkotha@gmail.com>
Winson Chan <wcchan@stackstorm.com>
Yaroslav Lobankov <ylobankov@mirantis.com>
Zhao Lei <zhaolei@cn.fujitsu.com>
Zhenguo Niu <niuzhenguo@huawei.com>
ZhiQiang Fan <aji.zqfan@gmail.com>
ZhiQiang Fan <zhiqiang.fan@huawei.com>
Zhu Rong <zhu.rong@99cloud.net>
caoyue <yue.cao@easystack.cn>
cheneydc <dongc@neunn.com>
hardik <hardik.parekh@nectechnologies.in>
hparekh <hardik.parekh@nectechnologies.in>
keliang <ke.liang@easystack.cn>
syed ahsan shamim zaidi <ahsanmohsin04@yahoo.com>
tengqm <tengqim@cn.ibm.com>
wangzhh <wangzhh@awcloud.com>
zhangguoqing <zhang.guoqing@99cloud.net>

View File

@ -29,7 +29,7 @@ from mistral.workflow import utils as wf_utils
def create_action_execution(action_def, action_input, task_ex=None,
index=0, description=''):
# TODO(rakhmerov): We can avoid hitting DB at all when calling something
# TODO(rakhmerov): We can avoid hitting DB at all when calling things like
# create_action_execution(), these operations can be just done using
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
# send necessary SQL queries to DB. Currently, session flush happens

View File

@ -24,6 +24,7 @@ from mistral.engine import action_handler
from mistral.engine import base
from mistral.engine import task_handler
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
from mistral.services import action_manager as a_m
from mistral.services import executions as wf_ex_service
from mistral.services import workflows as wf_service
@ -55,6 +56,9 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ex_id = None
try:
# Create a persistent workflow execution in a separate transaction
# so that we can return it even in case of unexpected errors that
# lead to transaction rollback.
with db_api.transaction():
# The new workflow execution will be in an IDLE
# state on initial record creation.
@ -65,10 +69,6 @@ class DefaultEngine(base.Engine, coordination.Service):
params
)
# Separate workflow execution creation and dispatching command
# transactions in order to be able to return workflow execution
# with corresponding error message in state_info when error occurs
# at dispatching commands.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_handler.set_execution_state(wf_ex, states.RUNNING)
@ -161,14 +161,10 @@ class DefaultEngine(base.Engine, coordination.Service):
self._on_task_state_change(task_ex, wf_ex, wf_spec)
def _on_task_state_change(self, task_ex, wf_ex, wf_spec,
task_state=states.SUCCESS):
def _on_task_state_change(self, task_ex, wf_ex, wf_spec):
task_spec = wf_spec.get_tasks()[task_ex.name]
# We must be sure that if task is completed,
# it was also completed in previous transaction.
if (task_handler.is_task_completed(task_ex, task_spec)
and states.is_completed(task_state)):
if task_handler.is_task_completed(task_ex, task_spec):
task_handler.after_task_complete(task_ex, task_spec, wf_spec)
# Ignore DELAYED state.
@ -178,8 +174,21 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
# Calculate commands to process next.
try:
cmds = wf_ctrl.continue_workflow()
except exc.YaqlEvaluationException as e:
LOG.error(
'YAQL error occurred while calculating next workflow '
'commands [wf_ex_id=%s, task_ex_id=%s]: %s',
wf_ex.id, task_ex.id, e
)
wf_handler.fail_workflow(wf_ex, str(e))
return
# Mark task as processed after all decisions have been made
# upon its completion.
task_ex.processed = True
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
@ -235,6 +244,7 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ex_id = action_ex.task_execution.workflow_execution_id
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
task_ex = task_handler.on_action_complete(
@ -248,30 +258,13 @@ class DefaultEngine(base.Engine, coordination.Service):
if states.is_paused_or_completed(wf_ex.state):
return action_ex.get_clone()
prev_task_state = task_ex.state
# Separate the task transition in a separate transaction. The task
# has already completed for better or worst. The task state should
# not be affected by errors during transition on conditions such as
# on-success and on-error.
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
action_ex = db_api.get_action_execution(action_ex_id)
task_ex = action_ex.task_execution
self._on_task_state_change(
task_ex,
wf_ex,
wf_spec,
task_state=prev_task_state
)
self._on_task_state_change(task_ex, wf_ex, wf_spec)
return action_ex.get_clone()
except Exception as e:
# TODO(dzimine): try to find out which command caused failure.
# TODO(rakhmerov): Need to refactor logging in a more elegant way.
LOG.error(
"Failed to handle action execution result [id=%s]: %s\n%s",
'Failed to handle action execution result [id=%s]: %s\n%s',
action_ex_id, e, traceback.format_exc()
)
@ -301,12 +294,13 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ctrl = wf_base.get_controller(wf_ex)
# TODO(rakhmerov): Add YAQL error handling.
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
# When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that
# completed within the period when the workflow was pause.
# completed within the period when the workflow was paused.
cmds = list(
filter(
lambda c: not isinstance(c, commands.PauseWorkflow),
@ -323,6 +317,7 @@ class DefaultEngine(base.Engine, coordination.Service):
t_ex.processed = True
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
if not cmds:
@ -378,9 +373,9 @@ class DefaultEngine(base.Engine, coordination.Service):
raise e
@u.log_exec(LOG)
def stop_workflow(self, execution_id, state, message=None):
def stop_workflow(self, wf_ex_id, state, message=None):
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(execution_id)
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
return self._stop_workflow(wf_ex, state, message)
@ -390,13 +385,16 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ctrl = wf_base.get_controller(wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
"Failed to get final context for %s: %s" % (wf_ex, e)
'Failed to get final context for %s: %s' % (wf_ex, e)
)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
return wf_handler.succeed_workflow(
wf_ex,
final_context,
@ -409,7 +407,7 @@ class DefaultEngine(base.Engine, coordination.Service):
return wf_ex
@u.log_exec(LOG)
def rollback_workflow(self, execution_id):
def rollback_workflow(self, wf_ex_id):
# TODO(rakhmerov): Implement.
raise NotImplementedError
@ -421,12 +419,26 @@ class DefaultEngine(base.Engine, coordination.Service):
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
task_handler.defer_task(cmd)
elif isinstance(cmd, commands.RunTask):
task_handler.run_new_task(cmd, wf_spec)
task_ex = task_handler.run_new_task(cmd, wf_spec)
if task_ex.state == states.ERROR:
wf_handler.fail_workflow(
wf_ex,
'Failed to start task [task_ex=%s]: %s' %
(task_ex, task_ex.state_info)
)
elif isinstance(cmd, commands.RunExistingTask):
task_handler.run_existing_task(
task_ex = task_handler.run_existing_task(
cmd.task_ex.id,
reset=cmd.reset
)
if task_ex.state == states.ERROR:
wf_handler.fail_workflow(
wf_ex,
'Failed to start task [task_ex=%s]: %s' %
(task_ex, task_ex.state_info)
)
elif isinstance(cmd, commands.SetWorkflowState):
if states.is_completed(cmd.new_state):
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
@ -441,33 +453,28 @@ class DefaultEngine(base.Engine, coordination.Service):
if wf_ex.state != states.RUNNING:
break
# TODO(rakhmerov): This method may not be needed at all because error
# handling is now implemented too roughly w/o distinguishing different
# errors. On most errors (like YAQLException) we shouldn't rollback
# transactions, we just need to fail corresponding execution objects
# where a problem happened (action, task or workflow).
@staticmethod
def _fail_workflow(wf_ex_id, err, action_ex_id=None):
def _fail_workflow(wf_ex_id, exc):
"""Private helper to fail workflow on exceptions."""
err_msg = str(err)
with db_api.transaction():
wf_ex = db_api.load_workflow_execution(wf_ex_id)
if wf_ex is None:
LOG.error(
"Cant fail workflow execution with id='%s': not found.",
"Can't fail workflow execution with id='%s': not found.",
wf_ex_id
)
return
return None
wf_handler.set_execution_state(wf_ex, states.ERROR, err_msg)
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
if action_ex_id:
# Note(dzimine): Don't call self.engine_client:
# 1) to avoid computing and triggering next tasks
# 2) to avoid a loop in case of error in transport
action_ex = db_api.get_action_execution(action_ex_id)
task_handler.on_action_complete(
action_ex,
spec_parser.get_workflow_spec(wf_ex.spec),
wf_utils.Result(error=err_msg)
)
if not states.is_paused_or_completed(wf_ex.state):
wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc))
return wf_ex

View File

@ -19,7 +19,6 @@ from oslo_log import log as logging
from mistral.actions import action_factory as a_f
from mistral import coordination
from mistral.engine import base
from mistral import exceptions as exc
from mistral.utils import inspect_utils as i_u
from mistral.workflow import utils as wf_utils
@ -37,7 +36,7 @@ class DefaultExecutor(base.Executor, coordination.Service):
action_params):
"""Runs action.
:param action_ex_id: Corresponding task id.
:param action_ex_id: Action execution id.
:param action_class_str: Path to action class in dot notation.
:param attributes: Attributes of action class which will be set to.
:param action_params: Action parameters.
@ -51,14 +50,29 @@ class DefaultExecutor(base.Executor, coordination.Service):
action_ex_id,
error_result
)
else:
return None
return error_result
action_cls = a_f.construct_action_class(action_class_str, attributes)
# Instantiate action.
try:
action = action_cls(**action_params)
except Exception as e:
msg = ("Failed to initialize action %s. Action init params = %s."
" Actual init params = %s. More info: %s"
% (action_class_str, i_u.get_arg_list(action_cls.__init__),
action_params.keys(), e))
LOG.warning(msg)
return send_error_back(msg)
# Run action.
try:
result = action.run()
# Note: it's made for backwards compatibility with already
@ -67,24 +81,25 @@ class DefaultExecutor(base.Executor, coordination.Service):
if not isinstance(result, wf_utils.Result):
result = wf_utils.Result(data=result)
if action_ex_id and (action.is_sync() or result.is_error()):
self._engine_client.on_action_complete(action_ex_id, result)
return result
except TypeError as e:
msg = ("Failed to initialize action %s. Action init params = %s."
" Actual init params = %s. More info: %s"
% (action_class_str, i_u.get_arg_list(action_cls.__init__),
action_params.keys(), e))
LOG.warning(msg)
except exc.ActionException as e:
except Exception as e:
msg = ("Failed to run action [action_ex_id=%s, action_cls='%s',"
" attributes='%s', params='%s']\n %s"
% (action_ex_id, action_cls, attributes, action_params, e))
LOG.exception(msg)
except Exception as e:
msg = str(e)
# Send error info to engine.
return send_error_back(msg)
# Send action result.
try:
if action_ex_id and (action.is_sync() or result.is_error()):
self._engine_client.on_action_complete(action_ex_id, result)
except Exception as e:
msg = ("Exception occurred when calling engine on_action_complete"
" [action_ex_id=%s, action_cls='%s',"
" attributes='%s', params='%s']\n %s"
% (action_ex_id, action_cls, attributes, action_params, e))
LOG.exception(msg)
return result

View File

@ -24,6 +24,7 @@ from mistral.engine import base
from mistral import exceptions as exc
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -164,6 +165,8 @@ class EngineServer(object):
:param rpc_ctx: RPC request context.
:param action_ex_id: Action execution id.
:param result_data: Action result data.
:param result_error: Action result error.
:return: Action execution.
"""

View File

@ -46,6 +46,9 @@ def run_existing_task(task_ex_id, reset=True):
"""This function runs existing task execution.
It is needed mostly by scheduler.
:param task_ex_id: Task execution id.
:param reset: Reset action executions for the task.
"""
task_ex = db_api.get_task_execution(task_ex_id)
task_spec = spec_parser.get_task_spec(task_ex.spec)
@ -54,15 +57,16 @@ def run_existing_task(task_ex_id, reset=True):
# Throw exception if the existing task already succeeded.
if task_ex.state == states.SUCCESS:
raise exc.EngineException('Reruning existing task that already '
'succeeded is not supported.')
raise exc.EngineException(
'Rerunning existing task that already succeeded is not supported.'
)
# Exit if the existing task failed and reset is not instructed.
# For a with-items task without reset, re-running the existing
# task will re-run the failed and unstarted items.
if (task_ex.state == states.ERROR and not reset and
not task_spec.get_with_items()):
return
return task_ex
# Reset nested executions only if task is not already RUNNING.
if task_ex.state != states.RUNNING:
@ -84,14 +88,27 @@ def run_existing_task(task_ex_id, reset=True):
_run_existing_task(task_ex, task_spec, wf_spec)
return task_ex
def _run_existing_task(task_ex, task_spec, wf_spec):
try:
input_dicts = _get_input_dictionaries(
wf_spec,
task_ex,
task_spec,
task_ex.in_context
)
except exc.MistralException as e:
LOG.error(
'An error while calculating task action inputs'
' [task_execution_id=%s]: %s',
task_ex.id, e
)
set_task_state(task_ex, states.ERROR, str(e))
return
# In some cases we can have no input, e.g. in case of 'with-items'.
if input_dicts:
@ -113,8 +130,15 @@ def defer_task(wf_cmd):
wf_ex = wf_cmd.wf_ex
task_spec = wf_cmd.task_spec
if not wf_utils.find_task_executions_by_spec(wf_ex, task_spec):
_create_task_execution(wf_ex, task_spec, ctx, state=states.WAITING)
if wf_utils.find_task_executions_by_spec(wf_ex, task_spec):
return None
return _create_task_execution(
wf_ex,
task_spec,
ctx,
state=states.WAITING
)
def run_new_task(wf_cmd, wf_spec):
@ -149,23 +173,25 @@ def run_new_task(wf_cmd, wf_spec):
# Policies could possibly change task state.
if task_ex.state != states.RUNNING:
return
return task_ex
_run_existing_task(task_ex, task_spec, wf_spec)
return task_ex
def on_action_complete(action_ex, wf_spec, result):
"""Handles event of action result arrival.
Given action result this method performs analysis of the workflow
execution and identifies commands (including tasks) that can be
scheduled for execution.
Given action result this method changes corresponding task execution
object. This method must never be called for the case of individual
action which is not associated with any tasks.
:param action_ex: Action execution objects the result belongs to.
:param wf_spec: Workflow specification.
:param result: Task action/workflow output wrapped into
mistral.workflow.utils.Result instance.
:return List of engine commands that need to be performed.
:return Task execution object.
"""
task_ex = action_ex.task_execution
@ -177,7 +203,18 @@ def on_action_complete(action_ex, wf_spec, result):
task_spec = wf_spec.get_tasks()[task_ex.name]
try:
result = action_handler.transform_result(result, task_ex, task_spec)
except exc.YaqlEvaluationException as e:
err_msg = str(e)
LOG.error(
'YAQL error while transforming action result'
' [action_execution_id=%s, result=%s]: %s',
action_ex.id, result, err_msg
)
result = wf_utils.Result(error=err_msg)
# Ignore workflow executions because they're handled during
# workflow completion.
@ -195,6 +232,7 @@ def on_action_complete(action_ex, wf_spec, result):
_complete_task(task_ex, task_spec, task_state, task_state_info)
else:
with_items.increase_capacity(task_ex)
if with_items.is_completed(task_ex):
_complete_task(
task_ex,
@ -405,7 +443,10 @@ def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec):
)
action_ex = action_handler.create_action_execution(
action_def, action_input, task_ex, index
action_def,
action_input,
task_ex,
index
)
target = expr.evaluate_recursively(
@ -506,11 +547,14 @@ def _complete_task(task_ex, task_spec, state, state_info=None):
set_task_state(task_ex, state, state_info)
try:
data_flow.publish_variables(
task_ex,
task_spec
data_flow.publish_variables(task_ex, task_spec)
except exc.MistralException as e:
LOG.error(
'An error while publishing task variables'
' [task_execution_id=%s]: %s',
task_ex.id, str(e)
)
except Exception as e:
set_task_state(task_ex, states.ERROR, str(e))
if not task_spec.get_keep_result():
@ -518,7 +562,6 @@ def _complete_task(task_ex, task_spec, state, state_info=None):
def set_task_state(task_ex, state, state_info, processed=None):
# TODO(rakhmerov): How do we log task result?
wf_trace.info(
task_ex.workflow_execution,
"Task execution '%s' [%s -> %s]" %

View File

@ -146,17 +146,17 @@ class EngineTestCase(base.DbTestCase):
def is_task_in_state(self, task_ex_id, state):
return db_api.get_task_execution(task_ex_id).state == state
def is_execution_in_state(self, wf_ex_id, state):
return db_api.get_workflow_execution(wf_ex_id).state == state
def is_execution_in_state(self, ex_id, state):
return db_api.get_workflow_execution(ex_id).state == state
def is_execution_success(self, wf_ex_id):
return self.is_execution_in_state(wf_ex_id, states.SUCCESS)
def is_execution_success(self, ex_id):
return self.is_execution_in_state(ex_id, states.SUCCESS)
def is_execution_error(self, wf_ex_id):
return self.is_execution_in_state(wf_ex_id, states.ERROR)
def is_execution_error(self, ex_id):
return self.is_execution_in_state(ex_id, states.ERROR)
def is_execution_paused(self, wf_ex_id):
return self.is_execution_in_state(wf_ex_id, states.PAUSED)
def is_execution_paused(self, ex_id):
return self.is_execution_in_state(ex_id, states.PAUSED)
def is_task_success(self, task_ex_id):
return self.is_task_in_state(task_ex_id, states.SUCCESS)

View File

@ -29,12 +29,14 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
class DirectWorkflowEngineTest(base.EngineTestCase):
def _run_workflow(self, workflow_yaml, state=states.ERROR):
wf_service.create_workflows(workflow_yaml)
def _run_workflow(self, wf_text, expected_state=states.ERROR):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_in_state(wf_ex.id, state))
self._await(
lambda: self.is_execution_in_state(wf_ex.id, expected_state)
)
return db_api.get_workflow_execution(wf_ex.id)
@ -274,17 +276,18 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
# Assert that there is only one task execution and it's SUCCESS.
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
# 'task1' should be in SUCCESS.
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
task_execs,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.SUCCESS, task_1_ex.state)
# Assert that there is only one action execution and it's SUCCESS.
# 'task1' should have exactly one action execution (in SUCCESS).
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
)
@ -292,6 +295,19 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
# 'task2' should exist but in ERROR.
task_2_ex = self._assert_single_item(
task_execs,
name='task2',
state=states.ERROR
)
# 'task2' must not have action executions.
self.assertEqual(
0,
len(db_api.get_action_executions(task_execution_id=task_2_ex.id))
)
def test_async_next_task_with_input_yaql_error(self):
wf_text = """
version: '2.0'
@ -331,29 +347,28 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, task_1_action_exs[0].state)
# Update async action execution result.
result = wf_utils.Result(data='foobar')
self.assertRaises(
exc.YaqlEvaluationException,
self.engine.on_action_complete,
self.engine.on_action_complete(
task_1_action_exs[0].id,
result
wf_utils.Result(data='foobar')
)
# Assert that task1 is SUCCESS and workflow is ERROR.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
# 'task1' must be in SUCCESS.
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
task_execs,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.SUCCESS, task_1_ex.state)
# 'task1' must have exactly one action execution (in SUCCESS).
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
)
@ -361,6 +376,19 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
# 'task2' must be in ERROR.
task_2_ex = self._assert_single_item(
task_execs,
name='task2',
state=states.ERROR
)
# 'task2' must not have action executions.
self.assertEqual(
0,
len(db_api.get_action_executions(task_execution_id=task_2_ex.id))
)
def test_messed_yaql_in_first_task(self):
wf_text = """
version: '2.0'
@ -511,13 +539,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, task_1_action_exs[0].state)
# Update async action execution result.
result = wf_utils.Result(data='foobar')
self.assertRaises(
exc.YaqlEvaluationException,
self.engine.on_action_complete,
self.engine.on_action_complete(
task_1_action_exs[0].id,
result
wf_utils.Result(data='foobar')
)
# Assert that task1 is SUCCESS and workflow is ERROR.

View File

@ -37,6 +37,7 @@ wf:
input:
- workflow_input: '__WORKFLOW_INPUT__'
- action_output_length: 0
tasks:
task1:
action: my_action
@ -79,8 +80,10 @@ def expect_size_limit_exception(field_name):
def generate_workflow(tokens):
new_wf = WF
long_string = ''.join('A' for _ in range(1024))
for token in tokens:
new_wf = new_wf.replace(token, long_string)
return new_wf
@ -136,11 +139,11 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn(
"Size of 'input' is 1KB which exceeds the limit of 0KB",
wf_ex.state_info
)
self.assertEqual(states.ERROR, wf_ex.state)
def test_action_output_limit(self):
wf_service.create_workflows(WF)
@ -175,7 +178,7 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn(
"Failure caused by error in tasks: task1",
'Failure caused by error in tasks: task1',
wf_ex.state_info
)

View File

@ -33,7 +33,7 @@ from mistral.workflow import utils as wf_utils
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK = """
WB = """
---
version: "2.0"
@ -55,7 +55,7 @@ workflows:
"""
WORKBOOK_WITH_STATIC_VAR = """
WB_WITH_STATIC_VAR = """
---
version: "2.0"
@ -78,7 +78,7 @@ workflows:
"""
WORKBOOK_MULTI_ARRAY = """
WB_MULTI_ARRAY = """
---
version: "2.0"
@ -104,7 +104,7 @@ workflows:
"""
WORKBOOK_ACTION_CONTEXT = """
WB_ACTION_CONTEXT = """
---
version: "2.0"
name: wb1
@ -123,7 +123,7 @@ workflows:
"""
WORKFLOW_INPUT = {
WF_INPUT = {
'names_info': [
{'name': 'John'},
{'name': 'Ivan'},
@ -140,7 +140,7 @@ WF_INPUT_URLS = {
]
}
WORKFLOW_INPUT_ONE_ITEM = {
WF_INPUT_ONE_ITEM = {
'names_info': [
{'name': 'Guy'}
]
@ -153,6 +153,7 @@ class RandomSleepEchoAction(action_base.Action):
def run(self):
utils.random_sleep(1)
return self.output
def test(self):
@ -176,10 +177,10 @@ class WithItemsEngineTest(base.EngineTestCase):
if ex.state == states.RUNNING])
def test_with_items_simple(self):
wb_service.create_workbook_v2(WORKBOOK)
wb_service.create_workbook_v2(WB)
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', WORKFLOW_INPUT)
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT)
self._await(
lambda: self.is_execution_success(wf_ex.id),
@ -188,15 +189,17 @@ class WithItemsEngineTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
with_items_context = task1.runtime_context['with_items_context']
task_execs = wf_ex.task_executions
self.assertEqual(3, with_items_context['count'])
task1_ex = self._assert_single_item(task_execs, name='task1')
with_items_ctx = task1_ex.runtime_context['with_items_context']
self.assertEqual(3, with_items_ctx['count'])
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task1)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
@ -204,15 +207,15 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn('Ivan', result)
self.assertIn('Mistral', result)
published = task1.published
published = task1_ex.published
self.assertIn(published['result'], ['John', 'Ivan', 'Mistral'])
self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(1, len(task_execs))
self.assertEqual(states.SUCCESS, task1_ex.state)
def test_with_items_fail(self):
workflow = """---
wf_text = """---
version: "2.0"
with_items:
@ -227,23 +230,21 @@ class WithItemsEngineTest(base.EngineTestCase):
task2:
action: std.echo output="With-items failed"
"""
wf_service.create_workflows(workflow)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('with_items', {})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(2, len(tasks))
self.assertEqual(2, len(wf_ex.task_executions))
def test_with_items_sub_workflow_fail(self):
workbook = """---
wb_text = """---
version: "2.0"
name: wb1
@ -263,36 +264,34 @@ class WithItemsEngineTest(base.EngineTestCase):
subworkflow:
type: direct
tasks:
fail:
action: std.fail
"""
wb_service.create_workbook_v2(workbook)
wb_service.create_workbook_v2(wb_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(2, len(tasks))
self.assertEqual(2, len(wf_ex.task_executions))
def test_with_items_static_var(self):
wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR)
wb_service.create_workbook_v2(WB_WITH_STATIC_VAR)
wf_input = copy.deepcopy(WORKFLOW_INPUT)
wf_input = copy.deepcopy(WF_INPUT)
wf_input.update({'greeting': 'Hello'})
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -311,26 +310,25 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task1.state)
def test_with_items_multi_array(self):
wb_service.create_workbook_v2(WORKBOOK_MULTI_ARRAY)
wb_service.create_workbook_v2(WB_MULTI_ARRAY)
wf_input = {'arrayI': ['a', 'b', 'c'], 'arrayJ': [1, 2, 3]}
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task_execs = wf_ex.task_executions
task1_ex = self._assert_single_item(task_execs, name='task1')
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task1)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
@ -338,30 +336,28 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn('b 2', result)
self.assertIn('c 3', result)
self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(1, len(task_execs))
self.assertEqual(states.SUCCESS, task1_ex.state)
def test_with_items_action_context(self):
wb_service.create_workbook_v2(WORKBOOK_ACTION_CONTEXT)
wb_service.create_workbook_v2(WB_ACTION_CONTEXT)
# Start workflow.
wf_ex = self.engine.start_workflow(
'wb1.wf1_with_items', WF_INPUT_URLS
)
wf_ex = self.engine.start_workflow('wb1.wf1_with_items', WF_INPUT_URLS)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
act_exs = task_ex.executions
self.engine.on_action_complete(act_exs[0].id, wf_utils.Result("Ivan"))
self.engine.on_action_complete(act_exs[1].id, wf_utils.Result("John"))
self.engine.on_action_complete(
act_exs[2].id, wf_utils.Result("Mistral")
act_exs[2].id,
wf_utils.Result("Mistral")
)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -378,7 +374,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_ex.state)
def test_with_items_empty_list(self):
workbook = """---
wb_text = """---
version: "2.0"
name: wb1
@ -400,29 +396,29 @@ class WithItemsEngineTest(base.EngineTestCase):
task2:
action: std.echo output="Hi!"
"""
wb_service.create_workbook_v2(workbook)
wb_service.create_workbook_v2(wb_text)
# Start workflow.
wf_input = {'names_info': []}
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task_execs = wf_ex.task_executions
self.assertEqual(2, len(tasks))
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(states.SUCCESS, task2.state)
task1_ex = self._assert_single_item(task_execs, name='task1')
task2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(2, len(task_execs))
self.assertEqual(states.SUCCESS, task1_ex.state)
self.assertEqual(states.SUCCESS, task2_ex.state)
def test_with_items_plain_list(self):
workbook = """---
wb_text = """---
version: "2.0"
name: wb1
@ -436,7 +432,8 @@ class WithItemsEngineTest(base.EngineTestCase):
with-items: i in [1, 2, 3]
action: std.echo output=<% $.i %>
"""
wb_service.create_workbook_v2(workbook)
wb_service.create_workbook_v2(wb_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
@ -446,11 +443,13 @@ class WithItemsEngineTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.SUCCESS, task1.state)
task1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
result = data_flow.get_task_execution_result(task1)
result = data_flow.get_task_execution_result(task1_ex)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
@ -459,7 +458,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn(3, result)
def test_with_items_plain_list_wrong(self):
workbook = """---
wb_text = """---
version: "2.0"
name: wb1
@ -477,13 +476,13 @@ class WithItemsEngineTest(base.EngineTestCase):
exception = self.assertRaises(
exc.InvalidModelException,
wb_service.create_workbook_v2, workbook
wb_service.create_workbook_v2, wb_text
)
self.assertIn("Invalid array in 'with-items'", exception.message)
def test_with_items_results_order(self):
workbook = """---
wb_text = """---
version: "2.0"
name: wb1
@ -502,60 +501,57 @@ class WithItemsEngineTest(base.EngineTestCase):
# Register random sleep action in the DB.
test_base.register_action_class('sleep_echo', RandomSleepEchoAction)
wb_service.create_workbook_v2(workbook)
wb_service.create_workbook_v2(wb_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
self.assertEqual(states.SUCCESS, task1.state)
published = task1.published
published = task1_ex.published
# Now we can check order of results explicitly.
self.assertEqual([1, 2, 3], published['one_two_three'])
def test_with_items_results_one_item_as_list(self):
wb_service.create_workbook_v2(WORKBOOK)
wb_service.create_workbook_v2(WB)
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items',
WORKFLOW_INPUT_ONE_ITEM)
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT_ONE_ITEM)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task_execs = wf_ex.task_executions
result = data_flow.get_task_execution_result(task1)
self.assertEqual(1, len(task_execs))
task1_ex = self._assert_single_item(
task_execs,
name='task1',
state=states.SUCCESS
)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
self.assertIn('Guy', result)
published = task1.published
self.assertIn(published['result'], ['Guy'])
self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, task1.state)
self.assertIn(task1_ex.published['result'], ['Guy'])
def test_with_items_concurrency_1(self):
workflow_with_concurrency_1 = """---
wf_with_concurrency_1 = """---
version: "2.0"
concurrency_test:
@ -569,16 +565,18 @@ class WithItemsEngineTest(base.EngineTestCase):
action: std.async_noop
with-items: name in <% $.names %>
concurrency: 1
"""
wf_service.create_workflows(workflow_with_concurrency_1)
wf_service.create_workflows(wf_with_concurrency_1)
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
@ -589,6 +587,7 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
@ -599,6 +598,7 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(1, self.get_running_action_exs_number(task_ex))
@ -609,13 +609,13 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(1, task_ex)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
task_ex = db_api.get_task_execution(task_ex.id)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
@ -629,7 +629,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_ex.state)
def test_with_items_concurrency_yaql(self):
workflow_with_concurrency_yaql = """---
wf_with_concurrency_yaql = """---
version: "2.0"
concurrency_test:
@ -644,9 +644,9 @@ class WithItemsEngineTest(base.EngineTestCase):
action: std.echo output=<% $.name %>
with-items: name in <% $.names %>
concurrency: <% $.concurrency %>
"""
wf_service.create_workflows(workflow_with_concurrency_yaql)
wf_service.create_workflows(wf_with_concurrency_yaql)
# Start workflow.
wf_ex = self.engine.start_workflow(
@ -654,13 +654,14 @@ class WithItemsEngineTest(base.EngineTestCase):
{'concurrency': 2}
)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.SUCCESS, task_ex.state)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
@ -671,10 +672,8 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn('Ivan', result)
self.assertIn('Mistral', result)
self.assertEqual(states.SUCCESS, task_ex.state)
def test_with_items_concurrency_yaql_wrong_type(self):
workflow_with_concurrency_yaql = """---
wf_with_concurrency_yaql = """---
version: "2.0"
concurrency_test:
@ -689,9 +688,9 @@ class WithItemsEngineTest(base.EngineTestCase):
action: std.echo output=<% $.name %>
with-items: name in <% $.names %>
concurrency: <% $.concurrency %>
"""
wf_service.create_workflows(workflow_with_concurrency_yaql)
wf_service.create_workflows(wf_with_concurrency_yaql)
# Start workflow.
wf_ex = self.engine.start_workflow(
@ -700,13 +699,13 @@ class WithItemsEngineTest(base.EngineTestCase):
)
self.assertIn(
"Invalid data type in ConcurrencyPolicy",
'Invalid data type in ConcurrencyPolicy',
wf_ex.state_info
)
self.assertEqual(states.ERROR, wf_ex.state)
def test_with_items_concurrency_2(self):
workflow_with_concurrency_2 = """---
wf_with_concurrency_2 = """---
version: "2.0"
concurrency_test:
@ -722,10 +721,11 @@ class WithItemsEngineTest(base.EngineTestCase):
concurrency: 2
"""
wf_service.create_workflows(workflow_with_concurrency_2)
wf_service.create_workflows(wf_with_concurrency_2)
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -739,6 +739,7 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(2, self.get_running_action_exs_number(task_ex))
@ -749,6 +750,7 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(0, task_ex)
self.assertEqual(2, self.get_running_action_exs_number(task_ex))
@ -759,6 +761,7 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(1, task_ex)
# 4th iteration complete.
@ -768,16 +771,17 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(2, task_ex)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
task_ex = db_api.get_task_execution(task_ex.id)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
self.assertIn('John', result)
@ -788,7 +792,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_ex.state)
def test_with_items_concurrency_2_fail(self):
workflow_with_concurrency_2_fail = """---
wf_with_concurrency_2_fail = """---
version: "2.0"
concurrency_test_fail:
@ -805,14 +809,13 @@ class WithItemsEngineTest(base.EngineTestCase):
action: std.echo output="With-items failed"
"""
wf_service.create_workflows(workflow_with_concurrency_2_fail)
wf_service.create_workflows(wf_with_concurrency_2_fail)
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test_fail', {})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_execution(wf_ex.id)
task_exs = wf_ex.task_executions
@ -822,12 +825,12 @@ class WithItemsEngineTest(base.EngineTestCase):
task_2 = self._assert_single_item(task_exs, name='task2')
self.assertEqual(
"With-items failed",
'With-items failed',
data_flow.get_task_execution_result(task_2)
)
def test_with_items_concurrency_3(self):
workflow_with_concurrency_3 = """---
wf_with_concurrency_3 = """---
version: "2.0"
concurrency_test:
@ -843,10 +846,12 @@ class WithItemsEngineTest(base.EngineTestCase):
concurrency: 3
"""
wf_service.create_workflows(workflow_with_concurrency_3)
wf_service.create_workflows(wf_with_concurrency_3)
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -860,6 +865,7 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(1, task_ex)
# 2nd iteration complete.
@ -869,6 +875,7 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(2, task_ex)
# 3rd iteration complete.
@ -878,26 +885,27 @@ class WithItemsEngineTest(base.EngineTestCase):
)
task_ex = db_api.get_task_execution(task_ex.id)
self.assert_capacity(3, task_ex)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
task_ex = db_api.get_task_execution(task_ex.id)
self.assertEqual(states.SUCCESS, task_ex.state)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
self.assertIn('John', result)
self.assertIn('Ivan', result)
self.assertIn('Mistral', result)
self.assertEqual(states.SUCCESS, task_ex.state)
def test_with_items_concurrency_gt_list_length(self):
workflow_definition = """---
wf_definition = """---
version: "2.0"
concurrency_test:
@ -913,26 +921,29 @@ class WithItemsEngineTest(base.EngineTestCase):
concurrency: 3
"""
wf_service.create_workflows(workflow_definition)
wf_service.create_workflows(wf_definition)
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
result = data_flow.get_task_execution_result(task_ex)
self.assertEqual(states.SUCCESS, task_ex.state)
self.assertIsInstance(result, list)
self.assertIn('John', result)
self.assertIn('Ivan', result)
def test_with_items_retry_policy(self):
workflow = """---
wf_text = """---
version: "2.0"
with_items_retry:
@ -948,32 +959,32 @@ class WithItemsEngineTest(base.EngineTestCase):
task2:
action: std.echo output="With-items failed"
"""
wf_service.create_workflows(workflow)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('with_items_retry', {})
self._await(
lambda: self.is_execution_success(wf_ex.id)
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(2, len(tasks))
task_execs = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(2, len(task_execs))
task1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(
2,
task1.runtime_context['retry_task_policy']['retry_no']
task1_ex.runtime_context['retry_task_policy']['retry_no']
)
self.assertEqual(9, len(task1.executions))
self._assert_multiple_items(task1.executions, 3, accepted=True)
self.assertEqual(9, len(task1_ex.executions))
self._assert_multiple_items(task1_ex.executions, 3, accepted=True)
def test_with_items_retry_policy_concurrency(self):
workflow = """---
wf_text = """---
version: "2.0"
with_items_retry_concurrency:
@ -990,31 +1001,28 @@ class WithItemsEngineTest(base.EngineTestCase):
task2:
action: std.echo output="With-items failed"
"""
wf_service.create_workflows(workflow)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow(
'with_items_retry_concurrency',
{}
)
wf_ex = self.engine.start_workflow('with_items_retry_concurrency', {})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(2, len(tasks))
task_execs = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(2, len(task_execs))
self.assertEqual(12, len(task1.executions))
self._assert_multiple_items(task1.executions, 4, accepted=True)
task1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(12, len(task1_ex.executions))
self._assert_multiple_items(task1_ex.executions, 4, accepted=True)
def test_with_items_env(self):
workflow = """---
wf_text = """---
version: "2.0"
with_items_env:
@ -1023,19 +1031,17 @@ class WithItemsEngineTest(base.EngineTestCase):
with-items: i in [1, 2, 3, 4]
action: std.echo output="<% $.i %>.<% env().name %>"
"""
wf_service.create_workflows(workflow)
env = {'name': 'Mistral'}
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow(
'with_items_env',
{},
env=env
env={'name': 'Mistral'}
)
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1058,7 +1064,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task1.state)
def test_with_items_two_tasks_second_starts_on_success(self):
workbook = """---
wb_text = """---
version: "2.0"
name: wb1
@ -1076,7 +1082,8 @@ class WithItemsEngineTest(base.EngineTestCase):
with-items: i in [3, 4]
action: std.echo output=<% $.i %>
"""
wb_service.create_workbook_v2(workbook)
wb_service.create_workbook_v2(wb_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
@ -1086,14 +1093,21 @@ class WithItemsEngineTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(states.SUCCESS, task2.state)
task_execs = wf_ex.task_executions
result_task1 = data_flow.get_task_execution_result(task1)
result_task2 = data_flow.get_task_execution_result(task2)
task1_ex = self._assert_single_item(
task_execs,
name='task1',
state=states.SUCCESS
)
task2_ex = self._assert_single_item(
task_execs,
name='task2',
state=states.SUCCESS
)
result_task1 = data_flow.get_task_execution_result(task1_ex)
result_task2 = data_flow.get_task_execution_result(task2_ex)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
@ -1103,45 +1117,51 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn(4, result_task2)
def test_with_items_subflow_concurrency_gt_list_length(self):
workbook_definition = """---
wb_text = """---
version: "2.0"
name: wb1
workflows:
main:
type: direct
input:
- names
tasks:
task1:
with-items: name in <% $.names %>
workflow: subflow1 name=<% $.name %>
concurrency: 3
subflow1:
type: direct
input:
- name
output:
result: <% task(task1).result %>
tasks:
task1:
action: std.echo output=<% $.name %>
"""
wb_service.create_workbook_v2(workbook_definition)
wb_service.create_workbook_v2(wb_text)
# Start workflow.
names = ["Peter", "Susan", "Edmund", "Lucy", "Aslan", "Caspian"]
wf_ex = self.engine.start_workflow('wb1.main', {'names': names})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
self.assertEqual(states.SUCCESS, task_ex.state)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1',
state=states.SUCCESS
)
result = [
item['result']

View File

@ -63,15 +63,15 @@ class RunExistingTask(WorkflowCommand):
"""Command for running already existent task."""
def __init__(self, task_ex, reset=True):
wf_ex = task_ex.workflow_execution
task_spec = spec_parser.get_task_spec(task_ex.spec)
super(RunExistingTask, self).__init__(
task_ex.workflow_execution,
spec_parser.get_task_spec(task_ex.spec),
task_ex.in_context
)
self.task_ex = task_ex
self.reset = reset
super(RunExistingTask, self).__init__(
wf_ex, task_spec, task_ex.in_context
)
class SetWorkflowState(WorkflowCommand):
"""Instruction to change a workflow state."""