Integrating new RPC layer with Mistral

At this point all tests pass, but there is no test for kombu driver.
Komu driver must be consider as expermiental before new tests are introduced.

TODO (next commits):
 - tests

Partially implements blueprint mistral-alternative-rpc

Co-Authored-By: Dawid Deja <dawid.deja@intel.com>
Change-Id: I1f5ca1f1e8c741efcb61480ccbec8e50ad993cba
This commit is contained in:
Nikolay Mahotkin 2015-06-26 14:12:03 +03:00 committed by Dawid Deja
parent fc6d712840
commit 95e6b34b17
10 changed files with 79 additions and 105 deletions

View File

@ -25,9 +25,10 @@ eventlet.monkey_patch(
thread=False if '--use-debugger' in sys.argv else True, thread=False if '--use-debugger' in sys.argv else True,
time=True) time=True)
import os import os
import six import six
import time
# If ../mistral/__init__.py exists, add ../ to Python search path, so that # If ../mistral/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python... # it will override what happens to be installed in /usr/(local/)lib/python...
@ -39,7 +40,6 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')):
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging as messaging
if six.PY3 is True: if six.PY3 is True:
import socketserver import socketserver
@ -51,7 +51,6 @@ from wsgiref.simple_server import WSGIServer
from mistral.api import app from mistral.api import app
from mistral import config from mistral import config
from mistral import context as ctx
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import default_engine as def_eng from mistral.engine import default_engine as def_eng
from mistral.engine import default_executor as def_executor from mistral.engine import default_executor as def_executor
@ -59,6 +58,7 @@ from mistral.engine.rpc import rpc
from mistral.services import expiration_policy from mistral.services import expiration_policy
from mistral.services import scheduler from mistral.services import scheduler
from mistral.utils import profiler from mistral.utils import profiler
from mistral.utils import rpc_utils
from mistral import version from mistral import version
@ -67,51 +67,33 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def launch_executor(transport): def launch_executor():
profiler.setup('mistral-executor', cfg.CONF.executor.host) profiler.setup('mistral-executor', cfg.CONF.executor.host)
target = messaging.Target(
topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host
)
executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client()) executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client())
executor_endpoint = rpc.ExecutorServer(executor_v2)
endpoints = [rpc.ExecutorServer(executor_v2)] executor_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(CONF.executor)
server = messaging.get_rpc_server(
transport,
target,
endpoints,
executor='eventlet',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
) )
executor_server.register_endpoint(executor_endpoint)
executor_v2.register_membership() executor_v2.register_membership()
try: try:
server.start() executor_server.run()
while True:
time.sleep(604800)
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
pass pass
finally: finally:
print("Stopping executor service...") print("Stopping executor service...")
server.stop()
server.wait()
def launch_engine(transport): def launch_engine():
profiler.setup('mistral-engine', cfg.CONF.engine.host) profiler.setup('mistral-engine', cfg.CONF.engine.host)
target = messaging.Target(
topic=cfg.CONF.engine.topic,
server=cfg.CONF.engine.host
)
engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client()) engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client())
endpoints = [rpc.EngineServer(engine_v2)] engine_endpoint = rpc.EngineServer(engine_v2)
# Setup scheduler in engine. # Setup scheduler in engine.
db_api.setup_db() db_api.setup_db()
@ -120,37 +102,30 @@ def launch_engine(transport):
# Setup expiration policy # Setup expiration policy
expiration_policy.setup() expiration_policy.setup()
server = messaging.get_rpc_server( engine_server = rpc.get_rpc_server_driver()(
transport, rpc_utils.get_rpc_info_from_oslo(CONF.engine)
target,
endpoints,
executor='eventlet',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
) )
engine_server.register_endpoint(engine_endpoint)
engine_v2.register_membership() engine_v2.register_membership()
try: try:
server.start() engine_server.run()
while True:
time.sleep(604800)
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
pass pass
finally: finally:
print("Stopping engine service...") print("Stopping engine service...")
server.stop()
server.wait()
class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer): class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer):
pass pass
def launch_api(transport): def launch_api():
host = cfg.CONF.api.host host = cfg.CONF.api.host
port = cfg.CONF.api.port port = cfg.CONF.api.port
server = simple_server.make_server( api_server = simple_server.make_server(
host, host,
port, port,
app.setup_app(), app.setup_app(),
@ -160,12 +135,12 @@ def launch_api(transport):
LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" % LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" %
(host, port, os.getpid())) (host, port, os.getpid()))
server.serve_forever() api_server.serve_forever()
def launch_any(transport, options): def launch_any(options):
# Launch the servers on different threads. # Launch the servers on different threads.
threads = [eventlet.spawn(LAUNCH_OPTIONS[option], transport) threads = [eventlet.spawn(LAUNCH_OPTIONS[option])
for option in options] for option in options]
print('Server started.') print('Server started.')
@ -250,11 +225,11 @@ def main():
# servers are launched on the same process. Otherwise, messages do not # servers are launched on the same process. Otherwise, messages do not
# get delivered if the Mistral servers are launched on different # get delivered if the Mistral servers are launched on different
# processes because the "fake" transport is using an in process queue. # processes because the "fake" transport is using an in process queue.
transport = rpc.get_transport() rpc.get_transport()
if cfg.CONF.server == ['all']: if cfg.CONF.server == ['all']:
# Launch all servers. # Launch all servers.
launch_any(transport, LAUNCH_OPTIONS.keys()) launch_any(LAUNCH_OPTIONS.keys())
else: else:
# Validate launch option. # Validate launch option.
if set(cfg.CONF.server) - set(LAUNCH_OPTIONS.keys()): if set(cfg.CONF.server) - set(LAUNCH_OPTIONS.keys()):
@ -262,7 +237,7 @@ def main():
'api, engine, and executor.') 'api, engine, and executor.')
# Launch distinct set of server(s). # Launch distinct set of server(s).
launch_any(transport, set(cfg.CONF.server)) launch_any(set(cfg.CONF.server))
except RuntimeError as excp: except RuntimeError as excp:
sys.stderr.write("ERROR: %s\n" % excp) sys.stderr.write("ERROR: %s\n" % excp)

