From 80a1bed67b85543f4511b0eb5fb83464d4f9901e Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Fri, 26 Oct 2018 17:13:52 +0700 Subject: [PATCH] Simplify workflow and join completion logic * action_queue module is replaced with the more generic post_tx_queue module that allows to register operations that must run after the main DB transaction associated with processing a workflow event such as completing action. * Instead of calling workflow completion check from all places where task may possibly complete, Mistral now registers a post transactional operation that runs after the main DB transaction (to make sure at least one needed consistent DB read) right inside the task completion logic. It reduces clutter significantly. * Workflow completion check is now registered only if the just completed task may lead to workflow completion, i.e. if it's the last one in a workflow branch. * Join now checks delayed calls to reduce a number of join completion checks created with scheduler and also uses post transactional queue for that. Closes-Bug: #1801872 Change-Id: I90741d4121c48c42606dfa850cfe824557b095d0 --- mistral/db/v2/api.py | 4 + mistral/db/v2/sqlalchemy/api.py | 13 ++ mistral/engine/action_queue.py | 133 ------------------ mistral/engine/actions.py | 26 ++-- mistral/engine/default_engine.py | 56 ++------ mistral/engine/policies.py | 21 +-- mistral/engine/post_tx_queue.py | 131 +++++++++++++++++ mistral/engine/task_handler.py | 118 ++++++++-------- mistral/engine/tasks.py | 36 +++++ mistral/engine/workflow_handler.py | 18 ++- mistral/engine/workflows.py | 37 +++-- mistral/services/action_execution_checker.py | 4 +- .../tests/unit/engine/test_default_engine.py | 5 + mistral/workflow/base.py | 5 + mistral/workflow/direct_workflow.py | 7 + ...oin_completion_check-77a47c5d8953096d.yaml | 10 ++ 16 files changed, 338 insertions(+), 286 deletions(-) delete mode 100644 mistral/engine/action_queue.py create mode 100644 mistral/engine/post_tx_queue.py create mode 100644 releasenotes/notes/simplify_workflow_and_join_completion_check-77a47c5d8953096d.yaml diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 589ee7443..44fd64513 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -382,6 +382,10 @@ def get_delayed_calls(**kwargs): return IMPL.get_delayed_calls(**kwargs) +def get_delayed_calls_count(**kwargs): + return IMPL.get_delayed_calls_count(**kwargs) + + def delete_delayed_calls(**kwargs): return IMPL.delete_delayed_calls(**kwargs) diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 14b46531f..ee19d503d 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -284,6 +284,14 @@ def _get_collection(model, insecure=False, limit=None, marker=None, return query.all() +def _get_count(model, insecure=False, **filters): + query = b.model_query(model) if insecure else _secure_query(model) + + query = db_filters.apply_filters(query, model, **filters) + + return query.count() + + def _get_db_object_by_name(model, name, columns=()): query = _secure_query(model, *columns) @@ -1134,6 +1142,11 @@ def get_delayed_calls(session=None, **kwargs): return _get_collection(model=models.DelayedCall, **kwargs) +@b.session_aware() +def get_delayed_calls_count(session=None, **kwargs): + return _get_count(model=models.DelayedCall, **kwargs) + + @b.session_aware() def delete_delayed_calls(session=None, **kwargs): return _delete_all(models.DelayedCall, **kwargs) diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py deleted file mode 100644 index 96dcd6f3c..000000000 --- a/mistral/engine/action_queue.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright 2016 - Nokia Networks. -# Copyright 2016 - Brocade Communications Systems, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import eventlet -import functools - -from oslo_config import cfg - -from mistral import context -from mistral.executors import base as exe -from mistral.rpc import clients as rpc -from mistral import utils - - -_THREAD_LOCAL_NAME = "__action_queue_thread_local" - -# Action queue operations. -_RUN_ACTION = "run_action" -_ON_ACTION_COMPLETE = "on_action_complete" - - -def _prepare(): - utils.set_thread_local(_THREAD_LOCAL_NAME, list()) - - -def _clear(): - utils.set_thread_local(_THREAD_LOCAL_NAME, None) - - -def _get_queue(): - queue = utils.get_thread_local(_THREAD_LOCAL_NAME) - - if queue is None: - raise RuntimeError( - 'Action queue is not initialized for the current thread.' - ' Most likely some transactional method is not decorated' - ' with action_queue.process()' - ) - - return queue - - -def _process_queue(queue): - executor = exe.get_executor(cfg.CONF.executor.type) - - for operation, args in queue: - if operation == _RUN_ACTION: - action_ex, action_def, target, execution_context, timeout = args - - executor.run_action( - action_ex.id, - action_def.action_class, - action_def.attributes or {}, - action_ex.input, - action_ex.runtime_context.get('safe_rerun', False), - execution_context, - target=target, - timeout=timeout - ) - elif operation == _ON_ACTION_COMPLETE: - action_ex_id, result, wf_action = args - - rpc.get_engine_client().on_action_complete( - action_ex_id, - result, - wf_action - ) - - -def process(func): - """Decorator that processes (runs) all actions in the action queue. - - Various engine methods may cause new actions to be scheduled. All - such methods must be decorated with this decorator. It makes sure - to run all the actions in the queue and clean up the queue. - """ - @functools.wraps(func) - def decorate(*args, **kw): - _prepare() - - try: - res = func(*args, **kw) - - queue = _get_queue() - auth_ctx = context.ctx() if context.has_ctx() else None - - # NOTE(rakhmerov): Since we make RPC calls to the engine itself - # we need to process the action queue asynchronously in a new - # thread. Otherwise, if we have one engine process the engine - # may send a request to itself while already processing - # another one. In conjunction with blocking RPC it will lead - # to a deadlock (and RPC timeout). - def _within_new_thread(): - old_auth_ctx = context.ctx() if context.has_ctx() else None - - context.set_ctx(auth_ctx) - - try: - _process_queue(queue) - finally: - context.set_ctx(old_auth_ctx) - - eventlet.spawn(_within_new_thread) - finally: - _clear() - - return res - - return decorate - - -def schedule_run_action(action_ex, action_def, target, execution_context, - timeout): - args = (action_ex, action_def, target, execution_context, timeout) - _get_queue().append((_RUN_ACTION, args)) - - -def schedule_on_action_complete(action_ex_id, result, wf_action=False): - _get_queue().append( - (_ON_ACTION_COMPLETE, (action_ex_id, result, wf_action)) - ) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 8e5affea9..d7ba33dcf 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -21,7 +21,7 @@ from osprofiler import profiler import six from mistral.db.v2 import api as db_api -from mistral.engine import action_queue +from mistral.engine import post_tx_queue from mistral.engine import utils as engine_utils from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc @@ -254,13 +254,23 @@ class PythonAction(Action): execution_context = self._prepare_execution_context() - action_queue.schedule_run_action( - self.action_ex, - self.action_def, - target, - execution_context, - timeout=timeout - ) + # Register an asynchronous command to send the action to + # run on an executor outside of the main DB transaction. + def _run_action(): + executor = exe.get_executor(cfg.CONF.executor.type) + + executor.run_action( + self.action_ex.id, + self.action_def.action_class, + self.action_def.attributes or {}, + self.action_ex.input, + self.action_ex.runtime_context.get('safe_rerun', False), + execution_context, + target=target, + timeout=timeout + ) + + post_tx_queue.register_operation(_run_action) @profiler.trace('action-run', hide_args=True) def run(self, input_dict, target, index=0, desc='', save=True, diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 1bb29e109..cb6c9eede 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -23,8 +23,8 @@ from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler -from mistral.engine import action_queue from mistral.engine import base +from mistral.engine import post_tx_queue from mistral.engine import workflow_handler as wf_handler from mistral import exceptions from mistral import utils as u @@ -40,7 +40,7 @@ LOG = logging.getLogger(__name__) class DefaultEngine(base.Engine): @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run @profiler.trace('engine-start-workflow', hide_args=True) def start_workflow(self, wf_identifier, wf_namespace='', wf_ex_id=None, wf_input=None, description='', **params): @@ -79,7 +79,7 @@ class DefaultEngine(base.Engine): return wf_ex.get_clone() @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run def start_action(self, action_name, action_input, description=None, **params): with db_api.transaction(): @@ -134,7 +134,7 @@ class DefaultEngine(base.Engine): return db_api.create_action_execution(values) @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run @profiler.trace('engine-on-action-complete', hide_args=True) def on_action_complete(self, action_ex_id, result, wf_action=False, async_=False): @@ -146,26 +146,10 @@ class DefaultEngine(base.Engine): action_handler.on_action_complete(action_ex, result) - result = action_ex.get_clone() - - # Need to see if checking workflow completion makes sense. - wf_ex_id = None - - if (action_ex.task_execution_id - and states.is_completed(action_ex.task_execution.state)): - wf_ex_id = action_ex.task_execution.workflow_execution_id - - # Note: We must do this check in a new transaction to make sure - # that at least one of the parallel transactions will do a consistent - # read from the DB. - if wf_ex_id: - with db_api.transaction(): - wf_handler.check_and_complete(wf_ex_id) - - return result + return action_ex.get_clone() @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run @profiler.trace('engine-on-action-update', hide_args=True) def on_action_update(self, action_ex_id, state, wf_action=False, async_=False): @@ -177,25 +161,10 @@ class DefaultEngine(base.Engine): action_handler.on_action_update(action_ex, state) - result = action_ex.get_clone() - - wf_ex_id = None - - if (action_ex.task_execution_id - and states.is_completed(action_ex.task_execution.state)): - wf_ex_id = action_ex.task_execution.workflow_execution_id - - # Note: We must do this check in a new transaction to make sure - # that at least one of the parallel transactions will do a consistent - # read from the DB. - if wf_ex_id: - with db_api.transaction(): - wf_handler.check_and_complete(wf_ex_id) - - return result + return action_ex.get_clone() @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run def pause_workflow(self, wf_ex_id): with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex_id) @@ -205,7 +174,7 @@ class DefaultEngine(base.Engine): return wf_ex.get_clone() @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run def rerun_workflow(self, task_ex_id, reset=True, env=None): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) @@ -217,7 +186,7 @@ class DefaultEngine(base.Engine): return wf_ex.get_clone() @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run def resume_workflow(self, wf_ex_id, env=None): with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex_id) @@ -227,7 +196,7 @@ class DefaultEngine(base.Engine): return wf_ex.get_clone() @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run def stop_workflow(self, wf_ex_id, state, message=None): with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex_id) @@ -241,10 +210,11 @@ class DefaultEngine(base.Engine): raise NotImplementedError @db_utils.retry_on_db_error - @action_queue.process + @post_tx_queue.run def report_running_actions(self, action_ex_ids): with db_api.transaction(): now = u.utc_now_sec() + for exec_id in action_ex_ids: try: db_api.update_action_execution( diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index a0dd790d1..b3e5ea352 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -15,8 +15,8 @@ from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api -from mistral.engine import action_queue from mistral.engine import base +from mistral.engine import post_tx_queue from mistral.engine import workflow_handler as wf_handler from mistral import expressions from mistral.services import scheduler @@ -511,7 +511,7 @@ class ConcurrencyPolicy(base.TaskPolicy): @db_utils.retry_on_db_error -@action_queue.process +@post_tx_queue.run def _continue_task(task_ex_id): from mistral.engine import task_handler @@ -520,38 +520,25 @@ def _continue_task(task_ex_id): @db_utils.retry_on_db_error -@action_queue.process +@post_tx_queue.run def _complete_task(task_ex_id, state, state_info): from mistral.engine import task_handler with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) - wf_ex_id = task_ex.workflow_execution_id - task_handler.complete_task(task_ex, state, state_info) - with db_api.transaction(): - wf_handler.check_and_complete(wf_ex_id) - @db_utils.retry_on_db_error -@action_queue.process +@post_tx_queue.run def _fail_task_if_incomplete(task_ex_id, timeout): from mistral.engine import task_handler with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) - wf_ex_id = None - if not states.is_completed(task_ex.state): msg = 'Task timed out [timeout(s)=%s].' % timeout - wf_ex_id = task_ex.workflow_execution_id - task_handler.complete_task(task_ex, states.ERROR, msg) - - if wf_ex_id: - with db_api.transaction(): - wf_handler.check_and_complete(wf_ex_id) diff --git a/mistral/engine/post_tx_queue.py b/mistral/engine/post_tx_queue.py new file mode 100644 index 000000000..a1086a44a --- /dev/null +++ b/mistral/engine/post_tx_queue.py @@ -0,0 +1,131 @@ +# Copyright 2015 - Mirantis, Inc. +# Copyright 2016 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import functools +from oslo_log import log as logging + +from mistral import context +from mistral.db import utils as db_utils +from mistral.db.v2 import api as db_api +from mistral import utils + + +""" +This module contains a mini framework for scheduling operations while +performing transactional processing of a workflow event such as +completing a workflow action. The scheduled operations will run after +the main DB transaction, in a new transaction, if needed. +""" + +LOG = logging.getLogger(__name__) + + +_THREAD_LOCAL_NAME = "__operation_queue_thread_local" + + +def _prepare(): + # Register two queues: transactional and non transactional operations. + utils.set_thread_local(_THREAD_LOCAL_NAME, (list(), list())) + + +def _clear(): + utils.set_thread_local(_THREAD_LOCAL_NAME, None) + + +def register_operation(func, args=None, in_tx=False): + """Register an operation.""" + + _get_queues()[0 if in_tx else 1].append((func, args or [])) + + +def _get_queues(): + queues = utils.get_thread_local(_THREAD_LOCAL_NAME) + + if queues is None: + raise RuntimeError( + 'Operation queue is not initialized for the current thread.' + ' Most likely some engine method is not decorated with' + ' operation_queue.run()' + ) + + return queues + + +def run(func): + """Decorator that runs all operations registered in the operation queue. + + Various engine methods may register such operations. All such methods must + be decorated with this decorator. + """ + @functools.wraps(func) + def decorate(*args, **kw): + _prepare() + + try: + res = func(*args, **kw) + + queues = _get_queues() + + tx_queue = queues[0] + non_tx_queue = queues[1] + + if not tx_queue and not non_tx_queue: + return res + + auth_ctx = context.ctx() if context.has_ctx() else None + + def _within_new_thread(): + old_auth_ctx = context.ctx() if context.has_ctx() else None + + context.set_ctx(auth_ctx) + + try: + if tx_queue: + _process_tx_queue(tx_queue) + + if non_tx_queue: + _process_non_tx_queue(non_tx_queue) + finally: + context.set_ctx(old_auth_ctx) + + eventlet.spawn(_within_new_thread) + finally: + _clear() + + return res + + return decorate + + +@db_utils.retry_on_db_error +@run +def _process_tx_queue(queue): + with db_api.transaction(): + for func, args in queue: + try: + func(*args) + except Exception: + LOG.exception("Failed to run transactional engine operation.") + + raise + + +def _process_non_tx_queue(queue): + for func, args in queue: + try: + func(*args) + except Exception: + LOG.exception("Failed to run non-transactional engine operation.") diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 3b9e3da03..bf58c1c4f 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -22,7 +22,7 @@ import traceback as tb from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models -from mistral.engine import action_queue +from mistral.engine import post_tx_queue from mistral.engine import tasks from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc @@ -274,6 +274,7 @@ def complete_task(task_ex, state, state_info): _check_affected_tasks(task) +@profiler.trace('task-handler-check-affected-tasks', hide_args=True) def _check_affected_tasks(task): if not task.is_completed(): return @@ -295,8 +296,28 @@ def _check_affected_tasks(task): task_ex.name ) + def _schedule_if_needed(t_ex_id): + # NOTE(rakhmerov): we need to minimize the number of delayed calls + # that refresh state of "join" tasks. We'll check if corresponding + # calls are already scheduled. Note that we must ignore delayed calls + # that are currently being processed because of a possible race with + # the transaction that deletes delayed calls, i.e. the call may still + # exist in DB (the deleting transaction didn't commit yet) but it has + # already been processed and the task state hasn't changed. + cnt = db_api.get_delayed_calls_count( + key=_get_refresh_state_job_key(t_ex_id), + processing=False + ) + + if cnt == 0: + _schedule_refresh_task_state(t_ex_id) + for t_ex in affected_task_execs: - _schedule_refresh_task_state(t_ex) + post_tx_queue.register_operation( + _schedule_if_needed, + args=[t_ex.id], + in_tx=True + ) def _build_task_from_execution(wf_spec, task_ex): @@ -364,7 +385,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, @db_utils.retry_on_db_error -@action_queue.process +@post_tx_queue.run @profiler.trace('task-handler-refresh-task-state', hide_args=True) def _refresh_task_state(task_ex_id): with db_api.transaction(): @@ -373,6 +394,10 @@ def _refresh_task_state(task_ex_id): if not task_ex: return + if (states.is_completed(task_ex.state) + or task_ex.state == states.RUNNING): + return + wf_ex = task_ex.workflow_execution if states.is_completed(wf_ex.state): @@ -384,46 +409,35 @@ def _refresh_task_state(task_ex_id): wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) - with db_api.named_lock(task_ex.id): - db_api.refresh(task_ex) + log_state = wf_ctrl.get_logical_task_state(task_ex) - if (states.is_completed(task_ex.state) - or task_ex.state == states.RUNNING): - return + state = log_state.state + state_info = log_state.state_info - log_state = wf_ctrl.get_logical_task_state(task_ex) + # Update 'triggered_by' because it could have changed. + task_ex.runtime_context['triggered_by'] = log_state.triggered_by - state = log_state.state - state_info = log_state.state_info - - # Update 'triggered_by' because it could have changed. - task_ex.runtime_context['triggered_by'] = log_state.triggered_by - - if state == states.RUNNING: - continue_task(task_ex) - elif state == states.ERROR: - complete_task(task_ex, state, state_info) - elif state == states.WAITING: - LOG.info( - "Task execution is still in WAITING state" - " [task_ex_id=%s, task_name=%s]", - task_ex_id, - task_ex.name - ) - else: - # Must never get here. - raise RuntimeError( - 'Unexpected logical task state [task_ex_id=%s, ' - 'task_name=%s, state=%s]' % - (task_ex_id, task_ex.name, state) - ) - - if states.is_completed(task_ex.state): - with db_api.transaction(): - wf_handler.check_and_complete(wf_ex.id) + if state == states.RUNNING: + continue_task(task_ex) + elif state == states.ERROR: + complete_task(task_ex, state, state_info) + elif state == states.WAITING: + LOG.info( + "Task execution is still in WAITING state" + " [task_ex_id=%s, task_name=%s]", + task_ex_id, + task_ex.name + ) + else: + # Must never get here. + raise RuntimeError( + 'Unexpected logical task state [task_ex_id=%s, ' + 'task_name=%s, state=%s]' % + (task_ex_id, task_ex.name, state) + ) -def _schedule_refresh_task_state(task_ex, delay=0): +def _schedule_refresh_task_state(task_ex_id, delay=0): """Schedules task preconditions check. This method provides transactional decoupling of task preconditions @@ -436,17 +450,17 @@ def _schedule_refresh_task_state(task_ex, delay=0): we'll have in this case (time between transactions) whereas scheduler is a special component that is designed to be resistant to failures. - :param task_ex: Task execution. + :param task_ex_id: Task execution ID. :param delay: Delay. """ - key = _get_refresh_state_job_key(task_ex.id) + key = _get_refresh_state_job_key(task_ex_id) scheduler.schedule_call( None, _REFRESH_TASK_STATE_PATH, delay, key=key, - task_ex_id=task_ex.id + task_ex_id=task_ex_id ) @@ -455,7 +469,7 @@ def _get_refresh_state_job_key(task_ex_id): @db_utils.retry_on_db_error -@action_queue.process +@post_tx_queue.run def _scheduled_on_action_complete(action_ex_id, wf_action): with db_api.transaction(): if wf_action: @@ -465,15 +479,6 @@ def _scheduled_on_action_complete(action_ex_id, wf_action): _on_action_complete(action_ex) - wf_ex_id = None - - if states.is_completed(action_ex.task_execution.state): - wf_ex_id = action_ex.task_execution.workflow_execution_id - - if wf_ex_id: - with db_api.transaction(): - wf_handler.check_and_complete(wf_ex_id) - def schedule_on_action_complete(action_ex, delay=0): """Schedules task completion check. @@ -510,7 +515,7 @@ def schedule_on_action_complete(action_ex, delay=0): @db_utils.retry_on_db_error -@action_queue.process +@post_tx_queue.run def _scheduled_on_action_update(action_ex_id, wf_action): with db_api.transaction(): if wf_action: @@ -520,15 +525,6 @@ def _scheduled_on_action_update(action_ex_id, wf_action): _on_action_update(action_ex) - wf_ex_id = None - - if states.is_completed(action_ex.task_execution.state): - wf_ex_id = action_ex.task_execution.workflow_execution_id - - if wf_ex_id: - with db_api.transaction(): - wf_handler.check_and_complete(wf_ex_id) - def schedule_on_action_update(action_ex, delay=0): """Schedules task update check. diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 097b291f0..ffc864a7e 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -26,6 +26,8 @@ from mistral.db.v2 import api as db_api from mistral.engine import actions from mistral.engine import dispatcher from mistral.engine import policies +from mistral.engine import post_tx_queue +from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc from mistral import expressions as expr from mistral.notifiers import base as notif @@ -118,6 +120,23 @@ class Task(object): This method puts task to a waiting state. """ + + # NOTE(rakhmerov): using named locks may cause problems under load + # with MySQL that raises a lot of deadlocks in case of high + # parallelism so it makes sense to do a fast check if the object + # already exists in DB outside of the lock. + if not self.task_ex: + t_execs = db_api.get_task_executions( + workflow_execution_id=self.wf_ex.id, + unique_key=self.unique_key, + state=states.WAITING + ) + + self.task_ex = t_execs[0] if t_execs else None + + if self.task_ex: + return + with db_api.named_lock(self.unique_key): if not self.task_ex: t_execs = db_api.get_task_executions( @@ -249,11 +268,25 @@ class Task(object): # upon its completion. self.task_ex.processed = True + self.register_workflow_completion_check() + # Publish task event. self.notify(old_task_state, self.task_ex.state) dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + def register_workflow_completion_check(self): + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) + + # Register an asynchronous command to check workflow completion + # in a separate transaction if the task may potentially lead to + # workflow completion. + def _check(): + wf_handler.check_and_complete(self.wf_ex.id) + + if wf_ctrl.may_complete_workflow(self.task_ex): + post_tx_queue.register_operation(_check, in_tx=True) + @profiler.trace('task-update') def update(self, state, state_info=None): """Update task and set specified state. @@ -290,6 +323,9 @@ class Task(object): self.set_state(state, state_info) + if states.is_completed(self.task_ex.state): + self.register_workflow_completion_check() + # Publish event. self.notify(old_task_state, self.task_ex.state) diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 1176267fe..b9e14b011 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -20,6 +20,7 @@ from osprofiler import profiler import traceback as tb from mistral.db.v2 import api as db_api +from mistral.engine import post_tx_queue from mistral.engine import workflows from mistral import exceptions as exc from mistral.services import scheduler @@ -108,6 +109,7 @@ def check_and_complete(wf_ex_id): force_fail_workflow(wf.wf_ex, msg) +@post_tx_queue.run @profiler.trace('workflow-handler-check-and-fix-integrity') def _check_and_fix_integrity(wf_ex_id): check_after_seconds = CONF.engine.execution_integrity_check_delay @@ -125,15 +127,13 @@ def _check_and_fix_integrity(wf_ex_id): if states.is_completed(wf_ex.state): return - _schedule_check_and_fix_integrity(wf_ex, delay=60) + _schedule_check_and_fix_integrity(wf_ex, delay=120) running_task_execs = db_api.get_task_executions( workflow_execution_id=wf_ex.id, state=states.RUNNING ) - any_completed = False - for t_ex in running_task_execs: # The idea is that we take the latest known timestamp of the task # execution and consider it eligible for checking and fixing only @@ -182,13 +182,6 @@ def _check_and_fix_integrity(wf_ex_id): child_executions[-1] ) - if states.is_completed(t_ex.state): - any_completed = True - - if any_completed: - with db_api.transaction(): - check_and_complete(wf_ex_id) - def pause_workflow(wf_ex, msg=None): # Pause subworkflows first. @@ -272,6 +265,11 @@ def _schedule_check_and_fix_integrity(wf_ex, delay=0): :param wf_ex: Workflow execution. :param delay: Minimum amount of time before the check should be made. """ + + if CONF.engine.execution_integrity_check_delay < 0: + # Never check integrity if it's a negative value. + return + key = _get_integrity_check_key(wf_ex) scheduler.schedule_call( diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 6a64376ad..f0402741b 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -23,14 +23,15 @@ import six from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models -from mistral.engine import action_queue from mistral.engine import dispatcher +from mistral.engine import post_tx_queue from mistral.engine import utils as engine_utils from mistral import exceptions as exc from mistral import expressions as expr from mistral.lang import parser as spec_parser from mistral.notifiers import base as notif from mistral.notifiers import notification_events as events +from mistral.rpc import clients as rpc from mistral.services import triggers from mistral.services import workflows as wf_service from mistral import utils @@ -374,7 +375,7 @@ class Workflow(object): if wf_ex is None: # Do nothing because the state was updated previously. - return + return False self.wf_ex = wf_ex self.wf_ex.state_info = json.dumps(state_info) \ @@ -406,6 +407,8 @@ class Workflow(object): triggers.on_workflow_complete(self.wf_ex) + return True + @profiler.trace('workflow-check-and-complete') def check_and_complete(self): """Completes the workflow if it needs to be completed. @@ -457,14 +460,17 @@ class Workflow(object): return 0 def _succeed_workflow(self, final_context, msg=None): - self.wf_ex.output = data_flow.evaluate_workflow_output( + output = data_flow.evaluate_workflow_output( self.wf_ex, self.wf_spec.get_output(), final_context ) - # Set workflow execution to success until after output is evaluated. - self.set_state(states.SUCCESS, msg) + # Set workflow execution to success after output is evaluated. + if not self.set_state(states.SUCCESS, msg): + return + + self.wf_ex.output = output # Publish event. self.notify(events.WORKFLOW_SUCCEEDED) @@ -492,7 +498,8 @@ class Workflow(object): ) LOG.error(msg) - self.set_state(states.ERROR, state_info=msg) + if not self.set_state(states.ERROR, state_info=msg): + return # When we set an ERROR state we should safely set output value getting # w/o exceptions due to field size limitations. @@ -524,7 +531,8 @@ class Workflow(object): if states.is_completed(self.wf_ex.state): return - self.set_state(states.CANCELLED, state_info=msg) + if not self.set_state(states.CANCELLED, state_info=msg): + return # When we set an ERROR state we should safely set output value getting # w/o exceptions due to field size limitations. @@ -564,11 +572,16 @@ class Workflow(object): " if a workflow is not in SUCCESS, ERROR or CANCELLED state." ) - action_queue.schedule_on_action_complete( - self.wf_ex.id, - result, - wf_action=True - ) + # Register a command executed in a separate thread to send the result + # to the parent workflow outside of the main DB transaction. + def _send_result(): + rpc.get_engine_client().on_action_complete( + self.wf_ex.id, + result, + wf_action=True + ) + + post_tx_queue.register_operation(_send_result) def _get_environment(params): diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py index 3cf4fcbf8..c888214ab 100644 --- a/mistral/services/action_execution_checker.py +++ b/mistral/services/action_execution_checker.py @@ -17,7 +17,7 @@ import datetime from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api from mistral.engine import action_handler -from mistral.engine import action_queue +from mistral.engine import post_tx_queue from mistral.services import scheduler from mistral import utils from mistral_lib import actions as mistral_lib @@ -30,7 +30,7 @@ SCHEDULER_KEY = 'handle_expired_actions_key' @db_utils.retry_on_db_error -@action_queue.process +@post_tx_queue.run def handle_expired_actions(): LOG.debug("Running heartbeat checker...") diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 9004410fc..59d5cdda6 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -532,6 +532,11 @@ class DefaultEngineTest(base.DbTestCase): ml_actions.Result(data='Hi') ) + self._await( + lambda: + db_api.get_workflow_execution(wf_ex.id).state == states.SUCCESS + ) + with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex.id) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index ebc576e78..c8831039e 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -229,6 +229,11 @@ class WorkflowController(object): """ raise NotImplementedError + def may_complete_workflow(self, task_ex): + """Determines if the task execution may lead to workflow completion.""" + + return states.is_completed(task_ex.state) + @abc.abstractmethod def _find_next_commands(self, task_ex): """Finds commands that should run next. diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index ca1aae2fe..756382435 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -236,6 +236,13 @@ class DirectWorkflowController(base.WorkflowController): filter(is_end_task, batch) ) + def may_complete_workflow(self, task_ex): + res = super(DirectWorkflowController, self).may_complete_workflow( + task_ex + ) + + return res and not self._has_outbound_tasks(task_ex) + def _has_outbound_tasks(self, task_ex): # In order to determine if there are outbound tasks we just need # to calculate next task names (based on task outbound context) diff --git a/releasenotes/notes/simplify_workflow_and_join_completion_check-77a47c5d8953096d.yaml b/releasenotes/notes/simplify_workflow_and_join_completion_check-77a47c5d8953096d.yaml new file mode 100644 index 000000000..0357ee799 --- /dev/null +++ b/releasenotes/notes/simplify_workflow_and_join_completion_check-77a47c5d8953096d.yaml @@ -0,0 +1,10 @@ +--- +fixes: + - | + Workflow and join completion check logic is now simplified with using + post transactional queue of operations which is a more generic version of + action_queue module previously serving for scheduling action runs outside + of the main DB transaction. Workflow completion check is now registered + only once when a task completes which reduces clutter and it's registered + only if the task may potentially lead to workflow completion. +