diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 8e9813207..158248883 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -32,10 +32,12 @@ LOG = logging.getLogger(__name__) class AMQPIncomingMessage(base.IncomingMessage): def __init__(self, listener, ctxt, message, msg_id, reply_q): - super(AMQPIncomingMessage, self).__init__(listener, ctxt, message) + super(AMQPIncomingMessage, self).__init__(listener, ctxt, + dict(message)) self.msg_id = msg_id self.reply_q = reply_q + self.acknowledge = message.acknowledge def _send_reply(self, conn, reply=None, failure=None, ending=False, log_failure=True): @@ -74,7 +76,7 @@ class AMQPListener(base.Listener): def __call__(self, message): # FIXME(markmc): logging isn't driver specific - rpc_common._safe_log(LOG.debug, 'received %s', message) + rpc_common._safe_log(LOG.debug, 'received %s', dict(message)) self.msg_id_cache.check_duplicate_message(message) ctxt = rpc_amqp.unpack_context(self.conf, message) @@ -88,7 +90,9 @@ class AMQPListener(base.Listener): def poll(self): while True: if self.incoming: - return self.incoming.pop(0) + message = self.incoming.pop(0) + message.acknowledge() + return message self.conn.consume(limit=1) @@ -156,6 +160,7 @@ class ReplyWaiter(object): conn.declare_direct_consumer(reply_q, self) def __call__(self, message): + message.acknowledge() self.incoming.append(message) def listen(self, msg_id): diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 9803e1935..83b9b9ef2 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -37,6 +37,9 @@ class IncomingMessage(object): def reply(self, reply=None, failure=None, log_failure=True): "Send a reply or failure back to the client." + def acknowledge(self): + "Acknowledge the message." + @six.add_metaclass(abc.ABCMeta) class Listener(object): diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index b43d8ee9d..a588db035 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -26,7 +26,6 @@ from oslo.messaging._drivers import base class FakeIncomingMessage(base.IncomingMessage): - def __init__(self, listener, ctxt, message, reply_q): super(FakeIncomingMessage, self).__init__(listener, ctxt, message) self._reply_q = reply_q @@ -49,7 +48,9 @@ class FakeListener(base.Listener): for target in self._targets: (ctxt, message, reply_q) = self._exchange.poll(target) if message is not None: - return FakeIncomingMessage(self, ctxt, message, reply_q) + message = FakeIncomingMessage(self, ctxt, message, reply_q) + message.acknowledge() + return message time.sleep(.05) diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index de0dd7941..4ebfa60e4 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -88,6 +88,17 @@ def raise_invalid_topology_version(conf): raise Exception(msg) +class QpidMessage(dict): + def __init__(self, session, raw_message): + super(QpidMessage, self).__init__( + rpc_common.deserialize_msg(raw_message.content)) + self._raw_message = raw_message + self._session = session + + def acknowledge(self): + self._session.acknowledge(self._raw_message) + + class ConsumerBase(object): """Consumer base class.""" @@ -183,11 +194,9 @@ class ConsumerBase(object): message = self.receiver.fetch() try: self._unpack_json_msg(message) - msg = rpc_common.deserialize_msg(message.content) - self.callback(msg) + self.callback(QpidMessage(message)) except Exception: LOG.exception(_("Failed to process message... skipping it.")) - finally: self.session.acknowledge(message) def get_receiver(self): diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index d8ecac927..1744b74ac 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -118,6 +118,16 @@ def _get_queue_arguments(conf): return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} +class RabbitMessage(dict): + def __init__(self, raw_message): + super(RabbitMessage, self).__init__( + rpc_common.deserialize_msg(raw_message.payload)) + self._raw_message = raw_message + + def acknowledge(self): + self._raw_message.ack() + + class ConsumerBase(object): """Consumer base class.""" @@ -151,12 +161,11 @@ class ConsumerBase(object): """ try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) + callback(RabbitMessage(message)) except Exception: LOG.exception(_("Failed to process message" " ... skipping it.")) - message.ack() + message.ack() def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will