Merge "Performance: remove unnecessary workflow execution update"
This commit is contained in:
commit
78dcb2d419
mistral
releasenotes/notes
@ -1266,6 +1266,7 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
|
||||
@b.session_aware()
|
||||
def get_running_expired_sync_actions(expiration_time, session=None):
|
||||
query = b.model_query(models.ActionExecution)
|
||||
|
||||
query = query.filter(
|
||||
models.ActionExecution.last_heartbeat < expiration_time
|
||||
)
|
||||
|
@ -98,11 +98,20 @@ def _save_command_to_backlog(wf_ex, cmd):
|
||||
|
||||
|
||||
def _poll_commands_from_backlog(wf_ex):
|
||||
backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY, [])
|
||||
|
||||
if not backlog_cmds:
|
||||
# NOTE: We need to always use a guard condition that checks
|
||||
# if a persistent structure is empty and, as in this case,
|
||||
# return immediately w/o doing any further manipulations.
|
||||
# Otherwise, if we do pop() operation with a default value
|
||||
# then the ORM framework will consider it a modification of
|
||||
# the persistent object and generate a corresponding SQL
|
||||
# UPDATE operation. In this particular case it will increase
|
||||
# contention for workflow executions table drastically and
|
||||
# decrease performance.
|
||||
if not wf_ex.runtime_context.get(BACKLOG_KEY):
|
||||
return []
|
||||
|
||||
backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY)
|
||||
|
||||
return [
|
||||
commands.restore_command_from_dict(wf_ex, cmd_dict)
|
||||
for cmd_dict in backlog_cmds
|
||||
|
@ -34,25 +34,32 @@ SCHEDULER_KEY = 'handle_expired_actions_key'
|
||||
def handle_expired_actions():
|
||||
LOG.debug("Running heartbeat checker...")
|
||||
|
||||
try:
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
exp_date = utils.utc_now_sec() - datetime.timedelta(
|
||||
seconds=max_missed * interval
|
||||
)
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
|
||||
exp_date = utils.utc_now_sec() - datetime.timedelta(
|
||||
seconds=max_missed * interval
|
||||
)
|
||||
|
||||
try:
|
||||
with db_api.transaction():
|
||||
action_exs = db_api.get_running_expired_sync_actions(exp_date)
|
||||
|
||||
LOG.debug("Found {} running and expired actions.".format(
|
||||
len(action_exs))
|
||||
)
|
||||
|
||||
if action_exs:
|
||||
LOG.info("Actions executions to transit to error, because "
|
||||
"heartbeat wasn't received: {}".format(action_exs))
|
||||
LOG.info(
|
||||
"Actions executions to transit to error, because "
|
||||
"heartbeat wasn't received: {}".format(action_exs)
|
||||
)
|
||||
|
||||
for action_ex in action_exs:
|
||||
result = mistral_lib.Result(
|
||||
error="Heartbeat wasn't received."
|
||||
)
|
||||
|
||||
action_handler.on_action_complete(action_ex, result)
|
||||
finally:
|
||||
schedule(interval)
|
||||
@ -62,14 +69,19 @@ def setup():
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
enabled = interval and max_missed
|
||||
|
||||
if not enabled:
|
||||
LOG.info("Action heartbeat reporting disabled.")
|
||||
|
||||
return
|
||||
|
||||
wait_time = interval * max_missed
|
||||
LOG.debug("First run of action execution checker, wait before "
|
||||
"checking to make sure executors have time to send "
|
||||
"heartbeats. ({} seconds)".format(wait_time))
|
||||
|
||||
LOG.debug(
|
||||
"First run of action execution checker, wait before "
|
||||
"checking to make sure executors have time to send "
|
||||
"heartbeats. ({} seconds)".format(wait_time)
|
||||
)
|
||||
|
||||
schedule(wait_time)
|
||||
|
||||
|
10
releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml
Normal file
10
releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Eliminated an unnecessary update of the workflow execution object
|
||||
when processing "on_action_complete" operation. W/o this fix all
|
||||
such transactions would have to compete for the workflow executions
|
||||
table that causes lots of DB deadlocks (on MySQL) and transaction
|
||||
retries. In some cases the number of retries even exceeds the limit
|
||||
(currently hardcoded 50) and such tasks can be fixed only with the
|
||||
integrity checker over time.
|
Loading…
x
Reference in New Issue
Block a user