diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 420587c45..e95edfc2e 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase'] import logging import threading +import time import uuid import cachetools @@ -70,16 +71,13 @@ class AMQPIncomingMessage(base.IncomingMessage): # Otherwise use the msg_id for backward compatibility. if self.reply_q: msg['_msg_id'] = self.msg_id - try: - if ending: - LOG.debug("sending reply msg_id: %(msg_id)s " - "reply queue: %(reply_q)s" % { - 'msg_id': self.msg_id, - 'unique_id': unique_id, - 'reply_q': self.reply_q}) - conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) - except rpc_amqp.AMQPDestinationNotFound: - self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + if ending: + LOG.debug("sending reply msg_id: %(msg_id)s " + "reply queue: %(reply_q)s" % { + 'msg_id': self.msg_id, + 'unique_id': unique_id, + 'reply_q': self.reply_q}) + conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) else: # TODO(sileht): look at which version of oslo-incubator rpc # send need this, but I guess this is older than icehouse @@ -93,20 +91,52 @@ class AMQPIncomingMessage(base.IncomingMessage): # because reply should not be expected by caller side return - # NOTE(sileht): return without hold the a connection if possible + # NOTE(sileht): return without using a connection if possible if (self.reply_q and not self._obsolete_reply_queues.reply_q_valid(self.reply_q, self.msg_id)): return - with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: - if self.listener.driver.send_single_reply: - self._send_reply(conn, reply, failure, log_failure=log_failure, - ending=True) - else: - self._send_reply(conn, reply, failure, log_failure=log_failure) - self._send_reply(conn, ending=True) + # NOTE(sileht): we read the configuration value from the driver + # to be able to backport this change in previous version that + # still have the qpid driver + duration = self.listener.driver.missing_destination_retry_timeout + timer = rpc_common.DecayingTimer(duration=duration) + timer.start() + + first_reply_sent = False + while True: + try: + with self.listener.driver._get_connection( + rpc_common.PURPOSE_SEND) as conn: + if self.listener.driver.send_single_reply: + self._send_reply(conn, reply, failure, + log_failure=log_failure, + ending=True) + else: + if not first_reply_sent: + self._send_reply(conn, reply, failure, + log_failure=log_failure) + first_reply_sent = True + self._send_reply(conn, ending=True) + return + except rpc_amqp.AMQPDestinationNotFound: + if timer.check_return() > 0: + LOG.info(_LI("The reply %(msg_id)s cannot be sent " + "%(reply_q)s reply queue don't exist, " + "retrying...") % { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q}) + time.sleep(0.25) + else: + self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + LOG.info(_LI("The reply %(msg_id)s cannot be sent " + "%(reply_q)s reply queue don't exist after " + "%(duration)s sec abandoning...") % { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q, + 'duration': duration}) + return def acknowledge(self): self.acknowledge_callback() @@ -345,6 +375,7 @@ class ReplyWaiter(object): class AMQPDriverBase(base.BaseDriver): + missing_destination_retry_timeout = 0 def __init__(self, conf, url, connection_pool, default_exchange=None, allowed_remote_exmods=None, diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index d96a39838..cdd642ea6 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1043,32 +1043,20 @@ class Connection(object): self._publish(exchange, msg, routing_key=routing_key, timeout=timeout) - def _publish_and_retry_on_missing_exchange(self, exchange, msg, - routing_key=None, timeout=None): - """Publisher that retry if the exchange is missing. - """ - + def _publish_and_raises_on_missing_exchange(self, exchange, msg, + routing_key=None, + timeout=None): + """Publisher that raises exception if exchange is missing.""" if not exchange.passive: raise RuntimeError("_publish_and_retry_on_missing_exchange() must " "be called with an passive exchange.") - # TODO(sileht): use @retrying - # NOTE(sileht): no need to wait the application expect a response - # before timeout is exshauted - duration = ( - timeout if timeout is not None - else self.kombu_reconnect_timeout - ) - - timer = rpc_common.DecayingTimer(duration=duration) - timer.start() - - while True: - try: - self._publish(exchange, msg, routing_key=routing_key, - timeout=timeout) - return - except self.connection.channel_errors as exc: + try: + self._publish(exchange, msg, routing_key=routing_key, + timeout=timeout) + return + except self.connection.channel_errors as exc: + if exc.code == 404: # NOTE(noelbk/sileht): # If rabbit dies, the consumer can be disconnected before the # publisher sends, and if the consumer hasn't declared the @@ -1077,24 +1065,9 @@ class Connection(object): # So we set passive=True to the publisher exchange and catch # the 404 kombu ChannelError and retry until the exchange # appears - if exc.code == 404 and timer.check_return() > 0: - LOG.info(_LI("The exchange %(exchange)s to send to " - "%(routing_key)s doesn't exist yet, " - "retrying...") % { - 'exchange': exchange.name, - 'routing_key': routing_key}) - time.sleep(0.25) - continue - elif exc.code == 404: - msg = _("The exchange %(exchange)s to send to " - "%(routing_key)s still doesn't exist after " - "%(duration)s sec abandoning...") % { - 'duration': duration, - 'exchange': exchange.name, - 'routing_key': routing_key} - LOG.info(msg) - raise rpc_amqp.AMQPDestinationNotFound(msg) - raise + raise rpc_amqp.AMQPDestinationNotFound( + "exchange %s doesn't exists" % exchange.name) + raise def direct_send(self, msg_id, msg): """Send a 'direct' message.""" @@ -1104,7 +1077,7 @@ class Connection(object): auto_delete=True, passive=True) - self._ensure_publishing(self._publish_and_retry_on_missing_exchange, + self._ensure_publishing(self._publish_and_raises_on_missing_exchange, exchange, msg, routing_key=msg_id) def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): @@ -1160,6 +1133,9 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) conf.register_opts(base.base_opts, group=opt_group) + self.missing_destination_retry_timeout = ( + conf.oslo_messaging_rabbit.kombu_reconnect_timeout) + connection_pool = pool.ConnectionPool( conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, url, Connection)