[zmq] Send immediate ack after message receiving

During rally tests it was discovered that the latency between
receiving message and dispatching it can reach up to 10 minutes.
This behaviour breaks the mechanism of acks and retries in zmq driver,
which causes test failures. This patch tries to fix these problems
by moving ack sending from rpc server's thread to zmq dealer
consumer's thread (immediately after message receiving).

Change-Id: If33d14006ffa947baf5c34c8b1f61f336432374f
This commit is contained in:
Gevorg Davoian 2016-09-15 17:53:04 +03:00
parent afaa4d935d
commit a9814d4cb2
4 changed files with 45 additions and 20 deletions
oslo_messaging

@ -108,9 +108,9 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
# message, since the old one might be lost;
# for the CALL message also try to resend its reply
# (of course, if it was already obtained and cached).
message.acknowledge()
message._acknowledge()
if msg_type == zmq_names.CALL_TYPE:
message.reply_from_cache()
message._reply_from_cache()
return None
self.messages_cache.add(message_id)
@ -120,6 +120,10 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
"msg_type": zmq_names.message_type_str(msg_type),
"msg_id": message_id}
)
# NOTE(gdavoian): send an immediate ack, since it may
# be too late to wait until the message will be
# dispatched and processed by a RPC server
message._acknowledge()
return message
else:

@ -49,6 +49,9 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
self.replies_cache = replies_cache
def acknowledge(self):
"""Acknowledge is not supported publicly (used only internally)."""
def _acknowledge(self):
if self.ack_sender is not None:
ack = zmq_response.Ack(message_id=self.message_id,
reply_id=self.reply_id)
@ -66,11 +69,11 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
if self.replies_cache is not None:
self.replies_cache.add(self.message_id, reply)
def reply_from_cache(self):
def _reply_from_cache(self):
if self.reply_sender is not None and self.replies_cache is not None:
reply = self.replies_cache.get(self.message_id)
if reply is not None:
self.reply_sender.send(self.socket, reply)
def requeue(self):
"""Requeue is not supported"""
"""Requeue is not supported."""

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import time
@ -19,6 +20,8 @@ import six
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@ -71,10 +74,16 @@ class TTLCache(object):
def _update_cache(self):
with self._lock:
current_time = time.time()
old_size = len(self._cache)
self._cache = \
{key: (value, expiration_time) for
key, (value, expiration_time) in six.iteritems(self._cache)
if not self._is_expired(expiration_time, current_time)}
new_size = len(self._cache)
LOG.debug('Updated cache: current size %(new_size)s '
'(%(size_difference)s records removed)',
{'new_size': new_size,
'size_difference': old_size - new_size})
time.sleep(self._ttl)
def cleanup(self):

@ -103,8 +103,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
time.sleep(1)
@mock.patch.object(
zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
autospec=True
)
def test_cast_success_without_retries(self, received_ack_mock):
@ -121,7 +121,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
def test_cast_success_with_one_retry(self):
with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
'acknowledge') as lost_ack_mock:
'_acknowledge') as lost_ack_mock:
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
@ -134,8 +134,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.assertEqual(0, self.set_result.call_count)
self.listener._received.clear()
with mock.patch.object(
zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
autospec=True
) as received_ack_mock:
self.ack_manager._pool.shutdown(wait=True)
@ -146,7 +146,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
def test_cast_success_with_two_retries(self):
with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
'acknowledge') as lost_ack_mock:
'_acknowledge') as lost_ack_mock:
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
@ -164,8 +164,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.assertEqual(2, lost_ack_mock.call_count)
self.assertEqual(0, self.set_result.call_count)
with mock.patch.object(
zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
autospec=True
) as received_ack_mock:
self.ack_manager._pool.shutdown(wait=True)
@ -174,7 +174,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge')
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge')
def test_cast_failure_exhausted_retries(self, lost_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
@ -188,8 +188,8 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.assertEqual(1, self.set_result.call_count)
@mock.patch.object(
zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
zmq_incoming_message.ZmqIncomingMessage, '_acknowledge',
side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge,
autospec=True
)
@mock.patch.object(
@ -197,7 +197,13 @@ class TestZmqAckManager(test_utils.BaseTestCase):
side_effect=zmq_incoming_message.ZmqIncomingMessage.reply,
autospec=True
)
def test_call_success_without_retries(self, received_reply_mock,
@mock.patch.object(
zmq_incoming_message.ZmqIncomingMessage, '_reply_from_cache',
side_effect=zmq_incoming_message.ZmqIncomingMessage._reply_from_cache,
autospec=True
)
def test_call_success_without_retries(self, unused_reply_from_cache_mock,
received_reply_mock,
received_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=True, timeout=10
@ -210,12 +216,14 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(3, self.set_result.call_count)
received_reply_mock.assert_called_once_with(mock.ANY, reply=True)
self.assertEqual(0, unused_reply_from_cache_mock.call_count)
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge')
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge')
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'reply')
def test_call_failure_exhausted_retries_and_timeout_error(self,
lost_reply_mock,
lost_ack_mock):
@mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
'_reply_from_cache')
def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock,
lost_reply_mock, lost_ack_mock):
self.assertRaises(oslo_messaging.MessagingTimeout,
self.driver.send,
self.target, {}, self.message,
@ -227,3 +235,4 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.assertEqual(3, lost_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
lost_reply_mock.assert_called_once_with(reply=True)
self.assertEqual(2, lost_reply_from_cache_mock.call_count)