Merge "Add sync task execution"

This commit is contained in:
Jenkins 2014-01-28 13:05:22 +00:00 committed by Gerrit Code Review
commit 3af480bb27
10 changed files with 185 additions and 21 deletions

View File

@ -16,16 +16,20 @@
from mistral.engine.actions import actions
from mistral.engine.actions import action_types
from mistral.engine.actions import action_helper as a_h
import mistral.exceptions as exc
def create_action(task):
action_type = task['service_dsl']['type']
action_type = a_h.get_action_type(task)
if not action_types.is_valid(action_type):
raise exc.InvalidActionException("Action type is not supported: %s" %
action_type)
return _get_mapping()[action_type](task)
action = _get_mapping()[action_type](task)
action_dsl = task['service_dsl']['actions'][action.name]
action.result_helper = action_dsl.get('parameters', {}).get('response', {})
return action
def _get_mapping():
@ -37,6 +41,7 @@ def _get_mapping():
def get_rest_action(task):
action_type = a_h.get_action_type(task)
action_name = task['task_dsl']['action'].split(':')[1]
action_dsl = task['service_dsl']['actions'][action_name]
task_params = task['task_dsl'].get('parameters', None)
@ -48,8 +53,9 @@ def get_rest_action(task):
headers.update(action_dsl.get('headers', {}))
method = action_dsl['parameters'].get('method', "GET")
return actions.RestAction(url, params=task_params,
method=method, headers=headers)
return actions.RestAction(action_type, action_name, url,
params=task_params, method=method,
headers=headers)
def get_mistral_rest_action(task):
@ -64,6 +70,7 @@ def get_mistral_rest_action(task):
def get_amqp_action(task):
action_type = a_h.get_action_type(task)
action_name = task['task_dsl']['action'].split(':')[1]
action_params = task['service_dsl']['actions'][action_name]['parameters']
task_params = task['task_dsl'].get('parameters', None)
@ -79,6 +86,6 @@ def get_amqp_action(task):
exchange = action_params.get('exchange')
queue_name = action_params['queue_name']
return actions.OsloRPCAction(host, userid, password, virtual_host,
message, routing_key, port, exchange,
queue_name)
return actions.OsloRPCAction(action_type, host, userid, password,
virtual_host, message, routing_key, port,
exchange, queue_name)

View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine.actions import action_types as a_t
from mistral import exceptions as exc
from mistral.engine import states
from mistral.utils import yaql_utils
def get_action_type(task):
return task['service_dsl']['type']
def is_task_synchronous(task):
return get_action_type(task) != a_t.MISTRAL_REST_API
def extract_state_result(action, action_result):
# All non-Mistral tasks are sync-auto because service doesn't know
# about Mistral and we need to receive the result immediately
if action.type != a_t.MISTRAL_REST_API:
if action.result_helper.get('select'):
result = yaql_utils.evaluate(action.result_helper['select'],
action_result)
# TODO(nmakhotkin) get state for other actions
state = states.get_state_by_http_status_code(action.status)
else:
raise exc.InvalidActionException("Cannot get the result of sync "
"task without YAQL expression")
return state, result
raise exc.InvalidActionException("Error. Wrong type of action to "
"retrieve the result")

View File

@ -24,12 +24,25 @@ LOG = logging.getLogger(__name__)
class BaseAction(object):
status = None
def __init__(self, action_type, action_name):
self.type = action_type
self.name = action_name
# Result_helper is a dict for retrieving result within YAQL expression
# and it belongs to action (for defining this attribute immediately
# at action creation).
self.result_helper = {}
def run(self):
pass
class RestAction(BaseAction):
def __init__(self, url, params={}, method="GET", headers={}):
def __init__(self, action_type, action_name, url, params={},
method="GET", headers={}):
super(RestAction, self).__init__(action_type, action_name)
self.url = url
self.params = params
self.method = method
@ -44,6 +57,7 @@ class RestAction(BaseAction):
LOG.info("Received HTTP response:\n%s\n%s" %
(resp.status_code, resp.content))
# Return rather json than text, but response can contain text also.
self.status = resp.status_code
try:
return resp.json()
except:
@ -52,9 +66,10 @@ class RestAction(BaseAction):
class OsloRPCAction(BaseAction):
def __init__(self, host, userid, password, virtual_host,
message, routing_key=None, port=5672, exchange=None,
queue_name=None):
def __init__(self, action_type, action_name, host, userid, password,
virtual_host, message, routing_key=None, port=5672,
exchange=None, queue_name=None):
super(OsloRPCAction, self).__init__(action_type, action_name)
self.host = host
self.port = port
self.userid = userid
@ -92,4 +107,5 @@ class OsloRPCAction(BaseAction):
amqp_conn.close()
def callback(self, msg):
pass
#TODO (nmakhotkin) set status
self.status = None

View File

