Remove unnecessary database transaction from Scheduler

Change-Id: I08f0fcd67e0cd0e40e76ed6cfc7bb214096a2c16
Closes-Bug: #1484521
This commit is contained in:
Renat Akhmerov 2016-05-27 13:37:02 +07:00
parent 5e042c7403
commit 3641b46d15
3 changed files with 49 additions and 36 deletions

View File

@ -438,32 +438,32 @@ class ConcurrencyPolicy(base.TaskPolicy):
def _continue_task(task_ex_id): def _continue_task(task_ex_id):
from mistral.engine import task_handler from mistral.engine import task_handler
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed. with db_api.transaction():
task_handler.continue_task(db_api.get_task_execution(task_ex_id)) task_handler.continue_task(db_api.get_task_execution(task_ex_id))
def _complete_task(task_ex_id, state, state_info): def _complete_task(task_ex_id, state, state_info):
from mistral.engine import task_handler from mistral.engine import task_handler
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed. with db_api.transaction():
task_handler.complete_task( task_handler.complete_task(
db_api.get_task_execution(task_ex_id), db_api.get_task_execution(task_ex_id),
state, state,
state_info state_info
) )
def _fail_task_if_incomplete(task_ex_id, timeout): def _fail_task_if_incomplete(task_ex_id, timeout):
from mistral.engine import task_handler from mistral.engine import task_handler
# TODO(rakhmerov): It must be done in TX after Scheduler is fixed. with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id) task_ex = db_api.get_task_execution(task_ex_id)
if not states.is_completed(task_ex.state): if not states.is_completed(task_ex.state):
msg = 'Task timed out [timeout(s)=%s].' % timeout msg = 'Task timed out [timeout(s)=%s].' % timeout
task_handler.complete_task( task_handler.complete_task(
db_api.get_task_execution(task_ex_id), db_api.get_task_execution(task_ex_id),
states.ERROR, states.ERROR,
msg msg
) )

View File

@ -162,23 +162,19 @@ class CallScheduler(periodic_task.PeriodicTasks):
) )
for (target_auth_context, target_method, method_args) in delayed_calls: for (target_auth_context, target_method, method_args) in delayed_calls:
# TODO(rakhmerov): https://bugs.launchpad.net/mistral/+bug/1484521 try:
# Transaction is needed here because some of the # Set the correct context for the method.
# target_method can use the DB. context.set_ctx(
with db_api.transaction(): context.MistralContext(target_auth_context)
try: )
# Set the correct context for the method.
context.set_ctx(
context.MistralContext(target_auth_context)
)
# Call the method. # Call the method.
target_method(**method_args) target_method(**method_args)
except Exception as e: except Exception as e:
LOG.exception("Delayed call failed [exception=%s]: %s", e) LOG.exception("Delayed call failed [exception=%s]: %s", e)
finally: finally:
# Remove context. # Remove context.
context.set_ctx(None) context.set_ctx(None)
with db_api.transaction(): with db_api.transaction():
for call in calls_to_make: for call in calls_to_make:

View File

@ -31,7 +31,7 @@ LOG = logging.getLogger(__name__)
# Default delay and timeout in seconds for await_xxx() functions. # Default delay and timeout in seconds for await_xxx() functions.
DEFAULT_DELAY = 1 DEFAULT_DELAY = 1
DEFAULT_TIMEOUT = 20 DEFAULT_TIMEOUT = 30
def launch_engine_server(transport, engine): def launch_engine_server(transport, engine):
@ -114,7 +114,7 @@ class EngineTestCase(base.DbTestCase):
eventlet.spawn(launch_executor_server, transport, self.executor), eventlet.spawn(launch_executor_server, transport, self.executor),
] ]
self.addOnException(self.print_workflow_executions) self.addOnException(self.print_executions)
# Start scheduler. # Start scheduler.
scheduler_thread_group = scheduler.setup() scheduler_thread_group = scheduler.setup()
@ -128,9 +128,10 @@ class EngineTestCase(base.DbTestCase):
[thread.kill() for thread in self.threads] [thread.kill() for thread in self.threads]
@staticmethod @staticmethod
def print_workflow_executions(exc_info): def print_executions(exc_info):
print("\nEngine test case exception occurred: %s" % exc_info[1]) print("\nEngine test case exception occurred: %s" % exc_info[1])
print("Exception type: %s" % exc_info[0]) print("Exception type: %s" % exc_info[0])
print("\nPrinting workflow executions...") print("\nPrinting workflow executions...")
wf_execs = db_api.get_workflow_executions() wf_execs = db_api.get_workflow_executions()
@ -167,6 +168,22 @@ class EngineTestCase(base.DbTestCase):
a.output) a.output)
) )
print("\nPrinting standalone action executions...")
a_execs = db_api.get_action_executions(task_execution_id=None)
for a in a_execs:
print(
"\t\t%s [id=%s, state=%s, state_info=%s, accepted=%s,"
" output=%s]" %
(a.name,
a.id,
a.state,
a.state_info,
a.accepted,
a.output)
)
# Various methods for abstract execution objects. # Various methods for abstract execution objects.
def is_execution_in_state(self, ex_id, state): def is_execution_in_state(self, ex_id, state):