From feefead2881ee04eb22da69932970b0469b5a6b2 Mon Sep 17 00:00:00 2001
From: Gevorg Davoian <gdavoian@mirantis.com>
Date: Mon, 31 Oct 2016 19:50:36 +0200
Subject: [PATCH] [zmq] Send fanouts without pub/sub in background

Change-Id: Ibfab90bb1dac06cd54671bc9a358927b3519ce63
---
 .../dealer/zmq_dealer_publisher_direct.py     |  1 -
 .../dealer/zmq_dealer_publisher_proxy.py      |  1 -
 .../client/publishers/zmq_publisher_base.py   |  7 ++--
 .../zmq_driver/client/zmq_ack_manager.py      | 25 ++++++-----
 .../zmq_driver/client/zmq_client_base.py      | 31 +++++++++-----
 .../client/zmq_publisher_manager.py           | 41 +++++++++++++++++-
 .../zmq_driver/client/zmq_routing_table.py    | 42 ++++++++++++-------
 .../server/consumers/zmq_dealer_consumer.py   |  8 ++--
 .../_drivers/zmq_driver/zmq_options.py        |  2 +-
 .../tests/drivers/zmq/test_zmq_ack_manager.py | 12 +++---
 10 files changed, 114 insertions(+), 56 deletions(-)

diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
index 3f9031238..df0f22db7 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
@@ -20,7 +20,6 @@ from oslo_messaging._drivers.zmq_driver.client import zmq_senders
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 
-
 zmq = zmq_async.import_zmq()
 
 
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 4fd4fc11c..63a3d4996 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -28,7 +28,6 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._drivers.zmq_driver import zmq_updater
 
-
 zmq = zmq_async.import_zmq()
 
 
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
index 25f4eab19..5de32bbe5 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -19,7 +19,6 @@ import six
 import oslo_messaging
 from oslo_messaging._drivers.zmq_driver import zmq_async
 
-
 zmq = zmq_async.import_zmq()
 
 
@@ -89,5 +88,7 @@ class PublisherBase(object):
         )
 
     def cleanup(self):
-        """Cleanup publisher. Close allocated connections."""
+        """Cleanup publisher: stop receiving responses, close allocated
+        connections etc.
+        """
         self.receiver.stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py
index 5109e3584..02aab771c 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py
@@ -29,10 +29,7 @@ zmq = zmq_async.import_zmq()
 class AckManager(zmq_publisher_manager.PublisherManagerBase):
 
     def __init__(self, publisher):
-        super(AckManager, self).__init__(publisher)
-        self._pool = zmq_async.get_pool(
-            size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
-        )
+        super(AckManager, self).__init__(publisher, with_pool=True)
 
     @staticmethod
     def _check_ack(ack, request):
@@ -98,7 +95,7 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
         ack_future = self._schedule_request_for_ack(request)
         if ack_future is None:
             self.publisher._raise_timeout(request)
-        self._pool.submit(self._wait_for_ack, request, ack_future)
+        self.pool.submit(self._wait_for_ack, request, ack_future)
         try:
             return self.publisher.receive_reply(ack_future.socket, request)
         finally:
@@ -106,14 +103,16 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
                 ack_future.set_result(None)
 
     def send_cast(self, request):
-        self._pool.submit(self._wait_for_ack, request)
+        self.pool.submit(self._wait_for_ack, request)
 
-    def send_fanout(self, request):
-        self._send_request(request)
+    send_fanout = _send_request
+    send_notify = _send_request
 
-    def send_notify(self, request):
-        self._send_request(request)
 
-    def cleanup(self):
-        self._pool.shutdown(wait=True)
-        super(AckManager, self).cleanup()
+class AckManagerAsyncMultisend(AckManager):
+
+    def _send_request_async(self, request):
+        self.pool.submit(self._send_request, request)
+
+    send_fanout = _send_request_async
+    send_notify = _send_request_async
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
index 3f8a9f88b..445530b48 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
@@ -71,23 +71,34 @@ class ZmqClientBase(object):
 
     @staticmethod
     def _create_publisher_direct(conf, matchmaker):
