From c3df59e1ab8d7d68b5837b0d46547b2eafa67560 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian <gdavoian@mirantis.com> Date: Tue, 13 Sep 2016 13:58:25 +0300 Subject: [PATCH] [zmq] Fix fanout without PUB/SUB This patch fix incorrect fanout behavior when use_pub_sub=False. In addition it also makes dummy matchmaker behave similar to (already fixed here) redis one in order to make future testing more realistic (relevant tests were also fixed and refactored). Change-Id: Ie131de189972250ea2d9b99fe65b5908b7144569 Closes-Bug: #1622968 --- .../zmq_driver/client/zmq_sockets_manager.py | 7 +- .../matchmaker/zmq_matchmaker_base.py | 69 +++++++++- .../matchmaker/zmq_matchmaker_redis.py | 26 ++-- .../_drivers/zmq_driver/server/zmq_server.py | 13 +- .../tests/drivers/zmq/test_impl_zmq.py | 130 +++++++----------- 5 files changed, 145 insertions(+), 100 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index 0c26fdaac..6ad89fdf1 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -37,12 +37,17 @@ class SocketsManager(object): return self.matchmaker.get_hosts_retry( target, zmq_names.socket_type_str(self.listener_type)) + def get_hosts_fanout(self, target): + return self.matchmaker.get_hosts_fanout_retry( + target, zmq_names.socket_type_str(self.listener_type)) + @staticmethod def _key_from_target(target): return target.topic if target.fanout else str(target) def _get_hosts_and_connect(self, socket, target): - hosts = self.get_hosts(target) + get_hosts = self.get_hosts_fanout if target.fanout else self.get_hosts + hosts = get_hosts(target) self._connect_to_hosts(socket, target, hosts) def _track_socket(self, socket, target): diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py index 8376db8d9..22eaf898a 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py @@ -13,6 +13,7 @@ import abc import collections +import logging import six @@ -20,6 +21,8 @@ from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._i18n import _LE +LOG = logging.getLogger(__name__) + class MatchmakerUnavailable(rpc_common.RPCException): """Exception is raised on connection error to matchmaker service""" @@ -147,6 +150,28 @@ class MatchmakerBase(object): :returns: a list of "hostname:port" hosts """ + @abc.abstractmethod + def get_hosts_fanout(self, target, listener_type): + """Get all hosts for fanout from nameserver by target. + + :param target: the default target for invocations + :type target: Target + :param listener_type: listener socket type ROUTER, SUB etc. + :type listener_type: str + :returns: a list of "hostname:port" hosts + """ + + @abc.abstractmethod + def get_hosts_fanout_retry(self, target, listener_type): + """Retry if not host for fanout - used on client first time connection. + + :param target: the default target for invocations + :type target: Target + :param listener_type: listener socket type ROUTER, SUB etc. + :type listener_type: str + :returns: a list of "hostname:port" hosts + """ + class MatchmakerDummy(MatchmakerBase): @@ -180,20 +205,56 @@ class MatchmakerDummy(MatchmakerBase): return list(self._routers) def register(self, target, hostname, listener_type, expire=-1): - key = zmq_address.target_to_key(target, listener_type) + if target.server: + key = zmq_address.target_to_key(target, listener_type) + if hostname not in self._cache[key]: + self._cache[key].append(hostname) + + key = zmq_address.prefix_str(target.topic, listener_type) if hostname not in self._cache[key]: self._cache[key].append(hostname) def unregister(self, target, hostname, listener_type): - key = zmq_address.target_to_key(target, listener_type) + if target.server: + key = zmq_address.target_to_key(target, listener_type) + if hostname in self._cache[key]: + self._cache[key].remove(hostname) + + key = zmq_address.prefix_str(target.topic, listener_type) if hostname in self._cache[key]: self._cache[key].remove(hostname) def get_hosts(self, target, listener_type): - key = zmq_address.target_to_key(target, listener_type) - return self._cache[key] + hosts = [] + + if target.server: + key = zmq_address.target_to_key(target, listener_type) + hosts.extend(self._cache[key]) + + if not hosts: + key = zmq_address.prefix_str(target.topic, listener_type) + hosts.extend(self._cache[key]) + + LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s", + {"target": target, "hosts": hosts}) + + return hosts def get_hosts_retry(self, target, listener_type): # Do not complicate dummy matchmaker # This method will act smarter in real world matchmakers return self.get_hosts(target, listener_type) + + def get_hosts_fanout(self, target, listener_type): + key = zmq_address.prefix_str(target.topic, listener_type) + hosts = list(self._cache[key]) + + LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s", + {"target": target, "hosts": hosts}) + + return hosts + + def get_hosts_fanout_retry(self, target, listener_type): + # Do not complicate dummy matchmaker + # This method will act smarter in real world matchmakers + return self.get_hosts_fanout(target, listener_type) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py index 22ad912fc..e3efff9a6 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py @@ -198,34 +198,32 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase): @redis_connection_warn def register(self, target, hostname, listener_type, expire=-1): - if target.topic and target.server: + if target.server: key = zmq_address.target_to_key(target, listener_type) self._add_key_with_expire(key, hostname, expire) - if target.topic: - key = zmq_address.prefix_str(target.topic, listener_type) - self._add_key_with_expire(key, hostname, expire) + key = zmq_address.prefix_str(target.topic, listener_type) + self._add_key_with_expire(key, hostname, expire) @no_reraise @redis_connection_warn def unregister(self, target, hostname, listener_type): - if target.topic and target.server: + if target.server: key = zmq_address.target_to_key(target, listener_type) self._redis.srem(key, hostname) - if target.topic: - key = zmq_address.prefix_str(target.topic, listener_type) - self._redis.srem(key, hostname) + key = zmq_address.prefix_str(target.topic, listener_type) + self._redis.srem(key, hostname) @redis_connection_warn def get_hosts(self, target, listener_type): hosts = [] - if target.topic and target.server: + if target.server: key = zmq_address.target_to_key(target, listener_type) hosts.extend(self._get_hosts_by_key(key)) - if not hosts and target.topic: + if not hosts: key = zmq_address.prefix_str(target.topic, listener_type) hosts.extend(self._get_hosts_by_key(key)) @@ -239,14 +237,8 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase): @redis_connection_warn def get_hosts_fanout(self, target, listener_type): - hosts = [] - - if target.topic and target.server: - key = zmq_address.target_to_key(target, listener_type) - hosts.extend(self._get_hosts_by_key(key)) - key = zmq_address.prefix_str(target.topic, listener_type) - hosts.extend(self._get_hosts_by_key(key)) + hosts = list(self._get_hosts_by_key(key)) LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s", {"target": target, "hosts": hosts}) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index b40bdc098..fca7495f8 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -40,6 +40,10 @@ class ZmqServer(base.PollStyleListener): self.target = target self.poller = poller or zmq_async.get_poller() + LOG.info(_LI('[%(host)s] Run server %(target)s'), + {'host': self.conf.oslo_messaging_zmq.rpc_zmq_host, + 'target': self.target}) + self.router_consumer = zmq_router_consumer.RouterConsumer( conf, self.poller, self) \ if not conf.oslo_messaging_zmq.use_router_proxy else None @@ -66,15 +70,22 @@ class ZmqServer(base.PollStyleListener): def stop(self): self.poller.close() - LOG.info(_LI("Stop server %(target)s"), {'target': self.target}) for consumer in self.consumers: consumer.stop() + LOG.info(_LI('[%(host)s] Stop server %(target)s'), + {'host': self.conf.oslo_messaging_zmq.rpc_zmq_host, + 'target': self.target}) + def cleanup(self): self.poller.close() for consumer in self.consumers: consumer.cleanup() + LOG.info(_LI('[%(host)s] Destroy server %(target)s'), + {'host': self.conf.oslo_messaging_zmq.rpc_zmq_host, + 'target': self.target}) + class ZmqNotificationServer(base.PollStyleListener): diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 04d86d9de..5c2d7e49e 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -13,7 +13,6 @@ # under the License. import testtools -import time import oslo_messaging from oslo_messaging._drivers import impl_zmq @@ -68,86 +67,63 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase): class TestZmqBasics(zmq_common.ZmqBaseTestCase): - def test_send_receive_raises(self): - """Call() without method.""" - target = oslo_messaging.Target(topic='testtopic') - self.listener.listen(target) - self.assertRaises( - KeyError, - self.driver.send, - target, {}, {'tx_id': 1}, - wait_for_reply=True, - timeout=60) + @testtools.skipIf(zmq is None, "zmq not available") + def setUp(self): + super(TestZmqBasics, self).setUp() + self.target = oslo_messaging.Target(topic='topic') + self.ctxt = {'key': 'value'} + self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}} - def test_send_receive_topic(self): - """Call() with topic.""" + def test_send_call_without_method_failure(self): + self.message.pop('method') + self.listener.listen(self.target) + self.assertRaises(KeyError, self.driver.send, + self.target, self.ctxt, self.message, + wait_for_reply=True, timeout=10) - target = oslo_messaging.Target(topic='testtopic') - self.listener.listen(target) - result = self.driver.send( - target, {}, - {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=True, - timeout=60) + def _check_listener_received(self): + self.assertTrue(self.listener._received.isSet()) + self.assertEqual(self.ctxt, self.listener.message.ctxt) + self.assertEqual(self.message, self.listener.message.message) + + def test_send_call_success(self): + self.listener.listen(self.target) + result = self.driver.send(self.target, self.ctxt, self.message, + wait_for_reply=True, timeout=10) self.assertTrue(result) + self._check_listener_received() - def test_send_noreply(self): - """Cast() with topic.""" - - target = oslo_messaging.Target(topic='testtopic', server="my@server") - self.listener.listen(target) - time.sleep(0.01) - result = self.driver.send( - target, {}, - {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=False) - - self.listener._received.wait(5) - - self.assertIsNone(result) - self.assertTrue(self.listener._received.isSet()) - method = self.listener.message.message[u'method'] - self.assertEqual(u'hello-world', method) - - def test_send_fanout(self): - target = oslo_messaging.Target(topic='testtopic', fanout=True) - - self.listener.listen(target) - - result = self.driver.send( - target, {}, - {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=False) - - self.listener._received.wait(5) - - self.assertIsNone(result) - self.assertTrue(self.listener._received.isSet()) - method = self.listener.message.message[u'method'] - self.assertEqual(u'hello-world', method) - - def test_send_receive_direct(self): - """Call() without topic.""" - - target = oslo_messaging.Target(server='127.0.0.1') - self.listener.listen(target) - message = {'method': 'hello-world', 'tx_id': 1} - context = {} - result = self.driver.send(target, context, message, - wait_for_reply=True, - timeout=60) + def test_send_call_direct_success(self): + self.target.server = 'server' + self.listener.listen(self.target) + result = self.driver.send(self.target, self.ctxt, self.message, + wait_for_reply=True, timeout=10) self.assertTrue(result) + self._check_listener_received() - def test_send_receive_notification(self): - """Notify() test""" - - target = oslo_messaging.Target(topic='t1', - server='notification@server') - self.listener.listen_notifications([(target, 'info')]) - - message = {'method': 'hello-world', 'tx_id': 1} - context = {} - target.topic += '.info' - self.driver.send_notification(target, context, message, '3.0') + def test_send_cast_direct_success(self): + self.target.server = 'server' + self.listener.listen(self.target) + result = self.driver.send(self.target, self.ctxt, self.message, + wait_for_reply=False) self.listener._received.wait(5) - self.assertTrue(self.listener._received.isSet()) + self.assertIsNone(result) + self._check_listener_received() + + def test_send_fanout_success(self): + self.target.fanout = True + self.listener.listen(self.target) + result = self.driver.send(self.target, self.ctxt, self.message, + wait_for_reply=False) + self.listener._received.wait(5) + self.assertIsNone(result) + self._check_listener_received() + + def test_send_notify_success(self): + self.listener.listen_notifications([(self.target, 'info')]) + self.target.topic += '.info' + result = self.driver.send_notification(self.target, self.ctxt, + self.message, '3.0') + self.listener._received.wait(5) + self.assertIsNone(result) + self._check_listener_received()