diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 19e6dc5c6..c9e1bce1e 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -1139,3 +1139,53 @@ class DirectWorkflowEngineTest(base.EngineTestCase): wf_ex = self.engine.start_workflow('wf') self.await_workflow_success(wf_ex.id) + + def test_unexisting_join_task_does_not_stuck_wf_running(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + branch1: + action: std.noop + on-success: branch1-23_merge + branch2: + action: std.async_noop + on-success: branch2-3_merge + branch3: + action: std.fail + on-success: branch2-3_merge + branch2-3_merge: + action: std.noop + on-success: branch1-23_merge + join: all + branch1-23_merge: + action: std.noop + join: all + """ + + wf_service.create_workflows(wf_text) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf') + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + t_ex = self._assert_single_item( + task_execs, + name='branch2' + ) + + t_action_exs = db_api.get_action_executions( + task_execution_id=t_ex.id + ) + + self.engine.on_action_complete( + t_action_exs[0].id, + ml_actions.Result(error="Error!") + ) + + self.await_workflow_error(wf_ex.id) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index 42f8a861a..9e40baa25 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -1,4 +1,5 @@ # Copyright 2015 - Mirantis, Inc. +# Copyright 2020 - NetCracker Technology Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -235,9 +236,8 @@ class DirectWorkflowController(base.WorkflowController): if not self.wf_spec.get_tasks()[t_name]: continue - if t_name in all_joins: - if t_name in t_execs_cache: - res.add(t_execs_cache[t_name]) + if t_name in all_joins and t_name in t_execs_cache: + res.add(t_execs_cache[t_name]) continue clauses.update(self.wf_spec.find_outbound_task_names(t_name))