Add option to run actions locally on the engine

Make executor pluggable and allow option to run the executor
locally on the engine or remotely over RPC.

Change-Id: I7cfb13068aa1d1f88136eaa092e629c34b78adf2
Implements: blueprint mistral-actions-run-by-engine
This commit is contained in:
Winson Chan 2017-04-13 18:17:53 +00:00
parent a753759cad
commit 62d8c5edaf
20 changed files with 520 additions and 140 deletions

View File

@ -44,9 +44,9 @@ from oslo_service import service
from mistral.api import service as api_service from mistral.api import service as api_service
from mistral import config from mistral import config
from mistral.engine import engine_server from mistral.engine import engine_server
from mistral.engine import executor_server
from mistral.engine.rpc_backend import rpc from mistral.engine.rpc_backend import rpc
from mistral.event_engine import event_engine_server from mistral.event_engine import event_engine_server
from mistral.executors import executor_server
from mistral import version from mistral import version

View File

@ -169,6 +169,16 @@ engine_opts = [
] ]
executor_opts = [ executor_opts = [
cfg.StrOpt(
'type',
choices=['local', 'remote'],
default='remote',
help=(
'Type of executor. Use local to run the executor within the '
'engine server. Use remote if the executor is launched as '
'a separate server to run action executions.'
)
),
cfg.StrOpt( cfg.StrOpt(
'host', 'host',
default='0.0.0.0', default='0.0.0.0',

View File

@ -15,7 +15,9 @@
import functools import functools
from mistral.engine.rpc_backend import rpc from oslo_config import cfg
from mistral.executors import base as exe
from mistral import utils from mistral import utils
@ -44,14 +46,16 @@ def _get_queue():
def _run_actions(): def _run_actions():
executor = exe.get_executor(cfg.CONF.executor.type)
for action_ex, action_def, target in _get_queue(): for action_ex, action_def, target in _get_queue():
rpc.get_executor_client().run_action( executor.run_action(
action_ex.id, action_ex.id,
action_def.action_class, action_def.action_class,
action_def.attributes or {}, action_def.attributes or {},
action_ex.input, action_ex.input,
target, action_ex.runtime_context.get('safe_rerun', False),
safe_rerun=action_ex.runtime_context.get('safe_rerun', False) target=target
) )

View File

@ -15,15 +15,16 @@
import abc import abc
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging
from osprofiler import profiler from osprofiler import profiler
import six import six
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import action_queue from mistral.engine import action_queue
from mistral.engine.rpc_backend import rpc
from mistral.engine import utils as engine_utils from mistral.engine import utils as engine_utils
from mistral.engine import workflow_handler as wf_handler from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.executors import base as exe
from mistral import expressions as expr from mistral import expressions as expr
from mistral.lang import parser as spec_parser from mistral.lang import parser as spec_parser
from mistral.services import action_manager as a_m from mistral.services import action_manager as a_m
@ -34,6 +35,9 @@ from mistral.workflow import states
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Action(object): class Action(object):
"""Action. """Action.
@ -251,14 +255,16 @@ class PythonAction(Action):
action_ex_id=action_ex_id action_ex_id=action_ex_id
) )
result = rpc.get_executor_client().run_action( executor = exe.get_executor(cfg.CONF.executor.type)
result = executor.run_action(
self.action_ex.id if self.action_ex else None, self.action_ex.id if self.action_ex else None,
self.action_def.action_class, self.action_def.action_class,
self.action_def.attributes or {}, self.action_def.attributes or {},
input_dict, input_dict,
target, safe_rerun=safe_rerun,
async_=False, target=target,
safe_rerun=safe_rerun async_=False
) )
return self._prepare_output(result) return self._prepare_output(result)
@ -528,6 +534,7 @@ def resolve_action_definition(action_spec_name, wf_name=None,
:param wf_spec_name: Workflow name according to a spec. :param wf_spec_name: Workflow name according to a spec.
:return: Action definition (python or ad-hoc). :return: Action definition (python or ad-hoc).
""" """
action_db = None action_db = None
if wf_name and wf_name != wf_spec_name: if wf_name and wf_name != wf_spec_name:

View File

@ -128,26 +128,6 @@ class Engine(object):
raise NotImplementedError raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class Executor(object):
"""Action executor interface."""
@abc.abstractmethod
def run_action(self, action_ex_id, action_class_str, attributes,
action_params, safe_rerun, redelivered=False):
"""Runs action.
:param action_ex_id: Corresponding action execution id.
:param action_class_str: Path to action class in dot notation.
:param attributes: Attributes of action class which will be set to.
:param action_params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param redelivered: Tells if given action was run before on another
executor.
"""
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class EventEngine(object): class EventEngine(object):
"""Action event trigger interface.""" """Action event trigger interface."""

View File

@ -1,5 +1,6 @@
# Copyright 2014 - Mirantis, Inc. # Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc. # Copyright 2015 - StackStorm, Inc.
# Copyright 2017 - Brocade Communications Systems, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -21,8 +22,9 @@ from osprofiler import profiler
from stevedore import driver from stevedore import driver
from mistral import context as auth_ctx from mistral import context as auth_ctx
from mistral.engine import base from mistral.engine import base as eng
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.executors import base as exe
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -149,7 +151,7 @@ def wrap_messaging_exception(method):
return decorator return decorator
class EngineClient(base.Engine): class EngineClient(eng.Engine):
"""RPC Engine client.""" """RPC Engine client."""
def __init__(self, rpc_conf_dict): def __init__(self, rpc_conf_dict):
@ -317,50 +319,50 @@ class EngineClient(base.Engine):
) )
class ExecutorClient(base.Executor): class ExecutorClient(exe.Executor):
"""RPC Executor client.""" """RPC Executor client."""
def __init__(self, rpc_conf_dict): def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for the Executor. """Constructs an RPC client for the Executor."""
:param rpc_conf_dict: Dict containing RPC configuration.
"""
self.topic = cfg.CONF.executor.topic self.topic = cfg.CONF.executor.topic
self._client = get_rpc_client_driver()(rpc_conf_dict) self._client = get_rpc_client_driver()(rpc_conf_dict)
@profiler.trace('executor-client-run-action') @profiler.trace('executor-client-run-action')
def run_action(self, action_ex_id, action_class_str, attributes, def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
action_params, target=None, async_=True, safe_rerun=False): params, safe_rerun, redelivered=False,
target=None, async_=True):
"""Sends a request to run action to executor. """Sends a request to run action to executor.
:param action_ex_id: Action execution id. :param action_ex_id: Action execution id.
:param action_class_str: Action class name. :param action_cls_str: Action class name.
:param attributes: Action class attributes. :param action_cls_attrs: Action class attributes.
:param action_params: Action input parameters. :param params: Action input parameters.
:param target: Target (group of action executors).
:param async: If True, run action in asynchronous mode (w/o waiting
for completion).
:param safe_rerun: If true, action would be re-run if executor dies :param safe_rerun: If true, action would be re-run if executor dies
during execution. during execution.
:param redelivered: Tells if given action was run before on another
executor.
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:return: Action result. :return: Action result.
""" """
kwargs = { rpc_kwargs = {
'action_ex_id': action_ex_id, 'action_ex_id': action_ex_id,
'action_class_str': action_class_str, 'action_cls_str': action_cls_str,
'attributes': attributes, 'action_cls_attrs': action_cls_attrs,
'params': action_params, 'params': params,
'safe_rerun': safe_rerun 'safe_rerun': safe_rerun
} }
rpc_client_method = (self._client.async_call rpc_client_method = (self._client.async_call
if async_ else self._client.sync_call) if async_ else self._client.sync_call)
return rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs) return rpc_client_method(auth_ctx.ctx(), 'run_action', **rpc_kwargs)
class EventEngineClient(base.EventEngine): class EventEngineClient(eng.EventEngine):
"""RPC EventEngine client.""" """RPC EventEngine client."""
def __init__(self, rpc_conf_dict): def __init__(self, rpc_conf_dict):

View File

67
mistral/executors/base.py Normal file
View File

@ -0,0 +1,67 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from stevedore import driver
_EXECUTORS = {}
def cleanup():
global _EXECUTORS
_EXECUTORS = {}
def get_executor(exec_type):
global _EXECUTORS
if not _EXECUTORS.get(exec_type):
mgr = driver.DriverManager(
'mistral.executors',
exec_type,
invoke_on_load=True
)
_EXECUTORS[exec_type] = mgr.driver
return _EXECUTORS[exec_type]
@six.add_metaclass(abc.ABCMeta)
class Executor(object):
"""Action executor interface."""
@abc.abstractmethod
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, redelivered=False,
target=None, async_=True):
"""Runs action.
:param action_ex_id: Corresponding action execution id.
:param action_cls_str: Path to action class in dot notation.
:param action_cls_attrs: Attributes of action class which
will be set to.
:param params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param redelivered: Tells if given action was run before on another
executor.
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:return: Action result.
"""
raise NotImplementedError()

View File

@ -17,9 +17,9 @@ from oslo_log import log as logging
from osprofiler import profiler from osprofiler import profiler
from mistral.actions import action_factory as a_f from mistral.actions import action_factory as a_f
from mistral.engine import base
from mistral.engine.rpc_backend import rpc from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.executors import base
from mistral.utils import inspect_utils as i_u from mistral.utils import inspect_utils as i_u
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
@ -31,18 +31,24 @@ class DefaultExecutor(base.Executor):
def __init__(self): def __init__(self):
self._engine_client = rpc.get_engine_client() self._engine_client = rpc.get_engine_client()
@profiler.trace('executor-run-action', hide_args=True) @profiler.trace('default-executor-run-action', hide_args=True)
def run_action(self, action_ex_id, action_class_str, attributes, def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
action_params, safe_rerun, redelivered=False): params, safe_rerun, redelivered=False,
target=None, async_=True):
"""Runs action. """Runs action.
:param action_ex_id: Action execution id. :param action_ex_id: Action execution id.
:param action_class_str: Path to action class in dot notation. :param action_cls_str: Path to action class in dot notation.
:param attributes: Attributes of action class which will be set to. :param action_cls_attrs: Attributes of action class which
:param action_params: Action parameters. will be set to.
:param params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun. :param safe_rerun: Tells if given action can be safely rerun.
:param redelivered: Tells if given action was run before on another :param redelivered: Tells if given action was run before on another
executor. executor.
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:return: Action result.
""" """
def send_error_back(error_msg): def send_error_back(error_msg):
@ -60,31 +66,38 @@ class DefaultExecutor(base.Executor):
if redelivered and not safe_rerun: if redelivered and not safe_rerun:
msg = ( msg = (
"Request to run action %s was redelivered, but action %s" "Request to run action %s was redelivered, but action %s "
" cannot be re-run safely. The only safe thing to do is fail" "cannot be re-run safely. The only safe thing to do is fail "
" action." "action." % (action_cls_str, action_cls_str)
% (action_class_str, action_class_str)
) )
return send_error_back(msg) return send_error_back(msg)
action_cls = a_f.construct_action_class(action_class_str, attributes) # Load action module.
action_cls = a_f.construct_action_class(
action_cls_str,
action_cls_attrs
)
# Instantiate action. # Instantiate action.
try: try:
action = action_cls(**action_params) action = action_cls(**params)
except Exception as e: except Exception as e:
msg = ("Failed to initialize action %s. Action init params = %s." msg = (
" Actual init params = %s. More info: %s" "Failed to initialize action %s. Action init params = %s. "
% (action_class_str, i_u.get_arg_list(action_cls.__init__), "Actual init params = %s. More info: %s" % (
action_params.keys(), e)) action_cls_str,
LOG.exception(msg) i_u.get_arg_list(action_cls.__init__),
params.keys(),
e
)
)
LOG.warning(msg)
return send_error_back(msg) return send_error_back(msg)
# Run action. # Run action.
try: try:
result = action.run() result = action.run()
@ -95,15 +108,22 @@ class DefaultExecutor(base.Executor):
result = wf_utils.Result(data=result) result = wf_utils.Result(data=result)
except Exception as e: except Exception as e:
msg = ("Failed to run action [action_ex_id=%s, action_cls='%s'," msg = (
" attributes='%s', params='%s']\n %s" "Failed to run action [action_ex_id=%s, action_cls='%s', "
% (action_ex_id, action_cls, attributes, action_params, e)) "attributes='%s', params='%s']\n %s" % (
action_ex_id,
action_cls,
action_cls_attrs,
params,
e
)
)
LOG.exception(msg) LOG.exception(msg)
return send_error_back(msg) return send_error_back(msg)
# Send action result. # Send action result.
try: try:
if action_ex_id and (action.is_sync() or result.is_error()): if action_ex_id and (action.is_sync() or result.is_error()):
self._engine_client.on_action_complete( self._engine_client.on_action_complete(
@ -118,22 +138,36 @@ class DefaultExecutor(base.Executor):
# such as message bus or network. One known case is when the action # such as message bus or network. One known case is when the action
# returns a bad result (e.g. invalid unicode) which can't be # returns a bad result (e.g. invalid unicode) which can't be
# serialized. # serialized.
msg = ("Failed to call engine's on_action_complete() method due" msg = (
" to a Mistral exception" "Failed to complete action due to a Mistral exception "
" [action_ex_id=%s, action_cls='%s'," "[action_ex_id=%s, action_cls='%s', "
" attributes='%s', params='%s']\n %s" "attributes='%s', params='%s']\n %s" % (
% (action_ex_id, action_cls, attributes, action_params, e)) action_ex_id,
action_cls,
action_cls_attrs,
params,
e
)
)
LOG.exception(msg) LOG.exception(msg)
return send_error_back(msg) return send_error_back(msg)
except Exception as e: except Exception as e:
# If it's not a Mistral exception all we can do is only # If it's not a Mistral exception all we can do is only
# log the error. # log the error.
msg = ("Failed to call engine's on_action_complete() method due" msg = (
" to an unexpected exception" "Failed to complete action due to an unexpected exception "
" [action_ex_id=%s, action_cls='%s'," "[action_ex_id=%s, action_cls='%s', "
" attributes='%s', params='%s']\n %s" "attributes='%s', params='%s']\n %s" % (
% (action_ex_id, action_cls, attributes, action_params, e)) action_ex_id,
action_cls,
action_cls_attrs,
params,
e
)
)
LOG.exception(msg) LOG.exception(msg)
return result return result

View File

@ -15,8 +15,8 @@
from oslo_log import log as logging from oslo_log import log as logging
from mistral import config as cfg from mistral import config as cfg
from mistral.engine import default_executor
from mistral.engine.rpc_backend import rpc from mistral.engine.rpc_backend import rpc
from mistral.executors import default_executor as exe
from mistral.service import base as service_base from mistral.service import base as service_base
from mistral import utils from mistral import utils
from mistral.utils import profiler as profiler_utils from mistral.utils import profiler as profiler_utils
@ -59,14 +59,14 @@ class ExecutorServer(service_base.MistralService):
if self._rpc_server: if self._rpc_server:
self._rpc_server.stop(graceful) self._rpc_server.stop(graceful)
def run_action(self, rpc_ctx, action_ex_id, action_class_str, def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
attributes, params, safe_rerun): action_cls_attrs, params, safe_rerun):
"""Receives calls over RPC to run action on executor. """Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary. :param rpc_ctx: RPC request context dictionary.
:param action_ex_id: Action execution id. :param action_ex_id: Action execution id.
:param action_class_str: Action class name. :param action_cls_str: Action class name.
:param attributes: Action class attributes. :param action_cls_attrs: Action class attributes.
:param params: Action input parameters. :param params: Action input parameters.
:param safe_rerun: Tells if given action can be safely rerun. :param safe_rerun: Tells if given action can be safely rerun.
:return: Action result. :return: Action result.
@ -74,17 +74,20 @@ class ExecutorServer(service_base.MistralService):
LOG.info( LOG.info(
"Received RPC request 'run_action'[action_ex_id=%s, " "Received RPC request 'run_action'[action_ex_id=%s, "
"action_class=%s, attributes=%s, params=%s]" "action_cls_str=%s, action_cls_attrs=%s, params=%s]" % (
% (action_ex_id, action_class_str, attributes, action_ex_id,
utils.cut(params)) action_cls_str,
action_cls_attrs,
utils.cut(params)
)
) )
redelivered = rpc_ctx.redelivered or False redelivered = rpc_ctx.redelivered or False
return self.executor.run_action( return self.executor.run_action(
action_ex_id, action_ex_id,
action_class_str, action_cls_str,
attributes, action_cls_attrs,
params, params,
safe_rerun, safe_rerun,
redelivered redelivered
@ -93,6 +96,6 @@ class ExecutorServer(service_base.MistralService):
def get_oslo_service(setup_profiler=True): def get_oslo_service(setup_profiler=True):
return ExecutorServer( return ExecutorServer(
default_executor.DefaultExecutor(), exe.DefaultExecutor(),
setup_profiler=setup_profiler setup_profiler=setup_profiler
) )

View File

@ -0,0 +1,29 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from mistral.engine.rpc_backend import rpc
LOG = logging.getLogger(__name__)
class RemoteExecutor(rpc.ExecutorClient):
"""Executor that passes execution request to a remote executor."""
def __init__(self):
self.topic = cfg.CONF.executor.topic
self._client = rpc.get_rpc_client_driver()(cfg.CONF.executor)

View File

@ -22,8 +22,9 @@ from oslo_service import service
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import engine_server from mistral.engine import engine_server
from mistral.engine import executor_server
from mistral.engine.rpc_backend import rpc from mistral.engine.rpc_backend import rpc
from mistral.executors import base as exe
from mistral.executors import executor_server
from mistral.tests.unit import base from mistral.tests.unit import base
from mistral.workflow import states from mistral.workflow import states
@ -57,40 +58,42 @@ class EngineTestCase(base.DbTestCase):
# Drop all RPC objects (transport, clients). # Drop all RPC objects (transport, clients).
rpc.cleanup() rpc.cleanup()
exe.cleanup()
self.engine_client = rpc.get_engine_client() self.threads = []
# Start remote executor.
if cfg.CONF.executor.type == 'remote':
LOG.info("Starting remote executor threads...")
self.executor_client = rpc.get_executor_client() self.executor_client = rpc.get_executor_client()
exe_svc = executor_server.get_oslo_service(setup_profiler=False)
self.executor = exe_svc.executor
self.threads.append(eventlet.spawn(launch_service, exe_svc))
self.addCleanup(exe_svc.stop, True)
LOG.info("Starting engine and executor threads...") # Start engine.
LOG.info("Starting engine threads...")
engine_service = engine_server.get_oslo_service(setup_profiler=False) self.engine_client = rpc.get_engine_client()
executor_service = executor_server.get_oslo_service( eng_svc = engine_server.get_oslo_service(setup_profiler=False)
setup_profiler=False self.engine = eng_svc.engine
) self.threads.append(eventlet.spawn(launch_service, eng_svc))
self.addCleanup(eng_svc.stop, True)
self.engine = engine_service.engine
self.executor = executor_service.executor
self.threads = [
eventlet.spawn(launch_service, executor_service),
eventlet.spawn(launch_service, engine_service)
]
self.addOnException(self.print_executions) self.addOnException(self.print_executions)
self.addCleanup(executor_service.stop, True)
self.addCleanup(engine_service.stop, True)
self.addCleanup(self.kill_threads) self.addCleanup(self.kill_threads)
# Make sure that both services fully started, otherwise # Make sure that both services fully started, otherwise
# the test may run too early. # the test may run too early.
executor_service.wait_started() if cfg.CONF.executor.type == 'remote':
engine_service.wait_started() exe_svc.wait_started()
eng_svc.wait_started()
def kill_threads(self): def kill_threads(self):
LOG.info("Finishing engine and executor threads...") LOG.info("Finishing engine and executor threads...")
[thread.kill() for thread in self.threads] for thread in self.threads:
thread.kill()
@staticmethod @staticmethod
def print_executions(exc_info=None): def print_executions(exc_info=None):

View File

@ -23,8 +23,8 @@ from oslo_utils import uuidutils
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
from mistral.engine import default_engine as d_eng from mistral.engine import default_engine as d_eng
from mistral.engine.rpc_backend import rpc
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.executors import base as exe
from mistral.services import workbooks as wb_service from mistral.services import workbooks as wb_service
from mistral.tests.unit import base from mistral.tests.unit import base
from mistral.tests.unit.engine import base as eng_test_base from mistral.tests.unit.engine import base as eng_test_base
@ -93,7 +93,7 @@ MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB)
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
@mock.patch.object(rpc, 'get_executor_client', mock.Mock()) @mock.patch.object(exe, 'get_executor', mock.Mock())
class DefaultEngineTest(base.DbTestCase): class DefaultEngineTest(base.DbTestCase):
def setUp(self): def setUp(self):
super(DefaultEngineTest, self).setUp() super(DefaultEngineTest, self).setUp()

View File

@ -16,8 +16,8 @@ import mock
from oslo_config import cfg from oslo_config import cfg
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import default_executor from mistral.executors import default_executor as d_exe
from mistral.engine.rpc_backend import rpc from mistral.executors import remote_executor as r_exe
from mistral.services import workbooks as wb_service from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base from mistral.tests.unit.engine import base
@ -77,16 +77,17 @@ workflows:
""" """
def _run_at_target(action_ex_id, action_class_str, attributes, def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
action_params, target=None, async_=True, safe_rerun=False): params, safe_rerun, target=None, async_=True):
# We'll just call executor directly for testing purposes. # We'll just call executor directly for testing purposes.
executor = default_executor.DefaultExecutor() executor = d_exe.DefaultExecutor()
executor.run_action( executor.run_action(
action_ex_id, action_ex_id,
action_class_str, action_cls_str,
attributes, action_cls_attrs,
action_params, params,
safe_rerun safe_rerun
) )
@ -100,7 +101,7 @@ class EnvironmentTest(base.EngineTestCase):
wb_service.create_workbook_v2(WORKBOOK) wb_service.create_workbook_v2(WORKBOOK)
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) @mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET)
def _test_subworkflow(self, env): def _test_subworkflow(self, env):
wf2_ex = self.engine.start_workflow('my_wb.wf2', {}, env=env) wf2_ex = self.engine.start_workflow('my_wb.wf2', {}, env=env)
@ -169,13 +170,13 @@ class EnvironmentTest(base.EngineTestCase):
for t_ex in wf1_task_execs: for t_ex in wf1_task_execs:
a_ex = t_ex.action_executions[0] a_ex = t_ex.action_executions[0]
rpc.ExecutorClient.run_action.assert_any_call( r_exe.RemoteExecutor.run_action.assert_any_call(
a_ex.id, a_ex.id,
'mistral.actions.std_actions.EchoAction', 'mistral.actions.std_actions.EchoAction',
{}, {},
a_ex.input, a_ex.input,
TARGET, False,
safe_rerun=False target=TARGET
) )
def test_subworkflow_env_task_input(self): def test_subworkflow_env_task_input(self):

View File

@ -16,8 +16,8 @@
import mock import mock
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import default_executor from mistral.executors import default_executor as d_exe
from mistral.engine.rpc_backend import rpc from mistral.executors import remote_executor as r_exe
from mistral.services import workflows as wf_service from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base from mistral.tests.unit.engine import base
from mistral.workflow import data_flow from mistral.workflow import data_flow
@ -25,9 +25,9 @@ from mistral.workflow import states
def _run_at_target(action_ex_id, action_class_str, attributes, def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, target=None, async_=True, safe_rerun=False): action_params, safe_rerun, target=None, async_=True):
# We'll just call executor directly for testing purposes. # We'll just call executor directly for testing purposes.
executor = default_executor.DefaultExecutor() executor = d_exe.DefaultExecutor()
executor.run_action( executor.run_action(
action_ex_id, action_ex_id,
@ -43,7 +43,8 @@ MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target)
class TestSafeRerun(base.EngineTestCase): class TestSafeRerun(base.EngineTestCase):
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET)
@mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_true(self): def test_safe_rerun_true(self):
wf_text = """--- wf_text = """---
version: '2.0' version: '2.0'
@ -89,7 +90,7 @@ class TestSafeRerun(base.EngineTestCase):
self.assertEqual(task1.state, states.SUCCESS) self.assertEqual(task1.state, states.SUCCESS)
self.assertEqual(task2.state, states.SUCCESS) self.assertEqual(task2.state, states.SUCCESS)
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) @mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_false(self): def test_safe_rerun_false(self):
wf_text = """--- wf_text = """---
version: '2.0' version: '2.0'
@ -135,7 +136,7 @@ class TestSafeRerun(base.EngineTestCase):
self.assertEqual(task1.state, states.ERROR) self.assertEqual(task1.state, states.ERROR)
self.assertEqual(task3.state, states.SUCCESS) self.assertEqual(task3.state, states.SUCCESS)
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) @mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_with_items(self): def test_safe_rerun_with_items(self):
wf_text = """--- wf_text = """---
version: '2.0' version: '2.0'

View File

View File

@ -0,0 +1,24 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral.tests.unit.engine import base as engine_test_base
LOG = logging.getLogger(__name__)
class ExecutorTestCase(engine_test_base.EngineTestCase):
pass

View File

@ -0,0 +1,171 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from oslo_log import log as logging
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.executors import base as exe
from mistral.executors import remote_executor as r_exe
from mistral.services import workbooks as wb_svc
from mistral.tests.unit.executors import base
from mistral.workflow import states
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
@mock.patch.object(
r_exe.RemoteExecutor,
'run_action',
mock.MagicMock(return_value=None)
)
class LocalExecutorTestCase(base.ExecutorTestCase):
@classmethod
def setUpClass(cls):
super(LocalExecutorTestCase, cls).setUpClass()
cfg.CONF.set_default('type', 'local', group='executor')
@classmethod
def tearDownClass(cls):
exe.cleanup()
cfg.CONF.set_default('type', 'remote', group='executor')
super(LocalExecutorTestCase, cls).tearDownClass()
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
'Task 1', # Mock task1 success.
'Task 2', # Mock task2 success.
'Task 3' # Mock task3 success.
]
)
)
def test_run(self):
wb_def = """
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t1:
action: std.echo output="Task 1"
on-success:
- t2
t2:
action: std.echo output="Task 2"
on-success:
- t3
t3:
action: std.echo output="Task 3"
"""
wb_svc.create_workbook_v2(wb_def)
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertEqual(states.SUCCESS, task_3_ex.state)
# Make sure the remote executor is not called.
self.assertFalse(r_exe.RemoteExecutor.run_action.called)
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
'Task 1.0', # Mock task1 success.
'Task 1.1', # Mock task1 success.
'Task 1.2', # Mock task1 success.
'Task 2' # Mock task2 success.
]
)
)
def test_run_with_items(self):
wb_def = """
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t1:
with-items: i in <% list(range(0, 3)) %>
action: std.echo output="Task 1.<% $.i %>"
publish:
v1: <% task(t1).result %>
on-success:
- t2
t2:
action: std.echo output="Task 2"
"""
wb_svc.create_workbook_v2(wb_def)
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(2, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.SUCCESS, task_2_ex.state)
with db_api.transaction():
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
)
self.assertEqual(3, len(task_1_action_exs))
# Make sure the remote executor is not called.
self.assertFalse(r_exe.RemoteExecutor.run_action.called)

View File

@ -0,0 +1,40 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral.executors import base as exe
from mistral.executors import default_executor as d_exe
from mistral.executors import remote_executor as r_exe
from mistral.tests.unit.executors import base
LOG = logging.getLogger(__name__)
class PluginTestCase(base.ExecutorTestCase):
def tearDown(self):
exe.cleanup()
super(PluginTestCase, self).tearDown()
def test_get_local_executor(self):
executor = exe.get_executor('local')
self.assertIsInstance(executor, d_exe.DefaultExecutor)
def test_get_remote_executor(self):
executor = exe.get_executor('remote')
self.assertIsInstance(executor, r_exe.RemoteExecutor)

View File

@ -70,6 +70,10 @@ mistral.actions =
std.sleep = mistral.actions.std_actions:SleepAction std.sleep = mistral.actions.std_actions:SleepAction
std.test_dict = mistral.actions.std_actions:TestDictAction std.test_dict = mistral.actions.std_actions:TestDictAction
mistral.executors =
local = mistral.executors.default_executor:DefaultExecutor
remote = mistral.executors.remote_executor:RemoteExecutor
mistral.expression.functions = mistral.expression.functions =
global = mistral.utils.expression_utils:global_ global = mistral.utils.expression_utils:global_
json_pp = mistral.utils.expression_utils:json_pp_ json_pp = mistral.utils.expression_utils:json_pp_