Merge "Abstract the acknowledge layer of a message"

This commit is contained in:
Jenkins 2014-02-17 16:39:24 +00:00 committed by Gerrit Code Review
commit 7473d18ebe
5 changed files with 38 additions and 11 deletions

@ -32,10 +32,12 @@ LOG = logging.getLogger(__name__)
class AMQPIncomingMessage(base.IncomingMessage): class AMQPIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, msg_id, reply_q): def __init__(self, listener, ctxt, message, msg_id, reply_q):
super(AMQPIncomingMessage, self).__init__(listener, ctxt, message) super(AMQPIncomingMessage, self).__init__(listener, ctxt,
dict(message))
self.msg_id = msg_id self.msg_id = msg_id
self.reply_q = reply_q self.reply_q = reply_q
self.acknowledge = message.acknowledge
def _send_reply(self, conn, reply=None, failure=None, def _send_reply(self, conn, reply=None, failure=None,
ending=False, log_failure=True): ending=False, log_failure=True):
@ -74,7 +76,7 @@ class AMQPListener(base.Listener):
def __call__(self, message): def __call__(self, message):
# FIXME(markmc): logging isn't driver specific # FIXME(markmc): logging isn't driver specific
rpc_common._safe_log(LOG.debug, 'received %s', message) rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
self.msg_id_cache.check_duplicate_message(message) self.msg_id_cache.check_duplicate_message(message)
ctxt = rpc_amqp.unpack_context(self.conf, message) ctxt = rpc_amqp.unpack_context(self.conf, message)
@ -88,7 +90,9 @@ class AMQPListener(base.Listener):
def poll(self): def poll(self):
while True: while True:
if self.incoming: if self.incoming:
return self.incoming.pop(0) message = self.incoming.pop(0)
message.acknowledge()
return message
self.conn.consume(limit=1) self.conn.consume(limit=1)
@ -156,6 +160,7 @@ class ReplyWaiter(object):
conn.declare_direct_consumer(reply_q, self) conn.declare_direct_consumer(reply_q, self)
def __call__(self, message): def __call__(self, message):
message.acknowledge()
self.incoming.append(message) self.incoming.append(message)
def listen(self, msg_id): def listen(self, msg_id):

@ -37,6 +37,9 @@ class IncomingMessage(object):
def reply(self, reply=None, failure=None, log_failure=True): def reply(self, reply=None, failure=None, log_failure=True):
"Send a reply or failure back to the client." "Send a reply or failure back to the client."
def acknowledge(self):
"Acknowledge the message."
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Listener(object): class Listener(object):

@ -26,7 +26,6 @@ from oslo.messaging._drivers import base
class FakeIncomingMessage(base.IncomingMessage): class FakeIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, reply_q): def __init__(self, listener, ctxt, message, reply_q):
super(FakeIncomingMessage, self).__init__(listener, ctxt, message) super(FakeIncomingMessage, self).__init__(listener, ctxt, message)
self._reply_q = reply_q self._reply_q = reply_q
@ -49,7 +48,9 @@ class FakeListener(base.Listener):
for target in self._targets: for target in self._targets:
(ctxt, message, reply_q) = self._exchange.poll(target) (ctxt, message, reply_q) = self._exchange.poll(target)
if message is not None: if message is not None:
return FakeIncomingMessage(self, ctxt, message, reply_q) message = FakeIncomingMessage(self, ctxt, message, reply_q)
message.acknowledge()
return message
time.sleep(.05) time.sleep(.05)

@ -88,6 +88,17 @@ def raise_invalid_topology_version(conf):
raise Exception(msg) raise Exception(msg)
class QpidMessage(dict):
def __init__(self, session, raw_message):
super(QpidMessage, self).__init__(
rpc_common.deserialize_msg(raw_message.content))
self._raw_message = raw_message
self._session = session
def acknowledge(self):
self._session.acknowledge(self._raw_message)
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""
@ -183,11 +194,9 @@ class ConsumerBase(object):
message = self.receiver.fetch() message = self.receiver.fetch()
try: try:
self._unpack_json_msg(message) self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content) self.callback(QpidMessage(message))
self.callback(msg)
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally:
self.session.acknowledge(message) self.session.acknowledge(message)
def get_receiver(self): def get_receiver(self):

@ -118,6 +118,16 @@ def _get_queue_arguments(conf):
return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
class RabbitMessage(dict):
def __init__(self, raw_message):
super(RabbitMessage, self).__init__(
rpc_common.deserialize_msg(raw_message.payload))
self._raw_message = raw_message
def acknowledge(self):
self._raw_message.ack()
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""
@ -151,12 +161,11 @@ class ConsumerBase(object):
""" """
try: try:
msg = rpc_common.deserialize_msg(message.payload) callback(RabbitMessage(message))
callback(msg)
except Exception: except Exception:
LOG.exception(_("Failed to process message" LOG.exception(_("Failed to process message"
" ... skipping it.")) " ... skipping it."))
message.ack() message.ack()
def consume(self, *args, **kwargs): def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will """Actually declare the consumer on the amqp channel. This will