Add description param to execution creation API

Now, we can create execution with description parameter using API, for
easily identify the execution.

We can also update the description of execution, but we can not update
state and description at the same time.

Change-Id: I1cf8ca73cdf229b6f3b6584a43cba70db6ac36ab
Implements: blueprint mistral-execution-description
This commit is contained in:
LingxianKong 2015-06-11 21:45:47 +08:00
parent 06a4afed63
commit d316fe6aee
8 changed files with 69 additions and 23 deletions

View File

@ -42,6 +42,9 @@ class Execution(resource.Resource):
workflow_name = wtypes.text workflow_name = wtypes.text
"reference to workflow definition" "reference to workflow definition"
description = wtypes.text
"description of workflow execution."
params = wtypes.text params = wtypes.text
"params define workflow type specific parameters. For example, reverse \ "params define workflow type specific parameters. For example, reverse \
workflow takes one parameter 'task_name' that defines a target task." workflow takes one parameter 'task_name' that defines a target task."
@ -93,6 +96,7 @@ class Execution(resource.Resource):
def sample(cls): def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000', return cls(id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow', workflow_name='flow',
description='this is the first execution.',
state='SUCCESS', state='SUCCESS',
input='{}', input='{}',
output='{}', output='{}',
@ -134,17 +138,25 @@ class ExecutionsController(rest.RestController):
(id, execution)) (id, execution))
db_api.ensure_workflow_execution_exists(id) db_api.ensure_workflow_execution_exists(id)
# Currently we can change only state.
if not execution.state:
raise exc.DataAccessException(
"Only state of execution can change. "
"Missing 'state' property."
)
new_state = execution.state new_state = execution.state
new_description = execution.description
msg = execution.state_info msg = execution.state_info
if new_state == states.PAUSED: # Currently we can change only state or description.
if (not (new_state or new_description) or
(new_state and new_description)):
raise exc.DataAccessException(
"Only state or description of execution can be changed. "
"But they can not be changed at the same time."
)
if new_description:
wf_ex = db_api.update_workflow_execution(
id,
description=new_description
)
elif new_state == states.PAUSED:
wf_ex = rpc.get_engine_client().pause_workflow(id) wf_ex = rpc.get_engine_client().pause_workflow(id)
elif new_state == states.RUNNING: elif new_state == states.RUNNING:
wf_ex = rpc.get_engine_client().resume_workflow(id) wf_ex = rpc.get_engine_client().resume_workflow(id)
@ -177,6 +189,7 @@ class ExecutionsController(rest.RestController):
result = engine.start_workflow( result = engine.start_workflow(
exec_dict['workflow_name'], exec_dict['workflow_name'],
exec_dict.get('input'), exec_dict.get('input'),
exec_dict.get('description', ''),
**exec_dict.get('params') or {} **exec_dict.get('params') or {}
) )

View File

@ -29,11 +29,12 @@ class Engine(object):
"""Engine interface.""" """Engine interface."""
@abc.abstractmethod @abc.abstractmethod
def start_workflow(self, wf_name, wf_input, **params): def start_workflow(self, wf_name, wf_input, description='', **params):
"""Starts the specified workflow. """Starts the specified workflow.
:param wf_name: Workflow name. :param wf_name: Workflow name.
:param wf_input: Workflow input data as a dictionary. :param wf_input: Workflow input data as a dictionary.
:param description: Execution description.
:param params: Additional workflow type specific parameters. :param params: Additional workflow type specific parameters.
:return: Workflow execution object. :return: Workflow execution object.
""" """

View File

@ -45,7 +45,7 @@ class DefaultEngine(base.Engine):
self._engine_client = engine_client self._engine_client = engine_client
@u.log_exec(LOG) @u.log_exec(LOG)
def start_workflow(self, wf_name, wf_input, **params): def start_workflow(self, wf_name, wf_input, description='', **params):
wf_exec_id = None wf_exec_id = None
try: try:
@ -61,6 +61,7 @@ class DefaultEngine(base.Engine):
wf_def, wf_def,
wf_spec, wf_spec,
wf_input, wf_input,
description,
params params
) )
wf_exec_id = wf_ex.id wf_exec_id = wf_ex.id
@ -363,9 +364,11 @@ class DefaultEngine(base.Engine):
return params return params
@staticmethod @staticmethod
def _create_workflow_execution(wf_def, wf_spec, wf_input, params): def _create_workflow_execution(wf_def, wf_spec, wf_input, description,
params):
wf_ex = db_api.create_workflow_execution({ wf_ex = db_api.create_workflow_execution({
'name': wf_def.name, 'name': wf_def.name,
'description': description,
'workflow_name': wf_def.name, 'workflow_name': wf_def.name,
'spec': wf_spec.to_dict(), 'spec': wf_spec.to_dict(),
'params': params or {}, 'params': params or {},

View File

@ -77,7 +77,8 @@ class EngineServer(object):
def __init__(self, engine): def __init__(self, engine):
self._engine = engine self._engine = engine
def start_workflow(self, rpc_ctx, workflow_name, workflow_input, params): def start_workflow(self, rpc_ctx, workflow_name, workflow_input,
description, params):
"""Receives calls over RPC to start workflows on engine. """Receives calls over RPC to start workflows on engine.
:param rpc_ctx: RPC request context. :param rpc_ctx: RPC request context.
@ -86,13 +87,14 @@ class EngineServer(object):
LOG.info( LOG.info(
"Received RPC request 'start_workflow'[rpc_ctx=%s," "Received RPC request 'start_workflow'[rpc_ctx=%s,"
" workflow_name=%s, workflow_input=%s, params=%s]" " workflow_name=%s, workflow_input=%s, description=%s, params=%s]"
% (rpc_ctx, workflow_name, workflow_input, params) % (rpc_ctx, workflow_name, workflow_input, description, params)
) )
return self._engine.start_workflow( return self._engine.start_workflow(
workflow_name, workflow_name,
workflow_input, workflow_input,
description,
**params **params
) )
@ -223,7 +225,7 @@ class EngineClient(base.Engine):
) )
@wrap_messaging_exception @wrap_messaging_exception
def start_workflow(self, wf_name, wf_input, **params): def start_workflow(self, wf_name, wf_input, exec_desc, **params):
"""Starts workflow sending a request to engine over RPC. """Starts workflow sending a request to engine over RPC.
:return: Workflow execution. :return: Workflow execution.
@ -233,6 +235,7 @@ class EngineClient(base.Engine):
'start_workflow', 'start_workflow',
workflow_name=wf_name, workflow_name=wf_name,
workflow_input=wf_input or {}, workflow_input=wf_input or {},
description=exec_desc,
params=params params=params
) )

View File

@ -503,6 +503,7 @@ def run_workflow(wf_name, wf_input, wf_params):
rpc.get_engine_client().start_workflow( rpc.get_engine_client().start_workflow(
wf_name, wf_name,
wf_input, wf_input,
"sub-workflow execution",
**wf_params **wf_params
) )

View File

@ -41,6 +41,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
rpc.get_engine_client().start_workflow( rpc.get_engine_client().start_workflow(
t.workflow.name, t.workflow.name,
t.workflow_input, t.workflow_input,
description="workflow execution by cron trigger.",
**t.workflow_params **t.workflow_params
) )
finally: finally:

View File

@ -1,5 +1,6 @@
# Copyright 2013 - Mirantis, Inc. # Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc. # Copyright 2015 - StackStorm, Inc.
# Copyright 2015 Huawei Technologies Co., Ltd.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -29,6 +30,7 @@ from mistral.workflow import states
WF_EX = models.WorkflowExecution( WF_EX = models.WorkflowExecution(
id='123', id='123',
workflow_name='some', workflow_name='some',
description='execution description.',
spec={'name': 'some'}, spec={'name': 'some'},
state=states.RUNNING, state=states.RUNNING,
state_info=None, state_info=None,
@ -48,7 +50,7 @@ WF_EX_JSON = {
'state_info': None, 'state_info': None,
'created_at': '1970-01-01 00:00:00', 'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00',
'workflow_name': 'some' 'workflow_name': 'some',
} }
UPDATED_WF_EX = copy.copy(WF_EX) UPDATED_WF_EX = copy.copy(WF_EX)
@ -57,6 +59,9 @@ UPDATED_WF_EX['state'] = states.PAUSED
UPDATED_WF_EX_JSON = copy.copy(WF_EX_JSON) UPDATED_WF_EX_JSON = copy.copy(WF_EX_JSON)
UPDATED_WF_EX_JSON['state'] = states.PAUSED UPDATED_WF_EX_JSON['state'] = states.PAUSED
WF_EX_JSON_WITH_DESC = copy.copy(WF_EX_JSON)
WF_EX_JSON_WITH_DESC['description'] = "execution description."
MOCK_WF_EX = mock.MagicMock(return_value=WF_EX) MOCK_WF_EX = mock.MagicMock(return_value=WF_EX)
MOCK_WF_EXECUTIONS = mock.MagicMock(return_value=[WF_EX]) MOCK_WF_EXECUTIONS = mock.MagicMock(return_value=[WF_EX])
MOCK_UPDATED_WF_EX = mock.MagicMock(return_value=UPDATED_WF_EX) MOCK_UPDATED_WF_EX = mock.MagicMock(return_value=UPDATED_WF_EX)
@ -74,7 +79,7 @@ class TestExecutionsController(base.FunctionalTest):
self.maxDiff = None self.maxDiff = None
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertDictEqual(WF_EX_JSON, resp.json) self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json)
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_NOT_FOUND) @mock.patch.object(db_api, 'get_workflow_execution', MOCK_NOT_FOUND)
def test_get_not_found(self): def test_get_not_found(self):
@ -92,8 +97,11 @@ class TestExecutionsController(base.FunctionalTest):
def test_put(self): def test_put(self):
resp = self.app.put_json('/v2/executions/123', UPDATED_WF_EX_JSON) resp = self.app.put_json('/v2/executions/123', UPDATED_WF_EX_JSON)
UPDATED_WF_EX_WITH_DESC = copy.copy(UPDATED_WF_EX_JSON)
UPDATED_WF_EX_WITH_DESC['description'] = 'execution description.'
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_WF_EX_JSON, resp.json) self.assertDictEqual(UPDATED_WF_EX_WITH_DESC, resp.json)
@mock.patch.object( @mock.patch.object(
db_api, db_api,
@ -113,6 +121,8 @@ class TestExecutionsController(base.FunctionalTest):
resp = self.app.put_json('/v2/executions/123', update_exec) resp = self.app.put_json('/v2/executions/123', update_exec)
update_exec['description'] = "execution description."
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertDictEqual(update_exec, resp.json) self.assertDictEqual(update_exec, resp.json)
mock_pw.assert_called_once_with('123', 'ERROR', "Force") mock_pw.assert_called_once_with('123', 'ERROR', "Force")
@ -127,20 +137,29 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 404) self.assertEqual(resp.status_int, 404)
def test_put_both_state_and_description(self):
self.assertRaises(
webtest_app.AppError,
self.app.put_json,
'/v2/executions/123',
WF_EX_JSON_WITH_DESC
)
@mock.patch.object(rpc.EngineClient, 'start_workflow') @mock.patch.object(rpc.EngineClient, 'start_workflow')
def test_post(self, f): def test_post(self, f):
f.return_value = WF_EX.to_dict() f.return_value = WF_EX.to_dict()
resp = self.app.post_json('/v2/executions', WF_EX_JSON) resp = self.app.post_json('/v2/executions', WF_EX_JSON_WITH_DESC)
self.assertEqual(resp.status_int, 201) self.assertEqual(resp.status_int, 201)
self.assertDictEqual(WF_EX_JSON, resp.json) self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json)
exec_dict = execution.Execution(**WF_EX_JSON).to_dict() exec_dict = execution.Execution(**WF_EX_JSON_WITH_DESC).to_dict()
f.assert_called_once_with( f.assert_called_once_with(
exec_dict['workflow_name'], exec_dict['workflow_name'],
exec_dict['input'], exec_dict['input'],
exec_dict['description'],
**exec_dict['params'] **exec_dict['params']
) )
@ -170,7 +189,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['executions']), 1) self.assertEqual(len(resp.json['executions']), 1)
self.assertDictEqual(WF_EX_JSON, resp.json['executions'][0]) self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json['executions'][0])
@mock.patch.object(db_api, 'get_workflow_executions', MOCK_EMPTY) @mock.patch.object(db_api, 'get_workflow_executions', MOCK_EMPTY)
def test_get_all_empty(self): def test_get_all_empty(self):

View File

@ -111,11 +111,13 @@ class DefaultEngineTest(base.DbTestCase):
wf_ex = self.engine.start_workflow( wf_ex = self.engine.start_workflow(
'wb.wf', 'wb.wf',
wf_input, wf_input,
'my execution',
task_name='task2' task_name='task2'
) )
self.assertIsNotNone(wf_ex) self.assertIsNotNone(wf_ex)
self.assertEqual(states.RUNNING, wf_ex.state) self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual('my execution', wf_ex.description)
self._assert_dict_contains_subset(wf_input, wf_ex.context) self._assert_dict_contains_subset(wf_input, wf_ex.context)
self.assertIn('__execution', wf_ex.context) self.assertIn('__execution', wf_ex.context)
@ -433,5 +435,8 @@ class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
self.assertRaises( self.assertRaises(
exc.InputException, exc.InputException,
self.engine_client.start_workflow, 'some_wf', {} self.engine_client.start_workflow,
'some_wf',
{},
'some_description'
) )