diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 1ae65af0d..73512742f 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -466,7 +466,6 @@ class Connection(object):
     pools = {}
 
     def __init__(self, conf, url, purpose):
-        self.consumers = []
         self.conf = conf
         self.driver_conf = self.conf.oslo_messaging_rabbit
         self.max_retries = self.driver_conf.rabbit_max_retries
@@ -525,7 +524,8 @@ class Connection(object):
 
         self._initial_pid = os.getpid()
 
-        self.do_consume = True
+        self._consumers = []
+        self._new_consumers = []
         self._consume_loop_stopped = False
         self.channel = None
 
@@ -705,7 +705,7 @@ class Connection(object):
             a new channel, we use it the reconfigure our consumers.
             """
             self._set_current_channel(new_channel)
-            for consumer in self.consumers:
+            for consumer in self._consumers:
                 consumer.declare(new_channel)
 
             LOG.info(_LI('Reconnected to AMQP server on '
@@ -780,7 +780,7 @@ class Connection(object):
             except recoverable_errors:
                 self._set_current_channel(None)
                 self.ensure_connection()
-        self.consumers = []
+        self._consumers = []
 
     def _heartbeat_supported_and_enabled(self):
         if self.driver_conf.heartbeat_timeout_threshold <= 0:
@@ -859,7 +859,8 @@ class Connection(object):
 
         def _declare_consumer():
             consumer.declare(self.channel)
-            self.consumers.append(consumer)
+            self._consumers.append(consumer)
+            self._new_consumers.append(consumer)
             return consumer
 
         with self._connection_lock:
@@ -877,7 +878,7 @@ class Connection(object):
             raise rpc_common.Timeout()
 
         def _recoverable_error_callback(exc):
-            self.do_consume = True
+            self._new_consumers = self._consumers
             timer.check_return(_raise_timeout, exc)
 
         def _error_callback(exc):
@@ -886,10 +887,11 @@ class Connection(object):
                       exc)
 
         def _consume():
-            if self.do_consume:
-                for tag, consumer in enumerate(self.consumers):
-                    consumer.consume(tag=tag)
-                self.do_consume = False
+            if self._new_consumers:
+                for tag, consumer in enumerate(self._consumers):
+                    if consumer in self._new_consumers:
+                        consumer.consume(tag=tag)
+                self._new_consumers = []
 
             poll_timeout = (self._poll_timeout if timeout is None
                             else min(timeout, self._poll_timeout))
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 54ada6f8d..c346019ee 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -179,12 +179,10 @@ class TestRabbitConsume(test_utils.BaseTestCase):
         transport = oslo_messaging.get_transport(self.conf,
                                                  'kombu+memory:////')
         self.addCleanup(transport.cleanup)
-        deadline = time.time() + 3
+        deadline = time.time() + 6
         with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
-            # FIXME(sileht): the deadline should be 6 seconds, not 3
-            # consuming with no consumer have never worked
-            # https://bugs.launchpad.net/oslo.messaging/+bug/1450342
-            # conn.consume(timeout=3)
+            self.assertRaises(driver_common.Timeout,
+                              conn.consume, timeout=3)
 
             # kombu memory transport doesn't really raise error
             # so just simulate a real driver behavior
@@ -192,10 +190,8 @@ class TestRabbitConsume(test_utils.BaseTestCase):
             conn.declare_fanout_consumer("notif.info", lambda msg: True)
             with mock.patch('kombu.connection.Connection.drain_events',
                             side_effect=IOError):
-                try:
-                    conn.consume(timeout=3)
-                except driver_common.Timeout:
-                    pass
+                self.assertRaises(driver_common.Timeout,
+                                  conn.consume, timeout=3)
 
         self.assertEqual(0, int(deadline - time.time()))
 
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index f27fe31dc..515e49bec 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -87,12 +87,10 @@ class TestRabbitConsume(test_utils.BaseTestCase):
     def test_consume_timeout(self):
         transport = messaging.get_transport(self.conf, 'kombu+memory:////')
         self.addCleanup(transport.cleanup)
-        deadline = time.time() + 3
+        deadline = time.time() + 6
         with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
-            # FIXME(sileht): the deadline should be 6 seconds, not 3
-            # consuming with no consumer have never worked
-            # https://bugs.launchpad.net/oslo.messaging/+bug/1450342
-            # conn.consume(timeout=3)
+            self.assertRaises(driver_common.Timeout,
+                              conn.consume, timeout=3)
 
             # kombu memory transport doesn't really raise error
             # so just simulate a real driver behavior
@@ -100,10 +98,8 @@ class TestRabbitConsume(test_utils.BaseTestCase):
             conn.declare_fanout_consumer("notif.info", lambda msg: True)
             with mock.patch('kombu.connection.Connection.drain_events',
                             side_effect=IOError):
-                try:
-                    conn.consume(timeout=3)
-                except driver_common.Timeout:
-                    pass
+                self.assertRaises(driver_common.Timeout,
+                                  conn.consume, timeout=3)
 
         self.assertEqual(0, int(deadline - time.time()))