Merge "Don't hold the connection when reply fail"
This commit is contained in:
commit
d1e2fb3be6
oslo_messaging/_drivers
@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import cachetools
|
||||
@ -70,16 +71,13 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
# Otherwise use the msg_id for backward compatibility.
|
||||
if self.reply_q:
|
||||
msg['_msg_id'] = self.msg_id
|
||||
try:
|
||||
if ending:
|
||||
LOG.debug("sending reply msg_id: %(msg_id)s "
|
||||
"reply queue: %(reply_q)s" % {
|
||||
'msg_id': self.msg_id,
|
||||
'unique_id': unique_id,
|
||||
'reply_q': self.reply_q})
|
||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||
if ending:
|
||||
LOG.debug("sending reply msg_id: %(msg_id)s "
|
||||
"reply queue: %(reply_q)s" % {
|
||||
'msg_id': self.msg_id,
|
||||
'unique_id': unique_id,
|
||||
'reply_q': self.reply_q})
|
||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||
else:
|
||||
# TODO(sileht): look at which version of oslo-incubator rpc
|
||||
# send need this, but I guess this is older than icehouse
|
||||
@ -93,20 +91,52 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
# because reply should not be expected by caller side
|
||||
return
|
||||
|
||||
# NOTE(sileht): return without hold the a connection if possible
|
||||
# NOTE(sileht): return without using a connection if possible
|
||||
if (self.reply_q and
|
||||
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
|
||||
self.msg_id)):
|
||||
return
|
||||
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
if self.listener.driver.send_single_reply:
|
||||
self._send_reply(conn, reply, failure, log_failure=log_failure,
|
||||
ending=True)
|
||||
else:
|
||||
self._send_reply(conn, reply, failure, log_failure=log_failure)
|
||||
self._send_reply(conn, ending=True)
|
||||
# NOTE(sileht): we read the configuration value from the driver
|
||||
# to be able to backport this change in previous version that
|
||||
# still have the qpid driver
|
||||
duration = self.listener.driver.missing_destination_retry_timeout
|
||||
timer = rpc_common.DecayingTimer(duration=duration)
|
||||
timer.start()
|
||||
|
||||
first_reply_sent = False
|
||||
while True:
|
||||
try:
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
if self.listener.driver.send_single_reply:
|
||||
self._send_reply(conn, reply, failure,
|
||||
log_failure=log_failure,
|
||||
ending=True)
|
||||
else:
|
||||
if not first_reply_sent:
|
||||
self._send_reply(conn, reply, failure,
|
||||
log_failure=log_failure)
|
||||
first_reply_sent = True
|
||||
self._send_reply(conn, ending=True)
|
||||
return
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
if timer.check_return() > 0:
|
||||
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
|
||||
"%(reply_q)s reply queue don't exist, "
|
||||
"retrying...") % {
|
||||
'msg_id': self.msg_id,
|
||||
'reply_q': self.reply_q})
|
||||
time.sleep(0.25)
|
||||
else:
|
||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
|
||||
"%(reply_q)s reply queue don't exist after "
|
||||
"%(duration)s sec abandoning...") % {
|
||||
'msg_id': self.msg_id,
|
||||
'reply_q': self.reply_q,
|
||||
'duration': duration})
|
||||
return
|
||||
|
||||
def acknowledge(self):
|
||||
self.acknowledge_callback()
|
||||
@ -345,6 +375,7 @@ class ReplyWaiter(object):
|
||||
|
||||
|
||||
class AMQPDriverBase(base.BaseDriver):
|
||||
missing_destination_retry_timeout = 0
|
||||
|
||||
def __init__(self, conf, url, connection_pool,
|
||||
default_exchange=None, allowed_remote_exmods=None,
|
||||
|
@ -1043,32 +1043,20 @@ class Connection(object):
|
||||
|
||||
self._publish(exchange, msg, routing_key=routing_key, timeout=timeout)
|
||||
|
||||
def _publish_and_retry_on_missing_exchange(self, exchange, msg,
|
||||
routing_key=None, timeout=None):
|
||||
"""Publisher that retry if the exchange is missing.
|
||||
"""
|
||||
|
||||
def _publish_and_raises_on_missing_exchange(self, exchange, msg,
|
||||
routing_key=None,
|
||||
timeout=None):
|
||||
"""Publisher that raises exception if exchange is missing."""
|
||||
if not exchange.passive:
|
||||
raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
|
||||
"be called with an passive exchange.")
|
||||
|
||||
# TODO(sileht): use @retrying
|
||||
# NOTE(sileht): no need to wait the application expect a response
|
||||
# before timeout is exshauted
|
||||
duration = (
|
||||
timeout if timeout is not None
|
||||
else self.kombu_reconnect_timeout
|
||||
)
|
||||
|
||||
timer = rpc_common.DecayingTimer(duration=duration)
|
||||
timer.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
self._publish(exchange, msg, routing_key=routing_key,
|
||||
timeout=timeout)
|
||||
return
|
||||
except self.connection.channel_errors as exc:
|
||||
try:
|
||||
self._publish(exchange, msg, routing_key=routing_key,
|
||||
timeout=timeout)
|
||||
return
|
||||
except self.connection.channel_errors as exc:
|
||||
if exc.code == 404:
|
||||
# NOTE(noelbk/sileht):
|
||||
# If rabbit dies, the consumer can be disconnected before the
|
||||
# publisher sends, and if the consumer hasn't declared the
|
||||
@ -1077,24 +1065,9 @@ class Connection(object):
|
||||
# So we set passive=True to the publisher exchange and catch
|
||||
# the 404 kombu ChannelError and retry until the exchange
|
||||
# appears
|
||||
if exc.code == 404 and timer.check_return() > 0:
|
||||
LOG.info(_LI("The exchange %(exchange)s to send to "
|
||||
"%(routing_key)s doesn't exist yet, "
|
||||
"retrying...") % {
|
||||
'exchange': exchange.name,
|
||||
'routing_key': routing_key})
|
||||
time.sleep(0.25)
|
||||
continue
|
||||
elif exc.code == 404:
|
||||
msg = _("The exchange %(exchange)s to send to "
|
||||
"%(routing_key)s still doesn't exist after "
|
||||
"%(duration)s sec abandoning...") % {
|
||||
'duration': duration,
|
||||
'exchange': exchange.name,
|
||||
'routing_key': routing_key}
|
||||
LOG.info(msg)
|
||||
raise rpc_amqp.AMQPDestinationNotFound(msg)
|
||||
raise
|
||||
raise rpc_amqp.AMQPDestinationNotFound(
|
||||
"exchange %s doesn't exists" % exchange.name)
|
||||
raise
|
||||
|
||||
def direct_send(self, msg_id, msg):
|
||||
"""Send a 'direct' message."""
|
||||
@ -1104,7 +1077,7 @@ class Connection(object):
|
||||
auto_delete=True,
|
||||
passive=True)
|
||||
|
||||
self._ensure_publishing(self._publish_and_retry_on_missing_exchange,
|
||||
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
|
||||
exchange, msg, routing_key=msg_id)
|
||||
|
||||
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
|
||||
@ -1160,6 +1133,9 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
||||
conf.register_opts(base.base_opts, group=opt_group)
|
||||
|
||||
self.missing_destination_retry_timeout = (
|
||||
conf.oslo_messaging_rabbit.kombu_reconnect_timeout)
|
||||
|
||||
connection_pool = pool.ConnectionPool(
|
||||
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
|
||||
url, Connection)
|
||||
|
Loading…
x
Reference in New Issue
Block a user