diff --git a/mistral/tests/unit/engine/rpc_backend/test_rpc.py b/mistral/tests/unit/engine/rpc_backend/test_rpc.py index 231c87507..393aed55c 100644 --- a/mistral/tests/unit/engine/rpc_backend/test_rpc.py +++ b/mistral/tests/unit/engine/rpc_backend/test_rpc.py @@ -13,6 +13,7 @@ # limitations under the License. from oslo_config import cfg +import oslo_messaging as messaging from mistral.tests.unit.engine import base from mistral.utils import rpc_utils @@ -43,3 +44,31 @@ class RPCTest(base.EngineTestCase): }, rpc_info ) + + def test_get_transport_url_config(self): + conf = cfg.CONF + transport_url = 'rabbit://user:supersecret@not_localhost:1234/' + + transport = messaging.TransportURL.parse(conf, transport_url) + + rpc_info = rpc_utils._get_rpc_info_from_transport_url( + transport, + conf.engine + ) + + self.assertDictEqual( + { + 'topic': 'mistral_engine', + 'port': 1234, + 'server_id': '0.0.0.0', + 'user_id': 'user', + 'virtual_host': '/', + 'host': 'not_localhost', + 'exchange': 'openstack', + 'password': 'supersecret', + 'durable_queues': False, + 'auto_delete': False, + 'timeout': 60 + }, + rpc_info + ) diff --git a/mistral/utils/rpc_utils.py b/mistral/utils/rpc_utils.py index 78d821937..6b6859221 100644 --- a/mistral/utils/rpc_utils.py +++ b/mistral/utils/rpc_utils.py @@ -14,6 +14,7 @@ # limitations under the License. from oslo_config import cfg +import oslo_messaging as messaging CONF = cfg.CONF @@ -21,23 +22,69 @@ CONF.import_opt('rpc_response_timeout', 'mistral.config') def get_rpc_info_from_oslo(additional_conf=None): - if CONF.rpc_backend in ['rabbit', 'fake']: - return _get_rabbit_info_from_oslo(additional_conf) + transport = messaging.TransportURL.parse(CONF, CONF.transport_url) + + rpc_backend = _get_rpc_backend(transport) + + if rpc_backend in ['rabbit', 'fake']: + return _get_rabbit_info(transport, additional_conf) else: # TODO(nmakhotkin) Implement. raise NotImplementedError +def _get_rpc_backend(transport): + if transport: + return transport.transport + + return CONF.rpc_backend + + +def _get_rabbit_info(transport, additional_conf): + if transport and len(transport.hosts) == 1: + # TODO(ddeja): Handle multiple hosts. + return _get_rpc_info_from_transport_url(transport, additional_conf) + + return _get_rabbit_info_from_oslo(additional_conf) + + def _get_rabbit_info_from_oslo(additional_conf): + return _prepare_rabbit_conf_dict( + CONF.oslo_messaging_rabbit.rabbit_userid, + CONF.oslo_messaging_rabbit.rabbit_password, + additional_conf.topic, + additional_conf.host, + CONF.oslo_messaging_rabbit.rabbit_host, + CONF.oslo_messaging_rabbit.rabbit_port, + CONF.oslo_messaging_rabbit.rabbit_virtual_host, + ) + + +def _get_rpc_info_from_transport_url(transport, additional_conf): + transport_host = transport.hosts[0] + + return _prepare_rabbit_conf_dict( + transport_host.username, + transport_host.password, + additional_conf.topic, + additional_conf.host, + transport_host.hostname, + transport_host.port, + transport.virtual_host or '/', + ) + + +def _prepare_rabbit_conf_dict(user_id, password, topic, server_id, host, port, + virtual_host): return { - 'user_id': CONF.oslo_messaging_rabbit.rabbit_userid, - 'password': CONF.oslo_messaging_rabbit.rabbit_password, + 'user_id': user_id, + 'password': password, 'exchange': CONF.control_exchange, - 'topic': additional_conf.topic, - 'server_id': additional_conf.host, - 'host': CONF.oslo_messaging_rabbit.rabbit_host, - 'port': CONF.oslo_messaging_rabbit.rabbit_port, - 'virtual_host': CONF.oslo_messaging_rabbit.rabbit_virtual_host, + 'topic': topic, + 'server_id': server_id, + 'host': host, + 'port': port, + 'virtual_host': virtual_host, 'durable_queues': CONF.oslo_messaging_rabbit.amqp_durable_queues, 'auto_delete': CONF.oslo_messaging_rabbit.amqp_auto_delete, 'timeout': CONF.rpc_response_timeout