diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 0c7d79ca5..f0c3cce86 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -780,3 +780,85 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn('Mistral', result) self.assertEqual(states.SUCCESS, task_ex.state) + + def test_with_items_retry_policy(self): + workflow = """--- + version: "2.0" + + with_items_retry: + tasks: + task1: + with-items: i in [1, 2, 3] + action: std.fail + retry: + count: 3 + delay: 1 + on-error: task2 + + task2: + action: std.echo output="With-items failed" + """ + wf_service.create_workflows(workflow) + + # Start workflow. + wf_ex = self.engine.start_workflow('with_items_retry', {}) + + self._await( + lambda: self.is_execution_success(wf_ex.id) + ) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + tasks = wf_ex.task_executions + self.assertEqual(2, len(tasks)) + + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual( + 2, + task1.runtime_context['retry_task_policy']['retry_no'] + ) + self.assertEqual(9, len(task1.executions)) + self._assert_multiple_items(task1.executions, 3, accepted=True) + + def test_with_items_retry_policy_concurrency(self): + workflow = """--- + version: "2.0" + + with_items_retry_concurrency: + tasks: + task1: + with-items: i in [1, 2, 3, 4] + action: std.fail + retry: + count: 3 + delay: 1 + concurrency: 2 + on-error: task2 + + task2: + action: std.echo output="With-items failed" + """ + wf_service.create_workflows(workflow) + + # Start workflow. + wf_ex = self.engine.start_workflow( + 'with_items_retry_concurrency', + {} + ) + + self._await( + lambda: self.is_execution_success(wf_ex.id), + ) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + tasks = wf_ex.task_executions + self.assertEqual(2, len(tasks)) + + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual(12, len(task1.executions)) + self._assert_multiple_items(task1.executions, 4, accepted=True) diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index 7797719f1..245c67c7d 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -72,10 +72,12 @@ def _get_indices_if_rerun(unaccepted_executions): :return: a list of numbers. """ - return [ - ex.runtime_context['with_items_index'] - for ex in unaccepted_executions - ] + return sorted( + set([ + ex.runtime_context['with_items_index'] + for ex in unaccepted_executions + ]) + ) def _get_unaccepted_act_exs(task_ex):