diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py index 602e5a99d..922607c7e 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -34,7 +34,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): self._check_request_pattern(request) - dealer_socket, hosts = self._check_hosts_connections(request.target) + dealer_socket, hosts = self._check_hosts_connections( + request.target, zmq_names.socket_type_str(zmq.ROUTER)) if not dealer_socket.connections: # NOTE(ozamiatin): Here we can provide @@ -104,7 +105,8 @@ class DealerPublisherProxy(DealerPublisher): LOG.info(_LI("Envelope: %s") % envelope) target = envelope[zmq_names.FIELD_TARGET] - dealer_socket, hosts = self._check_hosts_connections(target) + dealer_socket, hosts = self._check_hosts_connections( + target, zmq_names.socket_type_str(zmq.ROUTER)) if not dealer_socket.connections: # NOTE(ozamiatin): Here we can provide diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 228724b6c..1cd3360eb 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -35,7 +35,8 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend): if request.msg_type not in zmq_names.NOTIFY_TYPES: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - pub_socket, hosts = self._check_hosts_connections(request.target) + pub_socket, hosts = self._check_hosts_connections( + request.target, zmq_names.socket_type_str(zmq.SUB)) self._send_request(pub_socket, request) def _send_request(self, socket, request): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 0a8098af3..46e8ef535 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -90,11 +90,10 @@ class PublisherBase(object): :type request: zmq_request.Request """ LOG.info(_LI("Sending %(type)s message_id %(message)s to a target" - "%(target)s key: %(key)s, host:%(host)s") + "%(target)s host:%(host)s") % {"type": request.msg_type, "message": request.message_id, "target": request.target, - "key": zmq_address.target_to_key(request.target), "host": request.host}) socket.send_pyobj(request) @@ -122,10 +121,11 @@ class PublisherMultisend(PublisherBase): self.socket_type = socket_type self.matchmaker = matchmaker - def _check_hosts_connections(self, target): + def _check_hosts_connections(self, target, listener_type): # TODO(ozamiatin): Place for significant optimization # Matchmaker cache should be implemented - hosts = self.matchmaker.get_hosts(target) + hosts = self.matchmaker.get_hosts( + target, listener_type) if str(target) in self.outbound_sockets: socket = self.outbound_sockets[str(target)] else: diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py index b8fc4fe51..7fcb46961 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py @@ -35,7 +35,8 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend): if request.msg_type == zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - push_socket, hosts = self._check_hosts_connections(request.target) + push_socket, hosts = self._check_hosts_connections( + request.target, zmq_names.socket_type_str(zmq.PULL)) if not push_socket.connections: LOG.warning(_LW("Request %s was dropped because no connection") diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index c6063b8ab..ace229ba5 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -50,7 +50,8 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): return self._receive_reply(socket, request) def _resolve_host_address(self, target, timeout=0): - host = self.matchmaker.get_single_host(target, timeout) + host = self.matchmaker.get_single_host( + target, zmq_names.socket_type_str(zmq.ROUTER), timeout) return zmq_address.get_tcp_direct_address(host) def _connect_to_host(self, target, timeout=0): diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py index 8b2365b41..7b9b69d79 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py @@ -20,6 +20,7 @@ import retrying import six import oslo_messaging +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._i18n import _LI, _LW @@ -35,27 +36,31 @@ class MatchMakerBase(object): self.conf = conf @abc.abstractmethod - def register(self, target, hostname): + def register(self, target, hostname, listener_type): """Register target on nameserver. :param target: the target for host :type target: Target :param hostname: host for the topic in "host:port" format :type hostname: String + :param listener_type: Listener socket type ROUTER, SUB etc. + :type listener_type: String """ @abc.abstractmethod - def unregister(self, target, hostname): + def unregister(self, target, hostname, listener_type): """Unregister target from nameserver. :param target: the target for host :type target: Target :param hostname: host for the topic in "host:port" format :type hostname: String + :param listener_type: Listener socket type ROUTER, SUB etc. + :type listener_type: String """ @abc.abstractmethod - def get_hosts(self, target): + def get_hosts(self, target, listener_type): """Get all hosts from nameserver by target. :param target: the default target for invocations @@ -63,7 +68,7 @@ class MatchMakerBase(object): :returns: a list of "hostname:port" hosts """ - def get_single_host(self, target, timeout=None, retry=0): + def get_single_host(self, target, listener_type, timeout=None, retry=0): """Get a single host by target. :param target: the target for messages @@ -101,7 +106,7 @@ class MatchMakerBase(object): @_retry def _get_single_host(): - hosts = self.get_hosts(target) + hosts = self.get_hosts(target, listener_type) try: if not hosts: err_msg = "No hosts were found for target %s." % target @@ -136,16 +141,16 @@ class DummyMatchMaker(MatchMakerBase): self._cache = collections.defaultdict(list) - def register(self, target, hostname): - key = str(target) + def register(self, target, hostname, listener_type): + key = zmq_address.target_to_key(target, listener_type) if hostname not in self._cache[key]: self._cache[key].append(hostname) - def unregister(self, target, hostname): - key = str(target) + def unregister(self, target, hostname, listener_type): + key = zmq_address.target_to_key(target, listener_type) if hostname in self._cache[key]: self._cache[key].remove(hostname) - def get_hosts(self, target): - key = str(target) + def get_hosts(self, target, listener_type): + key = zmq_address.target_to_key(target, listener_type) return self._cache[key] diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index cbf4e1066..576566a2f 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -52,27 +52,29 @@ class RedisMatchMaker(base.MatchMakerBase): def _get_hosts_by_key(self, key): return self._redis.lrange(key, 0, -1) - def register(self, target, hostname): + def register(self, target, hostname, listener_type): if target.topic and target.server: - key = zmq_address.target_to_key(target) + key = zmq_address.target_to_key(target, listener_type) if hostname not in self._get_hosts_by_key(key): self._redis.lpush(key, hostname) if target.topic: - if hostname not in self._get_hosts_by_key(target.topic): - self._redis.lpush(target.topic, hostname) + key = zmq_address.prefix_str(target.topic, listener_type) + if hostname not in self._get_hosts_by_key(key): + self._redis.lpush(key, hostname) if target.server: - if hostname not in self._get_hosts_by_key(target.server): - self._redis.lpush(target.server, hostname) + key = zmq_address.prefix_str(target.server, listener_type) + if hostname not in self._get_hosts_by_key(key): + self._redis.lpush(key, hostname) - def unregister(self, target, hostname): - key = zmq_address.target_to_key(target) + def unregister(self, target, hostname, listener_type): + key = zmq_address.target_to_key(target, listener_type) self._redis.lrem(key, 0, hostname) - def get_hosts(self, target): + def get_hosts(self, target, listener_type): hosts = [] - key = zmq_address.target_to_key(target) + key = zmq_address.target_to_key(target, listener_type) hosts.extend(self._get_hosts_by_key(key)) return hosts diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 9c529dac0..a501ef7a7 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -64,13 +64,14 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): LOG.info("[%s] Listen to target %s" % (self.host, target)) self.targets.append(target) - self.matchmaker.register(target=target, - hostname=self.host) + self.matchmaker.register(target, self.host, + zmq_names.socket_type_str(zmq.ROUTER)) def cleanup(self): super(RouterConsumer, self).cleanup() for target in self.targets: - self.matchmaker.unregister(target, self.host) + self.matchmaker.unregister(target, self.host, + zmq_names.socket_type_str(zmq.ROUTER)) def _receive_request(self, socket): reply_id = socket.recv() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index afc92490f..b0ca2eba9 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -29,12 +29,20 @@ def get_broker_address(conf): return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir -def target_to_key(target): +def prefix_str(key, listener_type): + return listener_type + "_" + key + + +def target_to_key(target, listener_type): + + def prefix(key): + return prefix_str(key, listener_type) + if target.topic and target.server: attributes = ['topic', 'server'] key = ".".join(getattr(target, attr) for attr in attributes) - return key + return prefix(key) if target.topic: - return target.topic + return prefix(target.topic) if target.server: - return target.server + return prefix(target.server) diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py index ba5f1f399..5751e5ba3 100644 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py @@ -62,47 +62,47 @@ class TestImplMatchmaker(test_utils.BaseTestCase): self.host2 = b"test_host2" def test_register(self): - self.test_matcher.register(self.target, self.host1) + self.test_matcher.register(self.target, self.host1, "test") - self.assertEqual(self.test_matcher.get_hosts(self.target), + self.assertEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1]) - self.assertEqual(self.test_matcher.get_single_host(self.target), + self.assertEqual(self.test_matcher.get_single_host(self.target, "test"), self.host1) def test_register_two_hosts(self): - self.test_matcher.register(self.target, self.host1) - self.test_matcher.register(self.target, self.host2) + self.test_matcher.register(self.target, self.host1, "test") + self.test_matcher.register(self.target, self.host2, "test") - self.assertItemsEqual(self.test_matcher.get_hosts(self.target), + self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1, self.host2]) - self.assertIn(self.test_matcher.get_single_host(self.target), + self.assertIn(self.test_matcher.get_single_host(self.target, "test"), [self.host1, self.host2]) def test_register_unsibscribe(self): - self.test_matcher.register(self.target, self.host1) - self.test_matcher.register(self.target, self.host2) + self.test_matcher.register(self.target, self.host1, "test") + self.test_matcher.register(self.target, self.host2, "test") - self.test_matcher.unregister(self.target, self.host2) + self.test_matcher.unregister(self.target, self.host2, "test") - self.assertItemsEqual(self.test_matcher.get_hosts(self.target), + self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1]) - self.assertNotIn(self.test_matcher.get_single_host(self.target), + self.assertNotIn(self.test_matcher.get_single_host(self.target, "test"), [self.host2]) def test_register_two_same_hosts(self): - self.test_matcher.register(self.target, self.host1) - self.test_matcher.register(self.target, self.host1) + self.test_matcher.register(self.target, self.host1, "test") + self.test_matcher.register(self.target, self.host1, "test") - self.assertEqual(self.test_matcher.get_hosts(self.target), + self.assertEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1]) - self.assertEqual(self.test_matcher.get_single_host(self.target), + self.assertEqual(self.test_matcher.get_single_host(self.target, "test"), self.host1) def test_get_hosts_wrong_topic(self): target = oslo_messaging.Target(topic="no_such_topic") - self.assertEqual(self.test_matcher.get_hosts(target), []) + self.assertEqual(self.test_matcher.get_hosts(target, "test"), []) def test_get_single_host_wrong_topic(self): target = oslo_messaging.Target(topic="no_such_topic") self.assertRaises(oslo_messaging.InvalidTarget, - self.test_matcher.get_single_host, target) + self.test_matcher.get_single_host, target, "test")