Merge "Add functional and unit 0mq driver tests"
This commit is contained in:
commit
fa68eaa280
@ -32,7 +32,7 @@ from oslo.config import cfg
|
||||
from oslo.messaging._drivers import base
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging._executors import impl_eventlet # FIXME(markmc)
|
||||
from oslo.messaging._i18n import _
|
||||
from oslo.messaging._i18n import _, _LE
|
||||
from oslo.serialization import jsonutils
|
||||
from oslo.utils import excutils
|
||||
from oslo.utils import importutils
|
||||
@ -180,6 +180,10 @@ class ZmqSocket(object):
|
||||
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
|
||||
self.subscriptions.remove(msg_filter)
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self.sock is None or self.sock.closed
|
||||
|
||||
def close(self):
|
||||
if self.sock is None or self.sock.closed:
|
||||
return
|
||||
@ -257,7 +261,10 @@ class RpcContext(rpc_common.CommonRpcContext):
|
||||
|
||||
@classmethod
|
||||
def marshal(self, ctx):
|
||||
ctx_data = ctx.to_dict()
|
||||
if not isinstance(ctx, dict):
|
||||
ctx_data = ctx.to_dict()
|
||||
else:
|
||||
ctx_data = ctx
|
||||
return _serialize(ctx_data)
|
||||
|
||||
@classmethod
|
||||
@ -395,7 +402,7 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
def consume_in_thread(self):
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
while True:
|
||||
while not sock.closed:
|
||||
self.consume(sock)
|
||||
|
||||
for k in self.proxies.keys():
|
||||
@ -408,12 +415,12 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
t.wait()
|
||||
|
||||
def close(self):
|
||||
for s in self.sockets:
|
||||
s.close()
|
||||
|
||||
for t in self.threads:
|
||||
t.kill()
|
||||
|
||||
for s in self.sockets:
|
||||
s.close()
|
||||
|
||||
|
||||
class ZmqProxy(ZmqBaseReactor):
|
||||
"""A consumer class implementing a topic-based proxy.
|
||||
@ -612,9 +619,15 @@ class Connection(rpc_common.Connection):
|
||||
self.topics.append(topic)
|
||||
|
||||
def close(self):
|
||||
_get_matchmaker().stop_heartbeat()
|
||||
mm = _get_matchmaker()
|
||||
mm.stop_heartbeat()
|
||||
for topic in self.topics:
|
||||
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
|
||||
try:
|
||||
mm.unregister(topic, CONF.rpc_zmq_host)
|
||||
except Exception as err:
|
||||
LOG.error(_LE('Unable to unregister topic %(topic)s'
|
||||
' from matchmaker: %(err)s') %
|
||||
{'topic': topic, 'err': err})
|
||||
|
||||
self.reactor.close()
|
||||
self.topics = []
|
||||
@ -634,6 +647,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||
payload = [RpcContext.marshal(context), msg]
|
||||
|
||||
with Timeout(timeout_cast, exception=rpc_common.Timeout):
|
||||
conn = None
|
||||
try:
|
||||
conn = ZmqClient(addr)
|
||||
|
||||
@ -642,7 +656,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||
except zmq.ZMQError:
|
||||
raise RPCException("Cast failed. ZMQ Socket Exception")
|
||||
finally:
|
||||
if 'conn' in vars():
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
|
||||
|
||||
@ -684,12 +698,14 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
zmq.SUB, subscribe=msg_id, bind=False
|
||||
)
|
||||
|
||||
LOG.debug("Sending cast")
|
||||
LOG.debug("Sending cast: %s", topic)
|
||||
_cast(addr, context, topic, payload, envelope=envelope)
|
||||
|
||||
LOG.debug("Cast sent; Waiting reply")
|
||||
# Blocks until receives reply
|
||||
msg = msg_waiter.recv()
|
||||
if msg is None:
|
||||
raise rpc_common.Timeout()
|
||||
LOG.debug("Received message: %s", msg)
|
||||
LOG.debug("Unpacking response")
|
||||
|
||||
@ -789,7 +805,7 @@ class ZmqIncomingMessage(base.IncomingMessage):
|
||||
self.condition.notify()
|
||||
|
||||
def requeue(self):
|
||||
pass
|
||||
LOG.debug("WARNING: requeue not supported")
|
||||
|
||||
|
||||
class ZmqListener(base.Listener):
|
||||
@ -858,19 +874,11 @@ class ZmqDriver(base.BaseDriver):
|
||||
raise NotImplementedError('The ZeroMQ driver currently only works '
|
||||
'with oslo.config.cfg.CONF')
|
||||
|
||||
self.listeners = []
|
||||
|
||||
def _send(self, target, ctxt, message,
|
||||
wait_for_reply=None, timeout=None, envelope=False):
|
||||
|
||||
# FIXME(markmc): remove this temporary hack
|
||||
class Context(object):
|
||||
def __init__(self, d):
|
||||
self.d = d
|
||||
|
||||
def to_dict(self):
|
||||
return self.d
|
||||
|
||||
context = Context(ctxt)
|
||||
|
||||
if wait_for_reply:
|
||||
method = _call
|
||||
else:
|
||||
@ -884,7 +892,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
elif target.server:
|
||||
topic = '%s.%s' % (topic, target.server)
|
||||
|
||||
reply = _multi_send(method, context, topic, message,
|
||||
reply = _multi_send(method, ctxt, topic, message,
|
||||
envelope=envelope,
|
||||
allowed_remote_exmods=self._allowed_remote_exmods)
|
||||
|
||||
@ -916,6 +924,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
conn.create_consumer(target.topic, listener, fanout=True)
|
||||
|
||||
conn.consume_in_thread()
|
||||
self.listeners.append(conn)
|
||||
|
||||
return listener
|
||||
|
||||
@ -934,8 +943,11 @@ class ZmqDriver(base.BaseDriver):
|
||||
conn.create_consumer('%s-%s' % (target.topic, priority),
|
||||
listener)
|
||||
conn.consume_in_thread()
|
||||
self.listeners.append(conn)
|
||||
|
||||
return listener
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
for c in self.listeners:
|
||||
c.close()
|
||||
self.listeners = []
|
||||
|
@ -21,6 +21,9 @@ qpid-python
|
||||
# for test_matchmaker_redis
|
||||
redis>=2.10.0
|
||||
|
||||
# for test_impl_zmq
|
||||
pyzmq>=14.3.1
|
||||
|
||||
# when we can require tox>= 1.4, this can go into tox.ini:
|
||||
# [testenv:cover]
|
||||
# deps = {[testenv]deps} coverage
|
||||
|
504
tests/drivers/test_impl_zmq.py
Normal file
504
tests/drivers/test_impl_zmq.py
Normal file
@ -0,0 +1,504 @@
|
||||
# Copyright 2014 Canonical, Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.utils import importutils
|
||||
from tests import utils as test_utils
|
||||
|
||||
# NOTE(jamespage) the zmq driver implementation is currently tied
|
||||
# to eventlet so we have to monkey_patch to support testing
|
||||
# eventlet is not yet py3 compatible, so skip if not installed
|
||||
eventlet = importutils.try_import('eventlet')
|
||||
if eventlet:
|
||||
eventlet.monkey_patch()
|
||||
|
||||
impl_zmq = importutils.try_import('oslo.messaging._drivers.impl_zmq')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_unused_port():
|
||||
"""Returns an unused port on localhost."""
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind(('localhost', 0))
|
||||
port = s.getsockname()[1]
|
||||
s.close()
|
||||
return port
|
||||
|
||||
|
||||
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestConfZmqDriverLoad, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
|
||||
def test_driver_load(self):
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
|
||||
|
||||
|
||||
class stopRpc(object):
|
||||
def __init__(self, attrs):
|
||||
self.attrs = attrs
|
||||
|
||||
def __call__(self):
|
||||
if self.attrs['reactor']:
|
||||
self.attrs['reactor'].close()
|
||||
if self.attrs['driver']:
|
||||
self.attrs['driver'].cleanup()
|
||||
|
||||
|
||||
class TestZmqBasics(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqBasics, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
def test_start_stop_listener(self):
|
||||
target = messaging.Target(topic='testtopic')
|
||||
listener = self.driver.listen(target)
|
||||
result = listener.poll(0.01)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_send_receive_raises(self):
|
||||
"""Call() without method."""
|
||||
target = messaging.Target(topic='testtopic')
|
||||
self.driver.listen(target)
|
||||
self.assertRaises(
|
||||
KeyError,
|
||||
self.driver.send,
|
||||
target, {}, {'tx_id': 1}, wait_for_reply=True)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqIncomingMessage')
|
||||
def test_send_receive_topic(self, mock_msg):
|
||||
"""Call() with method."""
|
||||
mock_msg.return_value = msg = mock.MagicMock()
|
||||
msg.received = received = mock.MagicMock()
|
||||
received.failure = False
|
||||
received.reply = True
|
||||
msg.condition = condition = mock.MagicMock()
|
||||
condition.wait.return_value = True
|
||||
|
||||
target = messaging.Target(topic='testtopic')
|
||||
self.driver.listen(target)
|
||||
result = self.driver.send(
|
||||
target, {},
|
||||
{'method': 'hello-world', 'tx_id': 1},
|
||||
wait_for_reply=True)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
|
||||
def test_send_receive_fanout(self, mock_call):
|
||||
target = messaging.Target(topic='testtopic', fanout=True)
|
||||
self.driver.listen(target)
|
||||
|
||||
mock_call.__name__ = '_call'
|
||||
mock_call.return_value = [True]
|
||||
|
||||
result = self.driver.send(
|
||||
target, {},
|
||||
{'method': 'hello-world', 'tx_id': 1},
|
||||
wait_for_reply=True)
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'fanout~testtopic.127.0.0.1',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
|
||||
def test_send_receive_direct(self, mock_call):
|
||||
# Also verifies fix for bug http://pad.lv/1301723
|
||||
target = messaging.Target(topic='testtopic', server='localhost')
|
||||
self.driver.listen(target)
|
||||
|
||||
mock_call.__name__ = '_call'
|
||||
mock_call.return_value = [True]
|
||||
|
||||
result = self.driver.send(
|
||||
target, {},
|
||||
{'method': 'hello-world', 'tx_id': 1},
|
||||
wait_for_reply=True)
|
||||
|
||||
self.assertEqual(result, True)
|
||||
mock_call.assert_called_once_with(
|
||||
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
|
||||
{}, 'testtopic.localhost',
|
||||
{'tx_id': 1, 'method': 'hello-world'},
|
||||
None, False, [])
|
||||
|
||||
|
||||
class TestZmqSocket(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqSocket, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
|
||||
subscribe=None)
|
||||
self.assertTrue(sock.can_recv)
|
||||
self.assertFalse(sock.can_send)
|
||||
self.assertFalse(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
|
||||
subscribe=None)
|
||||
self.assertTrue(sock.can_recv)
|
||||
self.assertFalse(sock.can_send)
|
||||
self.assertTrue(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
|
||||
subscribe=None)
|
||||
self.assertFalse(sock.can_recv)
|
||||
self.assertTrue(sock.can_send)
|
||||
self.assertFalse(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
|
||||
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
|
||||
mock_ctxt = mock.Mock()
|
||||
mock_context.return_value = mock_ctxt
|
||||
mock_sock = mock.Mock()
|
||||
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
|
||||
mock_sock.connect = mock.Mock()
|
||||
mock_sock.bind = mock.Mock()
|
||||
addr = '127.0.0.1'
|
||||
|
||||
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
|
||||
subscribe=None)
|
||||
self.assertFalse(sock.can_recv)
|
||||
self.assertTrue(sock.can_send)
|
||||
self.assertFalse(sock.can_sub)
|
||||
self.assertTrue(mock_sock.connect.called)
|
||||
self.assertFalse(mock_sock.bind.called)
|
||||
|
||||
|
||||
class TestZmqIncomingMessage(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqIncomingMessage, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
def test_zmqincomingmessage(self):
|
||||
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
|
||||
msg.reply("abc")
|
||||
self.assertIsInstance(
|
||||
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
|
||||
self.assertIsInstance(
|
||||
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
|
||||
self.assertEqual(msg.received.reply, "abc")
|
||||
msg.requeue()
|
||||
|
||||
|
||||
class TestZmqConnection(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqConnection, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_create_consumer(self, mock_reactor):
|
||||
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
(self.internal_ipc_dir))
|
||||
# No Fanout
|
||||
conn.create_consumer(topic, context)
|
||||
conn.reactor.register.assert_called_with(context, inaddr,
|
||||
impl_zmq.zmq.PULL,
|
||||
subscribe=None, in_bind=False)
|
||||
|
||||
# Reset for next bunch of checks
|
||||
conn.reactor.register.reset_mock()
|
||||
|
||||
# Fanout
|
||||
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
|
||||
(self.internal_ipc_dir))
|
||||
conn.create_consumer(topic, context, fanout='subscriber.foo')
|
||||
conn.reactor.register.assert_called_with(context, inaddr,
|
||||
impl_zmq.zmq.SUB,
|
||||
subscribe='subscriber.foo',
|
||||
in_bind=False)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
|
||||
mock_reactor.register = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
topic = 'topic.foo'
|
||||
context = mock.Mock()
|
||||
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
|
||||
(self.internal_ipc_dir))
|
||||
|
||||
conn.create_consumer(topic, context)
|
||||
conn.reactor.register.assert_called_with(
|
||||
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
|
||||
conn.reactor.register.reset_mock()
|
||||
# Call again with same topic
|
||||
conn.create_consumer(topic, context)
|
||||
self.assertFalse(conn.reactor.register.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
|
||||
autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn.reactor.close = mock.Mock()
|
||||
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
|
||||
conn.close()
|
||||
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
|
||||
self.assertTrue(conn.reactor.close.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_wait(self, mock_reactor):
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn.reactor.wait = mock.Mock()
|
||||
conn.wait()
|
||||
self.assertTrue(conn.reactor.wait.called)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
|
||||
autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
|
||||
def test_zmqconnection_consume_in_thread(self, mock_reactor,
|
||||
mock_getmatchmaker):
|
||||
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
|
||||
conn = impl_zmq.Connection(self.driver)
|
||||
conn.reactor.consume_in_thread = mock.Mock()
|
||||
conn.consume_in_thread()
|
||||
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
|
||||
self.assertTrue(conn.reactor.consume_in_thread.called)
|
||||
|
||||
|
||||
class TestZmqListener(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqListener, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
def test_zmqlistener_no_msg(self):
|
||||
listener = impl_zmq.ZmqListener(self.driver)
|
||||
# Timeout = 0 should return straight away since the queue is empty
|
||||
listener.poll(timeout=0)
|
||||
|
||||
def test_zmqlistener_w_msg(self):
|
||||
listener = impl_zmq.ZmqListener(self.driver)
|
||||
kwargs = {'a': 1, 'b': 2}
|
||||
m = mock.Mock()
|
||||
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
|
||||
eventlet.spawn_n(listener.dispatch, ctxt, 0,
|
||||
m.fake_method, 'name.space', **kwargs)
|
||||
resp = listener.poll(timeout=10)
|
||||
msg = {'method': m.fake_method, 'namespace': 'name.space',
|
||||
'args': kwargs}
|
||||
self.assertEqual(resp.message, msg)
|
||||
|
||||
|
||||
class TestZmqDriver(test_utils.BaseTestCase):
|
||||
|
||||
@testtools.skipIf(impl_zmq is None, "zmq not available")
|
||||
def setUp(self):
|
||||
super(TestZmqDriver, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'zmq'
|
||||
# Get driver
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
# Set config values
|
||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
|
||||
'rpc_zmq_host': '127.0.0.1',
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_port': get_unused_port(),
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
|
||||
self.config(**kwargs)
|
||||
|
||||
# Start RPC
|
||||
LOG.info("Running internal zmq receiver.")
|
||||
self.reactor = impl_zmq.ZmqProxy(self.conf)
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
|
||||
self.addCleanup(stopRpc(self.__dict__))
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
|
||||
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
|
||||
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
||||
topic = 'testtopic'
|
||||
msg = 'jeronimo'
|
||||
self.driver.send(messaging.Target(topic=topic), context, msg,
|
||||
False, 0, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
|
||||
allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
|
||||
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
|
||||
context = mock.Mock(autospec=impl_zmq.RpcContext)
|
||||
topic = 'testtopic.foo'
|
||||
topic_reformat = 'testtopic-foo'
|
||||
msg = 'jeronimo'
|
||||
self.driver.send_notification(messaging.Target(topic=topic), context,
|
||||
msg, False, False)
|
||||
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
|
||||
msg, allowed_remote_exmods=[],
|
||||
envelope=False)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
|
||||
def test_zmqdriver_listen(self, mock_connection, mock_listener):
|
||||
mock_listener.return_value = listener = mock.Mock()
|
||||
mock_connection.return_value = conn = mock.Mock()
|
||||
conn.create_consumer = mock.Mock()
|
||||
conn.consume_in_thread = mock.Mock()
|
||||
topic = 'testtopic.foo'
|
||||
self.driver.listen(messaging.Target(topic=topic))
|
||||
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
|
||||
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
|
||||
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
|
||||
def test_zmqdriver_listen_for_notification(self, mock_connection,
|
||||
mock_listener):
|
||||
mock_listener.return_value = listener = mock.Mock()
|
||||
mock_connection.return_value = conn = mock.Mock()
|
||||
conn.create_consumer = mock.Mock()
|
||||
conn.consume_in_thread = mock.Mock()
|
||||
topic = 'testtopic.foo'
|
||||
data = [(messaging.Target(topic=topic), 0)]
|
||||
# NOTE(jamespage): Pooling not supported, just pass None for now.
|
||||
self.driver.listen_for_notifications(data, None)
|
||||
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)
|
@ -64,7 +64,7 @@ class ListenerSetupMixin(object):
|
||||
|
||||
def wait_for(self, expect_messages):
|
||||
while expect_messages != self._received_msgs:
|
||||
pass
|
||||
yield
|
||||
|
||||
def stop(self):
|
||||
for listener in self.listeners:
|
||||
|
@ -17,10 +17,6 @@ import logging
|
||||
import logging.config
|
||||
import os
|
||||
import sys
|
||||
try:
|
||||
import threading
|
||||
except ImportError:
|
||||
threading = None
|
||||
|
||||
import mock
|
||||
import testscenarios
|
||||
@ -39,13 +35,6 @@ logging.AUDIT = logging.INFO + 1
|
||||
logging.addLevelName(logging.AUDIT, 'AUDIT')
|
||||
|
||||
|
||||
def get_thread_ident():
|
||||
if threading is not None:
|
||||
return threading.current_thread().ident
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class TestLogNotifier(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
@ -62,6 +51,10 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
super(TestLogNotifier, self).setUp()
|
||||
self.addCleanup(messaging.notify._impl_test.reset)
|
||||
self.config(notification_driver=['test'])
|
||||
# NOTE(jamespage) disable thread information logging for testing
|
||||
# as this causes test failures when zmq tests monkey_patch via
|
||||
# eventlet
|
||||
logging.logThreads = 0
|
||||
|
||||
@mock.patch('oslo.utils.timeutils.utcnow')
|
||||
def test_logger(self, mock_utcnow):
|
||||
@ -93,7 +86,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
{'process': os.getpid(),
|
||||
'funcName': None,
|
||||
'name': 'foo',
|
||||
'thread': get_thread_ident(),
|
||||
'thread': None,
|
||||
'levelno': levelno,
|
||||
'processName': 'MainProcess',
|
||||
'pathname': '/foo/bar',
|
||||
@ -149,7 +142,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
{'process': os.getpid(),
|
||||
'funcName': 'test_logging_conf',
|
||||
'name': 'default',
|
||||
'thread': get_thread_ident(),
|
||||
'thread': None,
|
||||
'levelno': levelno,
|
||||
'processName': 'MainProcess',
|
||||
'pathname': pathname,
|
||||
|
Loading…
x
Reference in New Issue
Block a user