diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 47c87bc2e..d954f6a2a 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -438,32 +438,32 @@ class ConcurrencyPolicy(base.TaskPolicy): def _continue_task(task_ex_id): from mistral.engine import task_handler - # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. - task_handler.continue_task(db_api.get_task_execution(task_ex_id)) + with db_api.transaction(): + task_handler.continue_task(db_api.get_task_execution(task_ex_id)) def _complete_task(task_ex_id, state, state_info): from mistral.engine import task_handler - # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. - task_handler.complete_task( - db_api.get_task_execution(task_ex_id), - state, - state_info - ) + with db_api.transaction(): + task_handler.complete_task( + db_api.get_task_execution(task_ex_id), + state, + state_info + ) def _fail_task_if_incomplete(task_ex_id, timeout): from mistral.engine import task_handler - # TODO(rakhmerov): It must be done in TX after Scheduler is fixed. - task_ex = db_api.get_task_execution(task_ex_id) + with db_api.transaction(): + task_ex = db_api.get_task_execution(task_ex_id) - if not states.is_completed(task_ex.state): - msg = 'Task timed out [timeout(s)=%s].' % timeout + if not states.is_completed(task_ex.state): + msg = 'Task timed out [timeout(s)=%s].' % timeout - task_handler.complete_task( - db_api.get_task_execution(task_ex_id), - states.ERROR, - msg - ) + task_handler.complete_task( + db_api.get_task_execution(task_ex_id), + states.ERROR, + msg + ) diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index cc00f3dc0..6fd2e922d 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -162,23 +162,19 @@ class CallScheduler(periodic_task.PeriodicTasks): ) for (target_auth_context, target_method, method_args) in delayed_calls: - # TODO(rakhmerov): https://bugs.launchpad.net/mistral/+bug/1484521 - # Transaction is needed here because some of the - # target_method can use the DB. - with db_api.transaction(): - try: - # Set the correct context for the method. - context.set_ctx( - context.MistralContext(target_auth_context) - ) + try: + # Set the correct context for the method. + context.set_ctx( + context.MistralContext(target_auth_context) + ) - # Call the method. - target_method(**method_args) - except Exception as e: - LOG.exception("Delayed call failed [exception=%s]: %s", e) - finally: - # Remove context. - context.set_ctx(None) + # Call the method. + target_method(**method_args) + except Exception as e: + LOG.exception("Delayed call failed [exception=%s]: %s", e) + finally: + # Remove context. + context.set_ctx(None) with db_api.transaction(): for call in calls_to_make: diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 45619f007..56bfadcdc 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -31,7 +31,7 @@ LOG = logging.getLogger(__name__) # Default delay and timeout in seconds for await_xxx() functions. DEFAULT_DELAY = 1 -DEFAULT_TIMEOUT = 20 +DEFAULT_TIMEOUT = 30 def launch_engine_server(transport, engine): @@ -114,7 +114,7 @@ class EngineTestCase(base.DbTestCase): eventlet.spawn(launch_executor_server, transport, self.executor), ] - self.addOnException(self.print_workflow_executions) + self.addOnException(self.print_executions) # Start scheduler. scheduler_thread_group = scheduler.setup() @@ -128,9 +128,10 @@ class EngineTestCase(base.DbTestCase): [thread.kill() for thread in self.threads] @staticmethod - def print_workflow_executions(exc_info): + def print_executions(exc_info): print("\nEngine test case exception occurred: %s" % exc_info[1]) print("Exception type: %s" % exc_info[0]) + print("\nPrinting workflow executions...") wf_execs = db_api.get_workflow_executions() @@ -167,6 +168,22 @@ class EngineTestCase(base.DbTestCase): a.output) ) + print("\nPrinting standalone action executions...") + + a_execs = db_api.get_action_executions(task_execution_id=None) + + for a in a_execs: + print( + "\t\t%s [id=%s, state=%s, state_info=%s, accepted=%s," + " output=%s]" % + (a.name, + a.id, + a.state, + a.state_info, + a.accepted, + a.output) + ) + # Various methods for abstract execution objects. def is_execution_in_state(self, ex_id, state):