-        publisher_direct = zmq_dealer_publisher_direct.DealerPublisherDirect(
-            conf, matchmaker)
-        return zmq_publisher_manager.PublisherManagerDynamic(publisher_direct)
+        publisher_direct = \
+            zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
+        publisher_manager_cls = zmq_publisher_manager.PublisherManagerDynamic \
+            if conf.oslo_messaging_zmq.use_pub_sub else \
+            zmq_publisher_manager.PublisherManagerDynamicAsyncMultisend
+        return publisher_manager_cls(publisher_direct)
 
     @staticmethod
     def _create_publisher_proxy(conf, matchmaker):
-        publisher_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy(
-            conf, matchmaker)
-        return zmq_ack_manager.AckManager(publisher_proxy) \
-            if conf.oslo_messaging_zmq.rpc_use_acks else \
-            zmq_publisher_manager.PublisherManagerStatic(publisher_proxy)
+        publisher_proxy = \
+            zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
+        if conf.oslo_messaging_zmq.rpc_use_acks:
+            ack_manager_cls = zmq_ack_manager.AckManager \
+                if conf.oslo_messaging_zmq.use_pub_sub else \
+                zmq_ack_manager.AckManagerAsyncMultisend
+            return ack_manager_cls(publisher_proxy)
+        else:
+            publisher_manager_cls = \
+                zmq_publisher_manager.PublisherManagerStatic \
+                if conf.oslo_messaging_zmq.use_pub_sub else \
+                zmq_publisher_manager.PublisherManagerStaticAsyncMultisend
+            return publisher_manager_cls(publisher_proxy)
 
     @staticmethod
     def _create_publisher_proxy_dynamic(conf, matchmaker):
         publisher_proxy = \
