Correctly handle missing RabbitMQ queues
Currently, setting the '[oslo_messaging] direct_mandatory_flag' config option to 'True' (the default) will result in a 'MessageUndeliverable' exception being raised when sending a reply if a RabbitMQ queue is missing [1]. It was the responsibility of the application to handle this exception, however, many applications are not doing so. This has resulted in a number of bug reports. Start handling this error condition, using a retry loop to attempt to resend the message and work around any temporary glitches. Since attempting to send a reply will will no longer raise an exception, there is little benefit in retaining the '[oslo_messaging] direct_mandatory_flag' config option: users setting this to False will simply not benefit from the retry logic and improved logging added here. This option is already deprecated though and will be fully removed in a future release. [1] https://www.rabbitmq.com/channels.html Change-Id: Id5cddbefbe24ef100f1cc522f44430df77d217cb Closes-Bug: #1905965
This commit is contained in:
parent
e18553f505
commit
4937949dff
@ -66,7 +66,8 @@ flag is used`_.
|
|||||||
through the *Connection* class.
|
through the *Connection* class.
|
||||||
|
|
||||||
With mandatory flag RabbitMQ raises a callback if the message is not routed to
|
With mandatory flag RabbitMQ raises a callback if the message is not routed to
|
||||||
any queue.
|
any queue. This callback will be used to loop for a timeout and let's a chance
|
||||||
|
to sender to recover.
|
||||||
|
|
||||||
.. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
|
.. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
|
||||||
.. _queues: https://www.rabbitmq.com/queues.html
|
.. _queues: https://www.rabbitmq.com/queues.html
|
||||||
|
@ -145,39 +145,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with self.listener.driver._get_connection(
|
with self.listener.driver._get_connection(
|
||||||
rpc_common.PURPOSE_SEND) as conn:
|
rpc_common.PURPOSE_SEND,
|
||||||
|
) as conn:
|
||||||
self._send_reply(conn, reply, failure)
|
self._send_reply(conn, reply, failure)
|
||||||
|
|
||||||
return
|
return
|
||||||
except rpc_amqp.AMQPDestinationNotFound:
|
except oslo_messaging.MessageUndeliverable:
|
||||||
if timer.check_return() > 0:
|
# queue not found
|
||||||
LOG.debug(("The reply %(msg_id)s cannot be sent "
|
if timer.check_return() <= 0:
|
||||||
"%(reply_q)s reply queue doesn't exist, "
|
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||||
"retrying..."), {
|
LOG.error(
|
||||||
|
'The reply %(msg_id)s failed to send after '
|
||||||
|
'%(duration)d seconds due to a missing queue '
|
||||||
|
'(%(reply_q)s). Abandoning...', {
|
||||||
|
'msg_id': self.msg_id,
|
||||||
|
'duration': duration,
|
||||||
|
'reply_q': self.reply_q})
|
||||||
|
return
|
||||||
|
|
||||||
|
LOG.debug(
|
||||||
|
'The reply %(msg_id)s could not be sent due to a missing '
|
||||||
|
'queue (%(reply_q)s). Retrying...', {
|
||||||
'msg_id': self.msg_id,
|
'msg_id': self.msg_id,
|
||||||
'reply_q': self.reply_q})
|
'reply_q': self.reply_q})
|
||||||
time.sleep(0.25)
|
time.sleep(0.25)
|
||||||
else:
|
except rpc_amqp.AMQPDestinationNotFound as exc:
|
||||||
|
# exchange not found/down
|
||||||
|
if timer.check_return() <= 0:
|
||||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||||
infos = {
|
LOG.error(
|
||||||
|
'The reply %(msg_id)s failed to send after '
|
||||||
|
'%(duration)d seconds due to a broker issue '
|
||||||
|
'(%(exc)s). Abandoning...', {
|
||||||
'msg_id': self.msg_id,
|
'msg_id': self.msg_id,
|
||||||
'reply_q': self.reply_q,
|
'duration': duration,
|
||||||
'duration': duration
|
'exc': exc})
|
||||||
}
|
|
||||||
LOG.info("The reply %(msg_id)s cannot be sent "
|
|
||||||
"%(reply_q)s reply queue don't exist after "
|
|
||||||
"%(duration)s sec abandoning...", infos)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
LOG.debug(
|
||||||
|
'The reply %(msg_id)s could not be sent due to a broker '
|
||||||
|
'issue (%(exc)s). Retrying...', {
|
||||||
|
'msg_id': self.msg_id,
|
||||||
|
'exc': exc})
|
||||||
|
time.sleep(0.25)
|
||||||
|
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
# generate a keep alive for RPC call monitoring
|
# generate a keep alive for RPC call monitoring
|
||||||
with self.listener.driver._get_connection(
|
with self.listener.driver._get_connection(
|
||||||
rpc_common.PURPOSE_SEND) as conn:
|
rpc_common.PURPOSE_SEND,
|
||||||
|
) as conn:
|
||||||
try:
|
try:
|
||||||
self._send_reply(conn, None, None, ending=False)
|
self._send_reply(conn, None, None, ending=False)
|
||||||
except rpc_amqp.AMQPDestinationNotFound:
|
except oslo_messaging.MessageUndeliverable:
|
||||||
# internal exception that indicates queue/exchange gone -
|
# internal exception that indicates queue gone -
|
||||||
# broker unreachable.
|
# broker unreachable.
|
||||||
raise MessageDeliveryFailure("Heartbeat send failed")
|
raise MessageDeliveryFailure(
|
||||||
|
"Heartbeat send failed. Missing queue")
|
||||||
|
except rpc_amqp.AMQPDestinationNotFound:
|
||||||
|
# internal exception that indicates exchange gone -
|
||||||
|
# broker unreachable.
|
||||||
|
raise MessageDeliveryFailure(
|
||||||
|
"Heartbeat send failed. Missing exchange")
|
||||||
|
|
||||||
# NOTE(sileht): Those have already be ack in RpcListener IO thread
|
# NOTE(sileht): Those have already be ack in RpcListener IO thread
|
||||||
# We keep them as noop until all drivers do the same
|
# We keep them as noop until all drivers do the same
|
||||||
|
@ -177,6 +177,8 @@ rabbit_opts = [
|
|||||||
'flag for direct send. The direct send is used as reply, '
|
'flag for direct send. The direct send is used as reply, '
|
||||||
'so the MessageUndeliverable exception is raised '
|
'so the MessageUndeliverable exception is raised '
|
||||||
'in case the client queue does not exist.'
|
'in case the client queue does not exist.'
|
||||||
|
'MessageUndeliverable exception will be used to loop for a '
|
||||||
|
'timeout to lets a chance to sender to recover.'
|
||||||
'This flag is deprecated and it will not be possible to '
|
'This flag is deprecated and it will not be possible to '
|
||||||
'deactivate this functionality anymore'),
|
'deactivate this functionality anymore'),
|
||||||
cfg.BoolOpt('enable_cancel_on_failover',
|
cfg.BoolOpt('enable_cancel_on_failover',
|
||||||
@ -516,6 +518,7 @@ class Connection(object):
|
|||||||
# if it was already monkey patched by eventlet/greenlet.
|
# if it was already monkey patched by eventlet/greenlet.
|
||||||
global threading
|
global threading
|
||||||
threading = stdlib_threading
|
threading = stdlib_threading
|
||||||
|
|
||||||
self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
|
self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
|
||||||
|
|
||||||
if self.ssl:
|
if self.ssl:
|
||||||
|
@ -0,0 +1,5 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Adding retry strategy based on the mandatory flag. Missing exchanges and
|
||||||
|
queues are now identified separately for logging purposes.
|
Loading…
Reference in New Issue
Block a user