Fix incorrect in-depth search of affected tasks

In case if the join task is not exist (every upstream task have
ERROR state) workflow will stuck in RUNNING state because of
the bug in affected tasks search. This patch fixes this bug.

Change-Id: If9f0c9bea587b486998af1c18e282bedba453499
Closes-Bug: #1862161
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2020-02-06 14:43:00 +03:00
parent 1217e4ca61
commit de633d5d48
2 changed files with 53 additions and 3 deletions

View File

@ -1139,3 +1139,53 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
def test_unexisting_join_task_does_not_stuck_wf_running(self):
wf_text = """---
version: '2.0'
wf:
tasks:
branch1:
action: std.noop
on-success: branch1-23_merge
branch2:
action: std.async_noop
on-success: branch2-3_merge
branch3:
action: std.fail
on-success: branch2-3_merge
branch2-3_merge:
action: std.noop
on-success: branch1-23_merge
join: all
branch1-23_merge:
action: std.noop
join: all
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
t_ex = self._assert_single_item(
task_execs,
name='branch2'
)
t_action_exs = db_api.get_action_executions(
task_execution_id=t_ex.id
)
self.engine.on_action_complete(
t_action_exs[0].id,
ml_actions.Result(error="Error!")
)
self.await_workflow_error(wf_ex.id)

View File

@ -1,4 +1,5 @@
# Copyright 2015 - Mirantis, Inc.
# Copyright 2020 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -235,9 +236,8 @@ class DirectWorkflowController(base.WorkflowController):
if not self.wf_spec.get_tasks()[t_name]:
continue
if t_name in all_joins:
if t_name in t_execs_cache:
res.add(t_execs_cache[t_name])
if t_name in all_joins and t_name in t_execs_cache:
res.add(t_execs_cache[t_name])
continue
clauses.update(self.wf_spec.find_outbound_task_names(t_name))