Merge "Adapt functional tests to pika-driver"
This commit is contained in:
commit
fb90b4a243
@ -55,7 +55,7 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
|
||||
self.conf.notification_topics = [topic]
|
||||
|
||||
listener = self.useFixture(
|
||||
utils.NotificationFixture(self.url, [topic]))
|
||||
utils.NotificationFixture(self.conf, self.url, [topic]))
|
||||
|
||||
log_notify = oslo_messaging.LoggingNotificationHandler(self.url)
|
||||
|
||||
|
@ -27,11 +27,16 @@ class CallTestCase(utils.SkipIfNoTransportURL):
|
||||
def setUp(self):
|
||||
super(CallTestCase, self).setUp(conf=cfg.ConfigOpts())
|
||||
|
||||
self.conf.prog="test_prog"
|
||||
self.conf.project="test_project"
|
||||
|
||||
self.config(heartbeat_timeout_threshold=0,
|
||||
group='oslo_messaging_rabbit')
|
||||
|
||||
def test_specific_server(self):
|
||||
group = self.useFixture(utils.RpcServerGroupFixture(self.url))
|
||||
group = self.useFixture(utils.RpcServerGroupFixture(
|
||||
self.conf, self.url)
|
||||
)
|
||||
client = group.client(1)
|
||||
client.append(text='open')
|
||||
self.assertEqual('openstack', client.append(text='stack'))
|
||||
@ -45,7 +50,9 @@ class CallTestCase(utils.SkipIfNoTransportURL):
|
||||
self.assertEqual(0, group.servers[i].endpoint.ival)
|
||||
|
||||
def test_server_in_group(self):
|
||||
group = self.useFixture(utils.RpcServerGroupFixture(self.url))
|
||||
group = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.conf, self.url)
|
||||
)
|
||||
|
||||
client = group.client()
|
||||
data = [c for c in 'abcdefghijklmn']
|
||||
@ -62,13 +69,13 @@ class CallTestCase(utils.SkipIfNoTransportURL):
|
||||
# teardown may hang unless we broadcast all control messages
|
||||
# to each server
|
||||
group1 = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.url,
|
||||
utils.RpcServerGroupFixture(self.conf, self.url,
|
||||
use_fanout_ctrl=True))
|
||||
group2 = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.url, exchange="a",
|
||||
utils.RpcServerGroupFixture(self.conf, self.url, exchange="a",
|
||||
use_fanout_ctrl=True))
|
||||
group3 = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.url, exchange="b",
|
||||
utils.RpcServerGroupFixture(self.conf, self.url, exchange="b",
|
||||
use_fanout_ctrl=True))
|
||||
|
||||
client1 = group1.client(1)
|
||||
@ -101,24 +108,31 @@ class CallTestCase(utils.SkipIfNoTransportURL):
|
||||
self.assertEqual(0, s.endpoint.ival)
|
||||
|
||||
def test_timeout(self):
|
||||
transport = self.useFixture(utils.TransportFixture(self.url))
|
||||
transport = self.useFixture(
|
||||
utils.TransportFixture(self.conf, self.url)
|
||||
)
|
||||
target = oslo_messaging.Target(topic="no_such_topic")
|
||||
c = utils.ClientStub(transport.transport, target, timeout=1)
|
||||
self.assertThat(c.ping,
|
||||
matchers.raises(oslo_messaging.MessagingTimeout))
|
||||
|
||||
def test_exception(self):
|
||||
group = self.useFixture(utils.RpcServerGroupFixture(self.url))
|
||||
group = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.conf, self.url)
|
||||
)
|
||||
client = group.client(1)
|
||||
client.add(increment=2)
|
||||
self.assertRaises(ValueError, client.subtract, increment=3)
|
||||
|
||||
def test_timeout_with_concurrently_queues(self):
|
||||
transport = self.useFixture(utils.TransportFixture(self.url))
|
||||
transport = self.useFixture(
|
||||
utils.TransportFixture(self.conf, self.url)
|
||||
)
|
||||
target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()),
|
||||
server="server_" + str(uuid.uuid4()))
|
||||
server = self.useFixture(
|
||||
utils.RpcServerFixture(self.url, target, executor="threading"))
|
||||
utils.RpcServerFixture(self.conf, self.url, target,
|
||||
executor="threading"))
|
||||
client = utils.ClientStub(transport.transport, target,
|
||||
cast=False, timeout=5)
|
||||
|
||||
@ -141,7 +155,9 @@ class CastTestCase(utils.SkipIfNoTransportURL):
|
||||
# making the necessary assertions.
|
||||
|
||||
def test_specific_server(self):
|
||||
group = self.useFixture(utils.RpcServerGroupFixture(self.url))
|
||||
group = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.conf, self.url)
|
||||
)
|
||||
client = group.client(1, cast=True)
|
||||
client.append(text='open')
|
||||
client.append(text='stack')
|
||||
@ -159,7 +175,9 @@ class CastTestCase(utils.SkipIfNoTransportURL):
|
||||
def test_server_in_group(self):
|
||||
if self.url.startswith("amqp:"):
|
||||
self.skipTest("QPID-6307")
|
||||
group = self.useFixture(utils.RpcServerGroupFixture(self.url))
|
||||
group = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.conf, self.url)
|
||||
)
|
||||
client = group.client(cast=True)
|
||||
for i in range(20):
|
||||
client.add(increment=1)
|
||||
@ -176,7 +194,9 @@ class CastTestCase(utils.SkipIfNoTransportURL):
|
||||
self.assertEqual(20, total)
|
||||
|
||||
def test_fanout(self):
|
||||
group = self.useFixture(utils.RpcServerGroupFixture(self.url))
|
||||
group = self.useFixture(
|
||||
utils.RpcServerGroupFixture(self.conf, self.url)
|
||||
)
|
||||
client = group.client('all', cast=True)
|
||||
client.append(text='open')
|
||||
client.append(text='stack')
|
||||
@ -195,7 +215,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
|
||||
|
||||
def test_simple(self):
|
||||
listener = self.useFixture(
|
||||
utils.NotificationFixture(self.url, ['test_simple']))
|
||||
utils.NotificationFixture(self.conf, self.url, ['test_simple']))
|
||||
notifier = listener.notifier('abc')
|
||||
|
||||
notifier.info({}, 'test', 'Hello World!')
|
||||
@ -207,7 +227,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
|
||||
|
||||
def test_multiple_topics(self):
|
||||
listener = self.useFixture(
|
||||
utils.NotificationFixture(self.url, ['a', 'b']))
|
||||
utils.NotificationFixture(self.conf, self.url, ['a', 'b']))
|
||||
a = listener.notifier('pub-a', topic='a')
|
||||
b = listener.notifier('pub-b', topic='b')
|
||||
|
||||
@ -234,10 +254,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
|
||||
if self.url.startswith("amqp:"):
|
||||
self.skipTest("QPID-6307")
|
||||
listener_a = self.useFixture(
|
||||
utils.NotificationFixture(self.url, ['test-topic']))
|
||||
utils.NotificationFixture(self.conf, self.url, ['test-topic']))
|
||||
|
||||
listener_b = self.useFixture(
|
||||
utils.NotificationFixture(self.url, ['test-topic']))
|
||||
utils.NotificationFixture(self.conf, self.url, ['test-topic']))
|
||||
|
||||
n = listener_a.notifier('pub')
|
||||
|
||||
@ -254,9 +274,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
|
||||
|
||||
def test_independent_topics(self):
|
||||
listener_a = self.useFixture(
|
||||
utils.NotificationFixture(self.url, ['1']))
|
||||
utils.NotificationFixture(self.conf, self.url, ['1']))
|
||||
listener_b = self.useFixture(
|
||||
utils.NotificationFixture(self.url, ['2']))
|
||||
utils.NotificationFixture(self.conf, self.url, ['2']))
|
||||
|
||||
a = listener_a.notifier('pub-1', topic='1')
|
||||
b = listener_b.notifier('pub-2', topic='2')
|
||||
@ -285,7 +305,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
|
||||
|
||||
def test_all_categories(self):
|
||||
listener = self.useFixture(utils.NotificationFixture(
|
||||
self.url, ['test_all_categories']))
|
||||
self.conf, self.url, ['test_all_categories']))
|
||||
n = listener.notifier('abc')
|
||||
|
||||
cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical']
|
||||
|
@ -52,12 +52,13 @@ class TestServerEndpoint(object):
|
||||
class TransportFixture(fixtures.Fixture):
|
||||
"""Fixture defined to setup the oslo_messaging transport."""
|
||||
|
||||
def __init__(self, url):
|
||||
def __init__(self, conf, url):
|
||||
self.conf = conf
|
||||
self.url = url
|
||||
|
||||
def setUp(self):
|
||||
super(TransportFixture, self).setUp()
|
||||
self.transport = oslo_messaging.get_transport(cfg.CONF, url=self.url)
|
||||
self.transport = oslo_messaging.get_transport(self.conf, url=self.url)
|
||||
|
||||
def cleanUp(self):
|
||||
try:
|
||||
@ -74,9 +75,10 @@ class TransportFixture(fixtures.Fixture):
|
||||
class RpcServerFixture(fixtures.Fixture):
|
||||
"""Fixture to setup the TestServerEndpoint."""
|
||||
|
||||
def __init__(self, url, target, endpoint=None, ctrl_target=None,
|
||||
def __init__(self, conf, url, target, endpoint=None, ctrl_target=None,
|
||||
executor='eventlet'):
|
||||
super(RpcServerFixture, self).__init__()
|
||||
self.conf = conf
|
||||
self.url = url
|
||||
self.target = target
|
||||
self.endpoint = endpoint or TestServerEndpoint()
|
||||
@ -87,7 +89,7 @@ class RpcServerFixture(fixtures.Fixture):
|
||||
def setUp(self):
|
||||
super(RpcServerFixture, self).setUp()
|
||||
endpoints = [self.endpoint, self]
|
||||
transport = self.useFixture(TransportFixture(self.url))
|
||||
transport = self.useFixture(TransportFixture(self.conf, self.url))
|
||||
self.server = oslo_messaging.get_rpc_server(
|
||||
transport=transport.transport,
|
||||
target=self.target,
|
||||
@ -119,8 +121,9 @@ class RpcServerFixture(fixtures.Fixture):
|
||||
|
||||
|
||||
class RpcServerGroupFixture(fixtures.Fixture):
|
||||
def __init__(self, url, topic=None, names=None, exchange=None,
|
||||
def __init__(self, conf, url, topic=None, names=None, exchange=None,
|
||||
use_fanout_ctrl=False):
|
||||
self.conf = conf
|
||||
self.url = url
|
||||
# NOTE(sileht): topic and servier_name must be uniq
|
||||
# to be able to run all tests in parallel
|
||||
@ -145,7 +148,8 @@ class RpcServerGroupFixture(fixtures.Fixture):
|
||||
ctrl = None
|
||||
if self.use_fanout_ctrl:
|
||||
ctrl = self._target(fanout=True)
|
||||
server = RpcServerFixture(self.url, target, ctrl_target=ctrl)
|
||||
server = RpcServerFixture(self.conf, self.url, target,
|
||||
ctrl_target=ctrl)
|
||||
return server
|
||||
|
||||
def client(self, server=None, cast=False):
|
||||
@ -159,7 +163,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
|
||||
else:
|
||||
raise ValueError("Invalid value for server: %r" % server)
|
||||
|
||||
transport = self.useFixture(TransportFixture(self.url))
|
||||
transport = self.useFixture(TransportFixture(self.conf, self.url))
|
||||
client = ClientStub(transport.transport, target, cast=cast,
|
||||
timeout=5)
|
||||
transport.wait()
|
||||
@ -289,8 +293,9 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
|
||||
class NotificationFixture(fixtures.Fixture):
|
||||
def __init__(self, url, topics):
|
||||
def __init__(self, conf, url, topics):
|
||||
super(NotificationFixture, self).__init__()
|
||||
self.conf = conf
|
||||
self.url = url
|
||||
self.topics = topics
|
||||
self.events = moves.queue.Queue()
|
||||
@ -301,7 +306,7 @@ class NotificationFixture(fixtures.Fixture):
|
||||
targets = [oslo_messaging.Target(topic=t) for t in self.topics]
|
||||
# add a special topic for internal notifications
|
||||
targets.append(oslo_messaging.Target(topic=self.name))
|
||||
transport = self.useFixture(TransportFixture(self.url))
|
||||
transport = self.useFixture(TransportFixture(self.conf, self.url))
|
||||
self.server = oslo_messaging.get_notification_listener(
|
||||
transport.transport,
|
||||
targets,
|
||||
@ -324,7 +329,7 @@ class NotificationFixture(fixtures.Fixture):
|
||||
self.thread.join()
|
||||
|
||||
def notifier(self, publisher, topic=None):
|
||||
transport = self.useFixture(TransportFixture(self.url))
|
||||
transport = self.useFixture(TransportFixture(self.conf, self.url))
|
||||
n = notifier.Notifier(transport.transport,
|
||||
publisher,
|
||||
driver='messaging',
|
||||
|
@ -13,6 +13,8 @@
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
import os
|
||||
|
||||
import argparse
|
||||
import datetime
|
||||
import logging
|
||||
@ -240,6 +242,8 @@ def main():
|
||||
cfg.CONF.heartbeat_interval = 5
|
||||
cfg.CONF.notification_topics = "notif"
|
||||
cfg.CONF.notification_driver = "messaging"
|
||||
cfg.CONF.prog = os.path.basename(__file__)
|
||||
cfg.CONF.project = 'oslo.messaging'
|
||||
|
||||
transport = messaging.get_transport(cfg.CONF, url=args.url)
|
||||
target = messaging.Target(topic='profiler_topic', server='profiler_server')
|
||||
|
Loading…
Reference in New Issue
Block a user