-            zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(
-                conf, matchmaker)
+            zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(conf,
+                                                                   matchmaker)
         return zmq_publisher_manager.PublisherManagerDynamic(publisher_proxy)
 
     def cleanup(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py
index e3f77bdda..b9afc4ed7 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py
@@ -63,14 +63,21 @@ class PublisherManagerBase(object):
 
     Publisher knows how to establish connection, how to send message,
     and how to receive reply. PublisherManager coordinates all these steps
-    regarding retrying logic in AckManager implementations
+    regarding retrying logic in AckManager implementations. May also have an
+    additional thread pool for scheduling background tasks.
     """
 
-    def __init__(self, publisher):
+    def __init__(self, publisher, with_pool=False):
         self.publisher = publisher
         self.conf = publisher.conf
         self.sender = publisher.sender
         self.receiver = publisher.receiver
+        if with_pool:
+            self.pool = zmq_async.get_pool(
+                size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
+            )
+        else:
+            self.pool = None
 
     @abc.abstractmethod
     def send_call(self, request):
@@ -105,6 +112,8 @@ class PublisherManagerBase(object):
         """
 
     def cleanup(self):
+        if self.pool:
+            self.pool.shutdown(wait=True)
         self.publisher.cleanup()
 
 
@@ -129,6 +138,20 @@ class PublisherManagerDynamic(PublisherManagerBase):
     send_notify = _send
 
 
+class PublisherManagerDynamicAsyncMultisend(PublisherManagerDynamic):
+
+    def __init__(self, publisher):
+        super(PublisherManagerDynamicAsyncMultisend, self).__init__(
+            publisher, with_pool=True
+        )
+
+    def _send_async(self, request):
+        self.pool.submit(self._send, request)
+
+    send_fanout = _send_async
+    send_notify = _send_async
+
+
 class PublisherManagerStatic(PublisherManagerBase):
 
     @target_not_found_timeout
@@ -146,3 +169,17 @@ class PublisherManagerStatic(PublisherManagerBase):
     send_cast = _send
     send_fanout = _send
     send_notify = _send
+
+
+class PublisherManagerStaticAsyncMultisend(PublisherManagerStatic):
+
+    def __init__(self, publisher):
+        super(PublisherManagerStaticAsyncMultisend, self).__init__(
+            publisher, with_pool=True
+        )
+
+    def _send_async(self, request):
+        self.pool.submit(self._send, request)
+
+    send_fanout = _send_async
+    send_notify = _send_async
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
index c7bcbe845..346edf813 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
@@ -39,6 +39,7 @@ class RoutingTableAdaptor(object):
         self.routing_table_updater = RoutingTableUpdater(
             conf, matchmaker, self.routing_table)
         self.round_robin_targets = {}
+        self._lock = threading.Lock()
 
     def get_round_robin_host(self, target):
         target_key = zmq_address.target_to_key(
@@ -47,20 +48,25 @@ class RoutingTableAdaptor(object):
         LOG.debug("Processing target %s for round-robin." % target_key)
 
         if target_key not in self.round_robin_targets:
-            LOG.debug("Target %s is not in cache. Check matchmaker server."
-                      % target_key)
-            hosts = self.matchmaker.get_hosts_retry(
-                target, zmq_names.socket_type_str(self.listener_type))
-            LOG.debug("Received hosts %s" % hosts)
-            self.routing_table.update_hosts(target_key, hosts)
-            self.round_robin_targets[target_key] = \
-                self.routing_table.get_hosts_round_robin(target_key)
+            self._fetch_round_robin_hosts_from_matchmaker(target, target_key)
 
         rr_gen = self.round_robin_targets[target_key]
         host = next(rr_gen)
         LOG.debug("Host resolved for the current connection is %s" % host)
         return host
 
+    def _fetch_round_robin_hosts_from_matchmaker(self, target, target_key):
+        with self._lock:
+            if target_key not in self.round_robin_targets:
+                LOG.debug("Target %s is not in cache. Check matchmaker server."
+                          % target_key)
+                hosts = self.matchmaker.get_hosts_retry(
+                    target, zmq_names.socket_type_str(self.listener_type))
+                LOG.debug("Received hosts %s" % hosts)
+                self.routing_table.update_hosts(target_key, hosts)
+                self.round_robin_targets[target_key] = \
+                    self.routing_table.get_hosts_round_robin(target_key)
+
     def get_fanout_hosts(self, target):
         target_key = zmq_address.prefix_str(
             target.topic, zmq_names.socket_type_str(self.listener_type))
@@ -68,16 +74,20 @@ class RoutingTableAdaptor(object):
         LOG.debug("Processing target %s for fanout." % target_key)
 
         if not self.routing_table.contains(target_key):
-            LOG.debug("Target %s is not in cache. Check matchmaker server."
-                      % target_key)
-            hosts = self.matchmaker.get_hosts_fanout(
-                target, zmq_names.socket_type_str(self.listener_type))
-            LOG.debug("Received hosts %s" % hosts)
-            self.routing_table.update_hosts(target_key, hosts)
-        else:
-            LOG.debug("Target %s has been found in cache." % target_key)
+            self._fetch_fanout_hosts_from_matchmaker(target, target_key)
+
         return self.routing_table.get_hosts_fanout(target_key)
 
+    def _fetch_fanout_hosts_from_matchmaker(self, target, target_key):
+        with self._lock:
+            if not self.routing_table.contains(target_key):
+                LOG.debug("Target %s is not in cache. Check matchmaker server."
+                          % target_key)
+                hosts = self.matchmaker.get_hosts_fanout(
+                    target, zmq_names.socket_type_str(self.listener_type))
+                LOG.debug("Received hosts %s" % hosts)
+                self.routing_table.update_hosts(target_key, hosts)
+
     def cleanup(self):
         self.routing_table_updater.cleanup()
 
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index efe2d5c29..57c51ca02 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -154,11 +154,12 @@ class DealerConsumerWithAcks(DealerConsumer):
                  "msg_type": zmq_names.message_type_str(message_type),
                  "msg_id": message_id}
             )
-            # NOTE(gdavoian): send yet another ack for the non-CALL
+            # NOTE(gdavoian): send yet another ack for the direct
             # message, since the old one might be lost;
             # for the CALL message also try to resend its reply
             # (of course, if it was already obtained and cached).
