From 7fe91cc01a7a75c3b3334111fe21aad80986ff08 Mon Sep 17 00:00:00 2001 From: gord chung <gord@live.ca> Date: Fri, 10 Nov 2017 13:18:41 -0500 Subject: [PATCH] fix batch handling for some reason there are two timeouts. in the batch scenario, all the time wasted waiting on initial 'get' is never accounted for so the batch timeout is always longer than it is declared. Change-Id: I6132c770cccdf0ffad9f178f7463288cf954d672 --- oslo_messaging/_drivers/base.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index d183614f9..c09ab6f42 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -46,27 +46,15 @@ def batch_poll_helper(func): driver_prefetch = in_self.prefetch_size if driver_prefetch > 0: batch_size = min(batch_size, driver_prefetch) + timeout = batch_timeout or timeout - with timeutils.StopWatch(timeout) as timeout_watch: - # poll first message - msg = func(in_self, timeout=timeout_watch.leftover(True)) - if msg is not None: - incomings.append(msg) - if batch_size == 1 or msg is None: - return incomings - - # update batch_timeout according to timeout for whole operation - timeout_left = timeout_watch.leftover(True) - if timeout_left is not None and ( - batch_timeout is None or timeout_left < batch_timeout): - batch_timeout = timeout_left - - with timeutils.StopWatch(batch_timeout) as batch_timeout_watch: - # poll remained batch messages - while len(incomings) < batch_size and msg is not None: - msg = func(in_self, timeout=batch_timeout_watch.leftover(True)) - if msg is not None: - incomings.append(msg) + with timeutils.StopWatch(timeout) as watch: + while True: + message = func(in_self, timeout=watch.leftover(True)) + if message is not None: + incomings.append(message) + if len(incomings) == batch_size or message is None: + break return incomings return wrapper