View File

@ -50,7 +50,8 @@ rpc_impl_opt = cfg.StrOpt(
'rpc_implementation', 'rpc_implementation',
default='oslo', default='oslo',
choices=['oslo', 'kombu'], choices=['oslo', 'kombu'],
help='Specifies RPC implementation for RPC client and server.' help='Specifies RPC implementation for RPC client and server. Support of '
'kombu driver is experimental.'
) )
pecan_opts = [ pecan_opts = [

View File

@ -17,13 +17,12 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_messaging.rpc import client from oslo_messaging.rpc import client
from oslo_messaging.rpc import dispatcher
from oslo_messaging.rpc import server
from stevedore import driver from stevedore import driver
from mistral import context as auth_ctx from mistral import context as auth_ctx
from mistral.engine import base from mistral.engine import base
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.utils import rpc_utils
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
@ -38,16 +37,6 @@ _ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None _EXECUTOR_CLIENT = None
def get_rpc_server(transport, target, endpoints, executor='blocking',
serializer=None):
return server.RPCServer(
transport,
target,
dispatcher.RPCDispatcher(endpoints, serializer),
executor
)
def cleanup(): def cleanup():
"""Intended to be used by tests to recreate all RPC related objects.""" """Intended to be used by tests to recreate all RPC related objects."""
@ -73,7 +62,9 @@ def get_engine_client():
global _ENGINE_CLIENT global _ENGINE_CLIENT
if not _ENGINE_CLIENT: if not _ENGINE_CLIENT:
_ENGINE_CLIENT = EngineClient(get_transport()) _ENGINE_CLIENT = EngineClient(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.engine)
)
return _ENGINE_CLIENT return _ENGINE_CLIENT
@ -82,7 +73,9 @@ def get_executor_client():
global _EXECUTOR_CLIENT global _EXECUTOR_CLIENT
if not _EXECUTOR_CLIENT: if not _EXECUTOR_CLIENT:
_EXECUTOR_CLIENT = ExecutorClient(get_transport()) _EXECUTOR_CLIENT = ExecutorClient(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.executor)
)
return _EXECUTOR_CLIENT return _EXECUTOR_CLIENT
@ -316,19 +309,12 @@ def wrap_messaging_exception(method):
class EngineClient(base.Engine): class EngineClient(base.Engine):
"""RPC Engine client.""" """RPC Engine client."""
def __init__(self, transport): def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for engine. """Constructs an RPC client for engine.
:param transport: Messaging transport. :param rpc_conf_dict: Dict containing RPC configuration.
""" """
serializer = auth_ctx.RpcContextSerializer( self._client = get_rpc_client_driver()(rpc_conf_dict)
auth_ctx.JsonPayloadSerializer())
self._client = messaging.RPCClient(
transport,
messaging.Target(topic=cfg.CONF.engine.topic),
serializer=serializer
)
@wrap_messaging_exception @wrap_messaging_exception
def start_workflow(self, wf_identifier, wf_input, description='', def start_workflow(self, wf_identifier, wf_input, description='',
@ -337,7 +323,7 @@ class EngineClient(base.Engine):
:return: Workflow execution. :return: Workflow execution.
""" """
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'start_workflow', 'start_workflow',
workflow_identifier=wf_identifier, workflow_identifier=wf_identifier,
@ -353,7 +339,7 @@ class EngineClient(base.Engine):
:return: Action execution. :return: Action execution.
""" """
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'start_action', 'start_action',
action_name=action_name, action_name=action_name,
@ -363,7 +349,7 @@ class EngineClient(base.Engine):
) )
def on_task_state_change(self, task_ex_id, state, state_info=None): def on_task_state_change(self, task_ex_id, state, state_info=None):
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'on_task_state_change', 'on_task_state_change',
task_ex_id=task_ex_id, task_ex_id=task_ex_id,
@ -389,7 +375,7 @@ class EngineClient(base.Engine):
:return: Task. :return: Task.
""" """
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'on_action_complete', 'on_action_complete',
action_ex_id=action_ex_id, action_ex_id=action_ex_id,
@ -405,7 +391,7 @@ class EngineClient(base.Engine):
:return: Workflow execution. :return: Workflow execution.
""" """
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'pause_workflow', 'pause_workflow',
execution_id=wf_ex_id execution_id=wf_ex_id
@ -442,7 +428,7 @@ class EngineClient(base.Engine):
:return: Workflow execution. :return: Workflow execution.
""" """
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'resume_workflow', 'resume_workflow',
wf_ex_id=wf_ex_id, wf_ex_id=wf_ex_id,
@ -463,7 +449,7 @@ class EngineClient(base.Engine):
:return: Workflow execution, model.Execution :return: Workflow execution, model.Execution
""" """
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'stop_workflow', 'stop_workflow',
execution_id=wf_ex_id, execution_id=wf_ex_id,
@ -480,7 +466,7 @@ class EngineClient(base.Engine):
:return: Workflow execution. :return: Workflow execution.
""" """
return self._client.call( return self._client.sync_call(
auth_ctx.ctx(), auth_ctx.ctx(),
'rollback_workflow', 'rollback_workflow',
execution_id=wf_ex_id execution_id=wf_ex_id
@ -522,23 +508,14 @@ class ExecutorServer(object):
class ExecutorClient(base.Executor): class ExecutorClient(base.Executor):
"""RPC Executor client.""" """RPC Executor client."""
def __init__(self, transport): def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for the Executor. """Constructs an RPC client for the Executor.
:param transport: Messaging transport. :param rpc_conf_dict: Dict containing RPC configuration.
:type transport: Transport.
""" """
self.topic = cfg.CONF.executor.topic self.topic = cfg.CONF.executor.topic
self._client = get_rpc_client_driver()(rpc_conf_dict)
serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer()
)
self._client = messaging.RPCClient(
transport,
messaging.Target(),
serializer=serializer
)
def run_action(self, action_ex_id, action_class_str, attributes, def run_action(self, action_ex_id, action_class_str, attributes,
action_params, target=None, async=True): action_params, target=None, async=True):
@ -561,9 +538,8 @@ class ExecutorClient(base.Executor):
'params': action_params 'params': action_params
} }
call_ctx = self._client.prepare(topic=self.topic, server=target) rpc_client_method = (self._client.async_call
if async else self._client.sync_call)
rpc_client_method = call_ctx.cast if async else call_ctx.call
res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs) res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs)

