From 28005e90485f3a8e8612202fd37452e50c68fed7 Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Fri, 14 Aug 2015 15:27:31 +0300 Subject: [PATCH] Fixing working with-items and retry together Closes-Bug: #1484483 Change-Id: I0242fde87140d9dbff738b6e139a19ba300b30fc --- mistral/tests/unit/engine/test_with_items.py | 82 ++++++++++++++++++++ mistral/workflow/with_items.py | 10 ++- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 0c7d79ca5..f0c3cce86 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -780,3 +780,85 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn('Mistral', result) self.assertEqual(states.SUCCESS, task_ex.state) + + def test_with_items_retry_policy(self): + workflow = """--- + version: "2.0" + + with_items_retry: + tasks: + task1: + with-items: i in [1, 2, 3] + action: std.fail + retry: + count: 3 + delay: 1 + on-error: task2 + + task2: + action: std.echo output="With-items failed" + """ + wf_service.create_workflows(workflow) + + # Start workflow. + wf_ex = self.engine.start_workflow('with_items_retry', {}) + + self._await( + lambda: self.is_execution_success(wf_ex.id) + ) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + tasks = wf_ex.task_executions + self.assertEqual(2, len(tasks)) + + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual( + 2, + task1.runtime_context['retry_task_policy']['retry_no'] + ) + self.assertEqual(9, len(task1.executions)) + self._assert_multiple_items(task1.executions, 3, accepted=True) + + def test_with_items_retry_policy_concurrency(self): + workflow = """--- + version: "2.0" + + with_items_retry_concurrency: + tasks: + task1: + with-items: i in [1, 2, 3, 4] + action: std.fail + retry: + count: 3 + delay: 1 + concurrency: 2 + on-error: task2 + + task2: + action: std.echo output="With-items failed" + """ + wf_service.create_workflows(workflow) + + # Start workflow. + wf_ex = self.engine.start_workflow( + 'with_items_retry_concurrency', + {} + ) + + self._await( + lambda: self.is_execution_success(wf_ex.id), + ) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + tasks = wf_ex.task_executions + self.assertEqual(2, len(tasks)) + + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual(12, len(task1.executions)) + self._assert_multiple_items(task1.executions, 4, accepted=True) diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index 7797719f1..245c67c7d 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -72,10 +72,12 @@ def _get_indices_if_rerun(unaccepted_executions): :return: a list of numbers. """ - return [ - ex.runtime_context['with_items_index'] - for ex in unaccepted_executions - ] + return sorted( + set([ + ex.runtime_context['with_items_index'] + for ex in unaccepted_executions + ]) + ) def _get_unaccepted_act_exs(task_ex):