Correction in workflow state change handling

In case of workflow definition is deleted attached execution
could not change their state.
Solution is to remove unnecessary workflow definition queries
before execution state changes.

Change-Id: I2900388c8155e5ff0c117e0799df975588f80379
Closes-Bug: #1668523
This commit is contained in:
Istvan Imre 2017-02-28 11:52:03 +01:00
parent 086a3d43fa
commit 11e275a3cb
3 changed files with 57 additions and 43 deletions

View File

@ -34,11 +34,14 @@ _CHECK_AND_COMPLETE_PATH = (
@profiler.trace('workflow-handler-start-workflow')
def start_workflow(wf_identifier, wf_input, desc, params):
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_identifier)
)
wf = workflows.Workflow()
wf.start(wf_input, desc=desc, params=params)
wf.start(
wf_def=db_api.get_workflow_definition(wf_identifier),
input_dict=wf_input,
desc=desc,
params=params
)
_schedule_check_and_complete(wf.wf_ex)
@ -46,10 +49,7 @@ def start_workflow(wf_identifier, wf_input, desc, params):
def stop_workflow(wf_ex, state, msg=None):
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf = workflows.Workflow(wf_ex=wf_ex)
# In this case we should not try to handle possible errors. Instead,
# we need to let them pop up since the typical way of failing objects
@ -86,10 +86,7 @@ def _check_and_complete(wf_ex_id):
if not wf_ex or states.is_completed(wf_ex.state):
return
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf = workflows.Workflow(wf_ex=wf_ex)
try:
incomplete_tasks_count = wf.check_and_complete()
@ -121,10 +118,7 @@ def _check_and_complete(wf_ex_id):
def pause_workflow(wf_ex, msg=None):
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf = workflows.Workflow(wf_ex=wf_ex)
wf.set_state(states.PAUSED, msg)
@ -133,10 +127,7 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
if wf_ex.state == states.PAUSED:
return wf_ex.get_clone()
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf = workflows.Workflow(wf_ex=wf_ex)
wf.rerun(task_ex, reset=reset, env=env)
@ -150,10 +141,7 @@ def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state):
return wf_ex.get_clone()
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf = workflows.Workflow(wf_ex=wf_ex)
wf.resume(env=env)

View File

@ -54,8 +54,7 @@ class Workflow(object):
Mistral engine or its components in order to manipulate with workflows.
"""
def __init__(self, wf_def, wf_ex=None):
self.wf_def = wf_def
def __init__(self, wf_ex=None):
self.wf_ex = wf_ex
if wf_ex:
@ -64,16 +63,13 @@ class Workflow(object):
wf_ex.id
)
else:
# New workflow execution.
self.wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
self.wf_spec = None
@profiler.trace('workflow-start')
def start(self, input_dict, desc='', params=None):
def start(self, wf_def, input_dict, desc='', params=None):
"""Start workflow.
:param wf_def: Workflow definition.
:param input_dict: Workflow input.
:param desc: Workflow execution description.
:param params: Workflow type specific parameters.
@ -81,17 +77,23 @@ class Workflow(object):
assert not self.wf_ex
# New workflow execution.
self.wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
wf_trace.info(
self.wf_ex,
"Starting workflow [name=%s, input=%s]" %
(self.wf_def.name, utils.cut(input_dict))
(wf_def.name, utils.cut(input_dict))
)
# TODO(rakhmerov): This call implicitly changes input_dict! Fix it!
# After fix we need to move validation after adding risky fields.
eng_utils.validate_input(self.wf_def, input_dict, self.wf_spec)
eng_utils.validate_input(wf_def, input_dict, self.wf_spec)
self._create_execution(input_dict, desc, params)
self._create_execution(wf_def, input_dict, desc, params)
self.set_state(states.RUNNING)
@ -202,12 +204,12 @@ class Workflow(object):
)
return final_context
def _create_execution(self, input_dict, desc, params):
def _create_execution(self, wf_def, input_dict, desc, params):
self.wf_ex = db_api.create_workflow_execution({
'name': self.wf_def.name,
'name': wf_def.name,
'description': desc,
'workflow_name': self.wf_def.name,
'workflow_id': self.wf_def.id,
'workflow_name': wf_def.name,
'workflow_id': wf_def.id,
'spec': self.wf_spec.to_dict(),
'state': states.IDLE,
'output': {},
@ -272,10 +274,7 @@ class Workflow(object):
self.wf_ex.task_execution_id
)
parent_wf = Workflow(
db_api.get_workflow_definition(parent_task_ex.workflow_id),
parent_task_ex.workflow_execution
)
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
parent_wf.lock()

View File

@ -71,6 +71,33 @@ class WorkflowCancelTest(base.EngineTestCase):
self.assertEqual(1, len(task_execs))
self.assertEqual(states.SUCCESS, task_1_ex.state)
def test_cancel_workflow_if_definition_deleted(self):
workflow = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="foo"
wait-before: 5
"""
wf = wf_service.create_workflows(workflow)[0]
wf_ex = self.engine.start_workflow('wf', {})
with db_api.transaction():
db_api.delete_workflow_definition(wf.id)
self.engine.stop_workflow(
wf_ex.id,
states.CANCELLED,
"Cancelled by user."
)
self.await_workflow_cancelled(wf_ex.id)
def test_cancel_paused_workflow(self):
workflow = """
version: '2.0'