diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 671d5f51f..cbde52486 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -1266,6 +1266,7 @@ def get_expired_executions(expiration_time, limit=None, columns=(), @b.session_aware() def get_running_expired_sync_actions(expiration_time, session=None): query = b.model_query(models.ActionExecution) + query = query.filter( models.ActionExecution.last_heartbeat < expiration_time ) diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py index 4099f9db0..c4d18e031 100644 --- a/mistral/engine/dispatcher.py +++ b/mistral/engine/dispatcher.py @@ -98,11 +98,20 @@ def _save_command_to_backlog(wf_ex, cmd): def _poll_commands_from_backlog(wf_ex): - backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY, []) - - if not backlog_cmds: + # NOTE: We need to always use a guard condition that checks + # if a persistent structure is empty and, as in this case, + # return immediately w/o doing any further manipulations. + # Otherwise, if we do pop() operation with a default value + # then the ORM framework will consider it a modification of + # the persistent object and generate a corresponding SQL + # UPDATE operation. In this particular case it will increase + # contention for workflow executions table drastically and + # decrease performance. + if not wf_ex.runtime_context.get(BACKLOG_KEY): return [] + backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY) + return [ commands.restore_command_from_dict(wf_ex, cmd_dict) for cmd_dict in backlog_cmds diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py index 2b69299cd..3cf4fcbf8 100644 --- a/mistral/services/action_execution_checker.py +++ b/mistral/services/action_execution_checker.py @@ -34,25 +34,32 @@ SCHEDULER_KEY = 'handle_expired_actions_key' def handle_expired_actions(): LOG.debug("Running heartbeat checker...") - try: - interval = CONF.action_heartbeat.check_interval - max_missed = CONF.action_heartbeat.max_missed_heartbeats - exp_date = utils.utc_now_sec() - datetime.timedelta( - seconds=max_missed * interval - ) + interval = CONF.action_heartbeat.check_interval + max_missed = CONF.action_heartbeat.max_missed_heartbeats + exp_date = utils.utc_now_sec() - datetime.timedelta( + seconds=max_missed * interval + ) + + try: with db_api.transaction(): action_exs = db_api.get_running_expired_sync_actions(exp_date) + LOG.debug("Found {} running and expired actions.".format( len(action_exs)) ) + if action_exs: - LOG.info("Actions executions to transit to error, because " - "heartbeat wasn't received: {}".format(action_exs)) + LOG.info( + "Actions executions to transit to error, because " + "heartbeat wasn't received: {}".format(action_exs) + ) + for action_ex in action_exs: result = mistral_lib.Result( error="Heartbeat wasn't received." ) + action_handler.on_action_complete(action_ex, result) finally: schedule(interval) @@ -62,14 +69,19 @@ def setup(): interval = CONF.action_heartbeat.check_interval max_missed = CONF.action_heartbeat.max_missed_heartbeats enabled = interval and max_missed + if not enabled: LOG.info("Action heartbeat reporting disabled.") + return wait_time = interval * max_missed - LOG.debug("First run of action execution checker, wait before " - "checking to make sure executors have time to send " - "heartbeats. ({} seconds)".format(wait_time)) + + LOG.debug( + "First run of action execution checker, wait before " + "checking to make sure executors have time to send " + "heartbeats. ({} seconds)".format(wait_time) + ) schedule(wait_time) diff --git a/releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml b/releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml new file mode 100644 index 000000000..d51e49d0e --- /dev/null +++ b/releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml @@ -0,0 +1,10 @@ +--- +fixes: + - | + Eliminated an unnecessary update of the workflow execution object + when processing "on_action_complete" operation. W/o this fix all + such transactions would have to compete for the workflow executions + table that causes lots of DB deadlocks (on MySQL) and transaction + retries. In some cases the number of retries even exceeds the limit + (currently hardcoded 50) and such tasks can be fixed only with the + integrity checker over time.