diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 578f008a6..5e6381dd5 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -43,7 +43,6 @@ class ConsumerBase(object): def stop(self): """Stop consumer polling/updates""" - pass @abc.abstractmethod def receive_message(self, target): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index f3661af85..8ff3d7897 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -18,6 +18,7 @@ import uuid import six from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager from oslo_messaging._drivers.zmq_driver.server.consumers \ @@ -38,11 +39,7 @@ zmq = zmq_async.import_zmq() class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): - self.ack_sender = zmq_senders.AckSenderProxy(conf) self.reply_sender = zmq_senders.ReplySenderProxy(conf) - self.messages_cache = zmq_ttl_cache.TTLCache( - ttl=conf.oslo_messaging_zmq.rpc_message_ttl - ) self.sockets_manager = zmq_sockets_manager.SocketsManager( conf, server.matchmaker, zmq.ROUTER, zmq.DEALER) self.host = None @@ -68,77 +65,117 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): LOG.error(_LE("Failed connecting to ROUTER socket %(e)s") % e) raise rpc_common.RPCException(str(e)) - def _receive_request(self, socket): - empty = socket.recv() - assert empty == b'', 'Bad format: empty delimiter expected' - reply_id = socket.recv() - msg_type = int(socket.recv()) - message_id = socket.recv_string() - context, message = socket.recv_loaded() - return reply_id, msg_type, message_id, context, message + def _reply(self, rpc_message, reply, failure): + if failure is not None: + failure = rpc_common.serialize_remote_exception(failure) + reply = zmq_response.Reply(message_id=rpc_message.message_id, + reply_id=rpc_message.reply_id, + reply_body=reply, + failure=failure) + self.reply_sender.send(rpc_message.socket, reply) + return reply + + def _create_message(self, context, message, reply_id, message_id, socket, + message_type): + if message_type == zmq_names.CALL_TYPE: + message = zmq_incoming_message.ZmqIncomingMessage( + context, message, reply_id=reply_id, message_id=message_id, + socket=socket, reply_method=self._reply + ) + else: + message = zmq_incoming_message.ZmqIncomingMessage(context, message) + + LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s", + {"host": self.host, + "msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id}) + return message def receive_message(self, socket): try: - reply_id, msg_type, message_id, context, message = \ - self._receive_request(socket) + empty = socket.recv() + assert empty == b'', "Empty delimiter expected!" + reply_id = socket.recv() + assert reply_id != b'', "Valid reply id expected!" + message_type = int(socket.recv()) + assert message_type in zmq_names.REQUEST_TYPES, \ + "Request message type expected!" + message_id = socket.recv_string() + assert message_id != '', "Valid message id expected!" + context, message = socket.recv_loaded() - if msg_type == zmq_names.CALL_TYPE or \ - msg_type in zmq_names.NON_BLOCKING_TYPES: - - ack_sender = self.ack_sender \ - if self.conf.oslo_messaging_zmq.rpc_use_acks else None - reply_sender = self.reply_sender \ - if msg_type == zmq_names.CALL_TYPE else None - - message = zmq_incoming_message.ZmqIncomingMessage( - context, message, reply_id, message_id, socket, - ack_sender, reply_sender, self.messages_cache - ) - - # drop a duplicate message - if message_id in self.messages_cache: - LOG.warning( - _LW("[%(host)s] Dropping duplicate %(msg_type)s " - "message %(msg_id)s"), - {"host": self.host, - "msg_type": zmq_names.message_type_str(msg_type), - "msg_id": message_id} - ) - # NOTE(gdavoian): send yet another ack for the non-CALL - # message, since the old one might be lost; - # for the CALL message also try to resend its reply - # (of course, if it was already obtained and cached). - message._acknowledge() - if msg_type == zmq_names.CALL_TYPE: - message._reply_from_cache() - return None - - self.messages_cache.add(message_id) - LOG.debug( - "[%(host)s] Received %(msg_type)s message %(msg_id)s", - {"host": self.host, - "msg_type": zmq_names.message_type_str(msg_type), - "msg_id": message_id} - ) - # NOTE(gdavoian): send an immediate ack, since it may - # be too late to wait until the message will be - # dispatched and processed by a RPC server - message._acknowledge() - return message - - else: - LOG.error(_LE("Unknown message type: %s"), - zmq_names.message_type_str(msg_type)) + return self._create_message(context, message, reply_id, + message_id, socket, message_type) except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failure: %s"), str(e)) def cleanup(self): LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host) - self.messages_cache.cleanup() self.connection_updater.cleanup() super(DealerConsumer, self).cleanup() +class DealerConsumerWithAcks(DealerConsumer): + + def __init__(self, conf, poller, server): + super(DealerConsumerWithAcks, self).__init__(conf, poller, server) + self.ack_sender = zmq_senders.AckSenderProxy(conf) + self.messages_cache = zmq_ttl_cache.TTLCache( + ttl=conf.oslo_messaging_zmq.rpc_message_ttl + ) + + def _acknowledge(self, reply_id, message_id, socket): + ack = zmq_response.Ack(message_id=message_id, + reply_id=reply_id) + self.ack_sender.send(socket, ack) + + def _reply(self, rpc_message, reply, failure): + reply = super(DealerConsumerWithAcks, self)._reply(rpc_message, + reply, failure) + self.messages_cache.add(rpc_message.message_id, reply) + return reply + + def _reply_from_cache(self, message_id, socket): + reply = self.messages_cache.get(message_id) + if reply is not None: + self.reply_sender.send(socket, reply) + + def _create_message(self, context, message, reply_id, message_id, socket, + message_type): + # drop a duplicate message + if message_id in self.messages_cache: + LOG.warning( + _LW("[%(host)s] Dropping duplicate %(msg_type)s " + "message %(msg_id)s"), + {"host": self.host, + "msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id} + ) + # NOTE(gdavoian): send yet another ack for the non-CALL + # message, since the old one might be lost; + # for the CALL message also try to resend its reply + # (of course, if it was already obtained and cached). + self._acknowledge(reply_id, message_id, socket) + if message_type == zmq_names.CALL_TYPE: + self._reply_from_cache(message_id, socket) + return None + + self.messages_cache.add(message_id) + + # NOTE(gdavoian): send an immediate ack, since it may + # be too late to wait until the message will be + # dispatched and processed by a RPC server + self._acknowledge(reply_id, message_id, socket) + + return super(DealerConsumerWithAcks, self)._create_message( + context, message, reply_id, message_id, socket, message_type + ) + + def cleanup(self): + self.messages_cache.cleanup() + super(DealerConsumerWithAcks, self).cleanup() + + class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater): def _update_connection(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index e4836342c..80317f54e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,6 +14,8 @@ import logging +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver.server.consumers \ import zmq_consumer_base @@ -30,43 +32,51 @@ zmq = zmq_async.import_zmq() class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): - self.ack_sender = zmq_senders.AckSenderDirect(conf) self.reply_sender = zmq_senders.ReplySenderDirect(conf) super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER) LOG.info(_LI("[%s] Run ROUTER consumer"), self.host) - def _receive_request(self, socket): - reply_id = socket.recv() - empty = socket.recv() - assert empty == b'', 'Bad format: empty delimiter expected' - msg_type = int(socket.recv()) - message_id = socket.recv_string() - context, message = socket.recv_loaded() - return reply_id, msg_type, message_id, context, message + def _reply(self, rpc_message, reply, failure): + if failure is not None: + failure = rpc_common.serialize_remote_exception(failure) + reply = zmq_response.Reply(message_id=rpc_message.message_id, + reply_id=rpc_message.reply_id, + reply_body=reply, + failure=failure) + self.reply_sender.send(rpc_message.socket, reply) + return reply + + def _create_message(self, context, message, reply_id, message_id, socket, + message_type): + if message_type == zmq_names.CALL_TYPE: + message = zmq_incoming_message.ZmqIncomingMessage( + context, message, reply_id=reply_id, message_id=message_id, + socket=socket, reply_method=self._reply + ) + else: + message = zmq_incoming_message.ZmqIncomingMessage(context, message) + + LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s", + {"host": self.host, + "msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id}) + return message def receive_message(self, socket): try: - reply_id, msg_type, message_id, context, message = \ - self._receive_request(socket) + reply_id = socket.recv() + assert reply_id != b'', "Valid reply id expected!" + empty = socket.recv() + assert empty == b'', "Empty delimiter expected!" + message_type = int(socket.recv()) + assert message_type in zmq_names.REQUEST_TYPES, \ + "Request message type expected!" + message_id = socket.recv_string() + assert message_id != '', "Valid message id expected!" + context, message = socket.recv_loaded() - LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s", - {"host": self.host, - "msg_type": zmq_names.message_type_str(msg_type), - "msg_id": message_id}) - - if msg_type == zmq_names.CALL_TYPE or \ - msg_type in zmq_names.NON_BLOCKING_TYPES: - ack_sender = self.ack_sender \ - if self.conf.oslo_messaging_zmq.rpc_use_acks else None - reply_sender = self.reply_sender \ - if msg_type == zmq_names.CALL_TYPE else None - return zmq_incoming_message.ZmqIncomingMessage( - context, message, reply_id, message_id, socket, - ack_sender, reply_sender - ) - else: - LOG.error(_LE("Unknown message type: %s"), - zmq_names.message_type_str(msg_type)) + return self._create_message(context, message, reply_id, + message_id, socket, message_type) except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 50c85d9c4..7ab09394a 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,68 +12,29 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +import six from oslo_messaging._drivers import base -from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.client import zmq_response -from oslo_messaging._drivers.zmq_driver import zmq_async - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() class ZmqIncomingMessage(base.RpcIncomingMessage): """Base class for RPC-messages via ZMQ-driver. - Each message may send either acks/replies or just nothing - (if acks are disabled and replies are not supported). + Behaviour of messages is fully defined by consumers + which produced them from obtained raw data. """ - def __init__(self, context, message, reply_id=None, message_id=None, - socket=None, ack_sender=None, reply_sender=None, - replies_cache=None): - - if ack_sender is not None or reply_sender is not None: - assert socket is not None, "Valid socket expected!" - assert message_id is not None, "Valid message ID expected!" - assert reply_id is not None, "Valid reply ID expected!" - + def __init__(self, context, message, **kwargs): super(ZmqIncomingMessage, self).__init__(context, message) - - self.reply_id = reply_id - self.message_id = message_id - self.socket = socket - self.ack_sender = ack_sender - self.reply_sender = reply_sender - self.replies_cache = replies_cache + self._reply_method = kwargs.pop('reply_method', + lambda self, reply, failure: None) + for key, value in six.iteritems(kwargs): + setattr(self, key, value) def acknowledge(self): - """Acknowledge is not supported publicly (used only internally).""" - - def _acknowledge(self): - if self.ack_sender is not None: - ack = zmq_response.Ack(message_id=self.message_id, - reply_id=self.reply_id) - self.ack_sender.send(self.socket, ack) + """Acknowledge is not supported.""" def reply(self, reply=None, failure=None): - if self.reply_sender is not None: - if failure is not None: - failure = rpc_common.serialize_remote_exception(failure) - reply = zmq_response.Reply(message_id=self.message_id, - reply_id=self.reply_id, - reply_body=reply, - failure=failure) - self.reply_sender.send(self.socket, reply) - if self.replies_cache is not None: - self.replies_cache.add(self.message_id, reply) - - def _reply_from_cache(self): - if self.reply_sender is not None and self.replies_cache is not None: - reply = self.replies_cache.get(self.message_id) - if reply is not None: - self.reply_sender.send(self.socket, reply) + self._reply_method(self, reply=reply, failure=failure) def requeue(self): """Requeue is not supported.""" diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index fca7495f8..f62abf853 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -44,14 +44,20 @@ class ZmqServer(base.PollStyleListener): {'host': self.conf.oslo_messaging_zmq.rpc_zmq_host, 'target': self.target}) - self.router_consumer = zmq_router_consumer.RouterConsumer( - conf, self.poller, self) \ - if not conf.oslo_messaging_zmq.use_router_proxy else None - self.dealer_consumer = zmq_dealer_consumer.DealerConsumer( - conf, self.poller, self) \ - if conf.oslo_messaging_zmq.use_router_proxy else None - self.sub_consumer = zmq_sub_consumer.SubConsumer( - conf, self.poller, self) \ + if conf.oslo_messaging_zmq.use_router_proxy: + self.router_consumer = None + dealer_consumer_cls = \ + zmq_dealer_consumer.DealerConsumerWithAcks \ + if conf.oslo_messaging_zmq.rpc_use_acks else \ + zmq_dealer_consumer.DealerConsumer + self.dealer_consumer = dealer_consumer_cls(conf, self.poller, self) + else: + self.router_consumer = \ + zmq_router_consumer.RouterConsumer(conf, self.poller, self) + self.dealer_consumer = None + + self.sub_consumer = \ + zmq_sub_consumer.SubConsumer(conf, self.poller, self) \ if conf.oslo_messaging_zmq.use_pub_sub else None self.consumers = [] diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 83361a2d6..31b208031 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -48,7 +48,7 @@ RESPONSE_TYPES = (REPLY_TYPE, ACK_TYPE) MESSAGE_TYPES = REQUEST_TYPES + RESPONSE_TYPES MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE) -DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE, ACK_TYPE) +DIRECT_TYPES = (CALL_TYPE, CAST_TYPE) + RESPONSE_TYPES CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) NOTIFY_TYPES = (NOTIFY_TYPE,) NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES @@ -73,4 +73,4 @@ def message_type_str(message_type): NOTIFY_TYPE: "NOTIFY", REPLY_TYPE: "REPLY", ACK_TYPE: "ACK"} - return msg_type_str[message_type] + return msg_type_str.get(message_type, "UNKNOWN") diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py index d1b45acca..800f9ed5f 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py @@ -20,7 +20,8 @@ import oslo_messaging from oslo_messaging._drivers.zmq_driver.client import zmq_receivers from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message +from oslo_messaging._drivers.zmq_driver.server.consumers.zmq_dealer_consumer \ + import DealerConsumerWithAcks from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging.tests.drivers.zmq import zmq_common @@ -100,11 +101,9 @@ class TestZmqAckManager(test_utils.BaseTestCase): # and all parties to be ready for messaging time.sleep(1) - @mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, - autospec=True - ) + @mock.patch.object(DealerConsumerWithAcks, '_acknowledge', + side_effect=DealerConsumerWithAcks._acknowledge, + autospec=True) def test_cast_success_without_retries(self, received_ack_mock): result = self.driver.send( self.target, {}, self.message, wait_for_reply=False @@ -118,7 +117,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(2, self.set_result.call_count) def test_cast_success_with_one_retry(self): - with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, + with mock.patch.object(DealerConsumerWithAcks, '_acknowledge') as lost_ack_mock: result = self.driver.send( self.target, {}, self.message, wait_for_reply=False @@ -131,11 +130,9 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(1, lost_ack_mock.call_count) self.assertEqual(0, self.set_result.call_count) self.listener._received.clear() - with mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, - autospec=True - ) as received_ack_mock: + with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', + side_effect=DealerConsumerWithAcks._acknowledge, + autospec=True) as received_ack_mock: self.ack_manager._pool.shutdown(wait=True) self.assertFalse(self.listener._received.isSet()) self.assertEqual(2, self.send.call_count) @@ -143,7 +140,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(2, self.set_result.call_count) def test_cast_success_with_two_retries(self): - with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, + with mock.patch.object(DealerConsumerWithAcks, '_acknowledge') as lost_ack_mock: result = self.driver.send( self.target, {}, self.message, wait_for_reply=False @@ -161,18 +158,16 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(2, self.send.call_count) self.assertEqual(2, lost_ack_mock.call_count) self.assertEqual(0, self.set_result.call_count) - with mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, - autospec=True - ) as received_ack_mock: + with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', + side_effect=DealerConsumerWithAcks._acknowledge, + autospec=True) as received_ack_mock: self.ack_manager._pool.shutdown(wait=True) self.assertFalse(self.listener._received.isSet()) self.assertEqual(3, self.send.call_count) self.assertEqual(1, received_ack_mock.call_count) self.assertEqual(2, self.set_result.call_count) - @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge') + @mock.patch.object(DealerConsumerWithAcks, '_acknowledge') def test_cast_failure_exhausted_retries(self, lost_ack_mock): result = self.driver.send( self.target, {}, self.message, wait_for_reply=False @@ -185,21 +180,15 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(3, lost_ack_mock.call_count) self.assertEqual(1, self.set_result.call_count) - @mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, '_acknowledge', - side_effect=zmq_incoming_message.ZmqIncomingMessage._acknowledge, - autospec=True - ) - @mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, 'reply', - side_effect=zmq_incoming_message.ZmqIncomingMessage.reply, - autospec=True - ) - @mock.patch.object( - zmq_incoming_message.ZmqIncomingMessage, '_reply_from_cache', - side_effect=zmq_incoming_message.ZmqIncomingMessage._reply_from_cache, - autospec=True - ) + @mock.patch.object(DealerConsumerWithAcks, '_acknowledge', + side_effect=DealerConsumerWithAcks._acknowledge, + autospec=True) + @mock.patch.object(DealerConsumerWithAcks, '_reply', + side_effect=DealerConsumerWithAcks._reply, + autospec=True) + @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache', + side_effect=DealerConsumerWithAcks._reply_from_cache, + autospec=True) def test_call_success_without_retries(self, unused_reply_from_cache_mock, received_reply_mock, received_ack_mock): @@ -213,13 +202,13 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(1, self.send.call_count) self.assertEqual(1, received_ack_mock.call_count) self.assertEqual(3, self.set_result.call_count) - received_reply_mock.assert_called_once_with(mock.ANY, reply=True) + received_reply_mock.assert_called_once_with(mock.ANY, mock.ANY, + reply=True, failure=None) self.assertEqual(0, unused_reply_from_cache_mock.call_count) - @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, '_acknowledge') - @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'reply') - @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, - '_reply_from_cache') + @mock.patch.object(DealerConsumerWithAcks, '_acknowledge') + @mock.patch.object(DealerConsumerWithAcks, '_reply') + @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache') def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock, lost_reply_mock, lost_ack_mock): self.assertRaises(oslo_messaging.MessagingTimeout, @@ -232,5 +221,6 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.assertEqual(3, self.send.call_count) self.assertEqual(3, lost_ack_mock.call_count) self.assertEqual(2, self.set_result.call_count) - lost_reply_mock.assert_called_once_with(reply=True) + lost_reply_mock.assert_called_once_with(mock.ANY, + reply=True, failure=None) self.assertEqual(2, lost_reply_from_cache_mock.call_count)