diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 0ec03ceb3..f3661af85 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -108,9 +108,9 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): # message, since the old one might be lost; # for the CALL message also try to resend its reply # (of course, if it was already obtained and cached). - message.acknowledge() + message._acknowledge() if msg_type == zmq_names.CALL_TYPE: - message.reply_from_cache() + message._reply_from_cache() return None self.messages_cache.add(message_id) @@ -120,6 +120,10 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): "msg_type": zmq_names.message_type_str(msg_type), "msg_id": message_id} ) + # NOTE(gdavoian): send an immediate ack, since it may + # be too late to wait until the message will be + # dispatched and processed by a RPC server + message._acknowledge() return message else: diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 493e50940..50c85d9c4 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -49,6 +49,9 @@ class ZmqIncomingMessage(base.RpcIncomingMessage): self.replies_cache = replies_cache def acknowledge(self): + """Acknowledge is not supported publicly (used only internally).""" + + def _acknowledge(self): if self.ack_sender is not None: ack = zmq_response.Ack(message_id=self.message_id, reply_id=self.reply_id) @@ -66,11 +69,11 @@ class ZmqIncomingMessage(base.RpcIncomingMessage): if self.replies_cache is not None: self.replies_cache.add(self.message_id, reply) - def reply_from_cache(self): + def _reply_from_cache(self): if self.reply_sender is not None and self.replies_cache is not None: reply = self.replies_cache.get(self.message_id) if reply is not None: self.reply_sender.send(self.socket, reply) def requeue(self): - """Requeue is not supported""" + """Requeue is not supported.""" diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py index 49edfbc37..91d5d4141 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import logging import threading import time @@ -19,6 +20,8 @@ import six from oslo_messaging._drivers.zmq_driver import zmq_async +LOG = logging.getLogger(__name__) + zmq = zmq_async.import_zmq() @@ -71,10 +74,16 @@ class TTLCache(object): def _update_cache(self): with self._lock: current_time = time.time() + old_size = len(self._cache) self._cache = \ {key: (value, expiration_time) for key, (value, expiration_time) in six.iteritems(self._cache) if not self._is_expired(expiration_time, current_time)} + new_size = len(self._cache) + LOG.debug('Updated cache: current size %(new_size)s ' + '(%(size_difference)s records removed)', + {'new_size': new_size, + 'size_difference': old_size - new_size}) time.sleep(self._ttl) def cleanup(self): diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py index 7f5f434ee..dea640c19 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py @@ -103,8 +103,8 @@ class TestZmqAckManager(test_utils.BaseTestCase): time.sleep(1) @mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, 'acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge, + zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', + side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, autospec=True ) def test_cast_success_without_retries(self, received_ack_mock): @@ -121,7 +121,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): def test_cast_success_with_one_retry(self): with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, - 'acknowledge') as lost_ack_mock: + '_acknowledge') as lost_ack_mock: result = self.driver.send( self.target, {}, self.message, wait_for_reply=False ) @@ -134,8 +134,8 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(0, self.set_result.call_count) self.listener._received.clear() with mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, 'acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge, + zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', + side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, autospec=True ) as received_ack_mock: self.ack_manager._pool.shutdown(wait=True) @@ -146,7 +146,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): def test_cast_success_with_two_retries(self): with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, - 'acknowledge') as lost_ack_mock: + '_acknowledge') as lost_ack_mock: result = self.driver.send( self.target, {}, self.message, wait_for_reply=False ) @@ -164,8 +164,8 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(2, lost_ack_mock.call_count) self.assertEqual(0, self.set_result.call_count) with mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, 'acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge, + zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', + side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, autospec=True ) as received_ack_mock: self.ack_manager._pool.shutdown(wait=True) @@ -174,7 +174,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(1, received_ack_mock.call_count) self.assertEqual(2, self.set_result.call_count) - @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge') + @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge') def test_cast_failure_exhausted_retries(self, lost_ack_mock): result = self.driver.send( self.target, {}, self.message, wait_for_reply=False @@ -188,8 +188,8 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(1, self.set_result.call_count) @mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, 'acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge, + zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', + side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, autospec=True ) @mock.patch.object( @@ -197,7 +197,13 @@ class TestZmqAckManager(test_utils.BaseTestCase): side_effect=zmq_incoming_message.ZmqIncomingMessage.reply, autospec=True ) - def test_call_success_without_retries(self, received_reply_mock, + @mock.patch.object( + zmq_incoming_message.ZmqIncomingMessage, '_reply_from_cache', + side_effect=zmq_incoming_message.ZmqIncomingMessage._reply_from_cache, + autospec=True + ) + def test_call_success_without_retries(self, unused_reply_from_cache_mock, + received_reply_mock, received_ack_mock): result = self.driver.send( self.target, {}, self.message, wait_for_reply=True, timeout=10 @@ -210,12 +216,14 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(1, received_ack_mock.call_count) self.assertEqual(3, self.set_result.call_count) received_reply_mock.assert_called_once_with(mock.ANY, reply=True) + self.assertEqual(0, unused_reply_from_cache_mock.call_count) - @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge') + @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge') @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'reply') - def test_call_failure_exhausted_retries_and_timeout_error(self, - lost_reply_mock, - lost_ack_mock): + @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, + '_reply_from_cache') + def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock, + lost_reply_mock, lost_ack_mock): self.assertRaises(oslo_messaging.MessagingTimeout, self.driver.send, self.target, {}, self.message, @@ -227,3 +235,4 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(3, lost_ack_mock.call_count) self.assertEqual(2, self.set_result.call_count) lost_reply_mock.assert_called_once_with(reply=True) + self.assertEqual(2, lost_reply_from_cache_mock.call_count)