View File

@ -19,7 +19,9 @@ import datetime
import json import json
import mock import mock
from oslo_config import cfg from oslo_config import cfg
import oslo_messaging
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
@ -29,6 +31,9 @@ from mistral.tests.unit.api import base
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
# This line is needed for correct initialization of messaging config.
oslo_messaging.get_transport(cfg.CONF)
ACTION_EX_DB = models.ActionExecution( ACTION_EX_DB = models.ActionExecution(
id='123', id='123',
@ -146,6 +151,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DELETE = mock.MagicMock(return_value=None) MOCK_DELETE = mock.MagicMock(return_value=None)
@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock())
class TestActionExecutionsController(base.APITest): class TestActionExecutionsController(base.APITest):
def setUp(self): def setUp(self):
super(TestActionExecutionsController, self).setUp() super(TestActionExecutionsController, self).setUp()

View File

@ -19,6 +19,8 @@ import datetime
import json import json
import mock import mock
from oslo_config import cfg
import oslo_messaging
import uuid import uuid
from webtest import app as webtest_app from webtest import app as webtest_app
@ -31,6 +33,9 @@ from mistral.tests.unit.api import base
from mistral import utils from mistral import utils
from mistral.workflow import states from mistral.workflow import states
# This line is needed for correct initialization of messaging config.
oslo_messaging.get_transport(cfg.CONF)
WF_EX = models.WorkflowExecution( WF_EX = models.WorkflowExecution(
id='123e4567-e89b-12d3-a456-426655440000', id='123e4567-e89b-12d3-a456-426655440000',
@ -117,6 +122,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException()) MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException())
@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock())
class TestExecutionsController(base.APITest): class TestExecutionsController(base.APITest):
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
def test_get(self): def test_get(self):

