Move each drivers options into its own group
All drivers options are current stored into the DEFAULT group. This change makes the configuration clearer by putting driver options into a group named oslo_messaging_<driver>. Closes-bug: #1417040 Change-Id: I96a9682afe7eb0caf1fbf47bbb0291833aec245b
This commit is contained in:
parent
f5b9defce1
commit
824313ac9c
@ -42,11 +42,13 @@ amqp_opts = [
|
||||
help='Use durable queues in AMQP.'),
|
||||
cfg.BoolOpt('amqp_auto_delete',
|
||||
default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Auto-delete queues in AMQP.'),
|
||||
|
||||
# FIXME(markmc): this was toplevel in openstack.common.rpc
|
||||
cfg.IntOpt('rpc_conn_pool_size',
|
||||
default=30,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Size of RPC connection pool.'),
|
||||
]
|
||||
|
||||
@ -56,11 +58,11 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class ConnectionPool(pool.Pool):
|
||||
"""Class that implements a Pool of Connections."""
|
||||
def __init__(self, conf, url, connection_cls):
|
||||
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
|
||||
self.connection_cls = connection_cls
|
||||
self.conf = conf
|
||||
self.url = url
|
||||
super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
|
||||
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
|
||||
self.reply_proxy = None
|
||||
|
||||
# TODO(comstud): Timeout connections not used in a while
|
||||
|
@ -41,41 +41,52 @@ LOG = logging.getLogger(__name__)
|
||||
qpid_opts = [
|
||||
cfg.StrOpt('qpid_hostname',
|
||||
default='localhost',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Qpid broker hostname.'),
|
||||
cfg.IntOpt('qpid_port',
|
||||
default=5672,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Qpid broker port.'),
|
||||
cfg.ListOpt('qpid_hosts',
|
||||
default=['$qpid_hostname:$qpid_port'],
|
||||
deprecated_group='DEFAULT',
|
||||
help='Qpid HA cluster host:port pairs.'),
|
||||
cfg.StrOpt('qpid_username',
|
||||
default='',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Username for Qpid connection.'),
|
||||
cfg.StrOpt('qpid_password',
|
||||
default='',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Password for Qpid connection.',
|
||||
secret=True),
|
||||
cfg.StrOpt('qpid_sasl_mechanisms',
|
||||
default='',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Space separated list of SASL mechanisms to use for '
|
||||
'auth.'),
|
||||
cfg.IntOpt('qpid_heartbeat',
|
||||
default=60,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Seconds between connection keepalive heartbeats.'),
|
||||
cfg.StrOpt('qpid_protocol',
|
||||
default='tcp',
|
||||
deprecated_group='DEFAULT',
|
||||
help="Transport to use, either 'tcp' or 'ssl'."),
|
||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
||||
default=True,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Whether to disable the Nagle algorithm.'),
|
||||
cfg.IntOpt('qpid_receiver_capacity',
|
||||
default=1,
|
||||
deprecated_group='DEFAULT',
|
||||
help='The number of prefetched messages held by receiver.'),
|
||||
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
|
||||
# this file could probably use some additional refactoring so that the
|
||||
# differences between each version are split into different classes.
|
||||
cfg.IntOpt('qpid_topology_version',
|
||||
default=1,
|
||||
deprecated_group='DEFAULT',
|
||||
help="The qpid topology version to use. Version 1 is what "
|
||||
"was originally used by impl_qpid. Version 2 includes "
|
||||
"some backwards-incompatible changes that allow broker "
|
||||
@ -459,6 +470,7 @@ class Connection(object):
|
||||
self.session = None
|
||||
self.consumers = {}
|
||||
self.conf = conf
|
||||
self.driver_conf = conf.oslo_messaging_qpid
|
||||
|
||||
self._consume_loop_stopped = False
|
||||
|
||||
@ -476,7 +488,7 @@ class Connection(object):
|
||||
self.brokers_params.append(params)
|
||||
else:
|
||||
# Old configuration format
|
||||
for adr in self.conf.qpid_hosts:
|
||||
for adr in self.driver_conf.qpid_hosts:
|
||||
hostname, port = netutils.parse_host_port(
|
||||
adr, default_port=5672)
|
||||
|
||||
@ -485,8 +497,8 @@ class Connection(object):
|
||||
|
||||
params = {
|
||||
'host': '%s:%d' % (hostname, port),
|
||||
'username': self.conf.qpid_username,
|
||||
'password': self.conf.qpid_password,
|
||||
'username': self.driver_conf.qpid_username,
|
||||
'password': self.driver_conf.qpid_password,
|
||||
}
|
||||
self.brokers_params.append(params)
|
||||
|
||||
@ -505,12 +517,12 @@ class Connection(object):
|
||||
self.connection.username = broker['username']
|
||||
self.connection.password = broker['password']
|
||||
|
||||
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
|
||||
self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms
|
||||
# Reconnection is done by self.reconnect()
|
||||
self.connection.reconnect = False
|
||||
self.connection.heartbeat = self.conf.qpid_heartbeat
|
||||
self.connection.transport = self.conf.qpid_protocol
|
||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||
self.connection.heartbeat = self.driver_conf.qpid_heartbeat
|
||||
self.connection.transport = self.driver_conf.qpid_protocol
|
||||
self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay
|
||||
self.connection.open()
|
||||
|
||||
def _register_consumer(self, consumer):
|
||||
@ -633,7 +645,8 @@ class Connection(object):
|
||||
"%(err_str)s"), log_info)
|
||||
|
||||
def _declare_consumer():
|
||||
consumer = consumer_cls(self.conf, self.session, topic, callback)
|
||||
consumer = consumer_cls(self.driver_conf, self.session, topic,
|
||||
callback)
|
||||
self._register_consumer(consumer)
|
||||
return consumer
|
||||
|
||||
@ -693,7 +706,8 @@ class Connection(object):
|
||||
"'%(topic)s': %(err_str)s"), log_info)
|
||||
|
||||
def _publisher_send():
|
||||
publisher = cls(self.conf, self.session, topic=topic, **kwargs)
|
||||
publisher = cls(self.driver_conf, self.session, topic=topic,
|
||||
**kwargs)
|
||||
publisher.send(msg)
|
||||
|
||||
return self.ensure(_connect_error, _publisher_send, retry=retry)
|
||||
@ -764,10 +778,15 @@ class QpidDriver(amqpdriver.AMQPDriverBase):
|
||||
|
||||
def __init__(self, conf, url,
|
||||
default_exchange=None, allowed_remote_exmods=None):
|
||||
conf.register_opts(qpid_opts)
|
||||
conf.register_opts(rpc_amqp.amqp_opts)
|
||||
opt_group = cfg.OptGroup(name='oslo_messaging_qpid',
|
||||
title='QPID driver options')
|
||||
conf.register_group(opt_group)
|
||||
conf.register_opts(qpid_opts, group=opt_group)
|
||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
||||
|
||||
connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
|
||||
connection_pool = rpc_amqp.ConnectionPool(
|
||||
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
|
||||
url, Connection)
|
||||
|
||||
super(QpidDriver, self).__init__(conf, url,
|
||||
connection_pool,
|
||||
|
@ -42,6 +42,7 @@ from oslo_messaging import exceptions
|
||||
rabbit_opts = [
|
||||
cfg.StrOpt('kombu_ssl_version',
|
||||
default='',
|
||||
deprecated_group='DEFAULT',
|
||||
help='SSL version to use (valid only if SSL enabled). '
|
||||
'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, '
|
||||
'TLSv1_1, and TLSv1_2 may be available on some '
|
||||
@ -49,57 +50,72 @@ rabbit_opts = [
|
||||
),
|
||||
cfg.StrOpt('kombu_ssl_keyfile',
|
||||
default='',
|
||||
deprecated_group='DEFAULT',
|
||||
help='SSL key file (valid only if SSL enabled).'),
|
||||
cfg.StrOpt('kombu_ssl_certfile',
|
||||
default='',
|
||||
deprecated_group='DEFAULT',
|
||||
help='SSL cert file (valid only if SSL enabled).'),
|
||||
cfg.StrOpt('kombu_ssl_ca_certs',
|
||||
default='',
|
||||
deprecated_group='DEFAULT',
|
||||
help='SSL certification authority file '
|
||||
'(valid only if SSL enabled).'),
|
||||
cfg.FloatOpt('kombu_reconnect_delay',
|
||||
default=1.0,
|
||||
deprecated_group='DEFAULT',
|
||||
help='How long to wait before reconnecting in response to an '
|
||||
'AMQP consumer cancel notification.'),
|
||||
cfg.StrOpt('rabbit_host',
|
||||
default='localhost',
|
||||
deprecated_group='DEFAULT',
|
||||
help='The RabbitMQ broker address where a single node is '
|
||||
'used.'),
|
||||
cfg.IntOpt('rabbit_port',
|
||||
default=5672,
|
||||
deprecated_group='DEFAULT',
|
||||
help='The RabbitMQ broker port where a single node is used.'),
|
||||
cfg.ListOpt('rabbit_hosts',
|
||||
default=['$rabbit_host:$rabbit_port'],
|
||||
deprecated_group='DEFAULT',
|
||||
help='RabbitMQ HA cluster host:port pairs.'),
|
||||
cfg.BoolOpt('rabbit_use_ssl',
|
||||
default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Connect over SSL for RabbitMQ.'),
|
||||
cfg.StrOpt('rabbit_userid',
|
||||
default='guest',
|
||||
deprecated_group='DEFAULT',
|
||||
help='The RabbitMQ userid.'),
|
||||
cfg.StrOpt('rabbit_password',
|
||||
default='guest',
|
||||
deprecated_group='DEFAULT',
|
||||
help='The RabbitMQ password.',
|
||||
secret=True),
|
||||
cfg.StrOpt('rabbit_login_method',
|
||||
default='AMQPLAIN',
|
||||
deprecated_group='DEFAULT',
|
||||
help='The RabbitMQ login method.'),
|
||||
cfg.StrOpt('rabbit_virtual_host',
|
||||
default='/',
|
||||
deprecated_group='DEFAULT',
|
||||
help='The RabbitMQ virtual host.'),
|
||||
cfg.IntOpt('rabbit_retry_interval',
|
||||
default=1,
|
||||
help='How frequently to retry connecting with RabbitMQ.'),
|
||||
cfg.IntOpt('rabbit_retry_backoff',
|
||||
default=2,
|
||||
deprecated_group='DEFAULT',
|
||||
help='How long to backoff for between retries when connecting '
|
||||
'to RabbitMQ.'),
|
||||
cfg.IntOpt('rabbit_max_retries',
|
||||
default=0,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Maximum number of RabbitMQ connection retries. '
|
||||
'Default is 0 (infinite retry count).'),
|
||||
cfg.BoolOpt('rabbit_ha_queues',
|
||||
default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
|
||||
'If you change this option, you must wipe the '
|
||||
'RabbitMQ database.'),
|
||||
@ -107,6 +123,7 @@ rabbit_opts = [
|
||||
# NOTE(sileht): deprecated option since oslo_messaging 1.5.0,
|
||||
cfg.BoolOpt('fake_rabbit',
|
||||
default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Deprecated, use rpc_backend=kombu+memory or '
|
||||
'rpc_backend=fake'),
|
||||
]
|
||||
@ -447,25 +464,26 @@ class Connection(object):
|
||||
self.consumers = []
|
||||
self.consumer_num = itertools.count(1)
|
||||
self.conf = conf
|
||||
self.max_retries = self.conf.rabbit_max_retries
|
||||
self.driver_conf = self.conf.oslo_messaging_rabbit
|
||||
self.max_retries = self.driver_conf.rabbit_max_retries
|
||||
# Try forever?
|
||||
if self.max_retries <= 0:
|
||||
self.max_retries = None
|
||||
self.interval_start = self.conf.rabbit_retry_interval
|
||||
self.interval_stepping = self.conf.rabbit_retry_backoff
|
||||
self.interval_start = self.driver_conf.rabbit_retry_interval
|
||||
self.interval_stepping = self.driver_conf.rabbit_retry_backoff
|
||||
# max retry-interval = 30 seconds
|
||||
self.interval_max = 30
|
||||
|
||||
self._ssl_params = self._fetch_ssl_params()
|
||||
self._login_method = self.conf.rabbit_login_method
|
||||
self._login_method = self.driver_conf.rabbit_login_method
|
||||
|
||||
if url.virtual_host is not None:
|
||||
virtual_host = url.virtual_host
|
||||
else:
|
||||
virtual_host = self.conf.rabbit_virtual_host
|
||||
virtual_host = self.driver_conf.rabbit_virtual_host
|
||||
|
||||
self._url = ''
|
||||
if self.conf.fake_rabbit:
|
||||
if self.driver_conf.fake_rabbit:
|
||||
LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
|
||||
"rpc_backend to kombu+memory or use the fake "
|
||||
"driver instead.")
|
||||
@ -487,13 +505,13 @@ class Connection(object):
|
||||
transport = url.transport.replace('kombu+', '')
|
||||
self._url = "%s://%s" % (transport, virtual_host)
|
||||
else:
|
||||
for adr in self.conf.rabbit_hosts:
|
||||
for adr in self.driver_conf.rabbit_hosts:
|
||||
hostname, port = netutils.parse_host_port(
|
||||
adr, default_port=self.conf.rabbit_port)
|
||||
adr, default_port=self.driver_conf.rabbit_port)
|
||||
self._url += '%samqp://%s:%s@%s:%s/%s' % (
|
||||
";" if self._url else '',
|
||||
parse.quote(self.conf.rabbit_userid),
|
||||
parse.quote(self.conf.rabbit_password),
|
||||
parse.quote(self.driver_conf.rabbit_userid),
|
||||
parse.quote(self.driver_conf.rabbit_password),
|
||||
hostname, port,
|
||||
virtual_host)
|
||||
|
||||
@ -561,15 +579,15 @@ class Connection(object):
|
||||
ssl_params = dict()
|
||||
|
||||
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
|
||||
if self.conf.kombu_ssl_version:
|
||||
if self.driver_conf.kombu_ssl_version:
|
||||
ssl_params['ssl_version'] = self.validate_ssl_version(
|
||||
self.conf.kombu_ssl_version)
|
||||
if self.conf.kombu_ssl_keyfile:
|
||||
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
|
||||
if self.conf.kombu_ssl_certfile:
|
||||
ssl_params['certfile'] = self.conf.kombu_ssl_certfile
|
||||
if self.conf.kombu_ssl_ca_certs:
|
||||
ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
|
||||
self.driver_conf.kombu_ssl_version)
|
||||
if self.driver_conf.kombu_ssl_keyfile:
|
||||
ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile
|
||||
if self.driver_conf.kombu_ssl_certfile:
|
||||
ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile
|
||||
if self.driver_conf.kombu_ssl_ca_certs:
|
||||
ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs
|
||||
# We might want to allow variations in the
|
||||
# future with this?
|
||||
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
|
||||
@ -602,8 +620,9 @@ class Connection(object):
|
||||
def on_error(exc, interval):
|
||||
error_callback and error_callback(exc)
|
||||
|
||||
interval = (self.conf.kombu_reconnect_delay + interval
|
||||
if self.conf.kombu_reconnect_delay > 0 else interval)
|
||||
interval = (self.driver_conf.kombu_reconnect_delay + interval
|
||||
if self.driver_conf.kombu_reconnect_delay > 0
|
||||
else interval)
|
||||
|
||||
info = {'hostname': self.connection.hostname,
|
||||
'port': self.connection.port,
|
||||
@ -628,8 +647,8 @@ class Connection(object):
|
||||
# use kombu for HA connection, the interval_step
|
||||
# should sufficient, because the underlying kombu transport
|
||||
# connection object freed.
|
||||
if self.conf.kombu_reconnect_delay > 0:
|
||||
time.sleep(self.conf.kombu_reconnect_delay)
|
||||
if self.driver_conf.kombu_reconnect_delay > 0:
|
||||
time.sleep(self.driver_conf.kombu_reconnect_delay)
|
||||
|
||||
def on_reconnection(new_channel):
|
||||
"""Callback invoked when the kombu reconnects and creates
|
||||
@ -706,8 +725,8 @@ class Connection(object):
|
||||
"%(err_str)s"), log_info)
|
||||
|
||||
def _declare_consumer():
|
||||
consumer = consumer_cls(self.conf, self.channel, topic, callback,
|
||||
six.next(self.consumer_num))
|
||||
consumer = consumer_cls(self.driver_conf, self.channel, topic,
|
||||
callback, six.next(self.consumer_num))
|
||||
self.consumers.append(consumer)
|
||||
return consumer
|
||||
|
||||
@ -766,7 +785,8 @@ class Connection(object):
|
||||
"'%(topic)s': %(err_str)s"), log_info)
|
||||
|
||||
def _publish():
|
||||
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
|
||||
publisher = cls(self.driver_conf, self.channel, topic=topic,
|
||||
**kwargs)
|
||||
publisher.send(msg, timeout)
|
||||
|
||||
self.ensure(_error_callback, _publish, retry=retry)
|
||||
@ -851,10 +871,15 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||
def __init__(self, conf, url,
|
||||
default_exchange=None,
|
||||
allowed_remote_exmods=None):
|
||||
conf.register_opts(rabbit_opts)
|
||||
conf.register_opts(rpc_amqp.amqp_opts)
|
||||
opt_group = cfg.OptGroup(name='oslo_messaging_rabbit',
|
||||
title='RabbitMQ driver options')
|
||||
conf.register_group(opt_group)
|
||||
conf.register_opts(rabbit_opts, group=opt_group)
|
||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
||||
|
||||
connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
|
||||
connection_pool = rpc_amqp.ConnectionPool(
|
||||
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
|
||||
url, Connection)
|
||||
|
||||
super(RabbitDriver, self).__init__(conf, url,
|
||||
connection_pool,
|
||||
|
@ -20,9 +20,9 @@ import sys
|
||||
import fixtures
|
||||
|
||||
|
||||
def _import_opts(conf, module, opts):
|
||||
def _import_opts(conf, module, opts, group=None):
|
||||
__import__(module)
|
||||
conf.register_opts(getattr(sys.modules[module], opts))
|
||||
conf.register_opts(getattr(sys.modules[module], opts), group=group)
|
||||
|
||||
|
||||
class ConfFixture(fixtures.Fixture):
|
||||
@ -45,11 +45,17 @@ class ConfFixture(fixtures.Fixture):
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts')
|
||||
'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts',
|
||||
'oslo_messaging_rabbit')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.impl_qpid', 'qpid_opts')
|
||||
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
||||
'oslo_messaging_rabbit')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.amqp', 'amqp_opts')
|
||||
'oslo_messaging._drivers.impl_qpid', 'qpid_opts',
|
||||
'oslo_messaging_qpid')
|
||||
_import_opts(self.conf,
|
||||
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
||||
'oslo_messaging_qpid')
|
||||
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')
|
||||
_import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts')
|
||||
_import_opts(self.conf,
|
||||
|
@ -34,9 +34,6 @@ from oslo_messaging.rpc import client
|
||||
from oslo_messaging import transport
|
||||
|
||||
_global_opt_lists = [
|
||||
amqp.amqp_opts,
|
||||
impl_qpid.qpid_opts,
|
||||
impl_rabbit.rabbit_opts,
|
||||
impl_zmq.zmq_opts,
|
||||
matchmaker.matchmaker_opts,
|
||||
base._pool_opts,
|
||||
@ -50,6 +47,10 @@ _opts = [
|
||||
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
|
||||
('matchmaker_ring', matchmaker_ring.matchmaker_opts),
|
||||
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
|
||||
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
|
||||
impl_rabbit.rabbit_opts))),
|
||||
('oslo_messaging_qpid', list(itertools.chain(amqp.amqp_opts,
|
||||
impl_qpid.qpid_opts)))
|
||||
]
|
||||
|
||||
|
||||
|
@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestQpidInvalidTopologyVersion, self).setUp()
|
||||
self.config(qpid_topology_version=-1)
|
||||
self.config(qpid_topology_version=-1,
|
||||
group='oslo_messaging_qpid')
|
||||
|
||||
def test_invalid_topology_version(self):
|
||||
def consumer_callback(msg):
|
||||
@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
|
||||
# 1. qpid driver raises Exception(msg) for invalid topology version
|
||||
# 2. flake8 - H202 assertRaises Exception too broad
|
||||
exception_msg = ("Invalid value for qpid_topology_version: %d" %
|
||||
self.conf.qpid_topology_version)
|
||||
self.conf.oslo_messaging_qpid.qpid_topology_version)
|
||||
recvd_exc_msg = ''
|
||||
|
||||
try:
|
||||
self.consumer_cls(self.conf,
|
||||
self.consumer_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_receive,
|
||||
msgid_or_topic,
|
||||
consumer_callback,
|
||||
@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
|
||||
|
||||
recvd_exc_msg = ''
|
||||
try:
|
||||
self.publisher_cls(self.conf,
|
||||
self.publisher_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_send,
|
||||
topic=msgid_or_topic,
|
||||
**self.publisher_kwargs)
|
||||
@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
|
||||
self.msgid = str(random.randint(1, 100))
|
||||
|
||||
# create a DirectConsumer and DirectPublisher class objects
|
||||
self.dir_cons = qpid_driver.DirectConsumer(self.conf,
|
||||
self.session_receive,
|
||||
self.msgid,
|
||||
self.consumer_callback)
|
||||
self.dir_pub = qpid_driver.DirectPublisher(self.conf,
|
||||
self.session_send,
|
||||
self.msgid)
|
||||
self.dir_cons = qpid_driver.DirectConsumer(
|
||||
self.conf.oslo_messaging_qpid,
|
||||
self.session_receive,
|
||||
self.msgid,
|
||||
self.consumer_callback)
|
||||
self.dir_pub = qpid_driver.DirectPublisher(
|
||||
self.conf.oslo_messaging_qpid,
|
||||
self.session_send,
|
||||
self.msgid)
|
||||
|
||||
def try_send_msg(no_msgs):
|
||||
for i in range(no_msgs):
|
||||
@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
|
||||
|
||||
def test_qpid_topic_and_fanout(self):
|
||||
for receiver_id in range(self.no_receivers):
|
||||
consumer = self.consumer_cls(self.conf,
|
||||
consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_receive,
|
||||
self.receive_topic,
|
||||
self.consumer_callback,
|
||||
@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
|
||||
self._receiver_threads.append(thread)
|
||||
|
||||
for sender_id in range(self.no_senders):
|
||||
publisher = self.publisher_cls(self.conf,
|
||||
publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_send,
|
||||
topic=self.topic,
|
||||
**self.publisher_kwargs)
|
||||
@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDriverInterface, self).setUp()
|
||||
self.config(qpid_topology_version=2)
|
||||
self.config(qpid_topology_version=2,
|
||||
group='oslo_messaging_qpid')
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
|
||||
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
||||
brokers_count = len(brokers)
|
||||
|
||||
self.config(qpid_hosts=brokers)
|
||||
self.config(qpid_hosts=brokers,
|
||||
group='oslo_messaging_qpid')
|
||||
|
||||
with mock.patch('qpid.messaging.Connection') as conn_mock:
|
||||
# starting from the first broker in the list
|
||||
@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
|
||||
|
||||
self.config(qpid_hosts=self.brokers,
|
||||
qpid_username=None,
|
||||
qpid_password=None)
|
||||
qpid_password=None,
|
||||
group='oslo_messaging_qpid')
|
||||
|
||||
hostname_sets = set()
|
||||
self.info = {'attempt': 0,
|
||||
|
@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
|
||||
super(TestDeprecatedRabbitDriverLoad, self).setUp(
|
||||
conf=cfg.ConfigOpts())
|
||||
self.messaging_conf.transport_driver = 'rabbit'
|
||||
self.config(fake_rabbit=True)
|
||||
self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
|
||||
|
||||
def test_driver_load(self):
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
@ -673,7 +673,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
|
||||
self.config(rabbit_hosts=self.brokers,
|
||||
rabbit_retry_interval=0.01,
|
||||
rabbit_retry_backoff=0.01,
|
||||
kombu_reconnect_delay=0)
|
||||
kombu_reconnect_delay=0,
|
||||
group="oslo_messaging_rabbit")
|
||||
|
||||
self.kombu_connect = mock.Mock()
|
||||
self.useFixture(mockpatch.Patch(
|
||||
|
@ -29,13 +29,15 @@ class OptsTestCase(test_utils.BaseTestCase):
|
||||
super(OptsTestCase, self).setUp()
|
||||
|
||||
def _test_list_opts(self, result):
|
||||
self.assertEqual(4, len(result))
|
||||
self.assertEqual(6, len(result))
|
||||
|
||||
groups = [g for (g, l) in result]
|
||||
self.assertIn(None, groups)
|
||||
self.assertIn('matchmaker_ring', groups)
|
||||
self.assertIn('matchmaker_redis', groups)
|
||||
self.assertIn('oslo_messaging_amqp', groups)
|
||||
self.assertIn('oslo_messaging_rabbit', groups)
|
||||
self.assertIn('oslo_messaging_qpid', groups)
|
||||
|
||||
opt_names = [o.name for (g, l) in result for o in l]
|
||||
self.assertIn('rpc_backend', opt_names)
|
||||
|
@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestQpidInvalidTopologyVersion, self).setUp()
|
||||
self.config(qpid_topology_version=-1)
|
||||
self.config(qpid_topology_version=-1,
|
||||
group='oslo_messaging_qpid')
|
||||
|
||||
def test_invalid_topology_version(self):
|
||||
def consumer_callback(msg):
|
||||
@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
|
||||
# 1. qpid driver raises Exception(msg) for invalid topology version
|
||||
# 2. flake8 - H202 assertRaises Exception too broad
|
||||
exception_msg = ("Invalid value for qpid_topology_version: %d" %
|
||||
self.conf.qpid_topology_version)
|
||||
self.conf.oslo_messaging_qpid.qpid_topology_version)
|
||||
recvd_exc_msg = ''
|
||||
|
||||
try:
|
||||
self.consumer_cls(self.conf,
|
||||
self.consumer_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_receive,
|
||||
msgid_or_topic,
|
||||
consumer_callback,
|
||||
@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
|
||||
|
||||
recvd_exc_msg = ''
|
||||
try:
|
||||
self.publisher_cls(self.conf,
|
||||
self.publisher_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_send,
|
||||
topic=msgid_or_topic,
|
||||
**self.publisher_kwargs)
|
||||
@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
|
||||
self.msgid = str(random.randint(1, 100))
|
||||
|
||||
# create a DirectConsumer and DirectPublisher class objects
|
||||
self.dir_cons = qpid_driver.DirectConsumer(self.conf,
|
||||
self.session_receive,
|
||||
self.msgid,
|
||||
self.consumer_callback)
|
||||
self.dir_pub = qpid_driver.DirectPublisher(self.conf,
|
||||
self.session_send,
|
||||
self.msgid)
|
||||
self.dir_cons = qpid_driver.DirectConsumer(
|
||||
self.conf.oslo_messaging_qpid,
|
||||
self.session_receive,
|
||||
self.msgid,
|
||||
self.consumer_callback)
|
||||
self.dir_pub = qpid_driver.DirectPublisher(
|
||||
self.conf.oslo_messaging_qpid,
|
||||
self.session_send,
|
||||
self.msgid)
|
||||
|
||||
def try_send_msg(no_msgs):
|
||||
for i in range(no_msgs):
|
||||
@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
|
||||
|
||||
def test_qpid_topic_and_fanout(self):
|
||||
for receiver_id in range(self.no_receivers):
|
||||
consumer = self.consumer_cls(self.conf,
|
||||
consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_receive,
|
||||
self.receive_topic,
|
||||
self.consumer_callback,
|
||||
@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
|
||||
self._receiver_threads.append(thread)
|
||||
|
||||
for sender_id in range(self.no_senders):
|
||||
publisher = self.publisher_cls(self.conf,
|
||||
publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
|
||||
self.session_send,
|
||||
topic=self.topic,
|
||||
**self.publisher_kwargs)
|
||||
@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDriverInterface, self).setUp()
|
||||
self.config(qpid_topology_version=2)
|
||||
self.config(qpid_topology_version=2,
|
||||
group='oslo_messaging_qpid')
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
|
||||
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
||||
brokers_count = len(brokers)
|
||||
|
||||
self.config(qpid_hosts=brokers)
|
||||
self.config(qpid_hosts=brokers,
|
||||
group='oslo_messaging_qpid')
|
||||
|
||||
with mock.patch('qpid.messaging.Connection') as conn_mock:
|
||||
# starting from the first broker in the list
|
||||
@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
|
||||
|
||||
self.config(qpid_hosts=self.brokers,
|
||||
qpid_username=None,
|
||||
qpid_password=None)
|
||||
qpid_password=None,
|
||||
group='oslo_messaging_qpid')
|
||||
|
||||
hostname_sets = set()
|
||||
self.info = {'attempt': 0,
|
||||
|
@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
|
||||
super(TestDeprecatedRabbitDriverLoad, self).setUp(
|
||||
conf=cfg.ConfigOpts())
|
||||
self.messaging_conf.transport_driver = 'rabbit'
|
||||
self.config(fake_rabbit=True)
|
||||
self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
|
||||
|
||||
def test_driver_load(self):
|
||||
transport = messaging.get_transport(self.conf)
|
||||
@ -686,7 +686,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
|
||||
self.config(rabbit_hosts=self.brokers,
|
||||
rabbit_retry_interval=0.01,
|
||||
rabbit_retry_backoff=0.01,
|
||||
kombu_reconnect_delay=0)
|
||||
kombu_reconnect_delay=0,
|
||||
group="oslo_messaging_rabbit")
|
||||
|
||||
self.kombu_connect = mock.Mock()
|
||||
self.useFixture(mockpatch.Patch(
|
||||
|
Loading…
Reference in New Issue
Block a user