[zmq][matchmaker] Distinguish targets by listener types

In order to have a possibility to pass messages via different
pipelines (not over DEALER/ROUTER only) we need an information
in the name service about socket type assigned to the target.

Change-Id: I7cdba6c2c91af7f63ecca30c94faecef2c2eff8b
Closes-Bug: #1497326
This commit is contained in:
Oleksii Zamiatin 2015-11-11 12:13:44 +02:00
parent 497811b722
commit 517ae12b17
10 changed files with 76 additions and 55 deletions

@ -34,7 +34,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
self._check_request_pattern(request)
dealer_socket, hosts = self._check_hosts_connections(request.target)
dealer_socket, hosts = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
@ -104,7 +105,8 @@ class DealerPublisherProxy(DealerPublisher):
LOG.info(_LI("Envelope: %s") % envelope)
target = envelope[zmq_names.FIELD_TARGET]
dealer_socket, hosts = self._check_hosts_connections(target)
dealer_socket, hosts = self._check_hosts_connections(
target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide

@ -35,7 +35,8 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend):
if request.msg_type not in zmq_names.NOTIFY_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
pub_socket, hosts = self._check_hosts_connections(request.target)
pub_socket, hosts = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.SUB))
self._send_request(pub_socket, request)
def _send_request(self, socket, request):

