diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index b51724040..badff50d5 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -277,6 +277,7 @@ class Consumer(object):
                                                     rabbit_queue_ttl)
 
         self.queue = None
+        self._declared_on = None
         self.exchange = kombu.entity.Exchange(
             name=exchange_name,
             type=type,
@@ -285,6 +286,7 @@ class Consumer(object):
 
     def declare(self, conn):
         """Re-declare the queue after a rabbit (re)connect."""
+
         self.queue = kombu.entity.Queue(
             name=self.queue_name,
             channel=conn.channel,
@@ -308,17 +310,41 @@ class Consumer(object):
                 self.queue.declare()
             else:
                 raise
+        self._declared_on = conn.channel
 
-    def consume(self, tag):
+    def consume(self, conn, tag):
         """Actually declare the consumer on the amqp channel.  This will
         start the flow of messages from the queue.  Using the
         Connection.consume() will process the messages,
         calling the appropriate callback.
         """
 
-        self.queue.consume(callback=self._callback,
-                           consumer_tag=six.text_type(tag),
-                           nowait=self.nowait)
+        # Ensure we are on the correct channel before consuming
+        if conn.channel != self._declared_on:
+            self.declare(conn)
+        try:
+            self.queue.consume(callback=self._callback,
+                               consumer_tag=six.text_type(tag),
+                               nowait=self.nowait)
+        except conn.connection.channel_errors as exc:
+            # We retries once because of some races that we can
+            # recover before informing the deployer
+            # bugs.launchpad.net/oslo.messaging/+bug/1581148
+            # bugs.launchpad.net/oslo.messaging/+bug/1609766
+            # bugs.launchpad.net/neutron/+bug/1318721
+
+            # At any channel error, the RabbitMQ closes
+            # the channel, but the amqp-lib quietly re-open
+            # it. So, we must reset all tags and declare
+            # all consumers again.
+            conn._new_tags = set(conn._consumers.values())
+            if exc.code == 404:
+                self.declare(conn)
+                self.queue.consume(callback=self._callback,
+                                   consumer_tag=six.text_type(tag),
+                                   nowait=self.nowait)
+            else:
+                raise
 
     def cancel(self, tag):
         LOG.trace('ConsumerBase.cancel: canceling %s', tag)
@@ -754,8 +780,6 @@ class Connection(object):
 
             self.set_transport_socket_timeout()
             self._set_current_channel(new_channel)
-            for consumer in self._consumers:
-                consumer.declare(self)
 
             LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on '
                          '%(hostname)s:%(port)s via [%(transport)s] client'
@@ -832,6 +856,8 @@ class Connection(object):
             if self.purpose == rpc_common.PURPOSE_LISTEN:
                 self._set_qos(new_channel)
             self._producer = kombu.messaging.Producer(new_channel)
+            for consumer in self._consumers:
+                consumer.declare(self)
 
     def _set_qos(self, channel):
         """Set QoS prefetch count on the channel"""
@@ -1042,31 +1068,11 @@ class Connection(object):
             if not self.connection.connected:
                 raise self.connection.recoverable_connection_errors[0]
 
-            consume_max_retries = 2
-            while self._new_tags and consume_max_retries:
+            while self._new_tags:
                 for consumer, tag in self._consumers.items():
                     if tag in self._new_tags:
-                        try:
-                            consumer.consume(tag=tag)
-                            self._new_tags.remove(tag)
-                        except self.connection.channel_errors as exc:
-                            # NOTE(kbespalov): during the interval between
-                            # a queue declaration and consumer declaration
-                            # the queue can disappear. In this case
-                            # we must redeclare queue and try to re-consume.
-                            # More details is here:
-                            # bugs.launchpad.net/oslo.messaging/+bug/1581148
-                            if exc.code == 404 and consume_max_retries:
-                                consumer.declare(self)
-                                # NOTE(kbespalov): the broker closes a channel
-                                # at any channel error. The py-amqp catches
-                                # this situation and re-open a new channel.
-                                # So, we must re-declare all consumers again.
-                                self._new_tags = set(self._consumers.values())
-                                consume_max_retries -= 1
-                                break
-                            else:
-                                raise
+                        consumer.consume(self, tag=tag)
+                        self._new_tags.remove(tag)
 
             poll_timeout = (self._poll_timeout if timeout is None
                             else min(timeout, self._poll_timeout))