diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index fe88d9afe..c94a2b944 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -312,6 +312,10 @@ def get_incomplete_task_executions(**kwargs): return IMPL.get_incomplete_task_executions(**kwargs) +def get_incomplete_task_executions_count(**kwargs): + return IMPL.get_incomplete_task_executions_count(**kwargs) + + def create_task_execution(values): return IMPL.create_task_execution(values) diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 3d51e8500..6b00343ca 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -797,7 +797,7 @@ def get_task_executions(**kwargs): return _get_task_executions(**kwargs) -def get_incomplete_task_executions(**kwargs): +def _get_incomplete_task_executions_query(kwargs): query = b.model_query(models.TaskExecution) query = query.filter_by(**kwargs) @@ -811,9 +811,21 @@ def get_incomplete_task_executions(**kwargs): ) ) + return query + + +def get_incomplete_task_executions(**kwargs): + query = _get_incomplete_task_executions_query(kwargs) + return query.all() +def get_incomplete_task_executions_count(**kwargs): + query = _get_incomplete_task_executions_query(kwargs) + + return query.count() + + @b.session_aware() def create_task_execution(values, session=None): task_ex = models.TaskExecution() diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index e18a9b898..775b5afb6 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -52,6 +52,7 @@ def on_action_complete(action_ex, result): task_handler.schedule_on_action_complete(action_ex) +@profiler.trace('action-handler-build-action') def _build_action(action_ex): if isinstance(action_ex, models.WorkflowExecution): return actions.WorkflowAction(None, action_ex=action_ex) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 3cde89651..48db27cee 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -280,11 +280,11 @@ class Workflow(object): # Workflow is not completed if there are any incomplete task # executions. - incomplete_tasks = db_api.get_incomplete_task_executions( + incomplete_tasks_count = db_api.get_incomplete_task_executions_count( workflow_execution_id=self.wf_ex.id, ) - if incomplete_tasks: + if incomplete_tasks_count > 0: return wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) diff --git a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py index a686568e2..c31161dfc 100644 --- a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py @@ -1401,6 +1401,51 @@ class TaskExecutionTest(SQLAlchemyTest): created.id ) + def test_get_incomplete_task_executions(self): + wf_ex = db_api.create_workflow_execution(WF_EXECS[0]) + + values = copy.deepcopy(TASK_EXECS[0]) + values.update({'workflow_execution_id': wf_ex.id}) + values['state'] = 'RUNNING' + + task_ex1 = db_api.create_task_execution(values) + + task_execs = db_api.get_incomplete_task_executions( + workflow_execution_id=wf_ex.id + ) + + self.assertEqual(1, len(task_execs)) + self.assertEqual(task_ex1, task_execs[0]) + self.assertEqual( + 1, + db_api.get_incomplete_task_executions_count( + workflow_execution_id=wf_ex.id + ) + ) + + # Add one more task. + + values = copy.deepcopy(TASK_EXECS[1]) + values.update({'workflow_execution_id': wf_ex.id}) + values['state'] = 'SUCCESS' + + db_api.create_task_execution(values) + + # It should be still one incompleted task. + + task_execs = db_api.get_incomplete_task_executions( + workflow_execution_id=wf_ex.id + ) + + self.assertEqual(1, len(task_execs)) + self.assertEqual(task_ex1, task_execs[0]) + self.assertEqual( + 1, + db_api.get_incomplete_task_executions_count( + workflow_execution_id=wf_ex.id + ) + ) + def test_task_execution_repr(self): wf_ex = db_api.create_workflow_execution(WF_EXECS[0])