Merge "[zmq][matchmaker] Distinguish targets by listener types"
This commit is contained in:
commit
3f6ef7be46
oslo_messaging
_drivers/zmq_driver
client/publishers
zmq_dealer_publisher.pyzmq_pub_publisher.pyzmq_publisher_base.pyzmq_push_publisher.pyzmq_req_publisher.py
matchmaker
server/consumers
zmq_address.pytests/drivers/zmq/matchmaker
@ -34,7 +34,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
|
||||
self._check_request_pattern(request)
|
||||
|
||||
dealer_socket, hosts = self._check_hosts_connections(request.target)
|
||||
dealer_socket, hosts = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
if not dealer_socket.connections:
|
||||
# NOTE(ozamiatin): Here we can provide
|
||||
@ -104,7 +105,8 @@ class DealerPublisherProxy(DealerPublisher):
|
||||
LOG.info(_LI("Envelope: %s") % envelope)
|
||||
|
||||
target = envelope[zmq_names.FIELD_TARGET]
|
||||
dealer_socket, hosts = self._check_hosts_connections(target)
|
||||
dealer_socket, hosts = self._check_hosts_connections(
|
||||
target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
if not dealer_socket.connections:
|
||||
# NOTE(ozamiatin): Here we can provide
|
||||
|
@ -35,7 +35,8 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
if request.msg_type not in zmq_names.NOTIFY_TYPES:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
|
||||
pub_socket, hosts = self._check_hosts_connections(request.target)
|
||||
pub_socket, hosts = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.SUB))
|
||||
self._send_request(pub_socket, request)
|
||||
|
||||
def _send_request(self, socket, request):
|
||||
|
@ -90,11 +90,10 @@ class PublisherBase(object):
|
||||
:type request: zmq_request.Request
|
||||
"""
|
||||
LOG.info(_LI("Sending %(type)s message_id %(message)s to a target"
|
||||
"%(target)s key: %(key)s, host:%(host)s")
|
||||
"%(target)s host:%(host)s")
|
||||
% {"type": request.msg_type,
|
||||
"message": request.message_id,
|
||||
"target": request.target,
|
||||
"key": zmq_address.target_to_key(request.target),
|
||||
"host": request.host})
|
||||
socket.send_pyobj(request)
|
||||
|
||||
@ -122,10 +121,11 @@ class PublisherMultisend(PublisherBase):
|
||||
self.socket_type = socket_type
|
||||
self.matchmaker = matchmaker
|
||||
|
||||
def _check_hosts_connections(self, target):
|
||||
def _check_hosts_connections(self, target, listener_type):
|
||||
# TODO(ozamiatin): Place for significant optimization
|
||||
# Matchmaker cache should be implemented
|
||||
hosts = self.matchmaker.get_hosts(target)
|
||||
hosts = self.matchmaker.get_hosts(
|
||||
target, listener_type)
|
||||
if str(target) in self.outbound_sockets:
|
||||
socket = self.outbound_sockets[str(target)]
|
||||
else:
|
||||
|
@ -35,7 +35,8 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
|
||||
push_socket, hosts = self._check_hosts_connections(request.target)
|
||||
push_socket, hosts = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.PULL))
|
||||
|
||||
if not push_socket.connections:
|
||||
LOG.warning(_LW("Request %s was dropped because no connection")
|
||||
|
@ -50,7 +50,8 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
|
||||
return self._receive_reply(socket, request)
|
||||
|
||||
def _resolve_host_address(self, target, timeout=0):
|
||||
host = self.matchmaker.get_single_host(target, timeout)
|
||||
host = self.matchmaker.get_single_host(
|
||||
target, zmq_names.socket_type_str(zmq.ROUTER), timeout)
|
||||
return zmq_address.get_tcp_direct_address(host)
|
||||
|
||||
def _connect_to_host(self, target, timeout=0):
|
||||
|
@ -20,6 +20,7 @@ import retrying
|
||||
import six
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._i18n import _LI, _LW
|
||||
|
||||
|
||||
@ -35,27 +36,31 @@ class MatchMakerBase(object):
|
||||
self.conf = conf
|
||||
|
||||
@abc.abstractmethod
|
||||
def register(self, target, hostname):
|
||||
def register(self, target, hostname, listener_type):
|
||||
"""Register target on nameserver.
|
||||
|
||||
:param target: the target for host
|
||||
:type target: Target
|
||||
:param hostname: host for the topic in "host:port" format
|
||||
:type hostname: String
|
||||
:param listener_type: Listener socket type ROUTER, SUB etc.
|
||||
:type listener_type: String
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def unregister(self, target, hostname):
|
||||
def unregister(self, target, hostname, listener_type):
|
||||
"""Unregister target from nameserver.
|
||||
|
||||
:param target: the target for host
|
||||
:type target: Target
|
||||
:param hostname: host for the topic in "host:port" format
|
||||
:type hostname: String
|
||||
:param listener_type: Listener socket type ROUTER, SUB etc.
|
||||
:type listener_type: String
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_hosts(self, target):
|
||||
def get_hosts(self, target, listener_type):
|
||||
"""Get all hosts from nameserver by target.
|
||||
|
||||
:param target: the default target for invocations
|
||||
@ -63,7 +68,7 @@ class MatchMakerBase(object):
|
||||
:returns: a list of "hostname:port" hosts
|
||||
"""
|
||||
|
||||
def get_single_host(self, target, timeout=None, retry=0):
|
||||
def get_single_host(self, target, listener_type, timeout=None, retry=0):
|
||||
"""Get a single host by target.
|
||||
|
||||
:param target: the target for messages
|
||||
@ -101,7 +106,7 @@ class MatchMakerBase(object):
|
||||
|
||||
@_retry
|
||||
def _get_single_host():
|
||||
hosts = self.get_hosts(target)
|
||||
hosts = self.get_hosts(target, listener_type)
|
||||
try:
|
||||
if not hosts:
|
||||
err_msg = "No hosts were found for target %s." % target
|
||||
@ -136,16 +141,16 @@ class DummyMatchMaker(MatchMakerBase):
|
||||
|
||||
self._cache = collections.defaultdict(list)
|
||||
|
||||
def register(self, target, hostname):
|
||||
key = str(target)
|
||||
def register(self, target, hostname, listener_type):
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
if hostname not in self._cache[key]:
|
||||
self._cache[key].append(hostname)
|
||||
|
||||
def unregister(self, target, hostname):
|
||||
key = str(target)
|
||||
def unregister(self, target, hostname, listener_type):
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
if hostname in self._cache[key]:
|
||||
self._cache[key].remove(hostname)
|
||||
|
||||
def get_hosts(self, target):
|
||||
key = str(target)
|
||||
def get_hosts(self, target, listener_type):
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
return self._cache[key]
|
||||
|
@ -52,27 +52,29 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
def _get_hosts_by_key(self, key):
|
||||
return self._redis.lrange(key, 0, -1)
|
||||
|
||||
def register(self, target, hostname):
|
||||
def register(self, target, hostname, listener_type):
|
||||
|
||||
if target.topic and target.server:
|
||||
key = zmq_address.target_to_key(target)
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
if hostname not in self._get_hosts_by_key(key):
|
||||
self._redis.lpush(key, hostname)
|
||||
|
||||
if target.topic:
|
||||
if hostname not in self._get_hosts_by_key(target.topic):
|
||||
self._redis.lpush(target.topic, hostname)
|
||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||
if hostname not in self._get_hosts_by_key(key):
|
||||
self._redis.lpush(key, hostname)
|
||||
|
||||
if target.server:
|
||||
if hostname not in self._get_hosts_by_key(target.server):
|
||||
self._redis.lpush(target.server, hostname)
|
||||
key = zmq_address.prefix_str(target.server, listener_type)
|
||||
if hostname not in self._get_hosts_by_key(key):
|
||||
self._redis.lpush(key, hostname)
|
||||
|
||||
def unregister(self, target, hostname):
|
||||
key = zmq_address.target_to_key(target)
|
||||
def unregister(self, target, hostname, listener_type):
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
self._redis.lrem(key, 0, hostname)
|
||||
|
||||
def get_hosts(self, target):
|
||||
def get_hosts(self, target, listener_type):
|
||||
hosts = []
|
||||
key = zmq_address.target_to_key(target)
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
hosts.extend(self._get_hosts_by_key(key))
|
||||
return hosts
|
||||
|
@ -64,13 +64,14 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
LOG.info("[%s] Listen to target %s" % (self.host, target))
|
||||
|
||||
self.targets.append(target)
|
||||
self.matchmaker.register(target=target,
|
||||
hostname=self.host)
|
||||
self.matchmaker.register(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
def cleanup(self):
|
||||
super(RouterConsumer, self).cleanup()
|
||||
for target in self.targets:
|
||||
self.matchmaker.unregister(target, self.host)
|
||||
self.matchmaker.unregister(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
def _receive_request(self, socket):
|
||||
reply_id = socket.recv()
|
||||
|
@ -29,12 +29,20 @@ def get_broker_address(conf):
|
||||
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
|
||||
|
||||
|
||||
def target_to_key(target):
|
||||
def prefix_str(key, listener_type):
|
||||
return listener_type + "_" + key
|
||||
|
||||
|
||||
def target_to_key(target, listener_type):
|
||||
|
||||
def prefix(key):
|
||||
return prefix_str(key, listener_type)
|
||||
|
||||
if target.topic and target.server:
|
||||
attributes = ['topic', 'server']
|
||||
key = ".".join(getattr(target, attr) for attr in attributes)
|
||||
return key
|
||||
return prefix(key)
|
||||
if target.topic:
|
||||
return target.topic
|
||||
return prefix(target.topic)
|
||||
if target.server:
|
||||
return target.server
|
||||
return prefix(target.server)
|
||||
|
@ -62,47 +62,47 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
|
||||
self.host2 = b"test_host2"
|
||||
|
||||
def test_register(self):
|
||||
self.test_matcher.register(self.target, self.host1)
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
|
||||
self.assertEqual(self.test_matcher.get_hosts(self.target),
|
||||
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||
[self.host1])
|
||||
self.assertEqual(self.test_matcher.get_single_host(self.target),
|
||||
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
|
||||
self.host1)
|
||||
|
||||
def test_register_two_hosts(self):
|
||||
self.test_matcher.register(self.target, self.host1)
|
||||
self.test_matcher.register(self.target, self.host2)
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(self.target, self.host2, "test")
|
||||
|
||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target),
|
||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||
[self.host1, self.host2])
|
||||
self.assertIn(self.test_matcher.get_single_host(self.target),
|
||||
self.assertIn(self.test_matcher.get_single_host(self.target, "test"),
|
||||
[self.host1, self.host2])
|
||||
|
||||
def test_register_unsibscribe(self):
|
||||
self.test_matcher.register(self.target, self.host1)
|
||||
self.test_matcher.register(self.target, self.host2)
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(self.target, self.host2, "test")
|
||||
|
||||
self.test_matcher.unregister(self.target, self.host2)
|
||||
self.test_matcher.unregister(self.target, self.host2, "test")
|
||||
|
||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target),
|
||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||
[self.host1])
|
||||
self.assertNotIn(self.test_matcher.get_single_host(self.target),
|
||||
self.assertNotIn(self.test_matcher.get_single_host(self.target, "test"),
|
||||
[self.host2])
|
||||
|
||||
def test_register_two_same_hosts(self):
|
||||
self.test_matcher.register(self.target, self.host1)
|
||||
self.test_matcher.register(self.target, self.host1)
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
|
||||
self.assertEqual(self.test_matcher.get_hosts(self.target),
|
||||
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||
[self.host1])
|
||||
self.assertEqual(self.test_matcher.get_single_host(self.target),
|
||||
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
|
||||
self.host1)
|
||||
|
||||
def test_get_hosts_wrong_topic(self):
|
||||
target = oslo_messaging.Target(topic="no_such_topic")
|
||||
self.assertEqual(self.test_matcher.get_hosts(target), [])
|
||||
self.assertEqual(self.test_matcher.get_hosts(target, "test"), [])
|
||||
|
||||
def test_get_single_host_wrong_topic(self):
|
||||
target = oslo_messaging.Target(topic="no_such_topic")
|
||||
self.assertRaises(oslo_messaging.InvalidTarget,
|
||||
self.test_matcher.get_single_host, target)
|
||||
self.test_matcher.get_single_host, target, "test")
|
||||
|
Loading…
x
Reference in New Issue
Block a user