diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 420587c45..e95edfc2e 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
 
 import logging
 import threading
+import time
 import uuid
 
 import cachetools
@@ -70,16 +71,13 @@ class AMQPIncomingMessage(base.IncomingMessage):
         # Otherwise use the msg_id for backward compatibility.
         if self.reply_q:
             msg['_msg_id'] = self.msg_id
-            try:
-                if ending:
-                    LOG.debug("sending reply msg_id: %(msg_id)s "
-                              "reply queue: %(reply_q)s" % {
-                                  'msg_id': self.msg_id,
-                                  'unique_id': unique_id,
-                                  'reply_q': self.reply_q})
-                conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
-            except rpc_amqp.AMQPDestinationNotFound:
-                self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
+            if ending:
+                LOG.debug("sending reply msg_id: %(msg_id)s "
+                          "reply queue: %(reply_q)s" % {
+                              'msg_id': self.msg_id,
+                              'unique_id': unique_id,
+                              'reply_q': self.reply_q})
+            conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
         else:
             # TODO(sileht): look at which version of oslo-incubator rpc
             # send need this, but I guess this is older than icehouse
@@ -93,20 +91,52 @@ class AMQPIncomingMessage(base.IncomingMessage):
             #    because reply should not be expected by caller side
             return
 
-        # NOTE(sileht): return without hold the a connection if possible
+        # NOTE(sileht): return without using a connection if possible
         if (self.reply_q and
             not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
                                                           self.msg_id)):
             return
 
-        with self.listener.driver._get_connection(
-                rpc_common.PURPOSE_SEND) as conn:
-            if self.listener.driver.send_single_reply:
-                self._send_reply(conn, reply, failure, log_failure=log_failure,
-                                 ending=True)
-            else:
-                self._send_reply(conn, reply, failure, log_failure=log_failure)
-                self._send_reply(conn, ending=True)
+        # NOTE(sileht): we read the configuration value from the driver
+        # to be able to backport this change in previous version that
+        # still have the qpid driver
+        duration = self.listener.driver.missing_destination_retry_timeout
+        timer = rpc_common.DecayingTimer(duration=duration)
+        timer.start()
+
+        first_reply_sent = False
+        while True:
+            try:
+                with self.listener.driver._get_connection(
+                        rpc_common.PURPOSE_SEND) as conn:
+                    if self.listener.driver.send_single_reply:
+                        self._send_reply(conn, reply, failure,
+                                         log_failure=log_failure,
+                                         ending=True)
+                    else:
+                        if not first_reply_sent:
+                            self._send_reply(conn, reply, failure,
+                                             log_failure=log_failure)
+                            first_reply_sent = True
+                        self._send_reply(conn, ending=True)
+                return
+            except rpc_amqp.AMQPDestinationNotFound:
+                if timer.check_return() > 0:
+                    LOG.info(_LI("The reply %(msg_id)s cannot be sent  "
+                                 "%(reply_q)s reply queue don't exist, "
+                                 "retrying...") % {
+                                     'msg_id': self.msg_id,
+                                     'reply_q': self.reply_q})
+                    time.sleep(0.25)
+                else:
+                    self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
+                    LOG.info(_LI("The reply %(msg_id)s cannot be sent  "
+                                 "%(reply_q)s reply queue don't exist after "
+                                 "%(duration)s sec abandoning...") % {
+                                     'msg_id': self.msg_id,
+                                     'reply_q': self.reply_q,
+                                     'duration': duration})
+                    return
 
     def acknowledge(self):
         self.acknowledge_callback()
@@ -345,6 +375,7 @@ class ReplyWaiter(object):
 
 
 class AMQPDriverBase(base.BaseDriver):
+    missing_destination_retry_timeout = 0
 
     def __init__(self, conf, url, connection_pool,
                  default_exchange=None, allowed_remote_exmods=None,
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index d96a39838..cdd642ea6 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -1043,32 +1043,20 @@ class Connection(object):
 
         self._publish(exchange, msg, routing_key=routing_key, timeout=timeout)
 
-    def _publish_and_retry_on_missing_exchange(self, exchange, msg,
-                                               routing_key=None, timeout=None):
-        """Publisher that retry if the exchange is missing.
-        """
-
+    def _publish_and_raises_on_missing_exchange(self, exchange, msg,
+                                                routing_key=None,
+                                                timeout=None):
+        """Publisher that raises exception if exchange is missing."""
         if not exchange.passive:
             raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
                                "be called with an passive exchange.")
 
-        # TODO(sileht): use @retrying
-        # NOTE(sileht): no need to wait the application expect a response
-        # before timeout is exshauted
-        duration = (
-            timeout if timeout is not None
-            else self.kombu_reconnect_timeout
-        )
-
-        timer = rpc_common.DecayingTimer(duration=duration)
-        timer.start()
-
-        while True:
-            try:
-                self._publish(exchange, msg, routing_key=routing_key,
-                              timeout=timeout)
-                return
-            except self.connection.channel_errors as exc:
+        try:
+            self._publish(exchange, msg, routing_key=routing_key,
+                          timeout=timeout)
+            return
+        except self.connection.channel_errors as exc:
+            if exc.code == 404:
                 # NOTE(noelbk/sileht):
                 # If rabbit dies, the consumer can be disconnected before the
                 # publisher sends, and if the consumer hasn't declared the
@@ -1077,24 +1065,9 @@ class Connection(object):
                 # So we set passive=True to the publisher exchange and catch
                 # the 404 kombu ChannelError and retry until the exchange
                 # appears
-                if exc.code == 404 and timer.check_return() > 0:
-                    LOG.info(_LI("The exchange %(exchange)s to send to "
-                                 "%(routing_key)s doesn't exist yet, "
-                                 "retrying...") % {
-                                     'exchange': exchange.name,
-                                     'routing_key': routing_key})
-                    time.sleep(0.25)
-                    continue
-                elif exc.code == 404:
-                    msg = _("The exchange %(exchange)s to send to "
-                            "%(routing_key)s still doesn't exist after "
-                            "%(duration)s sec abandoning...") % {
-                                'duration': duration,
-                                'exchange': exchange.name,
-                                'routing_key': routing_key}
-                    LOG.info(msg)
-                    raise rpc_amqp.AMQPDestinationNotFound(msg)
-                raise
+                raise rpc_amqp.AMQPDestinationNotFound(
+                    "exchange %s doesn't exists" % exchange.name)
+            raise
 
     def direct_send(self, msg_id, msg):
         """Send a 'direct' message."""
@@ -1104,7 +1077,7 @@ class Connection(object):
                                          auto_delete=True,
                                          passive=True)
 
-        self._ensure_publishing(self._publish_and_retry_on_missing_exchange,
+        self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
                                 exchange, msg, routing_key=msg_id)
 
     def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
@@ -1160,6 +1133,9 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
         conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
         conf.register_opts(base.base_opts, group=opt_group)
 
+        self.missing_destination_retry_timeout = (
+            conf.oslo_messaging_rabbit.kombu_reconnect_timeout)
+
         connection_pool = pool.ConnectionPool(
             conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
             url, Connection)