diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 0a8c02ca2..59d2ff71e 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -194,18 +194,27 @@ class Connection(object):
 
         :param timeout: poll timeout in seconds
         """
-        if self._consume_loop_stopped:
-            return None
 
-        timeout = timeout if timeout >= 0 else self.consumer_timeout
-        try:
-            messages = self._poll_messages(timeout)
-        except kafka.errors.ConsumerTimeout as e:
-            raise driver_common.Timeout(e.message)
-        except Exception:
-            LOG.exception(_LE("Failed to consume messages"))
-            messages = None
-        return messages
+        def _raise_timeout(exc):
+            raise driver_common.Timeout(exc.message)
+
+        timer = driver_common.DecayingTimer(duration=timeout)
+        timer.start()
+
+        poll_timeout = (self.consumer_timeout if timeout is None
+                        else min(timeout, self.consumer_timeout))
+
+        while True:
+            if self._consume_loop_stopped:
+                return
+            try:
+                return self._poll_messages(poll_timeout)
+            except kafka.errors.ConsumerTimeout as exc:
+                poll_timeout = timer.check_return(
+                    _raise_timeout, exc, maximum=self.consumer_timeout)
+            except Exception:
+                LOG.exception(_LE("Failed to consume messages"))
+                return
 
     def stop_consuming(self):
         self._consume_loop_stopped = True