Merge "kafka: return to poller when timeout is reach"
This commit is contained in:
commit
234e920b6f
@ -194,18 +194,27 @@ class Connection(object):
|
||||
|
||||
:param timeout: poll timeout in seconds
|
||||
"""
|
||||
if self._consume_loop_stopped:
|
||||
return None
|
||||
|
||||
timeout = timeout if timeout >= 0 else self.consumer_timeout
|
||||
try:
|
||||
messages = self._poll_messages(timeout)
|
||||
except kafka.errors.ConsumerTimeout as e:
|
||||
raise driver_common.Timeout(e.message)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to consume messages"))
|
||||
messages = None
|
||||
return messages
|
||||
def _raise_timeout(exc):
|
||||
raise driver_common.Timeout(exc.message)
|
||||
|
||||
timer = driver_common.DecayingTimer(duration=timeout)
|
||||
timer.start()
|
||||
|
||||
poll_timeout = (self.consumer_timeout if timeout is None
|
||||
else min(timeout, self.consumer_timeout))
|
||||
|
||||
while True:
|
||||
if self._consume_loop_stopped:
|
||||
return
|
||||
try:
|
||||
return self._poll_messages(poll_timeout)
|
||||
except kafka.errors.ConsumerTimeout as exc:
|
||||
poll_timeout = timer.check_return(
|
||||
_raise_timeout, exc, maximum=self.consumer_timeout)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to consume messages"))
|
||||
return
|
||||
|
||||
def stop_consuming(self):
|
||||
self._consume_loop_stopped = True
|
||||
|
Loading…
x
Reference in New Issue
Block a user