From feefead2881ee04eb22da69932970b0469b5a6b2 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Mon, 31 Oct 2016 19:50:36 +0200 Subject: [PATCH] [zmq] Send fanouts without pub/sub in background Change-Id: Ibfab90bb1dac06cd54671bc9a358927b3519ce63 --- .../dealer/zmq_dealer_publisher_direct.py | 1 - .../dealer/zmq_dealer_publisher_proxy.py | 1 - .../client/publishers/zmq_publisher_base.py | 7 ++-- .../zmq_driver/client/zmq_ack_manager.py | 25 ++++++----- .../zmq_driver/client/zmq_client_base.py | 31 +++++++++----- .../client/zmq_publisher_manager.py | 41 +++++++++++++++++- .../zmq_driver/client/zmq_routing_table.py | 42 ++++++++++++------- .../server/consumers/zmq_dealer_consumer.py | 8 ++-- .../_drivers/zmq_driver/zmq_options.py | 2 +- .../tests/drivers/zmq/test_zmq_ack_manager.py | 12 +++--- 10 files changed, 114 insertions(+), 56 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index 3f9031238..df0f22db7 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -20,7 +20,6 @@ from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names - zmq = zmq_async.import_zmq() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 4fd4fc11c..63a3d4996 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -28,7 +28,6 @@ from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_updater - zmq = zmq_async.import_zmq() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 25f4eab19..5de32bbe5 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -19,7 +19,6 @@ import six import oslo_messaging from oslo_messaging._drivers.zmq_driver import zmq_async - zmq = zmq_async.import_zmq() @@ -89,5 +88,7 @@ class PublisherBase(object): ) def cleanup(self): - """Cleanup publisher. Close allocated connections.""" + """Cleanup publisher: stop receiving responses, close allocated + connections etc. + """ self.receiver.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py index 5109e3584..02aab771c 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py @@ -29,10 +29,7 @@ zmq = zmq_async.import_zmq() class AckManager(zmq_publisher_manager.PublisherManagerBase): def __init__(self, publisher): - super(AckManager, self).__init__(publisher) - self._pool = zmq_async.get_pool( - size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size - ) + super(AckManager, self).__init__(publisher, with_pool=True) @staticmethod def _check_ack(ack, request): @@ -98,7 +95,7 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase): ack_future = self._schedule_request_for_ack(request) if ack_future is None: self.publisher._raise_timeout(request) - self._pool.submit(self._wait_for_ack, request, ack_future) + self.pool.submit(self._wait_for_ack, request, ack_future) try: return self.publisher.receive_reply(ack_future.socket, request) finally: @@ -106,14 +103,16 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase): ack_future.set_result(None) def send_cast(self, request): - self._pool.submit(self._wait_for_ack, request) + self.pool.submit(self._wait_for_ack, request) - def send_fanout(self, request): - self._send_request(request) + send_fanout = _send_request + send_notify = _send_request - def send_notify(self, request): - self._send_request(request) - def cleanup(self): - self._pool.shutdown(wait=True) - super(AckManager, self).cleanup() +class AckManagerAsyncMultisend(AckManager): + + def _send_request_async(self, request): + self.pool.submit(self._send_request, request) + + send_fanout = _send_request_async + send_notify = _send_request_async diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py index 3f8a9f88b..445530b48 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py @@ -71,23 +71,34 @@ class ZmqClientBase(object): @staticmethod def _create_publisher_direct(conf, matchmaker): - publisher_direct = zmq_dealer_publisher_direct.DealerPublisherDirect( - conf, matchmaker) - return zmq_publisher_manager.PublisherManagerDynamic(publisher_direct) + publisher_direct = \ + zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker) + publisher_manager_cls = zmq_publisher_manager.PublisherManagerDynamic \ + if conf.oslo_messaging_zmq.use_pub_sub else \ + zmq_publisher_manager.PublisherManagerDynamicAsyncMultisend + return publisher_manager_cls(publisher_direct) @staticmethod def _create_publisher_proxy(conf, matchmaker): - publisher_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy( - conf, matchmaker) - return zmq_ack_manager.AckManager(publisher_proxy) \ - if conf.oslo_messaging_zmq.rpc_use_acks else \ - zmq_publisher_manager.PublisherManagerStatic(publisher_proxy) + publisher_proxy = \ + zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker) + if conf.oslo_messaging_zmq.rpc_use_acks: + ack_manager_cls = zmq_ack_manager.AckManager \ + if conf.oslo_messaging_zmq.use_pub_sub else \ + zmq_ack_manager.AckManagerAsyncMultisend + return ack_manager_cls(publisher_proxy) + else: + publisher_manager_cls = \ + zmq_publisher_manager.PublisherManagerStatic \ + if conf.oslo_messaging_zmq.use_pub_sub else \ + zmq_publisher_manager.PublisherManagerStaticAsyncMultisend + return publisher_manager_cls(publisher_proxy) @staticmethod def _create_publisher_proxy_dynamic(conf, matchmaker): publisher_proxy = \ - zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic( - conf, matchmaker) + zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(conf, + matchmaker) return zmq_publisher_manager.PublisherManagerDynamic(publisher_proxy) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py index e3f77bdda..b9afc4ed7 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py @@ -63,14 +63,21 @@ class PublisherManagerBase(object): Publisher knows how to establish connection, how to send message, and how to receive reply. PublisherManager coordinates all these steps - regarding retrying logic in AckManager implementations + regarding retrying logic in AckManager implementations. May also have an + additional thread pool for scheduling background tasks. """ - def __init__(self, publisher): + def __init__(self, publisher, with_pool=False): self.publisher = publisher self.conf = publisher.conf self.sender = publisher.sender self.receiver = publisher.receiver + if with_pool: + self.pool = zmq_async.get_pool( + size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size + ) + else: + self.pool = None @abc.abstractmethod def send_call(self, request): @@ -105,6 +112,8 @@ class PublisherManagerBase(object): """ def cleanup(self): + if self.pool: + self.pool.shutdown(wait=True) self.publisher.cleanup() @@ -129,6 +138,20 @@ class PublisherManagerDynamic(PublisherManagerBase): send_notify = _send +class PublisherManagerDynamicAsyncMultisend(PublisherManagerDynamic): + + def __init__(self, publisher): + super(PublisherManagerDynamicAsyncMultisend, self).__init__( + publisher, with_pool=True + ) + + def _send_async(self, request): + self.pool.submit(self._send, request) + + send_fanout = _send_async + send_notify = _send_async + + class PublisherManagerStatic(PublisherManagerBase): @target_not_found_timeout @@ -146,3 +169,17 @@ class PublisherManagerStatic(PublisherManagerBase): send_cast = _send send_fanout = _send send_notify = _send + + +class PublisherManagerStaticAsyncMultisend(PublisherManagerStatic): + + def __init__(self, publisher): + super(PublisherManagerStaticAsyncMultisend, self).__init__( + publisher, with_pool=True + ) + + def _send_async(self, request): + self.pool.submit(self._send, request) + + send_fanout = _send_async + send_notify = _send_async diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py index c7bcbe845..346edf813 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -39,6 +39,7 @@ class RoutingTableAdaptor(object): self.routing_table_updater = RoutingTableUpdater( conf, matchmaker, self.routing_table) self.round_robin_targets = {} + self._lock = threading.Lock() def get_round_robin_host(self, target): target_key = zmq_address.target_to_key( @@ -47,20 +48,25 @@ class RoutingTableAdaptor(object): LOG.debug("Processing target %s for round-robin." % target_key) if target_key not in self.round_robin_targets: - LOG.debug("Target %s is not in cache. Check matchmaker server." - % target_key) - hosts = self.matchmaker.get_hosts_retry( - target, zmq_names.socket_type_str(self.listener_type)) - LOG.debug("Received hosts %s" % hosts) - self.routing_table.update_hosts(target_key, hosts) - self.round_robin_targets[target_key] = \ - self.routing_table.get_hosts_round_robin(target_key) + self._fetch_round_robin_hosts_from_matchmaker(target, target_key) rr_gen = self.round_robin_targets[target_key] host = next(rr_gen) LOG.debug("Host resolved for the current connection is %s" % host) return host + def _fetch_round_robin_hosts_from_matchmaker(self, target, target_key): + with self._lock: + if target_key not in self.round_robin_targets: + LOG.debug("Target %s is not in cache. Check matchmaker server." + % target_key) + hosts = self.matchmaker.get_hosts_retry( + target, zmq_names.socket_type_str(self.listener_type)) + LOG.debug("Received hosts %s" % hosts) + self.routing_table.update_hosts(target_key, hosts) + self.round_robin_targets[target_key] = \ + self.routing_table.get_hosts_round_robin(target_key) + def get_fanout_hosts(self, target): target_key = zmq_address.prefix_str( target.topic, zmq_names.socket_type_str(self.listener_type)) @@ -68,16 +74,20 @@ class RoutingTableAdaptor(object): LOG.debug("Processing target %s for fanout." % target_key) if not self.routing_table.contains(target_key): - LOG.debug("Target %s is not in cache. Check matchmaker server." - % target_key) - hosts = self.matchmaker.get_hosts_fanout( - target, zmq_names.socket_type_str(self.listener_type)) - LOG.debug("Received hosts %s" % hosts) - self.routing_table.update_hosts(target_key, hosts) - else: - LOG.debug("Target %s has been found in cache." % target_key) + self._fetch_fanout_hosts_from_matchmaker(target, target_key) + return self.routing_table.get_hosts_fanout(target_key) + def _fetch_fanout_hosts_from_matchmaker(self, target, target_key): + with self._lock: + if not self.routing_table.contains(target_key): + LOG.debug("Target %s is not in cache. Check matchmaker server." + % target_key) + hosts = self.matchmaker.get_hosts_fanout( + target, zmq_names.socket_type_str(self.listener_type)) + LOG.debug("Received hosts %s" % hosts) + self.routing_table.update_hosts(target_key, hosts) + def cleanup(self): self.routing_table_updater.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index efe2d5c29..57c51ca02 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -154,11 +154,12 @@ class DealerConsumerWithAcks(DealerConsumer): "msg_type": zmq_names.message_type_str(message_type), "msg_id": message_id} ) - # NOTE(gdavoian): send yet another ack for the non-CALL + # NOTE(gdavoian): send yet another ack for the direct # message, since the old one might be lost; # for the CALL message also try to resend its reply # (of course, if it was already obtained and cached). - self._acknowledge(reply_id, message_id, socket) + if message_type in zmq_names.DIRECT_TYPES: + self._acknowledge(reply_id, message_id, socket) if message_type == zmq_names.CALL_TYPE: self._reply_from_cache(message_id, socket) return None @@ -168,7 +169,8 @@ class DealerConsumerWithAcks(DealerConsumer): # NOTE(gdavoian): send an immediate ack, since it may # be too late to wait until the message will be # dispatched and processed by a RPC server - self._acknowledge(reply_id, message_id, socket) + if message_type in zmq_names.DIRECT_TYPES: + self._acknowledge(reply_id, message_id, socket) return super(DealerConsumerWithAcks, self)._create_message( context, message, reply_id, message_id, socket, message_type diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index 756234c0d..7fe7afabe 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -82,7 +82,7 @@ zmq_opts = [ help='Update period in seconds of a name service record ' 'about existing target.'), - cfg.BoolOpt('use_pub_sub', default=True, + cfg.BoolOpt('use_pub_sub', default=False, deprecated_group='DEFAULT', help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py index 800f9ed5f..bea9870ec 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py @@ -109,7 +109,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.target, {}, self.message, wait_for_reply=False ) self.assertIsNone(result) - self.ack_manager._pool.shutdown(wait=True) + self.ack_manager.pool.shutdown(wait=True) self.assertTrue(self.listener._received.isSet()) self.assertEqual(self.message, self.listener.message.message) self.assertEqual(1, self.send.call_count) @@ -133,7 +133,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', side_effect=DealerConsumerWithAcks._acknowledge, autospec=True) as received_ack_mock: - self.ack_manager._pool.shutdown(wait=True) + self.ack_manager.pool.shutdown(wait=True) self.assertFalse(self.listener._received.isSet()) self.assertEqual(2, self.send.call_count) self.assertEqual(1, received_ack_mock.call_count) @@ -161,7 +161,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', side_effect=DealerConsumerWithAcks._acknowledge, autospec=True) as received_ack_mock: - self.ack_manager._pool.shutdown(wait=True) + self.ack_manager.pool.shutdown(wait=True) self.assertFalse(self.listener._received.isSet()) self.assertEqual(3, self.send.call_count) self.assertEqual(1, received_ack_mock.call_count) @@ -173,7 +173,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.target, {}, self.message, wait_for_reply=False ) self.assertIsNone(result) - self.ack_manager._pool.shutdown(wait=True) + self.ack_manager.pool.shutdown(wait=True) self.assertTrue(self.listener._received.isSet()) self.assertEqual(self.message, self.listener.message.message) self.assertEqual(3, self.send.call_count) @@ -196,7 +196,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.target, {}, self.message, wait_for_reply=True, timeout=10 ) self.assertIsNotNone(result) - self.ack_manager._pool.shutdown(wait=True) + self.ack_manager.pool.shutdown(wait=True) self.assertTrue(self.listener._received.isSet()) self.assertEqual(self.message, self.listener.message.message) self.assertEqual(1, self.send.call_count) @@ -215,7 +215,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.driver.send, self.target, {}, self.message, wait_for_reply=True, timeout=20) - self.ack_manager._pool.shutdown(wait=True) + self.ack_manager.pool.shutdown(wait=True) self.assertTrue(self.listener._received.isSet()) self.assertEqual(self.message, self.listener.message.message) self.assertEqual(3, self.send.call_count)