View File

@ -99,11 +99,10 @@ class EngineTestCase(base.DbTestCase):
# Drop all RPC objects (transport, clients). # Drop all RPC objects (transport, clients).
rpc.cleanup() rpc.cleanup()
transport = rpc.get_transport() transport = rpc.get_transport()
self.engine_client = rpc.EngineClient(transport) self.engine_client = rpc.get_engine_client()
self.executor_client = rpc.ExecutorClient(transport) self.executor_client = rpc.get_executor_client()
self.engine = def_eng.DefaultEngine(self.engine_client) self.engine = def_eng.DefaultEngine(self.engine_client)
self.executor = def_exec.DefaultExecutor(self.engine_client) self.executor = def_exec.DefaultExecutor(self.engine_client)

View File

@ -19,10 +19,13 @@ from mistral.utils import rpc_utils
class RPCTest(base.EngineTestCase): class RPCTest(base.EngineTestCase):
def setUp(self):
super(RPCTest, self).setUp()
def test_get_rabbit_config(self): def test_get_rabbit_config(self):
conf = cfg.CONF conf = cfg.CONF
rpc_info = rpc_utils.get_rabbit_info_from_oslo(conf.engine) rpc_info = rpc_utils._get_rabbit_info_from_oslo(conf.engine)
self.assertDictEqual( self.assertDictEqual(
{ {
@ -34,8 +37,8 @@ class RPCTest(base.EngineTestCase):
'host': 'localhost', 'host': 'localhost',
'exchange': 'openstack', 'exchange': 'openstack',
'password': 'guest', 'password': 'guest',
'auto_delete': False,
'durable_queues': False, 'durable_queues': False,
'auto_delete': False
}, },
rpc_info rpc_info
) )

View File

@ -452,7 +452,7 @@ class DefaultEngineTest(base.DbTestCase):
class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase): class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
def test_engine_client_remote_error(self): def test_engine_client_remote_error(self):
mocked = mock.Mock() mocked = mock.Mock()
mocked.call.side_effect = rpc_client.RemoteError( mocked.sync_call.side_effect = rpc_client.RemoteError(
'InputException', 'InputException',
'Input is wrong' 'Input is wrong'
) )
@ -468,7 +468,7 @@ class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
def test_engine_client_remote_error_arbitrary(self): def test_engine_client_remote_error_arbitrary(self):
mocked = mock.Mock() mocked = mock.Mock()
mocked.call.side_effect = KeyError('wrong key') mocked.sync_call.side_effect = KeyError('wrong key')
self.engine_client._client = mocked self.engine_client._client = mocked
exception = self.assertRaises( exception = self.assertRaises(

View File

@ -19,7 +19,15 @@ from oslo_config import cfg
CONF = cfg.CONF CONF = cfg.CONF
def get_rabbit_info_from_oslo(additional_conf): def get_rpc_info_from_oslo(additional_conf=None):
if CONF.rpc_backend in ['rabbit', 'fake']:
return _get_rabbit_info_from_oslo(additional_conf)
else:
# TODO(nmakhotkin) Implement.
raise NotImplementedError
def _get_rabbit_info_from_oslo(additional_conf):
return { return {
'user_id': CONF.oslo_messaging_rabbit.rabbit_userid, 'user_id': CONF.oslo_messaging_rabbit.rabbit_userid,
'password': CONF.oslo_messaging_rabbit.rabbit_password, 'password': CONF.oslo_messaging_rabbit.rabbit_password,