Merge "Fixing working with-items and retry together"
This commit is contained in:
commit
8072adf3db
@ -780,3 +780,85 @@ class WithItemsEngineTest(base.EngineTestCase):
|
|||||||
self.assertIn('Mistral', result)
|
self.assertIn('Mistral', result)
|
||||||
|
|
||||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||||
|
|
||||||
|
def test_with_items_retry_policy(self):
|
||||||
|
workflow = """---
|
||||||
|
version: "2.0"
|
||||||
|
|
||||||
|
with_items_retry:
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
with-items: i in [1, 2, 3]
|
||||||
|
action: std.fail
|
||||||
|
retry:
|
||||||
|
count: 3
|
||||||
|
delay: 1
|
||||||
|
on-error: task2
|
||||||
|
|
||||||
|
task2:
|
||||||
|
action: std.echo output="With-items failed"
|
||||||
|
"""
|
||||||
|
wf_service.create_workflows(workflow)
|
||||||
|
|
||||||
|
# Start workflow.
|
||||||
|
wf_ex = self.engine.start_workflow('with_items_retry', {})
|
||||||
|
|
||||||
|
self._await(
|
||||||
|
lambda: self.is_execution_success(wf_ex.id)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Note: We need to reread execution to access related tasks.
|
||||||
|
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||||
|
|
||||||
|
tasks = wf_ex.task_executions
|
||||||
|
self.assertEqual(2, len(tasks))
|
||||||
|
|
||||||
|
task1 = self._assert_single_item(tasks, name='task1')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
2,
|
||||||
|
task1.runtime_context['retry_task_policy']['retry_no']
|
||||||
|
)
|
||||||
|
self.assertEqual(9, len(task1.executions))
|
||||||
|
self._assert_multiple_items(task1.executions, 3, accepted=True)
|
||||||
|
|
||||||
|
def test_with_items_retry_policy_concurrency(self):
|
||||||
|
workflow = """---
|
||||||
|
version: "2.0"
|
||||||
|
|
||||||
|
with_items_retry_concurrency:
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
with-items: i in [1, 2, 3, 4]
|
||||||
|
action: std.fail
|
||||||
|
retry:
|
||||||
|
count: 3
|
||||||
|
delay: 1
|
||||||
|
concurrency: 2
|
||||||
|
on-error: task2
|
||||||
|
|
||||||
|
task2:
|
||||||
|
action: std.echo output="With-items failed"
|
||||||
|
"""
|
||||||
|
wf_service.create_workflows(workflow)
|
||||||
|
|
||||||
|
# Start workflow.
|
||||||
|
wf_ex = self.engine.start_workflow(
|
||||||
|
'with_items_retry_concurrency',
|
||||||
|
{}
|
||||||
|
)
|
||||||
|
|
||||||
|
self._await(
|
||||||
|
lambda: self.is_execution_success(wf_ex.id),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Note: We need to reread execution to access related tasks.
|
||||||
|
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||||
|
|
||||||
|
tasks = wf_ex.task_executions
|
||||||
|
self.assertEqual(2, len(tasks))
|
||||||
|
|
||||||
|
task1 = self._assert_single_item(tasks, name='task1')
|
||||||
|
|
||||||
|
self.assertEqual(12, len(task1.executions))
|
||||||
|
self._assert_multiple_items(task1.executions, 4, accepted=True)
|
||||||
|
@ -72,10 +72,12 @@ def _get_indices_if_rerun(unaccepted_executions):
|
|||||||
:return: a list of numbers.
|
:return: a list of numbers.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return [
|
return sorted(
|
||||||
ex.runtime_context['with_items_index']
|
set([
|
||||||
for ex in unaccepted_executions
|
ex.runtime_context['with_items_index']
|
||||||
]
|
for ex in unaccepted_executions
|
||||||
|
])
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _get_unaccepted_act_exs(task_ex):
|
def _get_unaccepted_act_exs(task_ex):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user