@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral import exceptions as exc
from mistral.engine import abstract_engine as abs_eng
from mistral.engine import states
from mistral.engine.actions import action_factory as a_f
from mistral.engine.actions import action_helper as a_h
from mistral.db import api as db_api
from mistral.openstack.common import log as logging
@ -36,6 +38,22 @@ class LocalEngine(abs_eng.AbstractEngine):
LOG.info("Task is started - %s" % task['name'])
db_api.task_update(task['workbook_name'], task['execution_id'],
task['id'], {'state': states.RUNNING})
if a_h.is_task_synchronous(task):
# In case of sync execution we run task
# and change state right after that.
action_result = action.run()
state, result = a_h.extract_state_result(action, action_result)
# TODO(nmakhotkin) save the result in the context with key
# action.result_helper['store_as']
if states.is_valid(state):
return cls.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'], state, result)
else:
raise exc.EngineException("Action has returned invalid "
"state: %s" % state)
return action.run()

View File

@ -19,8 +19,11 @@ import pika
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral import exceptions as exc
from mistral.engine import engine
from mistral.engine import states
from mistral.engine.actions import action_factory as a_f
from mistral.engine.actions import action_helper as a_h
LOG = logging.getLogger(__name__)
@ -28,8 +31,21 @@ LOG = logging.getLogger(__name__)
def do_task_action(task):
LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" %
(task['id'], task['task_dsl']['action'], task['service_dsl']))
action = a_f.create_action(task)
if a_h.is_task_synchronous(task):
action_result = action.run()
state, result = a_h.extract_state_result(action, action_result)
# TODO(nmakhotkin) save the result in the context with key
# action.result_helper['store_as']
a_f.create_action(task).run()
if states.is_valid(state):
return engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'], state, result)
else:
raise exc.EngineException("Action has returned invalid "
"state: %s" % state)
action.run()
def handle_task_error(task, exc):

View File

@ -35,3 +35,10 @@ def is_finished(state):
def is_stopped_or_finished(state):
return state == STOPPED or is_finished(state)
def get_state_by_http_status_code(status_code):
if not status_code or status_code >= 400:
return ERROR
else:
return SUCCESS

View File

@ -1,6 +1,6 @@
Services:
MyRest:
type: REST_API
type: MISTRAL_REST_API
parameters:
baseUrl: http://some_host
actions:
@ -14,14 +14,14 @@ Services:
backup-vm:
parameters:
url: url_for_backup
url: /url_for_backup
method: GET
task-parameters:
server_id:
attach-volume:
parameters:
url: url_for_attach
url: /url_for_attach
method: GET
task-parameters:
size:
@ -29,11 +29,25 @@ Services:
format-volume:
parameters:
url: url_for_format
url: /url_for_format
method: GET
task-parameters:
volume_id:
server_id:
Nova:
type: REST_API
parameters:
baseUrl: http://path_to_nova
actions:
create-vm:
parameters:
url: /url_for_create
response:
select: '$.server_id'
store_as: vm_id
task-parameters:
flavor_id:
image_id:
Workflow:
tasks:
@ -62,6 +76,12 @@ Workflow:
parameters:
server_id:
create-vm-nova:
action: Nova:create-vm
parameters:
image_id: 1234
flavor_id: 2
events:
create-vms:
type: periodic

View File

@ -44,7 +44,7 @@ class TestLocalEngine(test_base.DbTestCase):
'definition': get_cfg("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value="result"))
mock.MagicMock(return_value={'state': states.RUNNING}))
def test_engine_one_task(self):
execution = ENGINE.start_workflow_execution(WB_NAME,
"create-vms")
@ -65,7 +65,7 @@ class TestLocalEngine(test_base.DbTestCase):
'definition': get_cfg("test_rest.yaml")
}))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value="result"))
mock.MagicMock(return_value={'state': states.RUNNING}))
def test_engine_multiple_tasks(self):
execution = ENGINE.start_workflow_execution(WB_NAME,
"backup-vms")
@ -99,3 +99,18 @@ class TestLocalEngine(test_base.DbTestCase):
self.assertEqual(states.SUCCESS,
ENGINE.get_workflow_execution_state(WB_NAME,
execution['id']))
@mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': get_cfg("test_rest.yaml")
}))
@mock.patch.object(states, "get_state_by_http_status_code",
mock.MagicMock(return_value=states.SUCCESS))
def test_engine_sync_task(self):
execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova")
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)

View File

@ -30,10 +30,10 @@ class DSLParserTest(unittest2.TestCase):
def test_services(self):
service = self.dsl.get_service("MyRest")
self.assertEqual(service["type"], "REST_API")
self.assertEqual(service["type"], "MISTRAL_REST_API")
self.assertIn("baseUrl", service["parameters"])
services = self.dsl.get_services()
self.assertEqual(len(services), 1)
self.assertEqual(len(services), 2)
service_names = self.dsl.get_service_names()
self.assertEqual(service_names[0], "MyRest")

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def evaluate(expression_str, data):
#TODO(nmakhotkin) evaluate YAQL expression and return the result
pass