From 488594936a52145c778c89fc88adca722ae8bd72 Mon Sep 17 00:00:00 2001
From: Mehdi Abaakouk <sileht@redhat.com>
Date: Thu, 8 Dec 2016 10:05:17 +0100
Subject: [PATCH] kafka: return to poller when timeout is reach

consume() must return only if user timeout is reached and not
when driver consumer_timeout is reached.

Change-Id: I6b2b2a28038a194224e79fa37285436ca6787a0a
---
 oslo_messaging/_drivers/impl_kafka.py | 31 +++++++++++++++++----------
 1 file changed, 20 insertions(+), 11 deletions(-)

diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 0a8c02ca2..59d2ff71e 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -194,18 +194,27 @@ class Connection(object):
 
         :param timeout: poll timeout in seconds
         """
-        if self._consume_loop_stopped:
-            return None
 
-        timeout = timeout if timeout >= 0 else self.consumer_timeout
-        try:
-            messages = self._poll_messages(timeout)
-        except kafka.errors.ConsumerTimeout as e:
-            raise driver_common.Timeout(e.message)
-        except Exception:
-            LOG.exception(_LE("Failed to consume messages"))
-            messages = None
-        return messages
+        def _raise_timeout(exc):
+            raise driver_common.Timeout(exc.message)
+
+        timer = driver_common.DecayingTimer(duration=timeout)
+        timer.start()
+
+        poll_timeout = (self.consumer_timeout if timeout is None
+                        else min(timeout, self.consumer_timeout))
+
+        while True:
+            if self._consume_loop_stopped:
+                return
+            try:
+                return self._poll_messages(poll_timeout)
+            except kafka.errors.ConsumerTimeout as exc:
+                poll_timeout = timer.check_return(
+                    _raise_timeout, exc, maximum=self.consumer_timeout)
+            except Exception:
+                LOG.exception(_LE("Failed to consume messages"))
+                return
 
     def stop_consuming(self):
         self._consume_loop_stopped = True