Merge "[zmq] Fix fanout without PUB/SUB"

This commit is contained in:
Jenkins 2016-09-21 14:51:48 +00:00 committed by Gerrit Code Review
commit b1529e8236
5 changed files with 145 additions and 100 deletions
oslo_messaging

@ -37,12 +37,17 @@ class SocketsManager(object):
return self.matchmaker.get_hosts_retry( return self.matchmaker.get_hosts_retry(
target, zmq_names.socket_type_str(self.listener_type)) target, zmq_names.socket_type_str(self.listener_type))
def get_hosts_fanout(self, target):
return self.matchmaker.get_hosts_fanout_retry(
target, zmq_names.socket_type_str(self.listener_type))
@staticmethod @staticmethod
def _key_from_target(target): def _key_from_target(target):
return target.topic if target.fanout else str(target) return target.topic if target.fanout else str(target)
def _get_hosts_and_connect(self, socket, target): def _get_hosts_and_connect(self, socket, target):
hosts = self.get_hosts(target) get_hosts = self.get_hosts_fanout if target.fanout else self.get_hosts
hosts = get_hosts(target)
self._connect_to_hosts(socket, target, hosts) self._connect_to_hosts(socket, target, hosts)
def _track_socket(self, socket, target): def _track_socket(self, socket, target):

@ -13,6 +13,7 @@
import abc import abc
import collections import collections
import logging
import six import six
@ -20,6 +21,8 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
class MatchmakerUnavailable(rpc_common.RPCException): class MatchmakerUnavailable(rpc_common.RPCException):
"""Exception is raised on connection error to matchmaker service""" """Exception is raised on connection error to matchmaker service"""
@ -147,6 +150,28 @@ class MatchmakerBase(object):
:returns: a list of "hostname:port" hosts :returns: a list of "hostname:port" hosts
""" """
@abc.abstractmethod
def get_hosts_fanout(self, target, listener_type):
"""Get all hosts for fanout from nameserver by target.
:param target: the default target for invocations
:type target: Target
:param listener_type: listener socket type ROUTER, SUB etc.
:type listener_type: str
:returns: a list of "hostname:port" hosts
"""
@abc.abstractmethod
def get_hosts_fanout_retry(self, target, listener_type):
"""Retry if not host for fanout - used on client first time connection.
:param target: the default target for invocations
:type target: Target
:param listener_type: listener socket type ROUTER, SUB etc.
:type listener_type: str
:returns: a list of "hostname:port" hosts
"""
class MatchmakerDummy(MatchmakerBase): class MatchmakerDummy(MatchmakerBase):
@ -180,20 +205,56 @@ class MatchmakerDummy(MatchmakerBase):
return list(self._routers) return list(self._routers)
def register(self, target, hostname, listener_type, expire=-1): def register(self, target, hostname, listener_type, expire=-1):
key = zmq_address.target_to_key(target, listener_type) if target.server:
key = zmq_address.target_to_key(target, listener_type)
if hostname not in self._cache[key]:
self._cache[key].append(hostname)
key = zmq_address.prefix_str(target.topic, listener_type)
if hostname not in self._cache[key]: if hostname not in self._cache[key]:
self._cache[key].append(hostname) self._cache[key].append(hostname)
def unregister(self, target, hostname, listener_type): def unregister(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type) if target.server:
key = zmq_address.target_to_key(target, listener_type)
if hostname in self._cache[key]:
self._cache[key].remove(hostname)
key = zmq_address.prefix_str(target.topic, listener_type)
if hostname in self._cache[key]: if hostname in self._cache[key]:
self._cache[key].remove(hostname) self._cache[key].remove(hostname)
def get_hosts(self, target, listener_type): def get_hosts(self, target, listener_type):
key = zmq_address.target_to_key(target, listener_type) hosts = []
return self._cache[key]
if target.server:
key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._cache[key])
if not hosts:
key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._cache[key])
LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts})
return hosts
def get_hosts_retry(self, target, listener_type): def get_hosts_retry(self, target, listener_type):
# Do not complicate dummy matchmaker # Do not complicate dummy matchmaker
# This method will act smarter in real world matchmakers # This method will act smarter in real world matchmakers
return self.get_hosts(target, listener_type) return self.get_hosts(target, listener_type)
def get_hosts_fanout(self, target, listener_type):
key = zmq_address.prefix_str(target.topic, listener_type)
hosts = list(self._cache[key])
LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts})
return hosts
def get_hosts_fanout_retry(self, target, listener_type):
# Do not complicate dummy matchmaker
# This method will act smarter in real world matchmakers
return self.get_hosts_fanout(target, listener_type)

