diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 0eb2c1308..c1106cf28 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -147,6 +147,21 @@ def _on_action_update(action_ex): try: task.on_action_update(action_ex) + + if states.is_paused(action_ex.state): + wf_handler.pause_workflow(wf_ex) + + if states.is_running(action_ex.state): + # If any subworkflow of the parent workflow is paused, + # then keep the parent workflow execution paused. + for task_ex in wf_ex.task_executions: + if states.is_paused(task_ex.state): + return + + # Otherwise if no other subworkflow is paused, + # then resume the parent workflow execution. + wf_handler.resume_workflow(wf_ex) + except exc.MistralException as e: wf_ex = task_ex.workflow_execution diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 06852ecd7..db1b5758f 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -123,9 +123,21 @@ def _check_and_complete(wf_ex_id): def pause_workflow(wf_ex, msg=None): - wf = workflows.Workflow(wf_ex=wf_ex) + # Pause subworkflows first. + for task_ex in wf_ex.task_executions: + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=task_ex.id + ) - wf.set_state(states.PAUSED, msg) + for sub_wf_ex in sub_wf_exs: + if not states.is_completed(sub_wf_ex.state): + pause_workflow(sub_wf_ex, msg=msg) + + # If all subworkflows paused successfully, pause the main workflow. + # If any subworkflows failed to pause for temporary reason, this + # allows pause to be executed again on the main workflow. + wf = workflows.Workflow(wf_ex=wf_ex) + wf.pause(msg=msg) def rerun_workflow(wf_ex, task_ex, reset=True, env=None): @@ -146,8 +158,19 @@ def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): return wf_ex.get_clone() - wf = workflows.Workflow(wf_ex=wf_ex) + # Resume subworkflows first. + for task_ex in wf_ex.task_executions: + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=task_ex.id + ) + for sub_wf_ex in sub_wf_exs: + if not states.is_completed(sub_wf_ex.state): + resume_workflow(sub_wf_ex) + + # Resume current workflow here so to trigger continue workflow only + # after all other subworkflows are placed back in running state. + wf = workflows.Workflow(wf_ex=wf_ex) wf.resume(env=env) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 5d0f49a35..7084349e6 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -125,6 +125,29 @@ class Workflow(object): return self._cancel_workflow(msg) + def pause(self, msg=None): + """Pause workflow. + + :param msg: Additional explaining message. + """ + + assert self.wf_ex + + if states.is_paused(self.wf_ex.state): + return + + # Set the state of this workflow to paused. + self.set_state(states.PAUSED, state_info=msg) + + # If workflow execution is a subworkflow, + # schedule update to the task execution. + if self.wf_ex.task_execution_id: + # Import the task_handler module here to avoid circular reference. + from mistral.engine import task_handler + task_handler.schedule_on_action_update(self.wf_ex) + + return + def resume(self, env=None): """Resume workflow. @@ -135,7 +158,7 @@ class Workflow(object): wf_service.update_workflow_execution_env(self.wf_ex, env) - self.set_state(states.RUNNING, recursive=True) + self.set_state(states.RUNNING) wf_ctrl = wf_base.get_controller(self.wf_ex) @@ -144,6 +167,13 @@ class Workflow(object): self._continue_workflow(cmds) + # If workflow execution is a subworkflow, + # schedule update to the task execution. + if self.wf_ex.task_execution_id: + # Import the task_handler module here to avoid circular reference. + from mistral.engine import task_handler + task_handler.schedule_on_action_update(self.wf_ex) + def prepare_input(self, input_dict): for k, v in self.wf_spec.get_input().items(): if k not in input_dict or input_dict[k] is utils.NotDefined: diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 394418a65..93014c7bd 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -249,6 +249,10 @@ class EngineTestCase(base.DbTestCase): timeout ) + def await_workflow_running(self, ex_id, delay=DEFAULT_DELAY, + timeout=DEFAULT_TIMEOUT): + self.await_workflow_state(ex_id, states.RUNNING, delay, timeout) + def await_workflow_success(self, ex_id, delay=DEFAULT_DELAY, timeout=DEFAULT_TIMEOUT): self.await_workflow_state(ex_id, states.SUCCESS, delay, timeout) diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index df2e31a79..3e1fd250f 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -361,7 +361,7 @@ class DefaultEngineTest(base.DbTestCase): self.assertEqual(1, len(task_execs)) self.assertEqual(states.PAUSED, task_execs[0].state) - self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(states.PAUSED, wf_ex.state) action_execs = db_api.get_action_executions( task_execution_id=task1_ex.id diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 988c153fb..68a634eb3 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -155,10 +155,12 @@ class DirectWorkflowEngineTest(base.EngineTestCase): states.SUCCESS, self.engine.resume_workflow(wf_ex.id).state ) + self.assertRaises( exc.WorkflowException, self.engine.pause_workflow, wf_ex.id ) + self.assertEqual( states.SUCCESS, self.engine.stop_workflow(wf_ex.id, states.ERROR).state diff --git a/mistral/tests/unit/engine/test_subworkflows_pause_resume.py b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py new file mode 100644 index 000000000..dbf788515 --- /dev/null +++ b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py @@ -0,0 +1,1985 @@ +# Copyright 2015 - StackStorm, Inc. +# Copyright 2016 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.db.v2 import api as db_api +from mistral.services import workbooks as wb_service +from mistral.tests.unit.engine import base +from mistral.workflow import states +from mistral_lib import actions as ml_actions + + +class SubworkflowPauseResumeTest(base.EngineTestCase): + + def test_pause_resume_cascade_down_to_subworkflow(self): + workbook = """ + version: '2.0' + name: wb + workflows: + wf1: + tasks: + task1: + workflow: wf2 + on-success: + - task3 + task2: + workflow: wf3 + on-success: + - task3 + task3: + join: all + action: std.noop + wf2: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + wf3: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + """ + + wb_service.create_workbook_v2(workbook) + + # Start workflow execution. + wf_1_ex = self.engine.start_workflow('wb.wf1') + + self.await_workflow_state(wf_1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.RUNNING, wf_1_ex.state) + self.assertEqual(2, len(wf_1_task_execs)) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + self.assertEqual(1, len(wf_1_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex.id) + self.assertEqual(1, len(wf_1_task_2_action_exs)) + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) + self.assertEqual(states.RUNNING, wf_2_ex.state) + self.assertEqual(1, len(wf_2_task_execs)) + self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) + self.assertEqual(1, len(wf_2_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(1, len(wf_3_task_execs)) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(1, len(wf_3_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Pause the main workflow. + self.engine.pause_workflow(wf_1_ex.id) + + self.await_workflow_paused(wf_1_ex.id) + self.await_workflow_paused(wf_2_ex.id) + self.await_workflow_paused(wf_3_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.PAUSED, wf_2_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + self.assertEqual(states.PAUSED, wf_1_ex.state) + + # Resume the main workflow. + self.engine.resume_workflow(wf_1_ex.id) + + self.await_workflow_running(wf_1_ex.id) + self.await_workflow_running(wf_2_ex.id) + self.await_workflow_running(wf_3_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.RUNNING, wf_2_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + self.assertEqual(states.RUNNING, wf_1_ex.state) + + # Complete action executions of the subworkflows. + self.engine.on_action_complete( + wf_2_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.engine.on_action_complete( + wf_3_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_success(wf_2_ex.id) + self.await_workflow_success(wf_3_ex.id) + self.await_workflow_success(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + wf_1_task_3_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task3' + ) + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_2_task_2_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task2' + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + wf_3_task_2_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task2' + ) + + self.assertEqual(states.SUCCESS, wf_1_ex.state) + self.assertEqual(3, len(wf_1_task_execs)) + self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_2_ex.state) + self.assertEqual(2, len(wf_2_task_execs)) + self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state) + self.assertEqual(states.SUCCESS, wf_3_ex.state) + self.assertEqual(2, len(wf_3_task_execs)) + self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state) + + def test_pause_resume_cascade_up_from_subworkflow(self): + workbook = """ + version: '2.0' + name: wb + workflows: + wf1: + tasks: + task1: + workflow: wf2 + on-success: + - task3 + task2: + workflow: wf3 + on-success: + - task3 + task3: + join: all + action: std.noop + wf2: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + wf3: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + """ + + wb_service.create_workbook_v2(workbook) + + # Start workflow execution. + wf_1_ex = self.engine.start_workflow('wb.wf1') + + self.await_workflow_state(wf_1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.RUNNING, wf_1_ex.state) + self.assertEqual(2, len(wf_1_task_execs)) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + self.assertEqual(1, len(wf_1_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex.id) + self.assertEqual(1, len(wf_1_task_2_action_exs)) + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) + self.assertEqual(states.RUNNING, wf_2_ex.state) + self.assertEqual(1, len(wf_2_task_execs)) + self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) + self.assertEqual(1, len(wf_2_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(1, len(wf_3_task_execs)) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(1, len(wf_3_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Pause the subworkflow. + self.engine.pause_workflow(wf_2_ex.id) + + self.await_workflow_paused(wf_2_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.PAUSED, wf_2_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + self.assertEqual(states.PAUSED, wf_1_ex.state) + + # Resume the 1st subworkflow. + self.engine.resume_workflow(wf_2_ex.id) + + self.await_workflow_running(wf_2_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.RUNNING, wf_2_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + self.assertEqual(states.PAUSED, wf_1_ex.state) + + # Complete action execution of 1st subworkflow. + self.engine.on_action_complete( + wf_2_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_success(wf_2_ex.id) + self.await_task_success(wf_1_task_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.SUCCESS, wf_2_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + self.assertEqual(states.PAUSED, wf_1_ex.state) + + # Resume the 2nd subworkflow. + self.engine.resume_workflow(wf_3_ex.id) + + self.await_workflow_running(wf_3_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.SUCCESS, wf_2_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + self.assertEqual(states.RUNNING, wf_1_ex.state) + + # Complete action execution of 2nd subworkflow. + self.engine.on_action_complete( + wf_3_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_success(wf_3_ex.id) + self.await_workflow_success(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + wf_1_task_3_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task3' + ) + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_2_task_2_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task2' + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + wf_3_task_2_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task2' + ) + + self.assertEqual(states.SUCCESS, wf_1_ex.state) + self.assertEqual(3, len(wf_1_task_execs)) + self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_2_ex.state) + self.assertEqual(2, len(wf_2_task_execs)) + self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state) + self.assertEqual(states.SUCCESS, wf_3_ex.state) + self.assertEqual(2, len(wf_3_task_execs)) + self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state) + + def test_pause_resume_cascade_down_to_with_items_subworkflows(self): + workbook = """ + version: '2.0' + name: wb + workflows: + wf1: + tasks: + task1: + with-items: i in <% range(3) %> + workflow: wf2 + on-success: + - task3 + task2: + workflow: wf3 + on-success: + - task3 + task3: + join: all + action: std.noop + wf2: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + wf3: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + """ + + wb_service.create_workbook_v2(workbook) + + # Start workflow execution. + wf_1_ex = self.engine.start_workflow('wb.wf1') + + self.await_workflow_state(wf_1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.RUNNING, wf_1_ex.state) + self.assertEqual(2, len(wf_1_task_execs)) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + self.assertEqual(3, len(wf_1_task_1_action_exs)) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex_1.id) + self.assertEqual(states.RUNNING, wf_2_ex_1.state) + self.assertEqual(1, len(wf_2_ex_1_task_execs)) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) + self.assertEqual(1, len(wf_2_ex_1_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[1].state) + self.assertEqual(wf_1_task_1_action_exs[1].id, wf_2_ex_2.id) + self.assertEqual(states.RUNNING, wf_2_ex_2.state) + self.assertEqual(1, len(wf_2_ex_2_task_execs)) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) + self.assertEqual(1, len(wf_2_ex_2_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[2].state) + self.assertEqual(wf_1_task_1_action_exs[2].id, wf_2_ex_3.id) + self.assertEqual(states.RUNNING, wf_2_ex_3.state) + self.assertEqual(1, len(wf_2_ex_3_task_execs)) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) + self.assertEqual(1, len(wf_2_ex_3_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(1, len(wf_1_task_2_action_exs)) + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(1, len(wf_3_task_execs)) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(1, len(wf_3_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Pause the main workflow. + self.engine.pause_workflow(wf_1_ex.id) + self.await_workflow_paused(wf_2_ex_1.id) + self.await_workflow_paused(wf_2_ex_2.id) + self.await_workflow_paused(wf_2_ex_3.id) + self.await_workflow_paused(wf_3_ex.id) + self.await_task_paused(wf_1_task_1_ex.id) + self.await_task_paused(wf_1_task_2_ex.id) + self.await_workflow_paused(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.PAUSED, wf_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_2_ex_1.state) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[1].state) + self.assertEqual(states.PAUSED, wf_2_ex_2.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[2].state) + self.assertEqual(states.PAUSED, wf_2_ex_3.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Resume the main workflow. + self.engine.resume_workflow(wf_1_ex.id) + self.await_workflow_running(wf_2_ex_1.id) + self.await_workflow_running(wf_2_ex_2.id) + self.await_workflow_running(wf_2_ex_3.id) + self.await_workflow_running(wf_3_ex.id) + self.await_task_running(wf_1_task_1_ex.id) + self.await_task_running(wf_1_task_2_ex.id) + self.await_workflow_running(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.RUNNING, wf_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_2_ex_1.state) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[1].state) + self.assertEqual(states.RUNNING, wf_2_ex_2.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[2].state) + self.assertEqual(states.RUNNING, wf_2_ex_3.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Complete action execution of subworkflows. + self.engine.on_action_complete( + wf_2_ex_1_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.engine.on_action_complete( + wf_2_ex_2_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.engine.on_action_complete( + wf_2_ex_3_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.engine.on_action_complete( + wf_3_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_success(wf_2_ex_1.id) + self.await_workflow_success(wf_2_ex_2.id) + self.await_workflow_success(wf_2_ex_3.id) + self.await_workflow_success(wf_3_ex.id) + self.await_task_success(wf_1_task_1_ex.id) + self.await_task_success(wf_1_task_2_ex.id) + self.await_workflow_success(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.SUCCESS, wf_1_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_2_ex_1.state) + self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[1].state) + self.assertEqual(states.SUCCESS, wf_2_ex_2.state) + self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[2].state) + self.assertEqual(states.SUCCESS, wf_2_ex_3.state) + self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_3_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state) + + def test_pause_resume_cascade_up_from_with_items_subworkflow(self): + workbook = """ + version: '2.0' + name: wb + workflows: + wf1: + tasks: + task1: + with-items: i in <% range(3) %> + workflow: wf2 + on-success: + - task3 + task2: + workflow: wf3 + on-success: + - task3 + task3: + join: all + action: std.noop + wf2: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + wf3: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + """ + + wb_service.create_workbook_v2(workbook) + + # Start workflow execution. + wf_1_ex = self.engine.start_workflow('wb.wf1') + + self.await_workflow_state(wf_1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.RUNNING, wf_1_ex.state) + self.assertEqual(2, len(wf_1_task_execs)) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + self.assertEqual(3, len(wf_1_task_1_action_exs)) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex_1.id) + self.assertEqual(states.RUNNING, wf_2_ex_1.state) + self.assertEqual(1, len(wf_2_ex_1_task_execs)) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) + self.assertEqual(1, len(wf_2_ex_1_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[1].state) + self.assertEqual(wf_1_task_1_action_exs[1].id, wf_2_ex_2.id) + self.assertEqual(states.RUNNING, wf_2_ex_2.state) + self.assertEqual(1, len(wf_2_ex_2_task_execs)) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) + self.assertEqual(1, len(wf_2_ex_2_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[2].state) + self.assertEqual(wf_1_task_1_action_exs[2].id, wf_2_ex_3.id) + self.assertEqual(states.RUNNING, wf_2_ex_3.state) + self.assertEqual(1, len(wf_2_ex_3_task_execs)) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) + self.assertEqual(1, len(wf_2_ex_3_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(1, len(wf_1_task_2_action_exs)) + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(1, len(wf_3_task_execs)) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(1, len(wf_3_task_1_action_exs)) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Pause one of the subworkflows in the with-items task. + self.engine.pause_workflow(wf_2_ex_1.id) + self.await_workflow_paused(wf_2_ex_1.id) + self.await_workflow_paused(wf_2_ex_2.id) + self.await_workflow_paused(wf_2_ex_3.id) + self.await_workflow_paused(wf_3_ex.id) + self.await_task_paused(wf_1_task_1_ex.id) + self.await_task_paused(wf_1_task_2_ex.id) + self.await_workflow_paused(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.PAUSED, wf_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_2_ex_1.state) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[1].state) + self.assertEqual(states.PAUSED, wf_2_ex_2.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[2].state) + self.assertEqual(states.PAUSED, wf_2_ex_3.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Resume one of the subworkflows in the with-items task. + self.engine.resume_workflow(wf_2_ex_1.id) + self.await_workflow_running(wf_2_ex_1.id) + self.await_workflow_paused(wf_2_ex_2.id) + self.await_workflow_paused(wf_2_ex_3.id) + self.await_workflow_paused(wf_3_ex.id) + self.await_task_paused(wf_1_task_1_ex.id) + self.await_task_paused(wf_1_task_2_ex.id) + self.await_workflow_paused(wf_1_ex.id) + + # Complete action execution of the subworkflow that is resumed. + self.engine.on_action_complete( + wf_2_ex_1_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_success(wf_2_ex_1.id) + self.await_workflow_paused(wf_2_ex_2.id) + self.await_workflow_paused(wf_2_ex_3.id) + self.await_workflow_paused(wf_3_ex.id) + self.await_task_paused(wf_1_task_1_ex.id) + self.await_task_paused(wf_1_task_2_ex.id) + self.await_workflow_paused(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.PAUSED, wf_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_2_ex_1.state) + self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[1].state) + self.assertEqual(states.PAUSED, wf_2_ex_2.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[2].state) + self.assertEqual(states.PAUSED, wf_2_ex_3.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + + # Resume one of the remaining subworkflows. + self.engine.resume_workflow(wf_2_ex_2.id) + self.engine.resume_workflow(wf_2_ex_3.id) + self.engine.resume_workflow(wf_3_ex.id) + self.await_workflow_running(wf_2_ex_2.id) + self.await_workflow_running(wf_2_ex_3.id) + self.await_workflow_running(wf_3_ex.id) + self.await_task_running(wf_1_task_1_ex.id) + self.await_task_running(wf_1_task_2_ex.id) + self.await_workflow_running(wf_1_ex.id) + + # Complete action executions of the remaining subworkflows. + self.engine.on_action_complete( + wf_2_ex_2_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.engine.on_action_complete( + wf_2_ex_3_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.engine.on_action_complete( + wf_3_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_success(wf_2_ex_1.id) + self.await_workflow_success(wf_2_ex_2.id) + self.await_workflow_success(wf_2_ex_3.id) + self.await_workflow_success(wf_3_ex.id) + self.await_task_success(wf_1_task_1_ex.id) + self.await_task_success(wf_1_task_2_ex.id) + self.await_workflow_success(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = sorted( + wf_1_task_1_ex.executions, + key=lambda x: x['runtime_context']['index'] + ) + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the with-items subworkflow executions. + wf_2_ex_1 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[0].id + ) + + wf_2_ex_1_task_execs = wf_2_ex_1.task_executions + + wf_2_ex_1_task_1_ex = self._assert_single_item( + wf_2_ex_1.task_executions, + name='task1' + ) + + wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_1_task_1_ex.id + ) + + wf_2_ex_2 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[1].id + ) + + wf_2_ex_2_task_execs = wf_2_ex_2.task_executions + + wf_2_ex_2_task_1_ex = self._assert_single_item( + wf_2_ex_2.task_executions, + name='task1' + ) + + wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_2_task_1_ex.id + ) + + wf_2_ex_3 = db_api.get_workflow_execution( + wf_1_task_1_action_exs[2].id + ) + + wf_2_ex_3_task_execs = wf_2_ex_3.task_executions + + wf_2_ex_3_task_1_ex = self._assert_single_item( + wf_2_ex_3.task_executions, + name='task1' + ) + + wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_ex_3_task_1_ex.id + ) + + # Get objects for the wf3 subworkflow execution. + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + # Check state of parent workflow execution. + self.assertEqual(states.SUCCESS, wf_1_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) + + # Check state of wf2 (1) subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_2_ex_1.state) + self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_action_exs[0].state) + + # Check state of wf2 (2) subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[1].state) + self.assertEqual(states.SUCCESS, wf_2_ex_2.state) + self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_action_exs[0].state) + + # Check state of wf2 (3) subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[2].state) + self.assertEqual(states.SUCCESS, wf_2_ex_3.state) + self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_action_exs[0].state) + + # Check state of wf3 subworkflow execution. + self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_3_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state) diff --git a/mistral/tests/unit/engine/test_task_pause_resume.py b/mistral/tests/unit/engine/test_task_pause_resume.py index 8d7ea8cba..cb5df1abd 100644 --- a/mistral/tests/unit/engine/test_task_pause_resume.py +++ b/mistral/tests/unit/engine/test_task_pause_resume.py @@ -83,7 +83,7 @@ class TaskPauseResumeTest(base.EngineTestCase): task_execution_id=task_1_ex.id ) - self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(states.PAUSED, wf_ex.state) self.assertEqual(1, len(task_execs)) self.assertEqual(states.PAUSED, task_1_ex.state) self.assertEqual(1, len(task_1_action_exs)) @@ -174,8 +174,9 @@ class TaskPauseResumeTest(base.EngineTestCase): name='task1' ) - task_1_action_exs = db_api.get_action_executions( - task_execution_id=task_1_ex.id + task_1_action_exs = sorted( + db_api.get_action_executions(task_execution_id=task_1_ex.id), + key=lambda x: x['runtime_context']['index'] ) self.assertEqual(states.RUNNING, wf_ex.state) @@ -201,11 +202,12 @@ class TaskPauseResumeTest(base.EngineTestCase): name='task1' ) - task_1_action_exs = db_api.get_action_executions( - task_execution_id=task_1_ex.id + task_1_action_exs = sorted( + db_api.get_action_executions(task_execution_id=task_1_ex.id), + key=lambda x: x['runtime_context']['index'] ) - self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(states.PAUSED, wf_ex.state) self.assertEqual(1, len(task_execs)) self.assertEqual(states.PAUSED, task_1_ex.state) self.assertEqual(3, len(task_1_action_exs)) @@ -234,11 +236,12 @@ class TaskPauseResumeTest(base.EngineTestCase): name='task1' ) - task_1_action_exs = db_api.get_action_executions( - task_execution_id=task_1_ex.id + task_1_action_exs = sorted( + db_api.get_action_executions(task_execution_id=task_1_ex.id), + key=lambda x: x['runtime_context']['index'] ) - self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(states.PAUSED, wf_ex.state) self.assertEqual(1, len(task_execs)) self.assertEqual(states.PAUSED, task_1_ex.state) self.assertEqual(3, len(task_1_action_exs)) @@ -259,8 +262,9 @@ class TaskPauseResumeTest(base.EngineTestCase): name='task1' ) - task_1_action_exs = db_api.get_action_executions( - task_execution_id=task_1_ex.id + task_1_action_exs = sorted( + db_api.get_action_executions(task_execution_id=task_1_ex.id), + key=lambda x: x['runtime_context']['index'] ) self.assertEqual(states.RUNNING, wf_ex.state) @@ -287,8 +291,9 @@ class TaskPauseResumeTest(base.EngineTestCase): task_1_ex = self._assert_single_item(task_execs, name='task1') - task_1_action_exs = db_api.get_action_executions( - task_execution_id=task_1_ex.id + task_1_action_exs = sorted( + db_api.get_action_executions(task_execution_id=task_1_ex.id), + key=lambda x: x['runtime_context']['index'] ) task_2_ex = self._assert_single_item(task_execs, name='task2')