From 698248363c6efd15ed5de571956bae5aec29796c Mon Sep 17 00:00:00 2001
From: Renat Akhmerov <renat.akhmerov@gmail.com>
Date: Mon, 12 Sep 2016 16:18:03 +0300
Subject: [PATCH] Using count() instead of all() for getting incompleted tasks

* Workflow completion check logic used heavy select query which
  actually returned objects whereas all we need is to count them.
  This patch adds get_incompleted_task_executions_count() methods
  on DB API to address this.

Change-Id: I796ba175210f41ded7877f310efeb30bd5045531
---
 mistral/db/v2/api.py                          |  4 ++
 mistral/db/v2/sqlalchemy/api.py               | 14 +++++-
 mistral/engine/action_handler.py              |  1 +
 mistral/engine/workflows.py                   |  4 +-
 .../unit/db/v2/test_sqlalchemy_db_api.py      | 45 +++++++++++++++++++
 5 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py
index fe88d9afe..c94a2b944 100644
--- a/mistral/db/v2/api.py
+++ b/mistral/db/v2/api.py
@@ -312,6 +312,10 @@ def get_incomplete_task_executions(**kwargs):
     return IMPL.get_incomplete_task_executions(**kwargs)
 
 
+def get_incomplete_task_executions_count(**kwargs):
+    return IMPL.get_incomplete_task_executions_count(**kwargs)
+
+
 def create_task_execution(values):
     return IMPL.create_task_execution(values)
 
diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py
index 3d51e8500..6b00343ca 100644
--- a/mistral/db/v2/sqlalchemy/api.py
+++ b/mistral/db/v2/sqlalchemy/api.py
@@ -797,7 +797,7 @@ def get_task_executions(**kwargs):
     return _get_task_executions(**kwargs)
 
 
-def get_incomplete_task_executions(**kwargs):
+def _get_incomplete_task_executions_query(kwargs):
     query = b.model_query(models.TaskExecution)
 
     query = query.filter_by(**kwargs)
@@ -811,9 +811,21 @@ def get_incomplete_task_executions(**kwargs):
         )
     )
 
+    return query
+
+
+def get_incomplete_task_executions(**kwargs):
+    query = _get_incomplete_task_executions_query(kwargs)
+
     return query.all()
 
 
+def get_incomplete_task_executions_count(**kwargs):
+    query = _get_incomplete_task_executions_query(kwargs)
+
+    return query.count()
+
+
 @b.session_aware()
 def create_task_execution(values, session=None):
     task_ex = models.TaskExecution()
diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py
index e18a9b898..775b5afb6 100644
--- a/mistral/engine/action_handler.py
+++ b/mistral/engine/action_handler.py
@@ -52,6 +52,7 @@ def on_action_complete(action_ex, result):
         task_handler.schedule_on_action_complete(action_ex)
 
 
+@profiler.trace('action-handler-build-action')
 def _build_action(action_ex):
     if isinstance(action_ex, models.WorkflowExecution):
         return actions.WorkflowAction(None, action_ex=action_ex)
diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py
index 3cde89651..48db27cee 100644
--- a/mistral/engine/workflows.py
+++ b/mistral/engine/workflows.py
@@ -280,11 +280,11 @@ class Workflow(object):
 
         # Workflow is not completed if there are any incomplete task
         # executions.
-        incomplete_tasks = db_api.get_incomplete_task_executions(
+        incomplete_tasks_count = db_api.get_incomplete_task_executions_count(
             workflow_execution_id=self.wf_ex.id,
         )
 
-        if incomplete_tasks:
+        if incomplete_tasks_count > 0:
             return
 
         wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
diff --git a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py
index a686568e2..c31161dfc 100644
--- a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py
+++ b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py
@@ -1401,6 +1401,51 @@ class TaskExecutionTest(SQLAlchemyTest):
             created.id
         )
 
+    def test_get_incomplete_task_executions(self):
+        wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
+
+        values = copy.deepcopy(TASK_EXECS[0])
+        values.update({'workflow_execution_id': wf_ex.id})
+        values['state'] = 'RUNNING'
+
+        task_ex1 = db_api.create_task_execution(values)
+
+        task_execs = db_api.get_incomplete_task_executions(
+            workflow_execution_id=wf_ex.id
+        )
+
+        self.assertEqual(1, len(task_execs))
+        self.assertEqual(task_ex1, task_execs[0])
+        self.assertEqual(
+            1,
+            db_api.get_incomplete_task_executions_count(
+                workflow_execution_id=wf_ex.id
+            )
+        )
+
+        # Add one more task.
+
+        values = copy.deepcopy(TASK_EXECS[1])
+        values.update({'workflow_execution_id': wf_ex.id})
+        values['state'] = 'SUCCESS'
+
+        db_api.create_task_execution(values)
+
+        # It should be still one incompleted task.
+
+        task_execs = db_api.get_incomplete_task_executions(
+            workflow_execution_id=wf_ex.id
+        )
+
+        self.assertEqual(1, len(task_execs))
+        self.assertEqual(task_ex1, task_execs[0])
+        self.assertEqual(
+            1,
+            db_api.get_incomplete_task_executions_count(
+                workflow_execution_id=wf_ex.id
+            )
+        )
+
     def test_task_execution_repr(self):
         wf_ex = db_api.create_workflow_execution(WF_EXECS[0])