@ -90,11 +90,10 @@ class PublisherBase(object):
:type request: zmq_request.Request
"""
LOG.info(_LI("Sending %(type)s message_id %(message)s to a target"
"%(target)s key: %(key)s, host:%(host)s")
"%(target)s host:%(host)s")
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"key": zmq_address.target_to_key(request.target),
"host": request.host})
socket.send_pyobj(request)
@ -122,10 +121,11 @@ class PublisherMultisend(PublisherBase):
self.socket_type = socket_type
self.matchmaker = matchmaker
def _check_hosts_connections(self, target):
def _check_hosts_connections(self, target, listener_type):
# TODO(ozamiatin): Place for significant optimization
# Matchmaker cache should be implemented
hosts = self.matchmaker.get_hosts(target)
hosts = self.matchmaker.get_hosts(
target, listener_type)
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:

@ -35,7 +35,8 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
push_socket, hosts = self._check_hosts_connections(request.target)
push_socket, hosts = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.PULL))
if not push_socket.connections:
LOG.warning(_LW("Request %s was dropped because no connection")

@ -50,7 +50,8 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
return self._receive_reply(socket, request)
def _resolve_host_address(self, target, timeout=0):
host = self.matchmaker.get_single_host(target, timeout)
host = self.matchmaker.get_single_host(
target, zmq_names.socket_type_str(zmq.ROUTER), timeout)
return zmq_address.get_tcp_direct_address(host)
def _connect_to_host(self, target, timeout=0):

@ -20,6 +20,7 @@ import retrying
import six
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._i18n import _LI, _LW
@ -35,27 +36,31 @@ class MatchMakerBase(object):
self.conf = conf
@abc.abstractmethod
def register(self, target, hostname):
def register(self, target, hostname, listener_type):
"""Register target on nameserver.
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
:type hostname: String
:param listener_type: Listener socket type ROUTER, SUB etc.
:type listener_type: String
"""
@abc.abstractmethod
def unregister(self, target, hostname):
def unregister(self, target, hostname, listener_type):
"""Unregister target from nameserver.
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
:type hostname: String
:param listener_type: Listener socket type ROUTER, SUB etc.
:type listener_type: String
"""
@abc.abstractmethod
def get_hosts(self, target):
def get_hosts(self, target, listener_type):
"""Get all hosts from nameserver by target.
:param target: the default target for invocations
@ -63,7 +68,7 @@ class MatchMakerBase(object):
:returns: a list of "hostname:port" hosts
"""
def get_single_host(self, target, timeout=None, retry=0):
def get_single_host(self, target, listener_type, timeout=None, retry=0):
"""Get a single host by target.
:param target: the target for messages
@ -101,7 +106,7 @@ class MatchMakerBase(object):
@_retry
def _get_single_host():
hosts = self.get_hosts(target)
hosts = self.get_hosts(target, listener_type)
try:
if not hosts:
err_msg = "No hosts were found for target %s." % target
@ -136,16 +141,16 @@ class DummyMatchMaker(MatchMakerBase):
self._cache = collections.defaultdict(list)
def register(self, target, hostname):
key = str(target)
def register(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
if hostname not in self._cache[key]:
self._cache[key].append(hostname)
def unregister(self, target, hostname):
key = str(target)
def unregister(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
if hostname in self._cache[key]:
self._cache[key].remove(hostname)
def get_hosts(self, target):
key = str(target)
def get_hosts(self, target, listener_type):
key = zmq_address.target_to_key(target, listener_type)
return self._cache[key]

@ -52,27 +52,29 @@ class RedisMatchMaker(base.MatchMakerBase):
def _get_hosts_by_key(self, key):
return self._redis.lrange(key, 0, -1)
def register(self, target, hostname):
def register(self, target, hostname, listener_type):
if target.topic and target.server:
key = zmq_address.target_to_key(target)
key = zmq_address.target_to_key(target, listener_type)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
if target.topic:
if hostname not in self._get_hosts_by_key(target.topic):
self._redis.lpush(target.topic, hostname)
key = zmq_address.prefix_str(target.topic, listener_type)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
if target.server:
if hostname not in self._get_hosts_by_key(target.server):
self._redis.lpush(target.server, hostname)
key = zmq_address.prefix_str(target.server, listener_type)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
def unregister(self, target, hostname):
key = zmq_address.target_to_key(target)
def unregister(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
self._redis.lrem(key, 0, hostname)
def get_hosts(self, target):
def get_hosts(self, target, listener_type):
hosts = []
key = zmq_address.target_to_key(target)
key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._get_hosts_by_key(key))
return hosts

@ -64,13 +64,14 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
LOG.info("[%s] Listen to target %s" % (self.host, target))
self.targets.append(target)
self.matchmaker.register(target=target,
hostname=self.host)
self.matchmaker.register(target, self.host,
zmq_names.socket_type_str(zmq.ROUTER))
def cleanup(self):
super(RouterConsumer, self).cleanup()
for target in self.targets:
self.matchmaker.unregister(target, self.host)
self.matchmaker.unregister(target, self.host,
zmq_names.socket_type_str(zmq.ROUTER))
def _receive_request(self, socket):
reply_id = socket.recv()

@ -29,12 +29,20 @@ def get_broker_address(conf):
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
def target_to_key(target):
def prefix_str(key, listener_type):
return listener_type + "_" + key
def target_to_key(target, listener_type):
def prefix(key):
return prefix_str(key, listener_type)
if target.topic and target.server:
attributes = ['topic', 'server']
key = ".".join(getattr(target, attr) for attr in attributes)
return key
return prefix(key)
if target.topic:
return target.topic
return prefix(target.topic)
if target.server:
return target.server
return prefix(target.server)

@ -62,47 +62,47 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.host2 = b"test_host2"
def test_register(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host1, "test")
self.assertEqual(self.test_matcher.get_hosts(self.target),
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target),
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
self.host1)
def test_register_two_hosts(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host2)
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host2, "test")
self.assertItemsEqual(self.test_matcher.get_hosts(self.target),
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1, self.host2])
self.assertIn(self.test_matcher.get_single_host(self.target),
self.assertIn(self.test_matcher.get_single_host(self.target, "test"),
[self.host1, self.host2])
def test_register_unsibscribe(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host2)
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host2, "test")
self.test_matcher.unregister(self.target, self.host2)
self.test_matcher.unregister(self.target, self.host2, "test")
self.assertItemsEqual(self.test_matcher.get_hosts(self.target),
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertNotIn(self.test_matcher.get_single_host(self.target),
self.assertNotIn(self.test_matcher.get_single_host(self.target, "test"),
[self.host2])
def test_register_two_same_hosts(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host1, "test")
self.assertEqual(self.test_matcher.get_hosts(self.target),
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target),
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
self.host1)
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertEqual(self.test_matcher.get_hosts(target), [])
self.assertEqual(self.test_matcher.get_hosts(target, "test"), [])
def test_get_single_host_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertRaises(oslo_messaging.InvalidTarget,
self.test_matcher.get_single_host, target)
self.test_matcher.get_single_host, target, "test")