Merge "Fix race condition in refreshing "join" task state"

This commit is contained in:
Zuul 2018-11-15 20:30:33 +00:00 committed by Gerrit Code Review
commit a0c8da92dd

View File

@ -409,32 +409,43 @@ def _refresh_task_state(task_ex_id):
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
log_state = wf_ctrl.get_logical_task_state(task_ex) with db_api.named_lock(task_ex.id):
# NOTE: we have to use this lock to prevent two (or more) such
# methods from changing task state and starting its action or
# workflow. Checking task state outside of this section is a
# performance optimization because locking is pretty expensive.
db_api.refresh(task_ex)
state = log_state.state if (states.is_completed(task_ex.state)
state_info = log_state.state_info or task_ex.state == states.RUNNING):
return
# Update 'triggered_by' because it could have changed. log_state = wf_ctrl.get_logical_task_state(task_ex)
task_ex.runtime_context['triggered_by'] = log_state.triggered_by
if state == states.RUNNING: state = log_state.state
continue_task(task_ex) state_info = log_state.state_info
elif state == states.ERROR:
complete_task(task_ex, state, state_info) # Update 'triggered_by' because it could have changed.
elif state == states.WAITING: task_ex.runtime_context['triggered_by'] = log_state.triggered_by
LOG.info(
"Task execution is still in WAITING state" if state == states.RUNNING:
" [task_ex_id=%s, task_name=%s]", continue_task(task_ex)
task_ex_id, elif state == states.ERROR:
task_ex.name complete_task(task_ex, state, state_info)
) elif state == states.WAITING:
else: LOG.info(
# Must never get here. "Task execution is still in WAITING state"
raise RuntimeError( " [task_ex_id=%s, task_name=%s]",
'Unexpected logical task state [task_ex_id=%s, ' task_ex_id,
'task_name=%s, state=%s]' % task_ex.name
(task_ex_id, task_ex.name, state) )
) else:
# Must never get here.
raise RuntimeError(
'Unexpected logical task state [task_ex_id=%s, '
'task_name=%s, state=%s]' %
(task_ex_id, task_ex.name, state)
)
def _schedule_refresh_task_state(task_ex_id, delay=0): def _schedule_refresh_task_state(task_ex_id, delay=0):