Add tests for rabbit driver wire protcol

Add two test cases:

  - send messages using the rabbit driver and check that the message
    gets sent to the expected queue in the expected format

  - send messages to a given queue in the correct format and check that
    we can happily consume them using the rabbit driver

i.e. these are tests that check that the driver sends and receives
messages according to the wire protocol.

Change-Id: I6a48085585981b17609b31f216bdca438e90e068
This commit is contained in:
Mark McLoughlin 2013-07-31 11:36:28 +01:00
parent 8b366b3001
commit 8572d78faf

@ -14,11 +14,21 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
import fixtures
import kombu
import testscenarios
from oslo import messaging
from oslo.messaging._drivers import impl_rabbit as rabbit_driver
from oslo.messaging.openstack.common import jsonutils
from oslo.messaging import transport as msg_transport
from tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class TestRabbitDriver(test_utils.BaseTestCase):
@ -53,3 +63,287 @@ class TestRabbitDriver(test_utils.BaseTestCase):
self.assertTrue(received is not None)
self.assertEquals(received.ctxt, {})
self.assertEquals(received.message, {'foo': 'bar'})
def _declare_queue(target):
connection = kombu.connection.BrokerConnection(transport='memory')
# Kludge to speed up tests.
connection.transport.polling_interval = 0.0
connection.connect()
channel = connection.channel()
# work around 'memory' transport bug in 1.1.3
channel._new_queue('ae.undeliver')
if target.fanout:
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
type='fanout',
durable=False,
auto_delete=True)
queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
channel=channel,
exchange=exchange,
routing_key=target.topic)
if target.server:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
topic = '%s.%s' % (target.topic, target.server)
queue = kombu.entity.Queue(name=topic,
channel=channel,
exchange=exchange,
routing_key=topic)
else:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
queue = kombu.entity.Queue(name=target.topic,
channel=channel,
exchange=exchange,
routing_key=target.topic)
queue.declare()
return connection, channel, queue
class TestRequestWireFormat(test_utils.BaseTestCase):
_target = [
('topic_target',
dict(topic='testtopic', server=None, fanout=False)),
('server_target',
dict(topic='testtopic', server='testserver', fanout=False)),
# NOTE(markmc): https://github.com/celery/kombu/issues/195
('fanout_target',
dict(topic='testtopic', server=None, fanout=True,
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
]
_msg = [
('empty_msg',
dict(msg={}, expected={})),
('primitive_msg',
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
('complex_msg',
dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
]
_context = [
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
('user_project_ctxt',
dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
expected_ctxt={'_context_user': 'mark',
'_context_project': 'snarkybunch'})),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
cls._context,
cls._target)
def setUp(self):
super(TestRequestWireFormat, self).setUp()
self.conf.register_opts(msg_transport._transport_opts)
self.conf.register_opts(rabbit_driver.rabbit_opts)
self.config(rpc_backend='rabbit')
self.config(fake_rabbit=True)
self.uuids = []
self.orig_uuid4 = uuid.uuid4
self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
def mock_uuid4(self):
self.uuids.append(self.orig_uuid4())
return self.uuids[-1]
def test_request_wire_format(self):
if hasattr(self, 'skip_msg'):
self.skipTest(self.skip_msg)
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
driver = transport._driver
target = messaging.Target(topic=self.topic,
server=self.server,
fanout=self.fanout)
connection, channel, queue = _declare_queue(target)
self.addCleanup(connection.release)
driver.send(target, self.ctxt, self.msg)
msgs = []
def callback(msg):
msg = channel.message_to_python(msg)
msg.ack()
msgs.append(msg.payload)
queue.consume(callback=callback,
consumer_tag='1',
nowait=False)
connection.drain_events()
self.assertEqual(1, len(msgs))
self.assertTrue('oslo.message' in msgs[0])
received = msgs[0]
received['oslo.message'] = jsonutils.loads(received['oslo.message'])
expected_msg = {
'_msg_id': self.uuids[0].hex,
'_unique_id': self.uuids[1].hex,
'_reply_q': 'reply_' + self.uuids[2].hex,
}
expected_msg.update(self.expected)
expected_msg.update(self.expected_ctxt)
expected = {
'oslo.version': '2.0',
'oslo.message': expected_msg,
}
self.assertEqual(expected, received)
TestRequestWireFormat.generate_scenarios()
def _create_producer(target):
connection = kombu.connection.BrokerConnection(transport='memory')
# Kludge to speed up tests.
connection.transport.polling_interval = 0.0
connection.connect()
channel = connection.channel()
# work around 'memory' transport bug in 1.1.3
channel._new_queue('ae.undeliver')
if target.fanout:
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
type='fanout',
durable=False,
auto_delete=True)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key=target.topic)
elif target.server:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
topic = '%s.%s' % (target.topic, target.server)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key=topic)
else:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
auto_delete=False)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key=target.topic)
return connection, producer
class TestReplyWireFormat(test_utils.BaseTestCase):
_target = [
('topic_target',
dict(topic='testtopic', server=None, fanout=False)),
('server_target',
dict(topic='testtopic', server='testserver', fanout=False)),
# NOTE(markmc): https://github.com/celery/kombu/issues/195
('fanout_target',
dict(topic='testtopic', server=None, fanout=True,
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
]
_msg = [
('empty_msg',
dict(msg={}, expected={})),
('primitive_msg',
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
('complex_msg',
dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
]
_context = [
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
('user_project_ctxt',
dict(ctxt={'_context_user': 'mark',
'_context_project': 'snarkybunch'},
expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
cls._context,
cls._target)
def setUp(self):
super(TestReplyWireFormat, self).setUp()
self.conf.register_opts(msg_transport._transport_opts)
self.conf.register_opts(rabbit_driver.rabbit_opts)
self.config(rpc_backend='rabbit')
self.config(fake_rabbit=True)
def test_reply_wire_format(self):
if hasattr(self, 'skip_msg'):
self.skipTest(self.skip_msg)
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
driver = transport._driver
target = messaging.Target(topic=self.topic,
server=self.server,
fanout=self.fanout)
listener = driver.listen(target)
connection, producer = _create_producer(target)
self.addCleanup(connection.release)
msg = {
'oslo.version': '2.0',
'oslo.message': {}
}
msg['oslo.message'].update(self.msg)
msg['oslo.message'].update(self.ctxt)
msg['oslo.message'].update({
'_msg_id': uuid.uuid4().hex,
'_unique_id': uuid.uuid4().hex,
'_reply_q': 'reply_' + uuid.uuid4().hex,
})
msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
producer.publish(msg)
received = listener.poll()
self.assertTrue(received is not None)
self.assertEqual(self.expected_ctxt, received.ctxt)
self.assertEqual(self.expected, received.message)
TestReplyWireFormat.generate_scenarios()