@ -198,34 +198,32 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
@redis_connection_warn @redis_connection_warn
def register(self, target, hostname, listener_type, expire=-1): def register(self, target, hostname, listener_type, expire=-1):
if target.topic and target.server: if target.server:
key = zmq_address.target_to_key(target, listener_type) key = zmq_address.target_to_key(target, listener_type)
self._add_key_with_expire(key, hostname, expire) self._add_key_with_expire(key, hostname, expire)
if target.topic: key = zmq_address.prefix_str(target.topic, listener_type)
key = zmq_address.prefix_str(target.topic, listener_type) self._add_key_with_expire(key, hostname, expire)
self._add_key_with_expire(key, hostname, expire)
@no_reraise @no_reraise
@redis_connection_warn @redis_connection_warn
def unregister(self, target, hostname, listener_type): def unregister(self, target, hostname, listener_type):
if target.topic and target.server: if target.server:
key = zmq_address.target_to_key(target, listener_type) key = zmq_address.target_to_key(target, listener_type)
self._redis.srem(key, hostname) self._redis.srem(key, hostname)
if target.topic: key = zmq_address.prefix_str(target.topic, listener_type)
key = zmq_address.prefix_str(target.topic, listener_type) self._redis.srem(key, hostname)
self._redis.srem(key, hostname)
@redis_connection_warn @redis_connection_warn
def get_hosts(self, target, listener_type): def get_hosts(self, target, listener_type):
hosts = [] hosts = []
if target.topic and target.server: if target.server:
key = zmq_address.target_to_key(target, listener_type) key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._get_hosts_by_key(key)) hosts.extend(self._get_hosts_by_key(key))
if not hosts and target.topic: if not hosts:
key = zmq_address.prefix_str(target.topic, listener_type) key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._get_hosts_by_key(key)) hosts.extend(self._get_hosts_by_key(key))
@ -239,14 +237,8 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
@redis_connection_warn @redis_connection_warn
def get_hosts_fanout(self, target, listener_type): def get_hosts_fanout(self, target, listener_type):
hosts = []
if target.topic and target.server:
key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._get_hosts_by_key(key))
key = zmq_address.prefix_str(target.topic, listener_type) key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._get_hosts_by_key(key)) hosts = list(self._get_hosts_by_key(key))
LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s", LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts}) {"target": target, "hosts": hosts})

@ -40,6 +40,10 @@ class ZmqServer(base.PollStyleListener):
self.target = target self.target = target
self.poller = poller or zmq_async.get_poller() self.poller = poller or zmq_async.get_poller()
LOG.info(_LI('[%(host)s] Run server %(target)s'),
{'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
'target': self.target})
self.router_consumer = zmq_router_consumer.RouterConsumer( self.router_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self) \ conf, self.poller, self) \
if not conf.oslo_messaging_zmq.use_router_proxy else None if not conf.oslo_messaging_zmq.use_router_proxy else None
@ -66,15 +70,22 @@ class ZmqServer(base.PollStyleListener):
def stop(self): def stop(self):
self.poller.close() self.poller.close()
LOG.info(_LI("Stop server %(target)s"), {'target': self.target})
for consumer in self.consumers: for consumer in self.consumers:
consumer.stop() consumer.stop()
LOG.info(_LI('[%(host)s] Stop server %(target)s'),
{'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
'target': self.target})
def cleanup(self): def cleanup(self):
self.poller.close() self.poller.close()
for consumer in self.consumers: for consumer in self.consumers:
consumer.cleanup() consumer.cleanup()
LOG.info(_LI('[%(host)s] Destroy server %(target)s'),
{'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
'target': self.target})
class ZmqNotificationServer(base.PollStyleListener): class ZmqNotificationServer(base.PollStyleListener):

@ -13,7 +13,6 @@
# under the License. # under the License.
import testtools import testtools
import time
import oslo_messaging import oslo_messaging
from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers import impl_zmq
@ -68,86 +67,63 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase):
class TestZmqBasics(zmq_common.ZmqBaseTestCase): class TestZmqBasics(zmq_common.ZmqBaseTestCase):
def test_send_receive_raises(self): @testtools.skipIf(zmq is None, "zmq not available")
"""Call() without method.""" def setUp(self):
target = oslo_messaging.Target(topic='testtopic') super(TestZmqBasics, self).setUp()
self.listener.listen(target) self.target = oslo_messaging.Target(topic='topic')
self.assertRaises( self.ctxt = {'key': 'value'}
KeyError, self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}}
self.driver.send,
target, {}, {'tx_id': 1},
wait_for_reply=True,
timeout=60)
def test_send_receive_topic(self): def test_send_call_without_method_failure(self):
"""Call() with topic.""" self.message.pop('method')
self.listener.listen(self.target)
self.assertRaises(KeyError, self.driver.send,
self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
target = oslo_messaging.Target(topic='testtopic') def _check_listener_received(self):
self.listener.listen(target) self.assertTrue(self.listener._received.isSet())
result = self.driver.send( self.assertEqual(self.ctxt, self.listener.message.ctxt)
target, {}, self.assertEqual(self.message, self.listener.message.message)
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True, def test_send_call_success(self):
timeout=60) self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
self.assertTrue(result) self.assertTrue(result)
self._check_listener_received()
def test_send_noreply(self): def test_send_call_direct_success(self):
"""Cast() with topic.""" self.target.server = 'server'
self.listener.listen(self.target)
target = oslo_messaging.Target(topic='testtopic', server="my@server") result = self.driver.send(self.target, self.ctxt, self.message,
self.listener.listen(target) wait_for_reply=True, timeout=10)
time.sleep(0.01)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_receive_direct(self):
"""Call() without topic."""
target = oslo_messaging.Target(server='127.0.0.1')
self.listener.listen(target)
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
result = self.driver.send(target, context, message,
wait_for_reply=True,
timeout=60)
self.assertTrue(result) self.assertTrue(result)
self._check_listener_received()
def test_send_receive_notification(self): def test_send_cast_direct_success(self):
"""Notify() test""" self.target.server = 'server'
self.listener.listen(self.target)
target = oslo_messaging.Target(topic='t1', result = self.driver.send(self.target, self.ctxt, self.message,
server='notification@server') wait_for_reply=False)
self.listener.listen_notifications([(target, 'info')])
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
target.topic += '.info'
self.driver.send_notification(target, context, message, '3.0')
self.listener._received.wait(5) self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet()) self.assertIsNone(result)
self._check_listener_received()
def test_send_fanout_success(self):
self.target.fanout = True
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()
def test_send_notify_success(self):
self.listener.listen_notifications([(self.target, 'info')])
self.target.topic += '.info'
result = self.driver.send_notification(self.target, self.ctxt,
self.message, '3.0')
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()