diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index b9c25970a..41129e7c2 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -31,30 +31,6 @@ from oslo.messaging._i18n import _LI
 LOG = logging.getLogger(__name__)
 
 
-class _DecayingTimer(object):
-    def __init__(self, duration=None):
-        self._duration = duration
-        self._ends_at = None
-
-    def start(self):
-        if self._duration is not None:
-            self._ends_at = time.time() + max(0, self._duration)
-        return self
-
-    def check_return(self, msg_id):
-        if self._duration is None:
-            return None
-        if self._ends_at is None:
-            raise RuntimeError("Can not check/return a timeout from a timer"
-                               " that has not been started")
-        left = self._ends_at - time.time()
-        if left <= 0:
-            raise messaging.MessagingTimeout('Timed out waiting for a '
-                                             'reply to message ID %s'
-                                             % msg_id)
-        return left
-
-
 class AMQPIncomingMessage(base.IncomingMessage):
 
     def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
@@ -231,6 +207,11 @@ class ReplyWaiter(object):
     def unlisten(self, msg_id):
         self.waiters.remove(msg_id)
 
+    @staticmethod
+    def _raise_timeout_exception(msg_id):
+        raise messaging.MessagingTimeout(
+            'Timed out waiting for a reply to message ID %s' % msg_id)
+
     def _process_reply(self, data):
         result = None
         ending = False
@@ -256,15 +237,15 @@ class ReplyWaiter(object):
 
                 self.waiters.put(incoming_msg_id, message_data)
 
+            timeout = timer.check_return(self._raise_timeout_exception, msg_id)
             try:
-                self.conn.consume(limit=1, timeout=timer.check_return(msg_id))
+                self.conn.consume(limit=1, timeout=timeout)
             except rpc_common.Timeout:
-                raise messaging.MessagingTimeout('Timed out waiting for a '
-                                                 'reply to message ID %s'
-                                                 % msg_id)
+                self._raise_timeout_exception(msg_id)
 
     def _poll_queue(self, msg_id, timer):
-        message = self.waiters.get(msg_id, timeout=timer.check_return(msg_id))
+        timeout = timer.check_return(self._raise_timeout_exception, msg_id)
+        message = self.waiters.get(msg_id, timeout=timeout)
         if message is self.waiters.WAKE_UP:
             return None, None, True  # lock was released
 
@@ -293,7 +274,7 @@ class ReplyWaiter(object):
         # have the first thread take responsibility for passing replies not
         # intended for itself to the appropriate thread.
         #
-        timer = _DecayingTimer(duration=timeout).start()
+        timer = rpc_common.DecayingTimer(duration=timeout).start()
         final_reply = None
         while True:
             if self.conn_lock.acquire(False):
diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py
index 2645fbd39..4d0b7a1e7 100644
--- a/oslo/messaging/_drivers/common.py
+++ b/oslo/messaging/_drivers/common.py
@@ -18,6 +18,7 @@
 import copy
 import logging
 import sys
+import time
 import traceback
 
 import six
@@ -349,3 +350,28 @@ def deserialize_msg(msg):
     raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
 
     return raw_msg
+
+
+class DecayingTimer(object):
+    def __init__(self, duration=None):
+        self._duration = duration
+        self._ends_at = None
+
+    def start(self):
+        if self._duration is not None:
+            self._ends_at = time.time() + max(0, self._duration)
+        return self
+
+    def check_return(self, timeout_callback, *args, **kwargs):
+        if self._duration is None:
+            return None
+        if self._ends_at is None:
+            raise RuntimeError("Can not check/return a timeout from a timer"
+                               " that has not been started")
+
+        maximum = kwargs.pop('maximum', None)
+        left = self._ends_at - time.time()
+        if left <= 0:
+            timeout_callback(*args, **kwargs)
+
+        return left if maximum is None else min(left, maximum)
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 2ae5c8374..05219ffcd 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -694,19 +694,15 @@ class Connection(object):
     def iterconsume(self, limit=None, timeout=None):
         """Return an iterator that will consume from all queues/consumers."""
 
-        if timeout is None:
-            deadline = None
-        else:
-            deadline = time.time() + timeout
+        timer = rpc_common.DecayingTimer(duration=timeout).start()
 
-        def _raise_timeout_if_deadline_is_reached(exc):
-            if deadline is not None and deadline - time.time() < 0:
-                LOG.debug('Timed out waiting for RPC response: %s', exc)
-                raise rpc_common.Timeout()
+        def _raise_timeout(exc):
+            LOG.debug('Timed out waiting for RPC response: %s', exc)
+            raise rpc_common.Timeout()
 
         def _error_callback(exc):
             self.do_consume = True
-            _raise_timeout_if_deadline_is_reached(exc)
+            timer.check_return(_raise_timeout, exc)
             LOG.exception(_('Failed to consume message from queue: %s'),
                           exc)
 
@@ -718,11 +714,14 @@ class Connection(object):
                     queue.consume(nowait=True)
                 queues_tail.consume(nowait=False)
                 self.do_consume = False
+
+            poll_timeout = 1 if timeout is None else min(timeout, 1)
             while True:
                 try:
-                    return self.connection.drain_events(timeout=1)
+                    return self.connection.drain_events(timeout=poll_timeout)
                 except socket.timeout as exc:
-                    _raise_timeout_if_deadline_is_reached(exc)
+                    poll_timeout = timer.check_return(_raise_timeout, exc,
+                                                      maximum=1)
 
         for iteration in itertools.count(0):
             if limit and iteration >= limit: