Merge "Fix incorrect in-depth search of affected tasks"
This commit is contained in:
commit
dbfc0bea22
@ -1139,3 +1139,53 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
wf_ex = self.engine.start_workflow('wf')
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
def test_unexisting_join_task_does_not_stuck_wf_running(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
branch1:
|
||||
action: std.noop
|
||||
on-success: branch1-23_merge
|
||||
branch2:
|
||||
action: std.async_noop
|
||||
on-success: branch2-3_merge
|
||||
branch3:
|
||||
action: std.fail
|
||||
on-success: branch2-3_merge
|
||||
branch2-3_merge:
|
||||
action: std.noop
|
||||
on-success: branch1-23_merge
|
||||
join: all
|
||||
branch1-23_merge:
|
||||
action: std.noop
|
||||
join: all
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf')
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
t_ex = self._assert_single_item(
|
||||
task_execs,
|
||||
name='branch2'
|
||||
)
|
||||
|
||||
t_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=t_ex.id
|
||||
)
|
||||
|
||||
self.engine.on_action_complete(
|
||||
t_action_exs[0].id,
|
||||
ml_actions.Result(error="Error!")
|
||||
)
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
# Copyright 2020 - NetCracker Technology Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -235,9 +236,8 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
if not self.wf_spec.get_tasks()[t_name]:
|
||||
continue
|
||||
|
||||
if t_name in all_joins:
|
||||
if t_name in t_execs_cache:
|
||||
res.add(t_execs_cache[t_name])
|
||||
if t_name in all_joins and t_name in t_execs_cache:
|
||||
res.add(t_execs_cache[t_name])
|
||||
continue
|
||||
|
||||
clauses.update(self.wf_spec.find_outbound_task_names(t_name))
|
||||
|
Loading…
Reference in New Issue
Block a user