Merge "rabbit: more precise iterconsume timeout"

This commit is contained in:
Jenkins 2014-12-10 12:59:34 +00:00 committed by Gerrit Code Review
commit afed2e8579
3 changed files with 47 additions and 41 deletions

@ -31,30 +31,6 @@ from oslo.messaging._i18n import _LI
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class _DecayingTimer(object):
def __init__(self, duration=None):
self._duration = duration
self._ends_at = None
def start(self):
if self._duration is not None:
self._ends_at = time.time() + max(0, self._duration)
return self
def check_return(self, msg_id):
if self._duration is None:
return None
if self._ends_at is None:
raise RuntimeError("Can not check/return a timeout from a timer"
" that has not been started")
left = self._ends_at - time.time()
if left <= 0:
raise messaging.MessagingTimeout('Timed out waiting for a '
'reply to message ID %s'
% msg_id)
return left
class AMQPIncomingMessage(base.IncomingMessage): class AMQPIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
@ -231,6 +207,11 @@ class ReplyWaiter(object):
def unlisten(self, msg_id): def unlisten(self, msg_id):
self.waiters.remove(msg_id) self.waiters.remove(msg_id)
@staticmethod
def _raise_timeout_exception(msg_id):
raise messaging.MessagingTimeout(
'Timed out waiting for a reply to message ID %s' % msg_id)
def _process_reply(self, data): def _process_reply(self, data):
result = None result = None
ending = False ending = False
@ -256,15 +237,15 @@ class ReplyWaiter(object):
self.waiters.put(incoming_msg_id, message_data) self.waiters.put(incoming_msg_id, message_data)
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
try: try:
self.conn.consume(limit=1, timeout=timer.check_return(msg_id)) self.conn.consume(limit=1, timeout=timeout)
except rpc_common.Timeout: except rpc_common.Timeout:
raise messaging.MessagingTimeout('Timed out waiting for a ' self._raise_timeout_exception(msg_id)
'reply to message ID %s'
% msg_id)
def _poll_queue(self, msg_id, timer): def _poll_queue(self, msg_id, timer):
message = self.waiters.get(msg_id, timeout=timer.check_return(msg_id)) timeout = timer.check_return(self._raise_timeout_exception, msg_id)
message = self.waiters.get(msg_id, timeout=timeout)
if message is self.waiters.WAKE_UP: if message is self.waiters.WAKE_UP:
return None, None, True # lock was released return None, None, True # lock was released
@ -293,7 +274,7 @@ class ReplyWaiter(object):
# have the first thread take responsibility for passing replies not # have the first thread take responsibility for passing replies not
# intended for itself to the appropriate thread. # intended for itself to the appropriate thread.
# #
timer = _DecayingTimer(duration=timeout).start() timer = rpc_common.DecayingTimer(duration=timeout).start()
final_reply = None final_reply = None
while True: while True:
if self.conn_lock.acquire(False): if self.conn_lock.acquire(False):

@ -18,6 +18,7 @@
import copy import copy
import logging import logging
import sys import sys
import time
import traceback import traceback
import six import six
@ -349,3 +350,28 @@ def deserialize_msg(msg):
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg return raw_msg
class DecayingTimer(object):
def __init__(self, duration=None):
self._duration = duration
self._ends_at = None
def start(self):
if self._duration is not None:
self._ends_at = time.time() + max(0, self._duration)
return self
def check_return(self, timeout_callback, *args, **kwargs):
if self._duration is None:
return None
if self._ends_at is None:
raise RuntimeError("Can not check/return a timeout from a timer"
" that has not been started")
maximum = kwargs.pop('maximum', None)
left = self._ends_at - time.time()
if left <= 0:
timeout_callback(*args, **kwargs)
return left if maximum is None else min(left, maximum)

@ -694,19 +694,15 @@ class Connection(object):
def iterconsume(self, limit=None, timeout=None): def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers.""" """Return an iterator that will consume from all queues/consumers."""
if timeout is None: timer = rpc_common.DecayingTimer(duration=timeout).start()
deadline = None
else:
deadline = time.time() + timeout
def _raise_timeout_if_deadline_is_reached(exc): def _raise_timeout(exc):
if deadline is not None and deadline - time.time() < 0: LOG.debug('Timed out waiting for RPC response: %s', exc)
LOG.debug('Timed out waiting for RPC response: %s', exc) raise rpc_common.Timeout()
raise rpc_common.Timeout()
def _error_callback(exc): def _error_callback(exc):
self.do_consume = True self.do_consume = True
_raise_timeout_if_deadline_is_reached(exc) timer.check_return(_raise_timeout, exc)
LOG.exception(_('Failed to consume message from queue: %s'), LOG.exception(_('Failed to consume message from queue: %s'),
exc) exc)
@ -718,11 +714,14 @@ class Connection(object):
queue.consume(nowait=True) queue.consume(nowait=True)
queues_tail.consume(nowait=False) queues_tail.consume(nowait=False)
self.do_consume = False self.do_consume = False
poll_timeout = 1 if timeout is None else min(timeout, 1)
while True: while True:
try: try:
return self.connection.drain_events(timeout=1) return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc: except socket.timeout as exc:
_raise_timeout_if_deadline_is_reached(exc) poll_timeout = timer.check_return(_raise_timeout, exc,
maximum=1)
for iteration in itertools.count(0): for iteration in itertools.count(0):
if limit and iteration >= limit: if limit and iteration >= limit: