From 380689f22323c407f53c1b31177d563a63f3bc1d Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Wed, 15 Jan 2014 18:01:46 +0400 Subject: [PATCH] Add sync task execution * Non-Mistral actions gets a JSON response from which result can be extracted with YAQL-expression * Unit test * Feature is added to both engines Change-Id: Iae6aa031e6b5700b3a08ee184f5eae291458a8c4 --- mistral/engine/actions/action_factory.py | 21 ++++++--- mistral/engine/actions/action_helper.py | 45 +++++++++++++++++++ mistral/engine/actions/actions.py | 26 ++++++++--- mistral/engine/local/engine.py | 18 ++++++++ mistral/engine/scalable/executor/executor.py | 18 +++++++- mistral/engine/states.py | 7 +++ mistral/tests/resources/test_rest.yaml | 28 ++++++++++-- .../tests/unit/engine/local/test_engine.py | 19 +++++++- mistral/tests/unit/test_parser.py | 4 +- mistral/utils/yaql_utils.py | 20 +++++++++ 10 files changed, 185 insertions(+), 21 deletions(-) create mode 100644 mistral/engine/actions/action_helper.py create mode 100644 mistral/utils/yaql_utils.py diff --git a/mistral/engine/actions/action_factory.py b/mistral/engine/actions/action_factory.py index d9041bbdd..ea88cbcda 100644 --- a/mistral/engine/actions/action_factory.py +++ b/mistral/engine/actions/action_factory.py @@ -16,16 +16,20 @@ from mistral.engine.actions import actions from mistral.engine.actions import action_types +from mistral.engine.actions import action_helper as a_h import mistral.exceptions as exc def create_action(task): - action_type = task['service_dsl']['type'] + action_type = a_h.get_action_type(task) if not action_types.is_valid(action_type): raise exc.InvalidActionException("Action type is not supported: %s" % action_type) - return _get_mapping()[action_type](task) + action = _get_mapping()[action_type](task) + action_dsl = task['service_dsl']['actions'][action.name] + action.result_helper = action_dsl.get('parameters', {}).get('response', {}) + return action def _get_mapping(): @@ -37,6 +41,7 @@ def _get_mapping(): def get_rest_action(task): + action_type = a_h.get_action_type(task) action_name = task['task_dsl']['action'].split(':')[1] action_dsl = task['service_dsl']['actions'][action_name] task_params = task['task_dsl'].get('parameters', None) @@ -48,8 +53,9 @@ def get_rest_action(task): headers.update(action_dsl.get('headers', {})) method = action_dsl['parameters'].get('method', "GET") - return actions.RestAction(url, params=task_params, - method=method, headers=headers) + return actions.RestAction(action_type, action_name, url, + params=task_params, method=method, + headers=headers) def get_mistral_rest_action(task): @@ -64,6 +70,7 @@ def get_mistral_rest_action(task): def get_amqp_action(task): + action_type = a_h.get_action_type(task) action_name = task['task_dsl']['action'].split(':')[1] action_params = task['service_dsl']['actions'][action_name]['parameters'] task_params = task['task_dsl'].get('parameters', None) @@ -79,6 +86,6 @@ def get_amqp_action(task): exchange = action_params.get('exchange') queue_name = action_params['queue_name'] - return actions.OsloRPCAction(host, userid, password, virtual_host, - message, routing_key, port, exchange, - queue_name) + return actions.OsloRPCAction(action_type, host, userid, password, + virtual_host, message, routing_key, port, + exchange, queue_name) diff --git a/mistral/engine/actions/action_helper.py b/mistral/engine/actions/action_helper.py new file mode 100644 index 000000000..b2f544961 --- /dev/null +++ b/mistral/engine/actions/action_helper.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine.actions import action_types as a_t +from mistral import exceptions as exc +from mistral.engine import states +from mistral.utils import yaql_utils + + +def get_action_type(task): + return task['service_dsl']['type'] + + +def is_task_synchronous(task): + return get_action_type(task) != a_t.MISTRAL_REST_API + + +def extract_state_result(action, action_result): + # All non-Mistral tasks are sync-auto because service doesn't know + # about Mistral and we need to receive the result immediately + if action.type != a_t.MISTRAL_REST_API: + if action.result_helper.get('select'): + result = yaql_utils.evaluate(action.result_helper['select'], + action_result) + # TODO(nmakhotkin) get state for other actions + state = states.get_state_by_http_status_code(action.status) + else: + raise exc.InvalidActionException("Cannot get the result of sync " + "task without YAQL expression") + return state, result + raise exc.InvalidActionException("Error. Wrong type of action to " + "retrieve the result") diff --git a/mistral/engine/actions/actions.py b/mistral/engine/actions/actions.py index b4cdf1c2e..31e2a46fe 100644 --- a/mistral/engine/actions/actions.py +++ b/mistral/engine/actions/actions.py @@ -24,12 +24,25 @@ LOG = logging.getLogger(__name__) class BaseAction(object): + status = None + + def __init__(self, action_type, action_name): + self.type = action_type + self.name = action_name + + # Result_helper is a dict for retrieving result within YAQL expression + # and it belongs to action (for defining this attribute immediately + # at action creation). + self.result_helper = {} + def run(self): pass class RestAction(BaseAction): - def __init__(self, url, params={}, method="GET", headers={}): + def __init__(self, action_type, action_name, url, params={}, + method="GET", headers={}): + super(RestAction, self).__init__(action_type, action_name) self.url = url self.params = params self.method = method @@ -44,6 +57,7 @@ class RestAction(BaseAction): LOG.info("Received HTTP response:\n%s\n%s" % (resp.status_code, resp.content)) # Return rather json than text, but response can contain text also. + self.status = resp.status_code try: return resp.json() except: @@ -52,9 +66,10 @@ class RestAction(BaseAction): class OsloRPCAction(BaseAction): - def __init__(self, host, userid, password, virtual_host, - message, routing_key=None, port=5672, exchange=None, - queue_name=None): + def __init__(self, action_type, action_name, host, userid, password, + virtual_host, message, routing_key=None, port=5672, + exchange=None, queue_name=None): + super(OsloRPCAction, self).__init__(action_type, action_name) self.host = host self.port = port self.userid = userid @@ -92,4 +107,5 @@ class OsloRPCAction(BaseAction): amqp_conn.close() def callback(self, msg): - pass + #TODO (nmakhotkin) set status + self.status = None diff --git a/mistral/engine/local/engine.py b/mistral/engine/local/engine.py index 340ecdbc8..cb5636345 100644 --- a/mistral/engine/local/engine.py +++ b/mistral/engine/local/engine.py @@ -13,9 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from mistral import exceptions as exc from mistral.engine import abstract_engine as abs_eng from mistral.engine import states from mistral.engine.actions import action_factory as a_f +from mistral.engine.actions import action_helper as a_h from mistral.db import api as db_api from mistral.openstack.common import log as logging @@ -36,6 +38,22 @@ class LocalEngine(abs_eng.AbstractEngine): LOG.info("Task is started - %s" % task['name']) db_api.task_update(task['workbook_name'], task['execution_id'], task['id'], {'state': states.RUNNING}) + if a_h.is_task_synchronous(task): + # In case of sync execution we run task + # and change state right after that. + action_result = action.run() + state, result = a_h.extract_state_result(action, action_result) + # TODO(nmakhotkin) save the result in the context with key + # action.result_helper['store_as'] + + if states.is_valid(state): + return cls.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], state, result) + else: + raise exc.EngineException("Action has returned invalid " + "state: %s" % state) + return action.run() diff --git a/mistral/engine/scalable/executor/executor.py b/mistral/engine/scalable/executor/executor.py index 79c05a7bd..facf60199 100644 --- a/mistral/engine/scalable/executor/executor.py +++ b/mistral/engine/scalable/executor/executor.py @@ -19,8 +19,11 @@ import pika from mistral.openstack.common import log as logging from mistral.db import api as db_api +from mistral import exceptions as exc +from mistral.engine import engine from mistral.engine import states from mistral.engine.actions import action_factory as a_f +from mistral.engine.actions import action_helper as a_h LOG = logging.getLogger(__name__) @@ -28,8 +31,21 @@ LOG = logging.getLogger(__name__) def do_task_action(task): LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" % (task['id'], task['task_dsl']['action'], task['service_dsl'])) + action = a_f.create_action(task) + if a_h.is_task_synchronous(task): + action_result = action.run() + state, result = a_h.extract_state_result(action, action_result) + # TODO(nmakhotkin) save the result in the context with key + # action.result_helper['store_as'] - a_f.create_action(task).run() + if states.is_valid(state): + return engine.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], state, result) + else: + raise exc.EngineException("Action has returned invalid " + "state: %s" % state) + action.run() def handle_task_error(task, exc): diff --git a/mistral/engine/states.py b/mistral/engine/states.py index e59c3c26b..6577f14c3 100644 --- a/mistral/engine/states.py +++ b/mistral/engine/states.py @@ -35,3 +35,10 @@ def is_finished(state): def is_stopped_or_finished(state): return state == STOPPED or is_finished(state) + + +def get_state_by_http_status_code(status_code): + if not status_code or status_code >= 400: + return ERROR + else: + return SUCCESS diff --git a/mistral/tests/resources/test_rest.yaml b/mistral/tests/resources/test_rest.yaml index 83a0c8b4f..6609eb08b 100644 --- a/mistral/tests/resources/test_rest.yaml +++ b/mistral/tests/resources/test_rest.yaml @@ -1,6 +1,6 @@ Services: MyRest: - type: REST_API + type: MISTRAL_REST_API parameters: baseUrl: http://some_host actions: @@ -14,14 +14,14 @@ Services: backup-vm: parameters: - url: url_for_backup + url: /url_for_backup method: GET task-parameters: server_id: attach-volume: parameters: - url: url_for_attach + url: /url_for_attach method: GET task-parameters: size: @@ -29,11 +29,25 @@ Services: format-volume: parameters: - url: url_for_format + url: /url_for_format method: GET task-parameters: volume_id: server_id: + Nova: + type: REST_API + parameters: + baseUrl: http://path_to_nova + actions: + create-vm: + parameters: + url: /url_for_create + response: + select: '$.server_id' + store_as: vm_id + task-parameters: + flavor_id: + image_id: Workflow: tasks: @@ -62,6 +76,12 @@ Workflow: parameters: server_id: + create-vm-nova: + action: Nova:create-vm + parameters: + image_id: 1234 + flavor_id: 2 + events: create-vms: type: periodic diff --git a/mistral/tests/unit/engine/local/test_engine.py b/mistral/tests/unit/engine/local/test_engine.py index 13b7806a3..44e1b0fc4 100644 --- a/mistral/tests/unit/engine/local/test_engine.py +++ b/mistral/tests/unit/engine/local/test_engine.py @@ -44,7 +44,7 @@ class TestLocalEngine(test_base.DbTestCase): 'definition': get_cfg("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value="result")) + mock.MagicMock(return_value={'state': states.RUNNING})) def test_engine_one_task(self): execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms") @@ -65,7 +65,7 @@ class TestLocalEngine(test_base.DbTestCase): 'definition': get_cfg("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value="result")) + mock.MagicMock(return_value={'state': states.RUNNING})) def test_engine_multiple_tasks(self): execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms") @@ -99,3 +99,18 @@ class TestLocalEngine(test_base.DbTestCase): self.assertEqual(states.SUCCESS, ENGINE.get_workflow_execution_state(WB_NAME, execution['id'])) + + @mock.patch.object(actions.RestAction, "run", + mock.MagicMock(return_value={'state': states.SUCCESS})) + @mock.patch.object(db_api, "workbook_get", + mock.MagicMock(return_value={ + 'definition': get_cfg("test_rest.yaml") + })) + @mock.patch.object(states, "get_state_by_http_status_code", + mock.MagicMock(return_value=states.SUCCESS)) + def test_engine_sync_task(self): + execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova") + task = db_api.tasks_get(WB_NAME, execution['id'])[0] + execution = db_api.execution_get(WB_NAME, execution['id']) + self.assertEqual(execution['state'], states.SUCCESS) + self.assertEqual(task['state'], states.SUCCESS) diff --git a/mistral/tests/unit/test_parser.py b/mistral/tests/unit/test_parser.py index 3fa896937..4676b8dc7 100644 --- a/mistral/tests/unit/test_parser.py +++ b/mistral/tests/unit/test_parser.py @@ -30,10 +30,10 @@ class DSLParserTest(unittest2.TestCase): def test_services(self): service = self.dsl.get_service("MyRest") - self.assertEqual(service["type"], "REST_API") + self.assertEqual(service["type"], "MISTRAL_REST_API") self.assertIn("baseUrl", service["parameters"]) services = self.dsl.get_services() - self.assertEqual(len(services), 1) + self.assertEqual(len(services), 2) service_names = self.dsl.get_service_names() self.assertEqual(service_names[0], "MyRest") diff --git a/mistral/utils/yaql_utils.py b/mistral/utils/yaql_utils.py new file mode 100644 index 000000000..dff8ed2bb --- /dev/null +++ b/mistral/utils/yaql_utils.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def evaluate(expression_str, data): + #TODO(nmakhotkin) evaluate YAQL expression and return the result + pass