Fix REST API dangling transactions

Make all DB API public function session aware

Change-Id: I2dcba176fa016076372c4dde22cc70986daddc2b
Closes-Bug: #1640469
This commit is contained in:
Xavier Hardy 2016-11-10 17:18:36 +01:00 committed by Renat Akhmerov
parent f1071f5908
commit c70779041d
35 changed files with 1587 additions and 1084 deletions

View File

@ -169,7 +169,7 @@ def _paginate_query(model, limit=None, marker=None, sort_keys=None,
return query
def _delete_all(model, session=None, **kwargs):
def _delete_all(model, **kwargs):
# NOTE(kong): Because we use 'in_' operator in _secure_query(), delete()
# method will raise error with default parameter. Please refer to
# http://docs.sqlalchemy.org/en/rel_1_0/orm/query.html#sqlalchemy.orm.query.Query.delete
@ -323,7 +323,8 @@ def insert_or_ignore(model_cls, values, session=None):
# Workbook definitions.
def get_workbook(name):
@b.session_aware()
def get_workbook(name, session=None):
wb = _get_db_object_by_name(models.Workbook, name)
if not wb:
@ -334,11 +335,13 @@ def get_workbook(name):
return wb
def load_workbook(name):
@b.session_aware()
def load_workbook(name, session=None):
return _get_db_object_by_name(models.Workbook, name)
def get_workbooks(**kwargs):
@b.session_aware()
def get_workbooks(session=None, **kwargs):
return _get_collection_sorted_by_name(models.Workbook, **kwargs)
@ -383,13 +386,14 @@ def delete_workbook(name, session=None):
@b.session_aware()
def delete_workbooks(**kwargs):
def delete_workbooks(session=None, **kwargs):
return _delete_all(models.Workbook, **kwargs)
# Workflow definitions.
def get_workflow_definition(identifier):
@b.session_aware()
def get_workflow_definition(identifier, session=None):
"""Gets workflow definition by name or uuid.
:param identifier: Identifier could be in the format of plain string or
@ -409,7 +413,8 @@ def get_workflow_definition(identifier):
return wf_def
def get_workflow_definition_by_id(id):
@b.session_aware()
def get_workflow_definition_by_id(id, session=None):
wf_def = _get_db_object_by_id(models.WorkflowDefinition, id)
if not wf_def:
@ -420,11 +425,14 @@ def get_workflow_definition_by_id(id):
return wf_def
def load_workflow_definition(name):
@b.session_aware()
def load_workflow_definition(name, session=None):
return _get_db_object_by_name(models.WorkflowDefinition, name)
def get_workflow_definitions(sort_keys=['created_at'], fields=None, **kwargs):
@b.session_aware()
def get_workflow_definitions(sort_keys=['created_at'], fields=None,
session=None, **kwargs):
if fields and 'input' in fields:
fields.remove('input')
fields.append('spec')
@ -544,13 +552,14 @@ def delete_workflow_definition(identifier, session=None):
@b.session_aware()
def delete_workflow_definitions(**kwargs):
def delete_workflow_definitions(session=None, **kwargs):
return _delete_all(models.WorkflowDefinition, **kwargs)
# Action definitions.
def get_action_definition_by_id(id):
@b.session_aware()
def get_action_definition_by_id(id, session=None):
action_def = _get_db_object_by_id(models.ActionDefinition, id)
if not action_def:
@ -561,7 +570,8 @@ def get_action_definition_by_id(id):
return action_def
def get_action_definition(identifier):
@b.session_aware()
def get_action_definition(identifier, session=None):
a_def = _get_db_object_by_name_or_id(
models.ActionDefinition,
identifier
@ -575,11 +585,13 @@ def get_action_definition(identifier):
return a_def
def load_action_definition(name):
@b.session_aware()
def load_action_definition(name, session=None):
return _get_db_object_by_name(models.ActionDefinition, name)
def get_action_definitions(**kwargs):
@b.session_aware()
def get_action_definitions(session=None, **kwargs):
return _get_collection_sorted_by_name(
model=models.ActionDefinition,
**kwargs
@ -627,13 +639,14 @@ def delete_action_definition(identifier, session=None):
@b.session_aware()
def delete_action_definitions(**kwargs):
def delete_action_definitions(session=None, **kwargs):
return _delete_all(models.ActionDefinition, **kwargs)
# Action executions.
def get_action_execution(id):
@b.session_aware()
def get_action_execution(id, session=None):
a_ex = _get_db_object_by_id(models.ActionExecution, id)
if not a_ex:
@ -644,15 +657,18 @@ def get_action_execution(id):
return a_ex
def load_action_execution(id):
@b.session_aware()
def load_action_execution(id, session=None):
return _get_db_object_by_id(models.ActionExecution, id)
def ensure_action_execution_exists(id):
@b.session_aware()
def ensure_action_execution_exists(id, session=None):
get_action_execution(id)
def get_action_executions(**kwargs):
@b.session_aware()
def get_action_executions(session=None, **kwargs):
return _get_action_executions(**kwargs)
@ -697,7 +713,7 @@ def delete_action_execution(id, session=None):
@b.session_aware()
def delete_action_executions(**kwargs):
def delete_action_executions(session=None, **kwargs):
return _delete_all(models.ActionExecution, **kwargs)
@ -707,7 +723,8 @@ def _get_action_executions(**kwargs):
# Workflow executions.
def get_workflow_execution(id):
@b.session_aware()
def get_workflow_execution(id, session=None):
wf_ex = _get_db_object_by_id(models.WorkflowExecution, id)
if not wf_ex:
@ -718,15 +735,18 @@ def get_workflow_execution(id):
return wf_ex
def load_workflow_execution(id):
@b.session_aware()
def load_workflow_execution(id, session=None):
return _get_db_object_by_id(models.WorkflowExecution, id)
def ensure_workflow_execution_exists(id):
@b.session_aware()
def ensure_workflow_execution_exists(id, session=None):
get_workflow_execution(id)
def get_workflow_executions(**kwargs):
@b.session_aware()
def get_workflow_executions(session=None, **kwargs):
return _get_collection_sorted_by_time(
models.WorkflowExecution,
**kwargs
@ -774,13 +794,14 @@ def delete_workflow_execution(id, session=None):
@b.session_aware()
def delete_workflow_executions(**kwargs):
def delete_workflow_executions(session=None, **kwargs):
return _delete_all(models.WorkflowExecution, **kwargs)
# Tasks executions.
def get_task_execution(id):
@b.session_aware()
def get_task_execution(id, session=None):
task_ex = _get_db_object_by_id(models.TaskExecution, id)
if not task_ex:
@ -791,11 +812,13 @@ def get_task_execution(id):
return task_ex
def load_task_execution(id):
@b.session_aware()
def load_task_execution(id, session=None):
return _get_db_object_by_id(models.TaskExecution, id)
def get_task_executions(**kwargs):
@b.session_aware()
def get_task_executions(session=None, **kwargs):
return _get_task_executions(**kwargs)
@ -815,7 +838,8 @@ def _get_completed_task_executions_query(kwargs):
return query
def get_completed_task_executions(**kwargs):
@b.session_aware()
def get_completed_task_executions(session=None, **kwargs):
query = _get_completed_task_executions_query(kwargs)
return query.all()
@ -838,13 +862,15 @@ def _get_incomplete_task_executions_query(kwargs):
return query
def get_incomplete_task_executions(**kwargs):
@b.session_aware()
def get_incomplete_task_executions(session=None, **kwargs):
query = _get_incomplete_task_executions_query(kwargs)
return query.all()
def get_incomplete_task_executions_count(**kwargs):
@b.session_aware()
def get_incomplete_task_executions_count(session=None, **kwargs):
query = _get_incomplete_task_executions_query(kwargs)
return query.count()
@ -891,7 +917,7 @@ def delete_task_execution(id, session=None):
@b.session_aware()
def delete_task_executions(**kwargs):
def delete_task_executions(session=None, **kwargs):
return _delete_all(models.TaskExecution, **kwargs)
@ -973,7 +999,8 @@ def get_delayed_call(id, session=None):
return delayed_call
def get_delayed_calls(**kwargs):
@b.session_aware()
def get_delayed_calls(session=None, **kwargs):
return _get_collection(
model=models.DelayedCall,
**kwargs
@ -981,7 +1008,7 @@ def get_delayed_calls(**kwargs):
@b.session_aware()
def delete_delayed_calls(**kwargs):
def delete_delayed_calls(session=None, **kwargs):
return _delete_all(models.DelayedCall, **kwargs)
@ -1006,7 +1033,8 @@ def get_expired_executions(time, session=None):
# Cron triggers.
def get_cron_trigger(name):
@b.session_aware()
def get_cron_trigger(name, session=None):
cron_trigger = _get_db_object_by_name(models.CronTrigger, name)
if not cron_trigger:
@ -1017,11 +1045,13 @@ def get_cron_trigger(name):
return cron_trigger
def load_cron_trigger(name):
@b.session_aware()
def load_cron_trigger(name, session=None):
return _get_db_object_by_name(models.CronTrigger, name)
def get_cron_triggers(insecure=False, **kwargs):
@b.session_aware()
def get_cron_triggers(insecure=False, session=None, **kwargs):
return _get_collection_sorted_by_name(
models.CronTrigger,
insecure=insecure,
@ -1120,13 +1150,14 @@ def delete_cron_trigger(name, session=None):
@b.session_aware()
def delete_cron_triggers(**kwargs):
def delete_cron_triggers(session=None, **kwargs):
return _delete_all(models.CronTrigger, **kwargs)
# Environments.
def get_environment(name):
@b.session_aware()
def get_environment(name, session=None):
env = _get_db_object_by_name(models.Environment, name)
if not env:
@ -1137,11 +1168,13 @@ def get_environment(name):
return env
def load_environment(name):
@b.session_aware()
def load_environment(name, session=None):
return _get_db_object_by_name(models.Environment, name)
def get_environments(**kwargs):
@b.session_aware()
def get_environments(session=None, **kwargs):
return _get_collection_sorted_by_name(models.Environment, **kwargs)
@ -1188,7 +1221,7 @@ def delete_environment(name, session=None):
@b.session_aware()
def delete_environments(**kwargs):
def delete_environments(session=None, **kwargs):
return _delete_all(models.Environment, **kwargs)
@ -1245,7 +1278,8 @@ def create_resource_member(values, session=None):
return res_member
def get_resource_member(resource_id, res_type, member_id):
@b.session_aware()
def get_resource_member(resource_id, res_type, member_id, session=None):
query = _secure_query(models.ResourceMember).filter_by(
resource_type=res_type
)
@ -1267,7 +1301,8 @@ def get_resource_member(resource_id, res_type, member_id):
return res_member
def get_resource_members(resource_id, res_type):
@b.session_aware()
def get_resource_members(resource_id, res_type, session=None):
query = _secure_query(models.ResourceMember).filter_by(
resource_type=res_type
)
@ -1334,7 +1369,7 @@ def delete_resource_member(resource_id, res_type, member_id, session=None):
@b.session_aware()
def delete_resource_members(**kwargs):
def delete_resource_members(session=None, **kwargs):
return _delete_all(models.ResourceMember, **kwargs)
@ -1352,7 +1387,8 @@ def _get_accepted_resources(res_type):
# Event triggers.
def get_event_trigger(id, insecure=False):
@b.session_aware()
def get_event_trigger(id, insecure=False, session=None):
event_trigger = _get_event_trigger(id, insecure)
if not event_trigger:
@ -1363,7 +1399,8 @@ def get_event_trigger(id, insecure=False):
return event_trigger
def get_event_triggers(insecure=False, **kwargs):
@b.session_aware()
def get_event_triggers(insecure=False, session=None, **kwargs):
return _get_collection_sorted_by_time(
model=models.EventTrigger,
insecure=insecure,
@ -1411,7 +1448,7 @@ def delete_event_trigger(id, session=None):
@b.session_aware()
def delete_event_triggers(**kwargs):
def delete_event_triggers(session=None, **kwargs):
return _delete_all(models.EventTrigger, **kwargs)
@ -1422,7 +1459,8 @@ def _get_event_trigger(id, insecure=False):
return _get_db_object_by_id(models.EventTrigger, id)
def ensure_event_trigger_exists(id):
@b.session_aware()
def ensure_event_trigger_exists(id, session=None):
get_event_trigger(id)
@ -1447,7 +1485,8 @@ def create_named_lock(name, session=None):
return lock_id
def get_named_locks(**kwargs):
@b.session_aware()
def get_named_locks(session=None, **kwargs):
return _get_collection(models.NamedLock, **kwargs)

View File

@ -412,10 +412,13 @@ def _get_environment(params):
def _send_result_to_parent_workflow(wf_ex_id):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_output = wf_ex.output
if wf_ex.state == states.SUCCESS:
result = wf_utils.Result(data=wf_ex.output)
result = wf_utils.Result(data=wf_output)
elif wf_ex.state == states.ERROR:
err_msg = (
wf_ex.state_info or

View File

@ -1175,6 +1175,7 @@ ACTION_EXECS = [
class ActionExecutionTest(SQLAlchemyTest):
def test_create_and_get_and_load_action_execution(self):
with db_api.transaction():
created = db_api.create_action_execution(ACTION_EXECS[0])
fetched = db_api.get_action_execution(created.id)
@ -1236,6 +1237,7 @@ class ActionExecutionTest(SQLAlchemyTest):
self.assertEqual(updated, fetched)
def test_get_action_executions(self):
with db_api.transaction():
created0 = db_api.create_action_execution(WF_EXECS[0])
db_api.create_action_execution(ACTION_EXECS[1])
@ -1247,6 +1249,7 @@ class ActionExecutionTest(SQLAlchemyTest):
self.assertEqual(created0, fetched[0])
def test_delete_action_execution(self):
with db_api.transaction():
created = db_api.create_action_execution(ACTION_EXECS[0])
fetched = db_api.get_action_execution(created.id)
@ -1331,6 +1334,7 @@ WF_EXECS = [
class WorkflowExecutionTest(SQLAlchemyTest):
def test_create_and_get_and_load_workflow_execution(self):
with db_api.transaction():
created = db_api.create_workflow_execution(WF_EXECS[0])
fetched = db_api.get_workflow_execution(created.id)
@ -1341,7 +1345,9 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created, fetched)
self.assertIsNone(db_api.load_workflow_execution("not-existing-id"))
self.assertIsNone(
db_api.load_workflow_execution("not-existing-id")
)
def test_update_workflow_execution(self):
with db_api.transaction():
@ -1395,6 +1401,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(updated, fetched)
def test_get_workflow_executions(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
db_api.create_workflow_execution(WF_EXECS[1])
@ -1406,6 +1413,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created0, fetched[0])
def test_filter_workflow_execution_by_equal_value(self):
with db_api.transaction():
db_api.create_workflow_execution(WF_EXECS[0])
created = db_api.create_workflow_execution(WF_EXECS[1])
@ -1420,6 +1428,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created, fetched[0])
def test_filter_workflow_execution_by_not_equal_value(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
@ -1435,6 +1444,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created1, fetched[0])
def test_filter_workflow_execution_by_greater_than_value(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
@ -1449,6 +1459,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created1, fetched[0])
def test_filter_workflow_execution_by_greater_than_equal_value(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
@ -1464,6 +1475,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created1, fetched[1])
def test_filter_workflow_execution_by_less_than_value(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
@ -1478,6 +1490,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created0, fetched[0])
def test_filter_workflow_execution_by_less_than_equal_value(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
@ -1493,6 +1506,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created1, fetched[1])
def test_filter_workflow_execution_by_values_in_list(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
db_api.create_workflow_execution(WF_EXECS[1])
@ -1507,6 +1521,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created0, fetched[0])
def test_filter_workflow_execution_by_values_notin_list(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
@ -1521,6 +1536,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created1, fetched[0])
def test_filter_workflow_execution_by_multiple_columns(self):
with db_api.transaction():
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
@ -1541,6 +1557,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(created1, fetched[0])
def test_delete_workflow_execution(self):
with db_api.transaction():
created = db_api.create_workflow_execution(WF_EXECS[0])
fetched = db_api.get_workflow_execution(created.id)
@ -1652,6 +1669,7 @@ TASK_EXECS = [
class TaskExecutionTest(SQLAlchemyTest):
def test_create_and_get_and_load_task_execution(self):
with db_api.transaction():
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
values = copy.deepcopy(TASK_EXECS[0])
@ -2440,6 +2458,7 @@ class TXTest(SQLAlchemyTest):
self.assertFalse(self.is_db_session_open())
with db_api.transaction():
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(created, fetched)
@ -2447,6 +2466,7 @@ class TXTest(SQLAlchemyTest):
fetched_wb = db_api.get_workbook(created_wb.name)
self.assertEqual(created_wb, fetched_wb)
self.assertFalse(self.is_db_session_open())

View File

@ -64,10 +64,15 @@ class ActionContextTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
action_ex = self._assert_single_item(task_ex.executions)
headers = {

View File

@ -109,6 +109,7 @@ class ActionDefaultTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
@ -134,6 +135,7 @@ class ActionDefaultTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
@ -171,6 +173,7 @@ class ActionDefaultTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
@ -209,6 +212,7 @@ class ActionDefaultTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)

View File

@ -98,10 +98,9 @@ class AdhocActionsTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.maxDiff = None
self.assertDictEqual(
{
'workflow_result': 'a+b and a+b',
@ -118,17 +117,15 @@ class AdhocActionsTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.maxDiff = None
self.assertDictEqual(
{
expected_output = {
'workflow_result': 'a+b and a+b',
'concat_task_result': 'a+b and a+b'
},
wf_ex.output
)
}
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(expected_output, wf_ex.output)
def test_run_adhoc_action_without_sufficient_input_value(self):
wf_ex = self.engine.start_workflow(

View File

@ -62,11 +62,14 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -76,11 +79,14 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -90,11 +96,14 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -139,11 +148,14 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -153,11 +165,14 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -167,11 +182,14 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -249,11 +267,14 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -263,18 +284,18 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(2, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
task2_db = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
task2_db = self._assert_single_item(task_execs, name='task2')
self.await_task_success(task2_db.id)
self.await_workflow_error(wf_ex.id)
@ -284,11 +305,14 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -298,18 +322,18 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(2, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
task2_db = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
task2_db = self._assert_single_item(task_execs, name='task2')
self.await_task_error(task2_db.id)
self.await_workflow_success(wf_ex.id)
@ -351,11 +375,14 @@ class SimpleEngineCmdsWithMsgTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -367,11 +394,14 @@ class SimpleEngineCmdsWithMsgTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -383,11 +413,14 @@ class SimpleEngineCmdsWithMsgTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -433,11 +466,14 @@ class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -449,11 +485,14 @@ class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -465,11 +504,14 @@ class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)

View File

@ -67,13 +67,14 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
@ -136,13 +137,14 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
@ -171,9 +173,18 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertDictEqual(exp_published_arr[1], task2.published)
self.assertDictEqual(exp_published_arr[2], task3.published)
self.assertIn(exp_published_arr[0]['progress'], notify_published_arr)
self.assertIn(exp_published_arr[1]['progress'], notify_published_arr)
self.assertIn(exp_published_arr[2]['progress'], notify_published_arr)
self.assertIn(
exp_published_arr[0]['progress'],
notify_published_arr
)
self.assertIn(
exp_published_arr[1]['progress'],
notify_published_arr
)
self.assertIn(
exp_published_arr[2]['progress'],
notify_published_arr
)
def test_parallel_tasks(self):
parallel_tasks_wf = """---
@ -201,13 +212,15 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(2, len(tasks))
task1 = self._assert_single_item(tasks, name='task1')
@ -219,8 +232,8 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertDictEqual({'var1': 1}, task1.published)
self.assertDictEqual({'var2': 2}, task2.published)
self.assertEqual(1, wf_ex.output['var1'])
self.assertEqual(2, wf_ex.output['var2'])
self.assertEqual(1, wf_output['var1'])
self.assertEqual(2, wf_output['var2'])
def test_parallel_tasks_complex(self):
parallel_tasks_complex_wf = """---
@ -277,13 +290,15 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(6, len(tasks))
task1 = self._assert_single_item(tasks, name='task1')
@ -306,12 +321,12 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertDictEqual({'var2': 2}, task2.published)
self.assertDictEqual({'var21': 21}, task21.published)
self.assertEqual(1, wf_ex.output['var1'])
self.assertEqual(12, wf_ex.output['var12'])
self.assertNotIn('var13', wf_ex.output)
self.assertEqual(14, wf_ex.output['var14'])
self.assertEqual(2, wf_ex.output['var2'])
self.assertEqual(21, wf_ex.output['var21'])
self.assertEqual(1, wf_output['var1'])
self.assertEqual(12, wf_output['var12'])
self.assertNotIn('var13', wf_output)
self.assertEqual(14, wf_output['var14'])
self.assertEqual(2, wf_output['var2'])
self.assertEqual(21, wf_output['var21'])
def test_sequential_tasks_publishing_same_var(self):
var_overwrite_wf = """---
@ -358,13 +373,14 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
@ -416,12 +432,13 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
@ -469,10 +486,12 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task4 = self._assert_single_item(tasks, name='task4')
self.assertDictEqual(
@ -502,6 +521,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -541,10 +561,14 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task1 = self._assert_single_item(wf_ex.task_executions, name='task1')
task1 = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
result = data_flow.get_task_execution_result(task1)
@ -575,20 +599,28 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.ERROR, task1.state)
self.assertEqual('hello_from_error', task1.published['hi'])
self.assertIn('Fail action expected exception', task1.published['err'])
self.assertEqual('hello_from_error', wf_ex.output['out'])
self.assertIn('Fail action expected exception', wf_ex.output['result'])
self.assertIn(
'Fail action expected exception',
task1.published['err']
)
self.assertEqual('hello_from_error', wf_output['out'])
self.assertIn(
'Fail action expected exception',
wf_output['result']
)
def test_output_on_error_wb_yaql_failed(self):
wb_def = """---
@ -627,17 +659,19 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Failed to evaluate expression in output-on-error!',
wf_ex.state_info)
self.assertIn('$.message', wf_ex.state_info)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self.assertIn('task(task1).result.message', task1.state_info)

View File

@ -118,12 +118,15 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual('my execution', wf_ex.description)
self.assertIn('__execution', wf_ex.context)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
task_ex = wf_ex.task_executions[0]
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
@ -158,11 +161,14 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIn('__execution', wf_ex.context)
# Note: We need to reread execution to access related tasks.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
task_ex = wf_ex.task_executions[0]
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
@ -299,12 +305,15 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(wf_ex)
self.assertEqual(states.RUNNING, wf_ex.state)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
task1_ex = wf_ex.task_executions[0]
self.assertEqual(1, len(task_execs))
task1_ex = task_execs[0]
self.assertEqual('task1', task1_ex.name)
self.assertEqual(states.RUNNING, task1_ex.state)
@ -340,17 +349,17 @@ class DefaultEngineTest(base.DbTestCase):
self.assertDictEqual({'output': 'Hey'}, task1_action_ex.input)
self.assertDictEqual({'result': 'Hey'}, task1_action_ex.output)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIsNotNone(wf_ex)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(2, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
task2_ex = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
self.assertEqual(2, len(task_execs))
task2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.RUNNING, task2_ex.state)
@ -371,10 +380,13 @@ class DefaultEngineTest(base.DbTestCase):
wf_utils.Result(data='Hi')
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIsNotNone(wf_ex)
task_execs = wf_ex.task_executions
# Workflow completion check is done separate with scheduler
# but scheduler doesn't start in this test (in fact, it's just
# a DB test)so the workflow is expected to be in running state.
@ -390,10 +402,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertDictEqual({'output': 'Hi'}, task2_action_ex.input)
self.assertDictEqual({'result': 'Hi'}, task2_action_ex.output)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
self._assert_single_item(wf_ex.task_executions, name='task1')
self._assert_single_item(wf_ex.task_executions, name='task2')
self._assert_single_item(task_execs, name='task1')
self._assert_single_item(task_execs, name='task2')
def test_stop_workflow_fail(self):
# Start workflow.

View File

@ -74,6 +74,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = self._run_workflow(wf_text)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
@ -116,7 +119,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
@ -271,6 +276,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
@ -324,13 +332,15 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
# and async action execution are RUNNING.
wf_ex = self._run_workflow(wf_text, states.RUNNING)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(1, len(wf_ex.task_executions))
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.RUNNING, task_1_ex.state)
@ -347,13 +357,14 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_utils.Result(data='foobar')
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
# 'task1' must be in SUCCESS.
@ -478,11 +489,16 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
# Assert that there is only one task execution and it's SUCCESS.
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1'
)
@ -518,12 +534,15 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = self._run_workflow(wf_text, states.RUNNING)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(1, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.RUNNING, task_1_ex.state)
@ -541,16 +560,16 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
)
# Assert that task1 is SUCCESS and workflow is ERROR.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.ERROR, task_1_ex.state)

View File

@ -228,14 +228,17 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
@ -252,15 +255,18 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -460,13 +466,14 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(
@ -513,13 +520,14 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='t1')
@ -543,13 +551,14 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id, delay=10)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='t1')
@ -685,13 +694,16 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_1_ex = self._assert_single_item(task_execs, name='t1')
self.assertEqual(states.ERROR, task_1_ex.state)
self.assertIsNotNone(task_1_ex.state_info)
@ -703,9 +715,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(3, len(task_1_action_exs))
# Update env in workflow execution with the following.
updated_env = {
'var1': 'foobar'
}
updated_env = {'var1': 'foobar'}
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(
@ -721,14 +731,17 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id, delay=10)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -785,15 +798,18 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.SUCCESS, task_2_ex.state)
@ -811,15 +827,18 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -846,7 +865,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(task_3_ex.state_info)
task_3_action_exs = db_api.get_action_executions(
task_execution_id=wf_ex.task_executions[2].id
task_execution_id=task_execs[2].id
)
self.assertEqual(2, len(task_3_action_exs))
@ -1022,13 +1041,16 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_1_ex = self._assert_single_item(task_execs, name='t1')
self.await_task_error(task_1_ex.id)
@ -1111,14 +1133,17 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id, delay=10)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -1131,8 +1156,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# The single action execution that succeeded should not re-run.
self.assertEqual(12, len(task_1_action_exs))
self.assertListEqual(['Task 1.0', 'Task 1.1', 'Task 1.2'],
task_1_ex.published.get('v1'))
self.assertListEqual(
['Task 1.0', 'Task 1.1', 'Task 1.2'],
task_1_ex.published.get('v1')
)
# Check action executions of task 2.
self.assertEqual(states.SUCCESS, task_2_ex.state)
@ -1163,14 +1190,17 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
@ -1178,6 +1208,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(task_2_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -1186,15 +1217,18 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -1221,7 +1255,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_action_exs = db_api.get_action_executions(
task_execution_id=task_3_ex.id)
task_execution_id=task_3_ex.id
)
self.assertEqual(1, len(task_3_action_exs))
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
@ -1246,32 +1281,37 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
self.assertIsNotNone(task_2_ex.state_info)
with db_api.transaction():
# Get subworkflow and related task
sub_wf_exs = db_api.get_workflow_executions(
task_execution_id=task_2_ex.id
)
sub_wf_ex = sub_wf_exs[0]
sub_wf_task_execs = sub_wf_ex.task_executions
self.assertEqual(states.ERROR, sub_wf_ex.state)
self.assertIsNotNone(sub_wf_ex.state_info)
self.assertEqual(1, len(sub_wf_ex.task_executions))
self.assertEqual(1, len(sub_wf_task_execs))
sub_wf_task_ex = self._assert_single_item(
sub_wf_ex.task_executions,
sub_wf_task_execs,
name='wf2_t1'
)
@ -1294,14 +1334,17 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Wait for the subworkflow to succeed.
self.await_workflow_success(sub_wf_ex.id)
with db_api.transaction():
sub_wf_ex = db_api.get_workflow_execution(sub_wf_ex.id)
sub_wf_task_execs = sub_wf_ex.task_executions
self.assertEqual(states.SUCCESS, sub_wf_ex.state)
self.assertIsNone(sub_wf_ex.state_info)
self.assertEqual(1, len(sub_wf_ex.task_executions))
self.assertEqual(1, len(sub_wf_task_execs))
sub_wf_task_ex = self._assert_single_item(
sub_wf_ex.task_executions,
sub_wf_task_execs,
name='wf2_t1'
)
@ -1310,7 +1353,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(sub_wf_task_ex.state_info)
sub_wf_task_ex_action_exs = db_api.get_action_executions(
task_execution_id=sub_wf_task_ex.id)
task_execution_id=sub_wf_task_ex.id
)
self.assertEqual(2, len(sub_wf_task_ex_action_exs))
self.assertEqual(states.ERROR, sub_wf_task_ex_action_exs[0].state)
@ -1319,21 +1363,25 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Wait for the main workflow to succeed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
task_execution_id=task_1_ex.id
)
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)

View File

@ -102,11 +102,7 @@ class EnvironmentTest(base.EngineTestCase):
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET)
def _test_subworkflow(self, env):
wf2_ex = self.engine.start_workflow(
'my_wb.wf2',
{},
env=env
)
wf2_ex = self.engine.start_workflow('my_wb.wf2', {}, env=env)
# Execution of 'wf2'.
self.assertIsNotNone(wf2_ex)
@ -142,21 +138,26 @@ class EnvironmentTest(base.EngineTestCase):
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)
with db_api.transaction():
wf1_ex = db_api.get_workflow_execution(wf1_ex.id)
expected_wf1_output = {'final_result': "'Bonnie & Clyde'"}
self.assertDictEqual(wf1_ex.output, expected_wf1_output)
self.assertDictEqual(
{'final_result': "'Bonnie & Clyde'"},
wf1_ex.output
)
# Wait till workflow 'wf2' is completed.
self.await_workflow_success(wf2_ex.id)
with db_api.transaction():
wf2_ex = db_api.get_workflow_execution(wf2_ex.id)
expected_wf2_output = {'slogan': "'Bonnie & Clyde' is a cool movie!\n"}
self.assertDictEqual(wf2_ex.output, expected_wf2_output)
self.assertDictEqual(
{'slogan': "'Bonnie & Clyde' is a cool movie!\n"},
wf2_ex.output
)
with db_api.transaction():
# Check if target is resolved.
wf1_task_execs = db_api.get_task_executions(
workflow_execution_id=wf1_ex.id

View File

@ -82,6 +82,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
@ -109,6 +110,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
@ -138,6 +140,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
# Now we need to make sure that task is in ERROR state but action
@ -195,6 +198,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
# Now we need to make sure that task is in ERROR state but action
@ -202,8 +206,9 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
# must not affect action state.
task_execs = wf_ex.task_executions
# NOTE: task3 must not run because on-error handler triggers only
# on error outcome of an action (or workflow) associated with a task.
# NOTE: task3 must not run because on-error handler triggers
# only on error outcome of an action (or workflow) associated
# with a task.
self.assertEqual(1, len(task_execs))
task_ex = self._assert_single_item(
@ -244,6 +249,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
# Now we need to make sure that task and action are in SUCCESS
@ -295,6 +301,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
@ -342,6 +349,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions

View File

@ -93,6 +93,7 @@ class ErrorResultTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -106,8 +107,8 @@ class ErrorResultTest(base.EngineTestCase):
self.assertEqual(states.ERROR, task1.state)
self.assertEqual(states.SUCCESS, task2.state)
# "publish" clause is ignored in case of ERROR so task execution field
# must be empty.
# "publish" clause is ignored in case of ERROR so task execution
# field must be empty.
self.assertDictEqual({}, task1.published)
self.assertEqual(2, data_flow.get_task_execution_result(task1))
@ -125,6 +126,7 @@ class ErrorResultTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -138,8 +140,8 @@ class ErrorResultTest(base.EngineTestCase):
self.assertEqual(states.ERROR, task1.state)
self.assertEqual(states.SUCCESS, task3.state)
# "publish" clause is ignored in case of ERROR so task execution field
# must be empty.
# "publish" clause is ignored in case of ERROR so task execution
# field must be empty.
self.assertDictEqual({}, task1.published)
self.assertEqual(3, data_flow.get_task_execution_result(task1))
@ -157,6 +159,7 @@ class ErrorResultTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -168,7 +171,10 @@ class ErrorResultTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task1.state)
# "publish" clause is ignored in case of ERROR so task execution field
# must be empty.
# "publish" clause is ignored in case of ERROR so task execution
# field must be empty.
self.assertDictEqual({'p_var': 'success'}, task1.published)
self.assertEqual('success', data_flow.get_task_execution_result(task1))
self.assertEqual(
'success',
data_flow.get_task_execution_result(task1)
)

View File

@ -191,17 +191,19 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertIn(
'Failed to handle action completion [error=Size of',
wf_ex.state_info
)
self.assertIn('wf=wf, task=task1', wf_ex.state_info)
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
task_ex = self._assert_single_item(task_execs, name='task1')
self.assertIn(
"Size of 'published' is 1KB which exceeds the limit of 0KB",

View File

@ -95,8 +95,10 @@ class JavaScriptEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.SUCCESS, task_ex.state)

View File

@ -202,13 +202,17 @@ class JoinEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self._await(
lambda:
len(db_api.get_workflow_execution(wf_ex.id).task_executions) == 4
def _num_of_tasks():
return len(
db_api.get_task_executions(workflow_execution_id=wf_ex.id)
)
self._await(lambda: _num_of_tasks() == 4)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
@ -857,7 +861,10 @@ class JoinEngineTest(base.EngineTestCase):
mtd_name = 'mistral.engine.task_handler._refresh_task_state'
self._assert_single_item(calls, target_method_name=mtd_name)
cnt = sum([1 for c in calls if c.target_method_name == mtd_name])
# There can be 2 calls with different value of 'processing' flag.
self.assertTrue(cnt == 1 or cnt == 2)
# Stop the workflow.
self.engine.stop_workflow(wf_ex.id, state=states.CANCELLED)
@ -912,7 +919,10 @@ class JoinEngineTest(base.EngineTestCase):
mtd_name = 'mistral.engine.task_handler._refresh_task_state'
self._assert_single_item(calls, target_method_name=mtd_name)
cnt = sum([1 for c in calls if c.target_method_name == mtd_name])
# There can be 2 calls with different value of 'processing' flag.
self.assertTrue(cnt == 1 or cnt == 2)
# Stop the workflow.
db_api.delete_workflow_execution(wf_ex.id)

View File

@ -82,9 +82,11 @@ class NoopTaskEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(4, len(tasks))
@ -99,7 +101,7 @@ class NoopTaskEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task3.state)
self.assertEqual(states.SUCCESS, task4.state)
self.assertDictEqual({'result': 4}, wf_ex.output)
self.assertDictEqual({'result': 4}, wf_output)
def test_noop_task2(self):
wf_service.create_workflows(WF)
@ -109,9 +111,11 @@ class NoopTaskEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(4, len(tasks))
@ -126,4 +130,4 @@ class NoopTaskEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task3.state)
self.assertEqual(states.SUCCESS, task5.state)
self.assertDictEqual({'result': 5}, wf_ex.output)
self.assertDictEqual({'result': 5}, wf_output)

View File

@ -424,8 +424,10 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING_DELAYED, task_ex.state)
@ -440,15 +442,17 @@ class PoliciesTest(base.EngineTestCase):
wb_service.create_workbook_v2(WAIT_BEFORE_FROM_VAR)
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf1', {'wait_before': 1})
wf_ex = self.engine.start_workflow('wb.wf1', {'wait_before': 1})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING_DELAYED, task_db.state)
task_ex = wf_ex.task_executions[0]
self.await_workflow_success(exec_db.id)
self.assertEqual(states.RUNNING_DELAYED, task_ex.state)
self.await_workflow_success(wf_ex.id)
def test_wait_before_policy_two_tasks(self):
wf_text = """---
@ -472,6 +476,7 @@ class PoliciesTest(base.EngineTestCase):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
@ -484,8 +489,10 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
@ -500,8 +507,10 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {'wait_after': 2})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
@ -518,8 +527,10 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
@ -530,7 +541,9 @@ class PoliciesTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
@ -544,8 +557,10 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {'count': 3, 'delay': 1})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
@ -572,20 +587,25 @@ class PoliciesTest(base.EngineTestCase):
count: 3
delay: 1
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_success(task_ex.id)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
@ -616,15 +636,19 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
@ -655,15 +679,19 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
@ -695,15 +723,19 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_success(task_ex.id)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
@ -732,15 +764,18 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_success(task_ex.id)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
@ -768,15 +803,18 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
@ -811,13 +849,16 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.main', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -854,15 +895,19 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_success(task_ex.id)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
task_ex = wf_ex.task_executions[0]
self.assertDictEqual(
@ -870,7 +915,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex.runtime_context['retry_task_policy']
)
self.assertDictEqual({'result': 'mocked result'}, wf_ex.output)
self.assertDictEqual({'result': 'mocked result'}, wf_output)
@mock.patch.object(
std_actions.EchoAction,
@ -899,24 +944,26 @@ class PoliciesTest(base.EngineTestCase):
count: 3
delay: 1
"""
wf_service.create_workflows(retry_wf)
wf_ex = self.engine.start_workflow('wf1', {})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
retry_task = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
wf_output = wf_ex.output
task_execs = wf_ex.task_executions
retry_task = self._assert_single_item(task_execs, name='task2')
self.assertDictEqual(
{'retry_no': 1},
retry_task.runtime_context['retry_task_policy']
)
self.assertDictEqual({'result': 'value'}, wf_ex.output)
self.assertDictEqual({'result': 'value'}, wf_output)
def test_timeout_policy(self):
wb_service.create_workbook_v2(TIMEOUT_WB)
@ -924,17 +971,22 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
self.await_task_error(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self._assert_single_item(wf_ex.task_executions, name='task1')
task_execs = wf_ex.task_executions
self._assert_single_item(task_execs, name='task1')
self.await_workflow_success(wf_ex.id)
@ -944,8 +996,10 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
@ -955,11 +1009,13 @@ class PoliciesTest(base.EngineTestCase):
# Wait until timeout exceeds.
self._sleep(1)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks_db = wf_ex.task_executions
task_execs = wf_ex.task_executions
# Make sure that engine did not create extra tasks.
self.assertEqual(1, len(tasks_db))
self.assertEqual(1, len(task_execs))
def test_timeout_policy_from_var(self):
wb_service.create_workbook_v2(TIMEOUT_FROM_VAR)
@ -967,8 +1023,10 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {'timeout': 1})
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(states.RUNNING, task_ex.state)
@ -984,11 +1042,12 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.IDLE, task_ex.state)
@ -998,20 +1057,22 @@ class PoliciesTest(base.EngineTestCase):
self.engine.resume_workflow(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self._assert_single_item(wf_ex.task_executions, name='task1')
task_execs = wf_ex.task_executions
self._assert_single_item(task_execs, name='task1')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
next_task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='task1')
next_task_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.SUCCESS, task_ex.state)
self.assertEqual(states.SUCCESS, next_task_ex.state)
@ -1022,11 +1083,12 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.IDLE, task_ex.state)
@ -1042,24 +1104,27 @@ class PoliciesTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
task_ex = db_api.get_task_execution(task_ex.id)
self.assertEqual(states.IDLE, task_ex.state)
self.engine.resume_workflow(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self._assert_single_item(wf_ex.task_executions, name='task1')
task_execs = wf_ex.task_executions
self._assert_single_item(task_execs, name='task1')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
next_task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='task1')
next_task_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.SUCCESS, task_ex.state)
self.assertEqual(states.SUCCESS, next_task_ex.state)
@ -1072,14 +1137,14 @@ class PoliciesTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.SUCCESS, task_ex.state)
self.assertEqual(4, task_ex.runtime_context['concurrency'])
def test_concurrency_is_in_runtime_context_from_var(self):
@ -1088,12 +1153,12 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {'concurrency': 4})
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(4, task_ex.runtime_context['concurrency'])
@ -1143,6 +1208,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -143,27 +143,36 @@ class EngineActionRaceConditionTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', None)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.RUNNING, wf_ex.task_executions[0].state)
self.assertEqual(states.RUNNING, task_execs[0].state)
self.wait_for_action()
with db_api.transaction():
# Here's the point when the action is blocked but already running.
# Do the same check again, it should always pass.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.RUNNING, wf_ex.task_executions[0].state)
self.assertEqual(states.RUNNING, task_execs[0].state)
self.unblock_action()
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual({'result': 'test'}, wf_ex.output)
wf_output = wf_ex.output
self.assertDictEqual({'result': 'test'}, wf_output)
# TODO(rakhmerov): Should periodically fail now because of poor
# transaction isolation support in SQLite. Requires more research

View File

@ -84,13 +84,16 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self.assertEqual(1, len(db_api.get_task_executions()))
task_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -114,13 +117,16 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(2, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
self.assertEqual(2, len(task_execs))
self.assertEqual(2, len(db_api.get_task_executions()))
task1_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -128,7 +134,7 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
self.assertDictEqual({'result1': 'a'}, task1_ex.published)
task2_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task2',
state=states.SUCCESS
)

View File

@ -89,15 +89,20 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {}, task_name='t3')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
@ -105,6 +110,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(task_2_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -112,21 +118,26 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
task_execution_id=task_1_ex.id
)
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
@ -136,7 +147,8 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(task_2_ex.state_info)
task_2_action_exs = db_api.get_action_executions(
task_execution_id=task_2_ex.id)
task_execution_id=task_2_ex.id
)
self.assertEqual(2, len(task_2_action_exs))
self.assertEqual(states.ERROR, task_2_action_exs[0].state)
@ -146,7 +158,8 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_action_exs = db_api.get_action_executions(
task_execution_id=task_3_ex.id)
task_execution_id=task_3_ex.id
)
self.assertEqual(1, len(task_3_action_exs))
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
@ -181,16 +194,20 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
@ -204,6 +221,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(task_2_ex.id, env=updated_env)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -213,21 +231,26 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
task_execution_id=task_1_ex.id
)
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
@ -242,7 +265,8 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(task_2_ex.state_info)
task_2_action_exs = db_api.get_action_executions(
task_execution_id=task_2_ex.id)
task_execution_id=task_2_ex.id
)
self.assertEqual(2, len(task_2_action_exs))
self.assertEqual(states.ERROR, task_2_action_exs[0].state)
@ -262,7 +286,8 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_action_exs = db_api.get_action_executions(
task_execution_id=task_3_ex.id)
task_execution_id=task_3_ex.id
)
self.assertEqual(1, len(task_3_action_exs))
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
@ -287,15 +312,20 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {}, task_name='t3')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)

View File

@ -121,6 +121,7 @@ class RunActionEngineTest(base.EngineTestCase):
self.await_action_success(action_ex.id)
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex.id)
self.assertEqual(states.SUCCESS, action_ex.state)
@ -149,15 +150,21 @@ class RunActionEngineTest(base.EngineTestCase):
self.assertEqual('Hello!', action_ex.output['result'])
self.assertEqual(states.SUCCESS, action_ex.state)
db_action_ex = db_api.get_action_execution(action_ex.id)
self.assertEqual(states.SUCCESS, db_action_ex.state)
self.assertEqual({'result': 'Hello!'}, db_action_ex.output)
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex.id)
self.assertEqual(states.SUCCESS, action_ex.state)
self.assertEqual({'result': 'Hello!'}, action_ex.output)
def test_run_action_run_sync_error(self):
# Start action.
self.assertRaises(
exc.InputException,
self.engine.start_action, 'std.async_noop', {}, run_sync=True)
self.engine.start_action,
'std.async_noop',
{},
run_sync=True
)
def test_run_action_async(self):
action_ex = self.engine.start_action('std.async_noop', {})
@ -176,6 +183,7 @@ class RunActionEngineTest(base.EngineTestCase):
self.await_action_error(action_ex.id)
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex.id)
self.assertEqual(states.ERROR, action_ex.state)
@ -189,6 +197,7 @@ class RunActionEngineTest(base.EngineTestCase):
self.await_action_error(action_ex.id)
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex.id)
self.assertEqual(states.ERROR, action_ex.state)

View File

@ -76,6 +76,7 @@ class TestSafeRerun(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -121,6 +122,7 @@ class TestSafeRerun(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -155,6 +157,7 @@ class TestSafeRerun(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -135,16 +135,20 @@ class ExecutionStateInfoTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_1_ex = self._assert_single_item(task_execs, name='t1')
self.assertEqual(states.ERROR, task_1_ex.state)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
task_execution_id=task_1_ex.id
)
self.assertEqual(3, len(task_1_action_exs))
self.assertIn(task_1_action_exs[0].id, wf_ex.state_info)

View File

@ -145,21 +145,27 @@ class SubworkflowsTest(base.EngineTestCase):
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)
with db_api.transaction():
wf1_ex = db_api.get_workflow_execution(wf1_ex.id)
wf1_output = wf1_ex.output
self.assertDictEqual(
{'final_result': "'Bonnie & Clyde'"},
wf1_ex.output
wf1_output
)
# Wait till workflow 'wf2' is completed.
self.await_workflow_success(wf2_ex.id, timeout=4)
with db_api.transaction():
wf2_ex = db_api.get_workflow_execution(wf2_ex.id)
wf2_output = wf2_ex.output
self.assertDictEqual(
{'slogan': "'Bonnie & Clyde' is a cool movie!"},
wf2_ex.output
wf2_output
)
# Check project_id in tasks.

View File

@ -60,6 +60,7 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -109,6 +110,7 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -152,16 +154,17 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(1, len(tasks))
self._assert_single_item(tasks, name='task1', state=states.ERROR)
task_ex = db_api.get_task_execution(tasks[0].id)
self.assertIn("Task timed out", task_ex.state_info)
def test_task_defaults_wait_policies(self):
@ -195,13 +198,13 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
2
)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(1, len(tasks))
self._assert_single_item(tasks, name='task1', state=states.SUCCESS)
def test_task_defaults_requires(self):
@ -234,6 +237,7 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -71,12 +71,15 @@ class TaskPublishTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertEqual(1, len(wf_ex.task_executions))
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
self.assertEqual(states.ERROR, wf_ex.state)
self.assertEqual(1, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='t1')
# Task 1 should have failed.
self.assertEqual(states.ERROR, task_1_ex.state)
@ -84,7 +87,8 @@ class TaskPublishTest(base.EngineTestCase):
# Action execution of task 1 should have succeeded.
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
task_execution_id=task_1_ex.id
)
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)

View File

@ -188,6 +188,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -243,6 +244,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -268,14 +270,13 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
with db_api.transaction():
task1 = db_api.get_task_execution(task1.id)
task1 = self._assert_single_item(tasks, name='task1')
result = data_flow.get_task_execution_result(task1)
@ -317,6 +318,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -333,15 +335,13 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
with db_api.transaction():
task1 = db_api.get_task_execution(task1.id)
result = data_flow.get_task_execution_result(task1)
self.assertIsInstance(result, list)
@ -363,6 +363,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -372,9 +373,6 @@ class WithItemsEngineTest(base.EngineTestCase):
# Since we know that we can receive results in random order,
# check is not depend on order of items.
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
@ -392,7 +390,9 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.wf1_with_items', WF_INPUT_URLS)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
act_exs = task_ex.executions
@ -406,10 +406,9 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
@ -452,6 +451,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -487,6 +487,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -496,9 +497,6 @@ class WithItemsEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
result = data_flow.get_task_execution_result(task1_ex)
# Since we know that we can receive results in random order,
@ -558,11 +556,14 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task1_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1',
state=states.SUCCESS
)
@ -580,6 +581,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -593,9 +595,6 @@ class WithItemsEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
@ -1038,6 +1037,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1047,12 +1047,14 @@ class WithItemsEngineTest(base.EngineTestCase):
task1_ex = self._assert_single_item(task_execs, name='task1')
task1_executions = task1_ex.executions
self.assertEqual(
2,
task1_ex.runtime_context['retry_task_policy']['retry_no']
)
self.assertEqual(9, len(task1_ex.executions))
self._assert_multiple_items(task1_ex.executions, 3, accepted=True)
self.assertEqual(9, len(task1_executions))
self._assert_multiple_items(task1_executions, 3, accepted=True)
@testtools.skip('Restore concurrency support.')
def test_with_items_retry_policy_concurrency(self):
@ -1115,14 +1117,14 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
with db_api.transaction():
task1 = db_api.get_task_execution(task1.id)
task1 = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
result = data_flow.get_task_execution_result(task1)
@ -1165,6 +1167,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1299,6 +1302,7 @@ class WithItemsEngineTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -50,25 +50,25 @@ class WorkflowCancelTest(base.EngineTestCase):
self.await_workflow_cancelled(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.await_task_success(task_1_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.CANCELLED, wf_ex.state)
self.assertEqual("Cancelled by user.", wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
self.assertEqual(states.SUCCESS, task_1_ex.state)
def test_cancel_paused_workflow(self):
@ -104,25 +104,28 @@ class WorkflowCancelTest(base.EngineTestCase):
self.await_workflow_cancelled(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.await_task_success(task_1_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task1'
)
self.assertEqual(states.CANCELLED, wf_ex.state)
self.assertEqual("Cancelled by user.", wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
self.assertEqual(states.SUCCESS, task_1_ex.state)
def test_cancel_completed_workflow(self):
@ -148,16 +151,16 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
self.assertEqual(states.SUCCESS, task_1_ex.state)
def test_cancel_parent_workflow(self):
@ -198,15 +201,21 @@ class WorkflowCancelTest(base.EngineTestCase):
self.await_workflow_cancelled(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='taskx')
self.await_task_cancelled(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='taskx')
subwf_execs = db_api.get_workflow_executions(
task_execution_id=task_ex.id
@ -250,10 +259,14 @@ class WorkflowCancelTest(base.EngineTestCase):
self.engine.start_workflow('wb.wf', {})
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_ex = self._assert_single_item(wf_execs, name='wb.subwf')
self.engine.stop_workflow(
@ -266,10 +279,14 @@ class WorkflowCancelTest(base.EngineTestCase):
self.await_task_cancelled(task_ex.id)
self.await_workflow_cancelled(wf_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_ex = self._assert_single_item(wf_execs, name='wb.subwf')
self.assertEqual(states.CANCELLED, subwf_ex.state)
@ -315,18 +332,29 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
task_execs = wf_ex.task_executions
task_ex = self._assert_single_item(task_execs, name='taskx')
self.await_workflow_cancelled(wf_ex.id)
self.await_task_cancelled(task_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_exs = self._assert_multiple_items(
wf_execs,
2,
name='wb.subwf'
)
self.assertEqual(states.CANCELLED, subwf_exs[0].state)
self.assertEqual("Cancelled by user.", subwf_exs[0].state_info)
@ -368,11 +396,19 @@ class WorkflowCancelTest(base.EngineTestCase):
self.engine.start_workflow('wb.wf', {})
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_exs = self._assert_multiple_items(
wf_execs,
2,
name='wb.subwf'
)
self.engine.stop_workflow(
subwf_exs[0].id,
@ -385,11 +421,19 @@ class WorkflowCancelTest(base.EngineTestCase):
self.await_task_cancelled(task_ex.id)
self.await_workflow_cancelled(wf_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_exs = self._assert_multiple_items(
wf_execs,
2,
name='wb.subwf'
)
self.assertEqual(states.CANCELLED, subwf_exs[0].state)
self.assertEqual("Cancelled by user.", subwf_exs[0].state_info)
@ -431,11 +475,19 @@ class WorkflowCancelTest(base.EngineTestCase):
self.engine.start_workflow('wb.wf', {})
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_exs = self._assert_multiple_items(
wf_execs,
2,
name='wb.subwf'
)
self.engine.stop_workflow(
subwf_exs[0].id,
@ -454,11 +506,19 @@ class WorkflowCancelTest(base.EngineTestCase):
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_exs = self._assert_multiple_items(
wf_execs,
2,
name='wb.subwf'
)
self.assertEqual(states.CANCELLED, subwf_exs[0].state)
self.assertEqual("Cancelled by user.", subwf_exs[0].state_info)
@ -500,11 +560,19 @@ class WorkflowCancelTest(base.EngineTestCase):
self.engine.start_workflow('wb.wf', {})
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_exs = self._assert_multiple_items(
wf_execs,
2,
name='wb.subwf'
)
self.engine.stop_workflow(
subwf_exs[1].id,
@ -523,11 +591,19 @@ class WorkflowCancelTest(base.EngineTestCase):
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='taskx'
)
subwf_exs = self._assert_multiple_items(
wf_execs,
2,
name='wb.subwf'
)
self.assertEqual(states.CANCELLED, subwf_exs[0].state)
self.assertEqual("Cancelled by user.", subwf_exs[0].state_info)

View File

@ -199,23 +199,30 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
self.engine.resume_workflow(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(2, len(wf_ex.task_executions))
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
def test_resume_reverse(self):
wb_service.create_workbook_v2(RESUME_WORKBOOK_REVERSE)
@ -227,14 +234,16 @@ class WorkflowResumeTest(base.EngineTestCase):
task_name='task2'
)
# Note: We need to reread execution to access related tasks.
self.engine.pause_workflow(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(1, len(wf_ex.task_executions))
self.assertEqual(1, len(task_execs))
self.engine.resume_workflow(wf_ex.id)
@ -243,10 +252,14 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
def test_resume_two_branches(self):
wb_service.create_workbook_v2(WORKBOOK_TWO_BRANCHES)
@ -256,21 +269,27 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
wf_ex = self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
# We can see 3 tasks in execution.
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
def test_resume_two_start_tasks(self):
wb_service.create_workbook_v2(WORKBOOK_TWO_START_TASKS)
@ -280,12 +299,13 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.PAUSED, wf_ex.state)
task_execs = wf_ex.task_executions
self.assertEqual(states.PAUSED, wf_ex.state)
# The exact number of tasks depends on which of two tasks
# 'task1' and 'task2' completed earlier.
self.assertGreaterEqual(len(task_execs), 2)
@ -300,10 +320,13 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
def test_resume_different_task_states(self):
wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES)
@ -313,12 +336,13 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.PAUSED, wf_ex.state)
task_execs = wf_ex.task_executions
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(3, len(task_execs))
task2_ex = self._assert_single_item(task_execs, name='task2')
@ -345,10 +369,13 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info)
self.assertEqual(4, len(wf_ex.task_executions))
self.assertEqual(4, len(task_execs))
def test_resume_fails(self):
# Start and pause workflow.
@ -390,20 +417,16 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_execs = wf_ex.task_executions
task_2_ex = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
task_1_ex = self._assert_single_item(task_execs, name='task1')
task_2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -420,17 +443,17 @@ class WorkflowResumeTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertDictEqual(updated_env, wf_ex.context['__env'])
self.assertEqual(3, len(wf_ex.task_executions))
self.assertEqual(3, len(task_execs))
# Check result of task2.
task_2_ex = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
task_2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.SUCCESS, task_2_ex.state)
@ -445,7 +468,7 @@ class WorkflowResumeTest(base.EngineTestCase):
# Check result of task3.
task_3_ex = self._assert_single_item(
wf_ex.task_executions,
task_execs,
name='task3'
)

View File

@ -55,9 +55,11 @@ class WorkflowVariablesTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
@ -69,5 +71,5 @@ class WorkflowVariablesTest(base.EngineTestCase):
'literal_var': 'Literal value',
'yaql_var': 'Hello Renat'
},
wf_ex.output
wf_output
)

View File

@ -66,13 +66,14 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(
tasks,
name='task1',