diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 204f6c833..e13ab70ca 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -25,9 +25,10 @@ eventlet.monkey_patch( thread=False if '--use-debugger' in sys.argv else True, time=True) + import os import six -import time + # If ../mistral/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... @@ -39,7 +40,6 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')): from oslo_config import cfg from oslo_log import log as logging -import oslo_messaging as messaging if six.PY3 is True: import socketserver @@ -51,7 +51,6 @@ from wsgiref.simple_server import WSGIServer from mistral.api import app from mistral import config -from mistral import context as ctx from mistral.db.v2 import api as db_api from mistral.engine import default_engine as def_eng from mistral.engine import default_executor as def_executor @@ -59,6 +58,7 @@ from mistral.engine.rpc import rpc from mistral.services import expiration_policy from mistral.services import scheduler from mistral.utils import profiler +from mistral.utils import rpc_utils from mistral import version @@ -67,51 +67,33 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) -def launch_executor(transport): +def launch_executor(): profiler.setup('mistral-executor', cfg.CONF.executor.host) - target = messaging.Target( - topic=cfg.CONF.executor.topic, - server=cfg.CONF.executor.host - ) - executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client()) + executor_endpoint = rpc.ExecutorServer(executor_v2) - endpoints = [rpc.ExecutorServer(executor_v2)] - - server = messaging.get_rpc_server( - transport, - target, - endpoints, - executor='eventlet', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) + executor_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(CONF.executor) ) + executor_server.register_endpoint(executor_endpoint) executor_v2.register_membership() try: - server.start() - while True: - time.sleep(604800) + executor_server.run() except (KeyboardInterrupt, SystemExit): pass finally: print("Stopping executor service...") - server.stop() - server.wait() -def launch_engine(transport): +def launch_engine(): profiler.setup('mistral-engine', cfg.CONF.engine.host) - target = messaging.Target( - topic=cfg.CONF.engine.topic, - server=cfg.CONF.engine.host - ) - engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client()) - endpoints = [rpc.EngineServer(engine_v2)] + engine_endpoint = rpc.EngineServer(engine_v2) # Setup scheduler in engine. db_api.setup_db() @@ -120,37 +102,30 @@ def launch_engine(transport): # Setup expiration policy expiration_policy.setup() - server = messaging.get_rpc_server( - transport, - target, - endpoints, - executor='eventlet', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) + engine_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(CONF.engine) ) + engine_server.register_endpoint(engine_endpoint) engine_v2.register_membership() try: - server.start() - while True: - time.sleep(604800) + engine_server.run() except (KeyboardInterrupt, SystemExit): pass finally: print("Stopping engine service...") - server.stop() - server.wait() class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer): pass -def launch_api(transport): +def launch_api(): host = cfg.CONF.api.host port = cfg.CONF.api.port - server = simple_server.make_server( + api_server = simple_server.make_server( host, port, app.setup_app(), @@ -160,12 +135,12 @@ def launch_api(transport): LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" % (host, port, os.getpid())) - server.serve_forever() + api_server.serve_forever() -def launch_any(transport, options): +def launch_any(options): # Launch the servers on different threads. - threads = [eventlet.spawn(LAUNCH_OPTIONS[option], transport) + threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) for option in options] print('Server started.') @@ -250,11 +225,11 @@ def main(): # servers are launched on the same process. Otherwise, messages do not # get delivered if the Mistral servers are launched on different # processes because the "fake" transport is using an in process queue. - transport = rpc.get_transport() + rpc.get_transport() if cfg.CONF.server == ['all']: # Launch all servers. - launch_any(transport, LAUNCH_OPTIONS.keys()) + launch_any(LAUNCH_OPTIONS.keys()) else: # Validate launch option. if set(cfg.CONF.server) - set(LAUNCH_OPTIONS.keys()): @@ -262,7 +237,7 @@ def main(): 'api, engine, and executor.') # Launch distinct set of server(s). - launch_any(transport, set(cfg.CONF.server)) + launch_any(set(cfg.CONF.server)) except RuntimeError as excp: sys.stderr.write("ERROR: %s\n" % excp) diff --git a/mistral/config.py b/mistral/config.py index b945d74fa..9d70ecd09 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -50,7 +50,8 @@ rpc_impl_opt = cfg.StrOpt( 'rpc_implementation', default='oslo', choices=['oslo', 'kombu'], - help='Specifies RPC implementation for RPC client and server.' + help='Specifies RPC implementation for RPC client and server. Support of ' + 'kombu driver is experimental.' ) pecan_opts = [ diff --git a/mistral/engine/rpc/rpc.py b/mistral/engine/rpc/rpc.py index 1dbf55adc..65869cb85 100644 --- a/mistral/engine/rpc/rpc.py +++ b/mistral/engine/rpc/rpc.py @@ -17,13 +17,12 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging.rpc import client -from oslo_messaging.rpc import dispatcher -from oslo_messaging.rpc import server from stevedore import driver from mistral import context as auth_ctx from mistral.engine import base from mistral import exceptions as exc +from mistral.utils import rpc_utils from mistral.workflow import utils as wf_utils @@ -38,16 +37,6 @@ _ENGINE_CLIENT = None _EXECUTOR_CLIENT = None -def get_rpc_server(transport, target, endpoints, executor='blocking', - serializer=None): - return server.RPCServer( - transport, - target, - dispatcher.RPCDispatcher(endpoints, serializer), - executor - ) - - def cleanup(): """Intended to be used by tests to recreate all RPC related objects.""" @@ -73,7 +62,9 @@ def get_engine_client(): global _ENGINE_CLIENT if not _ENGINE_CLIENT: - _ENGINE_CLIENT = EngineClient(get_transport()) + _ENGINE_CLIENT = EngineClient( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.engine) + ) return _ENGINE_CLIENT @@ -82,7 +73,9 @@ def get_executor_client(): global _EXECUTOR_CLIENT if not _EXECUTOR_CLIENT: - _EXECUTOR_CLIENT = ExecutorClient(get_transport()) + _EXECUTOR_CLIENT = ExecutorClient( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.executor) + ) return _EXECUTOR_CLIENT @@ -316,19 +309,12 @@ def wrap_messaging_exception(method): class EngineClient(base.Engine): """RPC Engine client.""" - def __init__(self, transport): + def __init__(self, rpc_conf_dict): """Constructs an RPC client for engine. - :param transport: Messaging transport. + :param rpc_conf_dict: Dict containing RPC configuration. """ - serializer = auth_ctx.RpcContextSerializer( - auth_ctx.JsonPayloadSerializer()) - - self._client = messaging.RPCClient( - transport, - messaging.Target(topic=cfg.CONF.engine.topic), - serializer=serializer - ) + self._client = get_rpc_client_driver()(rpc_conf_dict) @wrap_messaging_exception def start_workflow(self, wf_identifier, wf_input, description='', @@ -337,7 +323,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'start_workflow', workflow_identifier=wf_identifier, @@ -353,7 +339,7 @@ class EngineClient(base.Engine): :return: Action execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'start_action', action_name=action_name, @@ -363,7 +349,7 @@ class EngineClient(base.Engine): ) def on_task_state_change(self, task_ex_id, state, state_info=None): - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'on_task_state_change', task_ex_id=task_ex_id, @@ -389,7 +375,7 @@ class EngineClient(base.Engine): :return: Task. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'on_action_complete', action_ex_id=action_ex_id, @@ -405,7 +391,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'pause_workflow', execution_id=wf_ex_id @@ -442,7 +428,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'resume_workflow', wf_ex_id=wf_ex_id, @@ -463,7 +449,7 @@ class EngineClient(base.Engine): :return: Workflow execution, model.Execution """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'stop_workflow', execution_id=wf_ex_id, @@ -480,7 +466,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'rollback_workflow', execution_id=wf_ex_id @@ -522,23 +508,14 @@ class ExecutorServer(object): class ExecutorClient(base.Executor): """RPC Executor client.""" - def __init__(self, transport): + def __init__(self, rpc_conf_dict): """Constructs an RPC client for the Executor. - :param transport: Messaging transport. - :type transport: Transport. + :param rpc_conf_dict: Dict containing RPC configuration. """ + self.topic = cfg.CONF.executor.topic - - serializer = auth_ctx.RpcContextSerializer( - auth_ctx.JsonPayloadSerializer() - ) - - self._client = messaging.RPCClient( - transport, - messaging.Target(), - serializer=serializer - ) + self._client = get_rpc_client_driver()(rpc_conf_dict) def run_action(self, action_ex_id, action_class_str, attributes, action_params, target=None, async=True): @@ -561,9 +538,8 @@ class ExecutorClient(base.Executor): 'params': action_params } - call_ctx = self._client.prepare(topic=self.topic, server=target) - - rpc_client_method = call_ctx.cast if async else call_ctx.call + rpc_client_method = (self._client.async_call + if async else self._client.sync_call) res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs) diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index b3a7276e8..851b45b0b 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -19,7 +19,9 @@ import datetime import json import mock + from oslo_config import cfg +import oslo_messaging from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models @@ -29,6 +31,9 @@ from mistral.tests.unit.api import base from mistral.workflow import states from mistral.workflow import utils as wf_utils +# This line is needed for correct initialization of messaging config. +oslo_messaging.get_transport(cfg.CONF) + ACTION_EX_DB = models.ActionExecution( id='123', @@ -146,6 +151,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_DELETE = mock.MagicMock(return_value=None) +@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock()) class TestActionExecutionsController(base.APITest): def setUp(self): super(TestActionExecutionsController, self).setUp() diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 3b503264a..5f2e7687c 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -19,6 +19,8 @@ import datetime import json import mock +from oslo_config import cfg +import oslo_messaging import uuid from webtest import app as webtest_app @@ -31,6 +33,9 @@ from mistral.tests.unit.api import base from mistral import utils from mistral.workflow import states +# This line is needed for correct initialization of messaging config. +oslo_messaging.get_transport(cfg.CONF) + WF_EX = models.WorkflowExecution( id='123e4567-e89b-12d3-a456-426655440000', @@ -117,6 +122,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException()) +@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock()) class TestExecutionsController(base.APITest): @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) def test_get(self): diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 568ca3b38..89d2401ab 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -99,11 +99,10 @@ class EngineTestCase(base.DbTestCase): # Drop all RPC objects (transport, clients). rpc.cleanup() - transport = rpc.get_transport() - self.engine_client = rpc.EngineClient(transport) - self.executor_client = rpc.ExecutorClient(transport) + self.engine_client = rpc.get_engine_client() + self.executor_client = rpc.get_executor_client() self.engine = def_eng.DefaultEngine(self.engine_client) self.executor = def_exec.DefaultExecutor(self.engine_client) diff --git a/mistral/tests/unit/engine/rpc_direct/__init__.py b/mistral/tests/unit/engine/rpc/__init__.py similarity index 100% rename from mistral/tests/unit/engine/rpc_direct/__init__.py rename to mistral/tests/unit/engine/rpc/__init__.py diff --git a/mistral/tests/unit/engine/rpc_direct/test_rpc.py b/mistral/tests/unit/engine/rpc/test_rpc.py similarity index 88% rename from mistral/tests/unit/engine/rpc_direct/test_rpc.py rename to mistral/tests/unit/engine/rpc/test_rpc.py index e5824303f..91a94dd66 100644 --- a/mistral/tests/unit/engine/rpc_direct/test_rpc.py +++ b/mistral/tests/unit/engine/rpc/test_rpc.py @@ -19,10 +19,13 @@ from mistral.utils import rpc_utils class RPCTest(base.EngineTestCase): + def setUp(self): + super(RPCTest, self).setUp() + def test_get_rabbit_config(self): conf = cfg.CONF - rpc_info = rpc_utils.get_rabbit_info_from_oslo(conf.engine) + rpc_info = rpc_utils._get_rabbit_info_from_oslo(conf.engine) self.assertDictEqual( { @@ -34,8 +37,8 @@ class RPCTest(base.EngineTestCase): 'host': 'localhost', 'exchange': 'openstack', 'password': 'guest', - 'auto_delete': False, 'durable_queues': False, + 'auto_delete': False }, rpc_info ) diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 375f55d57..28ff8c307 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -452,7 +452,7 @@ class DefaultEngineTest(base.DbTestCase): class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase): def test_engine_client_remote_error(self): mocked = mock.Mock() - mocked.call.side_effect = rpc_client.RemoteError( + mocked.sync_call.side_effect = rpc_client.RemoteError( 'InputException', 'Input is wrong' ) @@ -468,7 +468,7 @@ class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase): def test_engine_client_remote_error_arbitrary(self): mocked = mock.Mock() - mocked.call.side_effect = KeyError('wrong key') + mocked.sync_call.side_effect = KeyError('wrong key') self.engine_client._client = mocked exception = self.assertRaises( diff --git a/mistral/utils/rpc_utils.py b/mistral/utils/rpc_utils.py index ad006a765..efd03dbfd 100644 --- a/mistral/utils/rpc_utils.py +++ b/mistral/utils/rpc_utils.py @@ -19,7 +19,15 @@ from oslo_config import cfg CONF = cfg.CONF -def get_rabbit_info_from_oslo(additional_conf): +def get_rpc_info_from_oslo(additional_conf=None): + if CONF.rpc_backend in ['rabbit', 'fake']: + return _get_rabbit_info_from_oslo(additional_conf) + else: + # TODO(nmakhotkin) Implement. + raise NotImplementedError + + +def _get_rabbit_info_from_oslo(additional_conf): return { 'user_id': CONF.oslo_messaging_rabbit.rabbit_userid, 'password': CONF.oslo_messaging_rabbit.rabbit_password,