Merge "Added session.flush() before update_on_match()"
This commit is contained in:
@@ -129,6 +129,7 @@ MISTRAL_TITLE = """
|
||||
|| \\/ || \\\ || || || \\\ ||
|
||||
|| || || \\\ || || || /\\\ ||
|
||||
|| || || __// ||_// || \\\__// \\\_ ||
|
||||
|
||||
Mistral Workflow Service, version %s
|
||||
""" % version.version_string()
|
||||
|
||||
|
@@ -143,6 +143,51 @@ def _lock_entity(model, id):
|
||||
return _secure_query(model).with_for_update().filter(model.id == id).one()
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def update_on_match(id, specimen, values, session=None):
|
||||
"""Updates a model with the given values if it matches the given specimen.
|
||||
|
||||
:param id: ID of a persistent model.
|
||||
:param specimen: Specimen used to match the
|
||||
:param values: Values to set to the model if fields of the object
|
||||
match the specimen.
|
||||
:param session: Session.
|
||||
:return: Persistent object attached to the session.
|
||||
"""
|
||||
|
||||
assert id is not None
|
||||
assert specimen is not None
|
||||
|
||||
# We need to flush the session because when we do update_on_match()
|
||||
# it doesn't always update the state of the persistent object properly
|
||||
# when it merges a specimen state into it. Some fields get wiped out from
|
||||
# the history of ORM events that must be flushed later. For example, it
|
||||
# doesn't work well in case of Postgres.
|
||||
# See https://bugs.launchpad.net/mistral/+bug/1736821
|
||||
session.flush()
|
||||
|
||||
model = None
|
||||
model_class = type(specimen)
|
||||
|
||||
# Use WHERE clause to exclude possible conflicts if the state has
|
||||
# already been changed.
|
||||
try:
|
||||
model = b.model_query(model_class).update_on_match(
|
||||
specimen=specimen,
|
||||
surrogate_key='id',
|
||||
values=values
|
||||
)
|
||||
except oslo_sqlalchemy.update_match.NoRowsMatched:
|
||||
LOG.info(
|
||||
"Can't change state of persistent object "
|
||||
"because it has already been changed. [model_class=%, id=%s, "
|
||||
"specimen=%s, values=%s]",
|
||||
model_class, id, specimen, values
|
||||
)
|
||||
|
||||
return model
|
||||
|
||||
|
||||
def _secure_query(model, *columns):
|
||||
query = b.model_query(model, columns)
|
||||
|
||||
@@ -831,32 +876,10 @@ def delete_workflow_executions(session=None, **kwargs):
|
||||
return _delete_all(models.WorkflowExecution, **kwargs)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def update_workflow_execution_state(id, cur_state, state, session=None):
|
||||
wf_ex = None
|
||||
def update_workflow_execution_state(id, cur_state, state):
|
||||
specimen = models.WorkflowExecution(id=id, state=cur_state)
|
||||
|
||||
# Use WHERE clause to exclude possible conflicts if the state has
|
||||
# already been changed.
|
||||
try:
|
||||
specimen = models.WorkflowExecution(
|
||||
id=id,
|
||||
state=cur_state
|
||||
)
|
||||
|
||||
wf_ex = b.model_query(
|
||||
models.WorkflowExecution).update_on_match(
|
||||
specimen=specimen,
|
||||
surrogate_key='id',
|
||||
values={'state': state}
|
||||
)
|
||||
except oslo_sqlalchemy.update_match.NoRowsMatched:
|
||||
LOG.info(
|
||||
"Can't change workflow execution state from %s to %s, "
|
||||
"because it has already been changed. [execution_id=%s]",
|
||||
cur_state, state, id
|
||||
)
|
||||
|
||||
return wf_ex
|
||||
return update_on_match(id, specimen, {'state': state})
|
||||
|
||||
|
||||
# Tasks executions.
|
||||
@@ -987,32 +1010,10 @@ def delete_task_executions(session=None, **kwargs):
|
||||
return _delete_all(models.TaskExecution, **kwargs)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def update_task_execution_state(id, cur_state, state, session=None):
|
||||
wf_ex = None
|
||||
def update_task_execution_state(id, cur_state, state):
|
||||
specimen = models.TaskExecution(id=id, state=cur_state)
|
||||
|
||||
# Use WHERE clause to exclude possible conflicts if the state has
|
||||
# already been changed.
|
||||
try:
|
||||
specimen = models.TaskExecution(
|
||||
id=id,
|
||||
state=cur_state
|
||||
)
|
||||
|
||||
wf_ex = b.model_query(
|
||||
models.TaskExecution).update_on_match(
|
||||
specimen=specimen,
|
||||
surrogate_key='id',
|
||||
values={'state': state}
|
||||
)
|
||||
except oslo_sqlalchemy.update_match.NoRowsMatched:
|
||||
LOG.info(
|
||||
"Can't change task execution state from %s to %s, "
|
||||
"because it has already been changed. [execution_id=%s]",
|
||||
cur_state, state, id
|
||||
)
|
||||
|
||||
return wf_ex
|
||||
return update_on_match(id, specimen, {'state': state})
|
||||
|
||||
|
||||
# Delayed calls.
|
||||
|
@@ -285,9 +285,7 @@ def add_openstack_data_to_context(wf_ex):
|
||||
def add_execution_to_context(wf_ex):
|
||||
wf_ex.context = wf_ex.context or {}
|
||||
|
||||
wf_ex.context['__execution'] = {
|
||||
'id': wf_ex.id
|
||||
}
|
||||
wf_ex.context['__execution'] = {'id': wf_ex.id}
|
||||
|
||||
|
||||
def add_environment_to_context(wf_ex):
|
||||
|
Reference in New Issue
Block a user