From bdbfb823010cefe94945ba118199d9ecccdbcab7 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 18 Jun 2019 20:00:05 +0300 Subject: [PATCH] Reformat rerun logic for tasks with join Change-Id: I055bc2d5a4bdf839f1e262e49563616d8deff92f Closes-Bug: #1833262 Signed-off-by: Oleg Ovcharuk --- mistral/engine/policies.py | 4 +- mistral/engine/tasks.py | 11 +++- .../unit/engine/test_direct_workflow_rerun.py | 66 ++++++++++++++----- 3 files changed, 62 insertions(+), 19 deletions(-) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index c99032a8c..9bb14b6a1 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -395,8 +395,8 @@ class RetryPolicy(base.TaskPolicy): runtime_context[context_key] = policy_context # NOTE(vgvoleg): join tasks in direct workflows can't be - # retried as is, because this tasks can't start without - # the correct logical state. + # retried as-is, because these tasks can't start without + # a correct logical state. if hasattr(task_spec, "get_join") and task_spec.get_join(): from mistral.engine import task_handler as t_h _log_task_delay(task_ex, self.delay, states.WAITING) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 3629b8cb5..f18b8fe84 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -39,7 +39,6 @@ from mistral.workflow import commands from mistral.workflow import data_flow from mistral.workflow import states - LOG = logging.getLogger(__name__) @@ -520,6 +519,16 @@ class RegularTask(Task): @profiler.trace('task-run-existing') def _run_existing(self): + # NOTE(vgvoleg): join tasks in direct workflows can't be + # rerun as-is, because these tasks can't start without + # a correct logical state. + if self.rerun and hasattr(self.task_spec, "get_join") \ + and self.task_spec.get_join(): + from mistral.engine import task_handler as t_h + self.set_state(states.WAITING, 'Task is waiting.') + t_h._schedule_refresh_task_state(self.task_ex.id) + return + if self.waiting: return diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index 5c67ec89b..54f9a0021 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -161,6 +161,10 @@ workflows: type: direct tasks: + t0: + action: std.noop + on-success: + - t3 t1: action: std.echo output="Task 1" on-success: @@ -818,12 +822,14 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(3, len(task_execs)) + self.assertEqual(4, len(task_execs)) + task_0_ex = self._assert_single_item(task_execs, name='t0') task_1_ex = self._assert_single_item(task_execs, name='t1') task_2_ex = self._assert_single_item(task_execs, name='t2') task_3_ex = self._assert_single_item(task_execs, name='t3') + self.assertEqual(states.SUCCESS, task_0_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_2_ex.state) self.assertEqual(states.ERROR, task_3_ex.state) @@ -847,12 +853,23 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_ex.state) self.assertIsNone(wf_ex.state_info) - self.assertEqual(3, len(task_execs)) + self.assertEqual(4, len(task_execs)) + task_0_ex = self._assert_single_item(task_execs, name='t0') task_1_ex = self._assert_single_item(task_execs, name='t1') task_2_ex = self._assert_single_item(task_execs, name='t2') task_3_ex = self._assert_single_item(task_execs, name='t3') + # Check action executions of task 0. + self.assertEqual(states.SUCCESS, task_0_ex.state) + + task_0_action_exs = db_api.get_action_executions( + task_execution_id=task_0_ex.id + ) + + self.assertEqual(1, len(task_0_action_exs)) + self.assertEqual(states.SUCCESS, task_0_action_exs[0].state) + # Check action executions of task 1. self.assertEqual(states.SUCCESS, task_1_ex.state) @@ -906,16 +923,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # Run workflow and fail task. wf_ex = self.engine.start_workflow('wb1.wf1') - with db_api.transaction(): - wf_ex = db_api.get_workflow_execution(wf_ex.id) - task_execs = wf_ex.task_executions - - task_1_ex = self._assert_single_item(task_execs, name='t1') - task_2_ex = self._assert_single_item(task_execs, name='t2') - - self.await_task_error(task_1_ex.id) - self.await_task_error(task_2_ex.id) - self.await_workflow_error(wf_ex.id) with db_api.transaction(): @@ -924,7 +931,11 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(2, len(task_execs)) + self.assertEqual(4, len(task_execs)) + + task_0_ex = self._assert_single_item(task_execs, name='t0') + + self.assertEqual(states.SUCCESS, task_0_ex.state) task_1_ex = self._assert_single_item(task_execs, name='t1') @@ -936,6 +947,11 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, task_2_ex.state) self.assertIsNotNone(task_2_ex.state_info) + task_3_ex = self._assert_single_item(task_execs, name='t3') + + self.assertEqual(states.ERROR, task_3_ex.state) + self.assertIsNotNone(task_3_ex.state_info) + # Resume workflow and re-run failed task. wf_ex = self.engine.rerun_workflow(task_1_ex.id) @@ -959,16 +975,24 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIsNotNone(wf_ex.state_info) - self.assertEqual(3, len(task_execs)) + self.assertEqual(4, len(task_execs)) + task_0_ex = self._assert_single_item(task_execs, name='t0') task_1_ex = self._assert_single_item(task_execs, name='t1') task_2_ex = self._assert_single_item(task_execs, name='t2') task_3_ex = self._assert_single_item(task_execs, name='t3') + self.assertEqual(states.SUCCESS, task_0_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.ERROR, task_2_ex.state) self.assertEqual(states.ERROR, task_3_ex.state) + # Check that join task did not start any action execution + task_3_action_exs = db_api.get_action_executions( + task_execution_id=task_3_ex.id + ) + self.assertEqual(0, len(task_3_action_exs)) + # Resume workflow and re-run failed task. wf_ex = self.engine.rerun_workflow(task_2_ex.id) @@ -987,12 +1011,22 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_ex.state) self.assertIsNone(wf_ex.state_info) - self.assertEqual(3, len(task_execs)) + self.assertEqual(4, len(task_execs)) + task_0_ex = self._assert_single_item(task_execs, name='t0') task_1_ex = self._assert_single_item(task_execs, name='t1') task_2_ex = self._assert_single_item(task_execs, name='t2') task_3_ex = self._assert_single_item(task_execs, name='t3') + # Check action executions of task 0. + self.assertEqual(states.SUCCESS, task_0_ex.state) + self.assertIsNone(task_0_ex.state_info) + + task_0_action_exs = db_api.get_action_executions( + task_execution_id=task_0_ex.id + ) + + self.assertEqual(1, len(task_0_action_exs)) # Check action executions of task 1. self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertIsNone(task_1_ex.state_info) @@ -1021,7 +1055,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self._assert_single_item(task_2_action_exs, state=states.SUCCESS) self._assert_single_item(task_2_action_exs, state=states.ERROR) - # Check action executions of task 3. + # Check there is exactly 1 action execution of task 3. self.assertEqual(states.SUCCESS, task_3_ex.state) task_3_action_exs = db_api.get_action_executions(