diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index e45a39ead..5537e2cdd 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -32,7 +32,7 @@ from oslo.config import cfg from oslo.messaging._drivers import base from oslo.messaging._drivers import common as rpc_common from oslo.messaging._executors import impl_eventlet # FIXME(markmc) -from oslo.messaging._i18n import _ +from oslo.messaging._i18n import _, _LE from oslo.serialization import jsonutils from oslo.utils import excutils from oslo.utils import importutils @@ -180,6 +180,10 @@ class ZmqSocket(object): self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter) self.subscriptions.remove(msg_filter) + @property + def closed(self): + return self.sock is None or self.sock.closed + def close(self): if self.sock is None or self.sock.closed: return @@ -257,7 +261,10 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): - ctx_data = ctx.to_dict() + if not isinstance(ctx, dict): + ctx_data = ctx.to_dict() + else: + ctx_data = ctx return _serialize(ctx_data) @classmethod @@ -395,7 +402,7 @@ class ZmqBaseReactor(ConsumerBase): def consume_in_thread(self): def _consume(sock): LOG.info(_("Consuming socket")) - while True: + while not sock.closed: self.consume(sock) for k in self.proxies.keys(): @@ -408,12 +415,12 @@ class ZmqBaseReactor(ConsumerBase): t.wait() def close(self): - for s in self.sockets: - s.close() - for t in self.threads: t.kill() + for s in self.sockets: + s.close() + class ZmqProxy(ZmqBaseReactor): """A consumer class implementing a topic-based proxy. @@ -612,9 +619,15 @@ class Connection(rpc_common.Connection): self.topics.append(topic) def close(self): - _get_matchmaker().stop_heartbeat() + mm = _get_matchmaker() + mm.stop_heartbeat() for topic in self.topics: - _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + try: + mm.unregister(topic, CONF.rpc_zmq_host) + except Exception as err: + LOG.error(_LE('Unable to unregister topic %(topic)s' + ' from matchmaker: %(err)s') % + {'topic': topic, 'err': err}) self.reactor.close() self.topics = [] @@ -634,6 +647,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False, payload = [RpcContext.marshal(context), msg] with Timeout(timeout_cast, exception=rpc_common.Timeout): + conn = None try: conn = ZmqClient(addr) @@ -642,7 +656,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False, except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: - if 'conn' in vars(): + if conn is not None: conn.close() @@ -684,12 +698,14 @@ def _call(addr, context, topic, msg, timeout=None, zmq.SUB, subscribe=msg_id, bind=False ) - LOG.debug("Sending cast") + LOG.debug("Sending cast: %s", topic) _cast(addr, context, topic, payload, envelope=envelope) LOG.debug("Cast sent; Waiting reply") # Blocks until receives reply msg = msg_waiter.recv() + if msg is None: + raise rpc_common.Timeout() LOG.debug("Received message: %s", msg) LOG.debug("Unpacking response") @@ -789,7 +805,7 @@ class ZmqIncomingMessage(base.IncomingMessage): self.condition.notify() def requeue(self): - pass + LOG.debug("WARNING: requeue not supported") class ZmqListener(base.Listener): @@ -858,19 +874,11 @@ class ZmqDriver(base.BaseDriver): raise NotImplementedError('The ZeroMQ driver currently only works ' 'with oslo.config.cfg.CONF') + self.listeners = [] + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, envelope=False): - # FIXME(markmc): remove this temporary hack - class Context(object): - def __init__(self, d): - self.d = d - - def to_dict(self): - return self.d - - context = Context(ctxt) - if wait_for_reply: method = _call else: @@ -884,7 +892,7 @@ class ZmqDriver(base.BaseDriver): elif target.server: topic = '%s.%s' % (topic, target.server) - reply = _multi_send(method, context, topic, message, + reply = _multi_send(method, ctxt, topic, message, envelope=envelope, allowed_remote_exmods=self._allowed_remote_exmods) @@ -916,6 +924,7 @@ class ZmqDriver(base.BaseDriver): conn.create_consumer(target.topic, listener, fanout=True) conn.consume_in_thread() + self.listeners.append(conn) return listener @@ -934,8 +943,11 @@ class ZmqDriver(base.BaseDriver): conn.create_consumer('%s-%s' % (target.topic, priority), listener) conn.consume_in_thread() + self.listeners.append(conn) return listener def cleanup(self): - pass + for c in self.listeners: + c.close() + self.listeners = [] diff --git a/test-requirements.txt b/test-requirements.txt index 3105e4cfe..e58ceca10 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -21,6 +21,9 @@ qpid-python # for test_matchmaker_redis redis>=2.10.0 +# for test_impl_zmq +pyzmq>=14.3.1 + # when we can require tox>= 1.4, this can go into tox.ini: # [testenv:cover] # deps = {[testenv]deps} coverage diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py new file mode 100644 index 000000000..3103f42aa --- /dev/null +++ b/tests/drivers/test_impl_zmq.py @@ -0,0 +1,504 @@ +# Copyright 2014 Canonical, Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import socket + +import fixtures +import mock +import testtools + +from oslo import messaging +from oslo.utils import importutils +from tests import utils as test_utils + +# NOTE(jamespage) the zmq driver implementation is currently tied +# to eventlet so we have to monkey_patch to support testing +# eventlet is not yet py3 compatible, so skip if not installed +eventlet = importutils.try_import('eventlet') +if eventlet: + eventlet.monkey_patch() + +impl_zmq = importutils.try_import('oslo.messaging._drivers.impl_zmq') + +LOG = logging.getLogger(__name__) + + +def get_unused_port(): + """Returns an unused port on localhost.""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(('localhost', 0)) + port = s.getsockname()[1] + s.close() + return port + + +class TestConfZmqDriverLoad(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestConfZmqDriverLoad, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + + def test_driver_load(self): + transport = messaging.get_transport(self.conf) + self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) + + +class stopRpc(object): + def __init__(self, attrs): + self.attrs = attrs + + def __call__(self): + if self.attrs['reactor']: + self.attrs['reactor'].close() + if self.attrs['driver']: + self.attrs['driver'].cleanup() + + +class TestZmqBasics(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestZmqBasics, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = messaging.get_transport(self.conf) + self.driver = transport._driver + + # Set config values + self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path + kwargs = {'rpc_zmq_bind_address': '127.0.0.1', + 'rpc_zmq_host': '127.0.0.1', + 'rpc_response_timeout': 5, + 'rpc_zmq_port': get_unused_port(), + 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + self.config(**kwargs) + + # Start RPC + LOG.info("Running internal zmq receiver.") + self.reactor = impl_zmq.ZmqProxy(self.conf) + self.reactor.consume_in_thread() + + self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') + self.addCleanup(stopRpc(self.__dict__)) + + def test_start_stop_listener(self): + target = messaging.Target(topic='testtopic') + listener = self.driver.listen(target) + result = listener.poll(0.01) + self.assertEqual(result, None) + + def test_send_receive_raises(self): + """Call() without method.""" + target = messaging.Target(topic='testtopic') + self.driver.listen(target) + self.assertRaises( + KeyError, + self.driver.send, + target, {}, {'tx_id': 1}, wait_for_reply=True) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqIncomingMessage') + def test_send_receive_topic(self, mock_msg): + """Call() with method.""" + mock_msg.return_value = msg = mock.MagicMock() + msg.received = received = mock.MagicMock() + received.failure = False + received.reply = True + msg.condition = condition = mock.MagicMock() + condition.wait.return_value = True + + target = messaging.Target(topic='testtopic') + self.driver.listen(target) + result = self.driver.send( + target, {}, + {'method': 'hello-world', 'tx_id': 1}, + wait_for_reply=True) + self.assertEqual(result, True) + + @mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True) + def test_send_receive_fanout(self, mock_call): + target = messaging.Target(topic='testtopic', fanout=True) + self.driver.listen(target) + + mock_call.__name__ = '_call' + mock_call.return_value = [True] + + result = self.driver.send( + target, {}, + {'method': 'hello-world', 'tx_id': 1}, + wait_for_reply=True) + + self.assertEqual(result, True) + mock_call.assert_called_once_with( + 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'], + {}, 'fanout~testtopic.127.0.0.1', + {'tx_id': 1, 'method': 'hello-world'}, + None, False, []) + + @mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True) + def test_send_receive_direct(self, mock_call): + # Also verifies fix for bug http://pad.lv/1301723 + target = messaging.Target(topic='testtopic', server='localhost') + self.driver.listen(target) + + mock_call.__name__ = '_call' + mock_call.return_value = [True] + + result = self.driver.send( + target, {}, + {'method': 'hello-world', 'tx_id': 1}, + wait_for_reply=True) + + self.assertEqual(result, True) + mock_call.assert_called_once_with( + 'tcp://localhost:%s' % self.conf['rpc_zmq_port'], + {}, 'testtopic.localhost', + {'tx_id': 1, 'method': 'hello-world'}, + None, False, []) + + +class TestZmqSocket(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestZmqSocket, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = messaging.get_transport(self.conf) + self.driver = transport._driver + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe') + @mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context') + def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe): + mock_ctxt = mock.Mock() + mock_context.return_value = mock_ctxt + mock_sock = mock.Mock() + mock_ctxt.socket = mock.Mock(return_value=mock_sock) + mock_sock.connect = mock.Mock() + mock_sock.bind = mock.Mock() + addr = '127.0.0.1' + + sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False, + subscribe=None) + self.assertTrue(sock.can_recv) + self.assertFalse(sock.can_send) + self.assertFalse(sock.can_sub) + self.assertTrue(mock_sock.connect.called) + self.assertFalse(mock_sock.bind.called) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe') + @mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context') + def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe): + mock_ctxt = mock.Mock() + mock_context.return_value = mock_ctxt + mock_sock = mock.Mock() + mock_ctxt.socket = mock.Mock(return_value=mock_sock) + mock_sock.connect = mock.Mock() + mock_sock.bind = mock.Mock() + addr = '127.0.0.1' + + sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False, + subscribe=None) + self.assertTrue(sock.can_recv) + self.assertFalse(sock.can_send) + self.assertTrue(sock.can_sub) + self.assertTrue(mock_sock.connect.called) + self.assertFalse(mock_sock.bind.called) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe') + @mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context') + def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe): + mock_ctxt = mock.Mock() + mock_context.return_value = mock_ctxt + mock_sock = mock.Mock() + mock_ctxt.socket = mock.Mock(return_value=mock_sock) + mock_sock.connect = mock.Mock() + mock_sock.bind = mock.Mock() + addr = '127.0.0.1' + + sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False, + subscribe=None) + self.assertFalse(sock.can_recv) + self.assertTrue(sock.can_send) + self.assertFalse(sock.can_sub) + self.assertTrue(mock_sock.connect.called) + self.assertFalse(mock_sock.bind.called) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe') + @mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context') + def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe): + mock_ctxt = mock.Mock() + mock_context.return_value = mock_ctxt + mock_sock = mock.Mock() + mock_ctxt.socket = mock.Mock(return_value=mock_sock) + mock_sock.connect = mock.Mock() + mock_sock.bind = mock.Mock() + addr = '127.0.0.1' + + sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False, + subscribe=None) + self.assertFalse(sock.can_recv) + self.assertTrue(sock.can_send) + self.assertFalse(sock.can_sub) + self.assertTrue(mock_sock.connect.called) + self.assertFalse(mock_sock.bind.called) + + +class TestZmqIncomingMessage(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestZmqIncomingMessage, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = messaging.get_transport(self.conf) + self.driver = transport._driver + + def test_zmqincomingmessage(self): + msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo') + msg.reply("abc") + self.assertIsInstance( + msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) + self.assertIsInstance( + msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) + self.assertEqual(msg.received.reply, "abc") + msg.requeue() + + +class TestZmqConnection(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestZmqConnection, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = messaging.get_transport(self.conf) + self.driver = transport._driver + + # Set config values + self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path + kwargs = {'rpc_zmq_bind_address': '127.0.0.1', + 'rpc_zmq_host': '127.0.0.1', + 'rpc_response_timeout': 5, + 'rpc_zmq_port': get_unused_port(), + 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + self.config(**kwargs) + + # Start RPC + LOG.info("Running internal zmq receiver.") + self.reactor = impl_zmq.ZmqProxy(self.conf) + self.reactor.consume_in_thread() + + self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') + self.addCleanup(stopRpc(self.__dict__)) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True) + def test_zmqconnection_create_consumer(self, mock_reactor): + + mock_reactor.register = mock.Mock() + conn = impl_zmq.Connection(self.driver) + topic = 'topic.foo' + context = mock.Mock() + inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % + (self.internal_ipc_dir)) + # No Fanout + conn.create_consumer(topic, context) + conn.reactor.register.assert_called_with(context, inaddr, + impl_zmq.zmq.PULL, + subscribe=None, in_bind=False) + + # Reset for next bunch of checks + conn.reactor.register.reset_mock() + + # Fanout + inaddr = ('ipc://%s/zmq_topic_fanout~topic' % + (self.internal_ipc_dir)) + conn.create_consumer(topic, context, fanout='subscriber.foo') + conn.reactor.register.assert_called_with(context, inaddr, + impl_zmq.zmq.SUB, + subscribe='subscriber.foo', + in_bind=False) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True) + def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor): + mock_reactor.register = mock.Mock() + conn = impl_zmq.Connection(self.driver) + topic = 'topic.foo' + context = mock.Mock() + inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % + (self.internal_ipc_dir)) + + conn.create_consumer(topic, context) + conn.reactor.register.assert_called_with( + context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False) + conn.reactor.register.reset_mock() + # Call again with same topic + conn.create_consumer(topic, context) + self.assertFalse(conn.reactor.register.called) + + @mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker', + autospec=True) + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True) + def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker): + conn = impl_zmq.Connection(self.driver) + conn.reactor.close = mock.Mock() + mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock() + conn.close() + self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called) + self.assertTrue(conn.reactor.close.called) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True) + def test_zmqconnection_wait(self, mock_reactor): + conn = impl_zmq.Connection(self.driver) + conn.reactor.wait = mock.Mock() + conn.wait() + self.assertTrue(conn.reactor.wait.called) + + @mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker', + autospec=True) + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True) + def test_zmqconnection_consume_in_thread(self, mock_reactor, + mock_getmatchmaker): + mock_getmatchmaker.return_value.start_heartbeat = mock.Mock() + conn = impl_zmq.Connection(self.driver) + conn.reactor.consume_in_thread = mock.Mock() + conn.consume_in_thread() + self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called) + self.assertTrue(conn.reactor.consume_in_thread.called) + + +class TestZmqListener(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestZmqListener, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = messaging.get_transport(self.conf) + self.driver = transport._driver + + # Set config values + self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path + kwargs = {'rpc_zmq_bind_address': '127.0.0.1', + 'rpc_zmq_host': '127.0.0.1', + 'rpc_response_timeout': 5, + 'rpc_zmq_port': get_unused_port(), + 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + self.config(**kwargs) + + # Start RPC + LOG.info("Running internal zmq receiver.") + self.reactor = impl_zmq.ZmqProxy(self.conf) + self.reactor.consume_in_thread() + + self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') + self.addCleanup(stopRpc(self.__dict__)) + + def test_zmqlistener_no_msg(self): + listener = impl_zmq.ZmqListener(self.driver) + # Timeout = 0 should return straight away since the queue is empty + listener.poll(timeout=0) + + def test_zmqlistener_w_msg(self): + listener = impl_zmq.ZmqListener(self.driver) + kwargs = {'a': 1, 'b': 2} + m = mock.Mock() + ctxt = mock.Mock(autospec=impl_zmq.RpcContext) + eventlet.spawn_n(listener.dispatch, ctxt, 0, + m.fake_method, 'name.space', **kwargs) + resp = listener.poll(timeout=10) + msg = {'method': m.fake_method, 'namespace': 'name.space', + 'args': kwargs} + self.assertEqual(resp.message, msg) + + +class TestZmqDriver(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestZmqDriver, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = messaging.get_transport(self.conf) + self.driver = transport._driver + + # Set config values + self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path + kwargs = {'rpc_zmq_bind_address': '127.0.0.1', + 'rpc_zmq_host': '127.0.0.1', + 'rpc_response_timeout': 5, + 'rpc_zmq_port': get_unused_port(), + 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + self.config(**kwargs) + + # Start RPC + LOG.info("Running internal zmq receiver.") + self.reactor = impl_zmq.ZmqProxy(self.conf) + self.reactor.consume_in_thread() + + self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') + self.addCleanup(stopRpc(self.__dict__)) + + @mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True) + @mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True) + def test_zmqdriver_send(self, mock_multi_send, mock_cast): + context = mock.Mock(autospec=impl_zmq.RpcContext) + topic = 'testtopic' + msg = 'jeronimo' + self.driver.send(messaging.Target(topic=topic), context, msg, + False, 0, False) + mock_multi_send.assert_called_with(mock_cast, context, topic, msg, + allowed_remote_exmods=[], + envelope=False) + + @mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True) + @mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True) + def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast): + context = mock.Mock(autospec=impl_zmq.RpcContext) + topic = 'testtopic.foo' + topic_reformat = 'testtopic-foo' + msg = 'jeronimo' + self.driver.send_notification(messaging.Target(topic=topic), context, + msg, False, False) + mock_multi_send.assert_called_with(mock_cast, context, topic_reformat, + msg, allowed_remote_exmods=[], + envelope=False) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True) + @mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True) + def test_zmqdriver_listen(self, mock_connection, mock_listener): + mock_listener.return_value = listener = mock.Mock() + mock_connection.return_value = conn = mock.Mock() + conn.create_consumer = mock.Mock() + conn.consume_in_thread = mock.Mock() + topic = 'testtopic.foo' + self.driver.listen(messaging.Target(topic=topic)) + conn.create_consumer.assert_called_with(topic, listener, fanout=True) + + @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True) + @mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True) + def test_zmqdriver_listen_for_notification(self, mock_connection, + mock_listener): + mock_listener.return_value = listener = mock.Mock() + mock_connection.return_value = conn = mock.Mock() + conn.create_consumer = mock.Mock() + conn.consume_in_thread = mock.Mock() + topic = 'testtopic.foo' + data = [(messaging.Target(topic=topic), 0)] + # NOTE(jamespage): Pooling not supported, just pass None for now. + self.driver.listen_for_notifications(data, None) + conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener) diff --git a/tests/notify/test_listener.py b/tests/notify/test_listener.py index 45bdbece3..72e142e7e 100644 --- a/tests/notify/test_listener.py +++ b/tests/notify/test_listener.py @@ -64,7 +64,7 @@ class ListenerSetupMixin(object): def wait_for(self, expect_messages): while expect_messages != self._received_msgs: - pass + yield def stop(self): for listener in self.listeners: diff --git a/tests/notify/test_logger.py b/tests/notify/test_logger.py index 8f73ef3bc..668e96312 100644 --- a/tests/notify/test_logger.py +++ b/tests/notify/test_logger.py @@ -17,10 +17,6 @@ import logging import logging.config import os import sys -try: - import threading -except ImportError: - threading = None import mock import testscenarios @@ -39,13 +35,6 @@ logging.AUDIT = logging.INFO + 1 logging.addLevelName(logging.AUDIT, 'AUDIT') -def get_thread_ident(): - if threading is not None: - return threading.current_thread().ident - else: - return None - - class TestLogNotifier(test_utils.BaseTestCase): scenarios = [ @@ -62,6 +51,10 @@ class TestLogNotifier(test_utils.BaseTestCase): super(TestLogNotifier, self).setUp() self.addCleanup(messaging.notify._impl_test.reset) self.config(notification_driver=['test']) + # NOTE(jamespage) disable thread information logging for testing + # as this causes test failures when zmq tests monkey_patch via + # eventlet + logging.logThreads = 0 @mock.patch('oslo.utils.timeutils.utcnow') def test_logger(self, mock_utcnow): @@ -93,7 +86,7 @@ class TestLogNotifier(test_utils.BaseTestCase): {'process': os.getpid(), 'funcName': None, 'name': 'foo', - 'thread': get_thread_ident(), + 'thread': None, 'levelno': levelno, 'processName': 'MainProcess', 'pathname': '/foo/bar', @@ -149,7 +142,7 @@ class TestLogNotifier(test_utils.BaseTestCase): {'process': os.getpid(), 'funcName': 'test_logging_conf', 'name': 'default', - 'thread': get_thread_ident(), + 'thread': None, 'levelno': levelno, 'processName': 'MainProcess', 'pathname': pathname,