Fix how Mistral calculates workflow output
* Workflow output sometimes is not calculated correctly due to the race condition between different transactions: the one that checks workflow completion (i.e. calls "check_and_complete") and the one that processes action execution completion (i.e. calls "on_action_complete"). Calculating output sometimes was based on stale data cached by the SQLAlchemy session. To fix this, we just need to expire all objects in the session so that they are refreshed automatically if we read their state in order to make required calculations. See the bug description for more details on how the problem was observed. * Added another test for direct workflow that formally checks calculation of workflow output. It doesn't pretend to test the aforementioned issue (it can be reproduced only with a big number of attempts, and/or under load). It's for the sake of the test module completeness. Change-Id: I4a7e7fd9a4bbb6e93df169b4b40bc2d83ccfce89 Closes-Bug: #1792090
This commit is contained in:
parent
38a54260b0
commit
dfdff78315
@ -62,6 +62,10 @@ def refresh(model):
|
|||||||
IMPL.refresh(model)
|
IMPL.refresh(model)
|
||||||
|
|
||||||
|
|
||||||
|
def expire_all():
|
||||||
|
IMPL.expire_all()
|
||||||
|
|
||||||
|
|
||||||
# Locking.
|
# Locking.
|
||||||
|
|
||||||
|
|
||||||
|
@ -125,6 +125,11 @@ def refresh(model, session=None):
|
|||||||
session.refresh(model)
|
session.refresh(model)
|
||||||
|
|
||||||
|
|
||||||
|
@b.session_aware()
|
||||||
|
def expire_all(session=None):
|
||||||
|
session.expire_all()
|
||||||
|
|
||||||
|
|
||||||
@b.session_aware()
|
@b.session_aware()
|
||||||
def acquire_lock(model, id, session=None):
|
def acquire_lock(model, id, session=None):
|
||||||
# Expire all so all objects queried after lock is acquired
|
# Expire all so all objects queried after lock is acquired
|
||||||
|
@ -414,6 +414,16 @@ class Workflow(object):
|
|||||||
if incomplete_tasks_count > 0:
|
if incomplete_tasks_count > 0:
|
||||||
return incomplete_tasks_count
|
return incomplete_tasks_count
|
||||||
|
|
||||||
|
LOG.debug("Workflow completed [id=%s]", self.wf_ex.id)
|
||||||
|
|
||||||
|
# NOTE(rakhmerov): Once we know that the workflow has completed,
|
||||||
|
# we need to expire all the objects in the DB session to make sure
|
||||||
|
# to read the most relevant data from the DB (that's already been
|
||||||
|
# committed in parallel transactions). Otherwise, some data like
|
||||||
|
# workflow context may be stale and decisions made upon it will be
|
||||||
|
# wrong.
|
||||||
|
db_api.expire_all()
|
||||||
|
|
||||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||||
|
|
||||||
if wf_ctrl.any_cancels():
|
if wf_ctrl.any_cancels():
|
||||||
|
@ -816,6 +816,44 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||||||
|
|
||||||
self.assertDictEqual({}, wf_ex.output)
|
self.assertDictEqual({}, wf_ex.output)
|
||||||
|
|
||||||
|
def test_output_expression(self):
|
||||||
|
wf_text = """---
|
||||||
|
version: '2.0'
|
||||||
|
|
||||||
|
wf:
|
||||||
|
output:
|
||||||
|
continue_flag: <% $.continue_flag %>
|
||||||
|
|
||||||
|
task-defaults:
|
||||||
|
on-error:
|
||||||
|
- task2
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
action: std.fail
|
||||||
|
on-success: task3
|
||||||
|
|
||||||
|
task2:
|
||||||
|
action: std.noop
|
||||||
|
publish:
|
||||||
|
continue_flag: false
|
||||||
|
|
||||||
|
task3:
|
||||||
|
action: std.noop
|
||||||
|
"""
|
||||||
|
|
||||||
|
wf_service.create_workflows(wf_text)
|
||||||
|
|
||||||
|
wf_ex = self.engine.start_workflow('wf')
|
||||||
|
|
||||||
|
self.await_workflow_success(wf_ex.id)
|
||||||
|
|
||||||
|
with db_api.transaction():
|
||||||
|
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||||
|
|
||||||
|
self.assertEqual(2, len(wf_ex.task_executions))
|
||||||
|
self.assertDictEqual({'continue_flag': False}, wf_ex.output)
|
||||||
|
|
||||||
def test_triggered_by(self):
|
def test_triggered_by(self):
|
||||||
wf_text = """---
|
wf_text = """---
|
||||||
version: '2.0'
|
version: '2.0'
|
||||||
|
13
releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml
Normal file
13
releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
---
|
||||||
|
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Workflow output sometimes was not calculated correctly due to
|
||||||
|
the race condition between different transactions: the one that
|
||||||
|
checks workflow completion (i.e. calls "check_and_complete") and
|
||||||
|
the one that processes action execution completion (i.e. calls
|
||||||
|
"on_action_complete"). Calculating output sometimes was based on
|
||||||
|
stale data cached by the SQLAlchemy session. To fix this, we just
|
||||||
|
need to expire all objects in the session so that they are
|
||||||
|
refreshed automatically if we read their state in order to make
|
||||||
|
required calculations. The corresponding change was made.
|
Loading…
x
Reference in New Issue
Block a user