From a9814d4cb2f1512378b67b6119ba6d0cdc850cb8 Mon Sep 17 00:00:00 2001
From: Gevorg Davoian <gdavoian@mirantis.com>
Date: Thu, 15 Sep 2016 17:53:04 +0300
Subject: [PATCH] [zmq] Send immediate ack after message receiving

During rally tests it was discovered that the latency between
receiving message and dispatching it can reach up to 10 minutes.
This behaviour breaks the mechanism of acks and retries in zmq driver,
which causes test failures. This patch tries to fix these problems
by moving ack sending from rpc server's thread to zmq dealer
consumer's thread (immediately after message receiving).

Change-Id: If33d14006ffa947baf5c34c8b1f61f336432374f
---
 .../server/consumers/zmq_dealer_consumer.py   |  8 +++-
 .../zmq_driver/server/zmq_incoming_message.py |  7 +++-
 .../zmq_driver/server/zmq_ttl_cache.py        |  9 ++++
 .../tests/drivers/zmq/test_zmq_ack_manager.py | 41 +++++++++++--------
 4 files changed, 45 insertions(+), 20 deletions(-)

diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index 0ec03ceb3..f3661af85 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -108,9 +108,9 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
                     # message, since the old one might be lost;
                     # for the CALL message also try to resend its reply
                     # (of course, if it was already obtained and cached).
-                    message.acknowledge()
+                    message._acknowledge()
                     if msg_type == zmq_names.CALL_TYPE:
-                        message.reply_from_cache()
+                        message._reply_from_cache()
                     return None
 
                 self.messages_cache.add(message_id)
@@ -120,6 +120,10 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
                      "msg_type": zmq_names.message_type_str(msg_type),
                      "msg_id": message_id}
                 )
+                # NOTE(gdavoian): send an immediate ack, since it may
+                # be too late to wait until the message will be
+                # dispatched and processed by a RPC server
+                message._acknowledge()
                 return message
 
             else:
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
index 493e50940..50c85d9c4 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
@@ -49,6 +49,9 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
         self.replies_cache = replies_cache
 
     def acknowledge(self):
+        """Acknowledge is not supported publicly (used only internally)."""
+
+    def _acknowledge(self):
         if self.ack_sender is not None:
             ack = zmq_response.Ack(message_id=self.message_id,
                                    reply_id=self.reply_id)
@@ -66,11 +69,11 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
             if self.replies_cache is not None:
                 self.replies_cache.add(self.message_id, reply)
 
-    def reply_from_cache(self):
+    def _reply_from_cache(self):
         if self.reply_sender is not None and self.replies_cache is not None:
             reply = self.replies_cache.get(self.message_id)
             if reply is not None:
                 self.reply_sender.send(self.socket, reply)
 
     def requeue(self):
-        """Requeue is not supported"""
+        """Requeue is not supported."""
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py
index 49edfbc37..91d5d4141 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py
@@ -12,6 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import logging
 import threading
 import time
 
@@ -19,6 +20,8 @@ import six
 
 from oslo_messaging._drivers.zmq_driver import zmq_async
 
+LOG = logging.getLogger(__name__)
+
 zmq = zmq_async.import_zmq()
 
 
@@ -71,10 +74,16 @@ class TTLCache(object):
     def _update_cache(self):
         with self._lock:
             current_time = time.time()
+            old_size = len(self._cache)
             self._cache = \
                 {key: (value, expiration_time) for
                  key, (value, expiration_time) in six.iteritems(self._cache)
                  if not self._is_expired(expiration_time, current_time)}
+            new_size = len(self._cache)
+            LOG.debug('Updated cache: current size %(new_size)s '
+                      '(%(size_difference)s records removed)',
+                      {'new_size': new_size,
+                       'size_difference': old_size - new_size})
         time.sleep(self._ttl)
 
     def cleanup(self):
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
index 7f5f434ee..dea640c19 100644
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
@@ -103,8 +103,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         time.sleep(1)
 
     @mock.patch.object(
-        zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
-        side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
+        zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
+        side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
         autospec=True
     )
     def test_cast_success_without_retries(self, received_ack_mock):
@@ -121,7 +121,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
 
     def test_cast_success_with_one_retry(self):
         with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
-                               'acknowledge') as lost_ack_mock:
+                               '_acknowledge') as lost_ack_mock:
             result = self.driver.send(
                 self.target, {}, self.message, wait_for_reply=False
             )
@@ -134,8 +134,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
             self.assertEqual(0, self.set_result.call_count)
             self.listener._received.clear()
         with mock.patch.object(
-            zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
-            side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
+            zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
+            side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
             autospec=True
         ) as received_ack_mock:
             self.ack_manager._pool.shutdown(wait=True)
@@ -146,7 +146,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
 
     def test_cast_success_with_two_retries(self):
         with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
-                               'acknowledge') as lost_ack_mock:
+                               '_acknowledge') as lost_ack_mock:
             result = self.driver.send(
                 self.target, {}, self.message, wait_for_reply=False
             )
@@ -164,8 +164,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
             self.assertEqual(2, lost_ack_mock.call_count)
             self.assertEqual(0, self.set_result.call_count)
         with mock.patch.object(
-            zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
-            side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
+            zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
+            side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
             autospec=True
         ) as received_ack_mock:
             self.ack_manager._pool.shutdown(wait=True)
@@ -174,7 +174,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
             self.assertEqual(1, received_ack_mock.call_count)
             self.assertEqual(2, self.set_result.call_count)
 
-    @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge')
+    @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge')
     def test_cast_failure_exhausted_retries(self, lost_ack_mock):
         result = self.driver.send(
             self.target, {}, self.message, wait_for_reply=False
@@ -188,8 +188,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         self.assertEqual(1, self.set_result.call_count)
 
     @mock.patch.object(
-        zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
-        side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
+        zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
+        side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
         autospec=True
     )
     @mock.patch.object(
@@ -197,7 +197,13 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         side_effect=zmq_incoming_message.ZmqIncomingMessage.reply,
         autospec=True
     )
-    def test_call_success_without_retries(self, received_reply_mock,
+    @mock.patch.object(
+        zmq_incoming_message.ZmqIncomingMessage, '_reply_from_cache',
+        side_effect=zmq_incoming_message.ZmqIncomingMessage._reply_from_cache,
+        autospec=True
+    )
+    def test_call_success_without_retries(self, unused_reply_from_cache_mock,
+                                          received_reply_mock,
                                           received_ack_mock):
         result = self.driver.send(
             self.target, {}, self.message, wait_for_reply=True, timeout=10
@@ -210,12 +216,14 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         self.assertEqual(1, received_ack_mock.call_count)
         self.assertEqual(3, self.set_result.call_count)
         received_reply_mock.assert_called_once_with(mock.ANY, reply=True)
+        self.assertEqual(0, unused_reply_from_cache_mock.call_count)
 
-    @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge')
+    @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge')
     @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'reply')
-    def test_call_failure_exhausted_retries_and_timeout_error(self,
-                                                              lost_reply_mock,
-                                                              lost_ack_mock):
+    @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
+                       '_reply_from_cache')
+    def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock,
+                                            lost_reply_mock, lost_ack_mock):
         self.assertRaises(oslo_messaging.MessagingTimeout,
                           self.driver.send,
                           self.target, {}, self.message,
@@ -227,3 +235,4 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         self.assertEqual(3, lost_ack_mock.call_count)
         self.assertEqual(2, self.set_result.call_count)
         lost_reply_mock.assert_called_once_with(reply=True)
+        self.assertEqual(2, lost_reply_from_cache_mock.call_count)