Merge "Reformat rerun logic for tasks with join"

This commit is contained in:
Zuul 2019-11-11 14:42:37 +00:00 committed by Gerrit Code Review
commit 1c7e242975
3 changed files with 62 additions and 19 deletions

View File

@ -416,8 +416,8 @@ class RetryPolicy(base.TaskPolicy):
runtime_context[context_key] = policy_context runtime_context[context_key] = policy_context
# NOTE(vgvoleg): join tasks in direct workflows can't be # NOTE(vgvoleg): join tasks in direct workflows can't be
# retried as is, because this tasks can't start without # retried as-is, because these tasks can't start without
# the correct logical state. # a correct logical state.
if hasattr(task_spec, "get_join") and task_spec.get_join(): if hasattr(task_spec, "get_join") and task_spec.get_join():
from mistral.engine import task_handler as t_h from mistral.engine import task_handler as t_h

View File

@ -39,7 +39,6 @@ from mistral.workflow import data_flow
from mistral.workflow import states from mistral.workflow import states
from mistral_lib import utils from mistral_lib import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -534,6 +533,16 @@ class RegularTask(Task):
@profiler.trace('task-run-existing') @profiler.trace('task-run-existing')
def _run_existing(self): def _run_existing(self):
# NOTE(vgvoleg): join tasks in direct workflows can't be
# rerun as-is, because these tasks can't start without
# a correct logical state.
if self.rerun and hasattr(self.task_spec, "get_join") \
and self.task_spec.get_join():
from mistral.engine import task_handler as t_h
self.set_state(states.WAITING, 'Task is waiting.')
t_h._schedule_refresh_task_state(self.task_ex.id)
return
if self.waiting: if self.waiting:
return return

View File

@ -161,6 +161,10 @@ workflows:
type: direct type: direct
tasks: tasks:
t0:
action: std.noop
on-success:
- t3
t1: t1:
action: std.echo output="Task 1" action: std.echo output="Task 1"
on-success: on-success:
@ -818,12 +822,14 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state) self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info) self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs)) self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1') task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2') task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3') task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.SUCCESS, task_0_ex.state)
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.SUCCESS, task_2_ex.state) self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertEqual(states.ERROR, task_3_ex.state) self.assertEqual(states.ERROR, task_3_ex.state)
@ -847,12 +853,23 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state) self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info) self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs)) self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1') task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2') task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3') task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 0.
self.assertEqual(states.SUCCESS, task_0_ex.state)
task_0_action_exs = db_api.get_action_executions(
task_execution_id=task_0_ex.id
)
self.assertEqual(1, len(task_0_action_exs))
self.assertEqual(states.SUCCESS, task_0_action_exs[0].state)
# Check action executions of task 1. # Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -906,16 +923,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task. # Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1') wf_ex = self.engine.start_workflow('wb1.wf1')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.await_task_error(task_1_ex.id)
self.await_task_error(task_2_ex.id)
self.await_workflow_error(wf_ex.id) self.await_workflow_error(wf_ex.id)
with db_api.transaction(): with db_api.transaction():
@ -924,7 +931,11 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state) self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info) self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs)) self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
self.assertEqual(states.SUCCESS, task_0_ex.state)
task_1_ex = self._assert_single_item(task_execs, name='t1') task_1_ex = self._assert_single_item(task_execs, name='t1')
@ -936,6 +947,11 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, task_2_ex.state) self.assertEqual(states.ERROR, task_2_ex.state)
self.assertIsNotNone(task_2_ex.state_info) self.assertIsNotNone(task_2_ex.state_info)
task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.ERROR, task_3_ex.state)
self.assertIsNotNone(task_3_ex.state_info)
# Resume workflow and re-run failed task. # Resume workflow and re-run failed task.
wf_ex = self.engine.rerun_workflow(task_1_ex.id) wf_ex = self.engine.rerun_workflow(task_1_ex.id)
@ -959,16 +975,24 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state) self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info) self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs)) self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1') task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2') task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3') task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.SUCCESS, task_0_ex.state)
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state) self.assertEqual(states.ERROR, task_2_ex.state)
self.assertEqual(states.ERROR, task_3_ex.state) self.assertEqual(states.ERROR, task_3_ex.state)
# Check that join task did not start any action execution
task_3_action_exs = db_api.get_action_executions(
task_execution_id=task_3_ex.id
)
self.assertEqual(0, len(task_3_action_exs))
# Resume workflow and re-run failed task. # Resume workflow and re-run failed task.
wf_ex = self.engine.rerun_workflow(task_2_ex.id) wf_ex = self.engine.rerun_workflow(task_2_ex.id)
@ -987,12 +1011,22 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state) self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info) self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs)) self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1') task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2') task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3') task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 0.
self.assertEqual(states.SUCCESS, task_0_ex.state)
self.assertIsNone(task_0_ex.state_info)
task_0_action_exs = db_api.get_action_executions(
task_execution_id=task_0_ex.id
)
self.assertEqual(1, len(task_0_action_exs))
# Check action executions of task 1. # Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertIsNone(task_1_ex.state_info) self.assertIsNone(task_1_ex.state_info)
@ -1021,7 +1055,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self._assert_single_item(task_2_action_exs, state=states.SUCCESS) self._assert_single_item(task_2_action_exs, state=states.SUCCESS)
self._assert_single_item(task_2_action_exs, state=states.ERROR) self._assert_single_item(task_2_action_exs, state=states.ERROR)
# Check action executions of task 3. # Check there is exactly 1 action execution of task 3.
self.assertEqual(states.SUCCESS, task_3_ex.state) self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_action_exs = db_api.get_action_executions( task_3_action_exs = db_api.get_action_executions(