Merge "Enable user to use transport_url in kombu driver"
This commit is contained in:
commit
f0fbb30d76
@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.utils import rpc_utils
|
||||
@ -43,3 +44,31 @@ class RPCTest(base.EngineTestCase):
|
||||
},
|
||||
rpc_info
|
||||
)
|
||||
|
||||
def test_get_transport_url_config(self):
|
||||
conf = cfg.CONF
|
||||
transport_url = 'rabbit://user:supersecret@not_localhost:1234/'
|
||||
|
||||
transport = messaging.TransportURL.parse(conf, transport_url)
|
||||
|
||||
rpc_info = rpc_utils._get_rpc_info_from_transport_url(
|
||||
transport,
|
||||
conf.engine
|
||||
)
|
||||
|
||||
self.assertDictEqual(
|
||||
{
|
||||
'topic': 'mistral_engine',
|
||||
'port': 1234,
|
||||
'server_id': '0.0.0.0',
|
||||
'user_id': 'user',
|
||||
'virtual_host': '/',
|
||||
'host': 'not_localhost',
|
||||
'exchange': 'openstack',
|
||||
'password': 'supersecret',
|
||||
'durable_queues': False,
|
||||
'auto_delete': False,
|
||||
'timeout': 60
|
||||
},
|
||||
rpc_info
|
||||
)
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -21,23 +22,69 @@ CONF.import_opt('rpc_response_timeout', 'mistral.config')
|
||||
|
||||
|
||||
def get_rpc_info_from_oslo(additional_conf=None):
|
||||
if CONF.rpc_backend in ['rabbit', 'fake']:
|
||||
return _get_rabbit_info_from_oslo(additional_conf)
|
||||
transport = messaging.TransportURL.parse(CONF, CONF.transport_url)
|
||||
|
||||
rpc_backend = _get_rpc_backend(transport)
|
||||
|
||||
if rpc_backend in ['rabbit', 'fake']:
|
||||
return _get_rabbit_info(transport, additional_conf)
|
||||
else:
|
||||
# TODO(nmakhotkin) Implement.
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def _get_rpc_backend(transport):
|
||||
if transport:
|
||||
return transport.transport
|
||||
|
||||
return CONF.rpc_backend
|
||||
|
||||
|
||||
def _get_rabbit_info(transport, additional_conf):
|
||||
if transport and len(transport.hosts) == 1:
|
||||
# TODO(ddeja): Handle multiple hosts.
|
||||
return _get_rpc_info_from_transport_url(transport, additional_conf)
|
||||
|
||||
return _get_rabbit_info_from_oslo(additional_conf)
|
||||
|
||||
|
||||
def _get_rabbit_info_from_oslo(additional_conf):
|
||||
return _prepare_rabbit_conf_dict(
|
||||
CONF.oslo_messaging_rabbit.rabbit_userid,
|
||||
CONF.oslo_messaging_rabbit.rabbit_password,
|
||||
additional_conf.topic,
|
||||
additional_conf.host,
|
||||
CONF.oslo_messaging_rabbit.rabbit_host,
|
||||
CONF.oslo_messaging_rabbit.rabbit_port,
|
||||
CONF.oslo_messaging_rabbit.rabbit_virtual_host,
|
||||
)
|
||||
|
||||
|
||||
def _get_rpc_info_from_transport_url(transport, additional_conf):
|
||||
transport_host = transport.hosts[0]
|
||||
|
||||
return _prepare_rabbit_conf_dict(
|
||||
transport_host.username,
|
||||
transport_host.password,
|
||||
additional_conf.topic,
|
||||
additional_conf.host,
|
||||
transport_host.hostname,
|
||||
transport_host.port,
|
||||
transport.virtual_host or '/',
|
||||
)
|
||||
|
||||
|
||||
def _prepare_rabbit_conf_dict(user_id, password, topic, server_id, host, port,
|
||||
virtual_host):
|
||||
return {
|
||||
'user_id': CONF.oslo_messaging_rabbit.rabbit_userid,
|
||||
'password': CONF.oslo_messaging_rabbit.rabbit_password,
|
||||
'user_id': user_id,
|
||||
'password': password,
|
||||
'exchange': CONF.control_exchange,
|
||||
'topic': additional_conf.topic,
|
||||
'server_id': additional_conf.host,
|
||||
'host': CONF.oslo_messaging_rabbit.rabbit_host,
|
||||
'port': CONF.oslo_messaging_rabbit.rabbit_port,
|
||||
'virtual_host': CONF.oslo_messaging_rabbit.rabbit_virtual_host,
|
||||
'topic': topic,
|
||||
'server_id': server_id,
|
||||
'host': host,
|
||||
'port': port,
|
||||
'virtual_host': virtual_host,
|
||||
'durable_queues': CONF.oslo_messaging_rabbit.amqp_durable_queues,
|
||||
'auto_delete': CONF.oslo_messaging_rabbit.amqp_auto_delete,
|
||||
'timeout': CONF.rpc_response_timeout
|
||||
|
Loading…
Reference in New Issue
Block a user