Fix race condition in refreshing "join" task state

* Previously we used periodic jobs to refresh state of "join" tasks
  and there was a guarantee that only one such job could run at a
  time, so there wasn't a need in using locking. Now we allow more
  than one such jobs run in parallel processes (and threads) so
  we have to lock task execution and then check task state again
  and update it, if needed.

Change-Id: Icaad486d9c3f830db0314dedb44664940cca0014
Closes-Bug: #1803483
This commit is contained in:
Renat Akhmerov 2018-11-15 11:32:50 +07:00
parent 0d85973f52
commit 05ce6f893d

View File

@ -409,32 +409,43 @@ def _refresh_task_state(task_ex_id):
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
log_state = wf_ctrl.get_logical_task_state(task_ex)
with db_api.named_lock(task_ex.id):
# NOTE: we have to use this lock to prevent two (or more) such
# methods from changing task state and starting its action or
# workflow. Checking task state outside of this section is a
# performance optimization because locking is pretty expensive.
db_api.refresh(task_ex)
state = log_state.state
state_info = log_state.state_info
if (states.is_completed(task_ex.state)
or task_ex.state == states.RUNNING):
return
# Update 'triggered_by' because it could have changed.
task_ex.runtime_context['triggered_by'] = log_state.triggered_by
log_state = wf_ctrl.get_logical_task_state(task_ex)
if state == states.RUNNING:
continue_task(task_ex)
elif state == states.ERROR:
complete_task(task_ex, state, state_info)
elif state == states.WAITING:
LOG.info(
"Task execution is still in WAITING state"
" [task_ex_id=%s, task_name=%s]",
task_ex_id,
task_ex.name
)
else:
# Must never get here.
raise RuntimeError(
'Unexpected logical task state [task_ex_id=%s, '
'task_name=%s, state=%s]' %
(task_ex_id, task_ex.name, state)
)
state = log_state.state
state_info = log_state.state_info
# Update 'triggered_by' because it could have changed.
task_ex.runtime_context['triggered_by'] = log_state.triggered_by
if state == states.RUNNING:
continue_task(task_ex)
elif state == states.ERROR:
complete_task(task_ex, state, state_info)
elif state == states.WAITING:
LOG.info(
"Task execution is still in WAITING state"
" [task_ex_id=%s, task_name=%s]",
task_ex_id,
task_ex.name
)
else:
# Must never get here.
raise RuntimeError(
'Unexpected logical task state [task_ex_id=%s, '
'task_name=%s, state=%s]' %
(task_ex_id, task_ex.name, state)
)
def _schedule_refresh_task_state(task_ex_id, delay=0):