-            self._acknowledge(reply_id, message_id, socket)
+            if message_type in zmq_names.DIRECT_TYPES:
+                self._acknowledge(reply_id, message_id, socket)
             if message_type == zmq_names.CALL_TYPE:
                 self._reply_from_cache(message_id, socket)
             return None
@@ -168,7 +169,8 @@ class DealerConsumerWithAcks(DealerConsumer):
         # NOTE(gdavoian): send an immediate ack, since it may
         # be too late to wait until the message will be
         # dispatched and processed by a RPC server
-        self._acknowledge(reply_id, message_id, socket)
+        if message_type in zmq_names.DIRECT_TYPES:
+            self._acknowledge(reply_id, message_id, socket)
 
         return super(DealerConsumerWithAcks, self)._create_message(
             context, message, reply_id, message_id, socket, message_type
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
index 756234c0d..7fe7afabe 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
@@ -82,7 +82,7 @@ zmq_opts = [
                help='Update period in seconds of a name service record '
                     'about existing target.'),
 
-    cfg.BoolOpt('use_pub_sub', default=True,
+    cfg.BoolOpt('use_pub_sub', default=False,
                 deprecated_group='DEFAULT',
                 help='Use PUB/SUB pattern for fanout methods. '
                      'PUB/SUB always uses proxy.'),
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
index 800f9ed5f..bea9870ec 100644
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
@@ -109,7 +109,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
             self.target, {}, self.message, wait_for_reply=False
         )
         self.assertIsNone(result)
-        self.ack_manager._pool.shutdown(wait=True)
+        self.ack_manager.pool.shutdown(wait=True)
         self.assertTrue(self.listener._received.isSet())
         self.assertEqual(self.message, self.listener.message.message)
         self.assertEqual(1, self.send.call_count)
@@ -133,7 +133,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
                                side_effect=DealerConsumerWithAcks._acknowledge,
                                autospec=True) as received_ack_mock:
-            self.ack_manager._pool.shutdown(wait=True)
+            self.ack_manager.pool.shutdown(wait=True)
             self.assertFalse(self.listener._received.isSet())
             self.assertEqual(2, self.send.call_count)
             self.assertEqual(1, received_ack_mock.call_count)
@@ -161,7 +161,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
                                side_effect=DealerConsumerWithAcks._acknowledge,
                                autospec=True) as received_ack_mock:
-            self.ack_manager._pool.shutdown(wait=True)
+            self.ack_manager.pool.shutdown(wait=True)
             self.assertFalse(self.listener._received.isSet())
             self.assertEqual(3, self.send.call_count)
             self.assertEqual(1, received_ack_mock.call_count)
@@ -173,7 +173,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
             self.target, {}, self.message, wait_for_reply=False
         )
         self.assertIsNone(result)
-        self.ack_manager._pool.shutdown(wait=True)
+        self.ack_manager.pool.shutdown(wait=True)
         self.assertTrue(self.listener._received.isSet())
         self.assertEqual(self.message, self.listener.message.message)
         self.assertEqual(3, self.send.call_count)
@@ -196,7 +196,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
             self.target, {}, self.message, wait_for_reply=True, timeout=10
         )
         self.assertIsNotNone(result)
-        self.ack_manager._pool.shutdown(wait=True)
+        self.ack_manager.pool.shutdown(wait=True)
         self.assertTrue(self.listener._received.isSet())
         self.assertEqual(self.message, self.listener.message.message)
         self.assertEqual(1, self.send.call_count)
@@ -215,7 +215,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
                           self.driver.send,
                           self.target, {}, self.message,
                           wait_for_reply=True, timeout=20)
-        self.ack_manager._pool.shutdown(wait=True)
+        self.ack_manager.pool.shutdown(wait=True)
         self.assertTrue(self.listener._received.isSet())
         self.assertEqual(self.message, self.listener.message.message)
         self.assertEqual(3, self.send.call_count)