diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 91af6634e..46e91c918 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -35,11 +35,27 @@ from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) +# Minimum/Maximum sleep between a poll and ack/requeue +# Maximum should be small enough to not get rejected ack, +# minimum should be big enough to not burn the CPU. +ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001 +ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0 + + +def do_pending_tasks(tasks): + while True: + try: + task = tasks.get(block=False) + except moves.queue.Empty: + break + else: + task() + class AMQPIncomingMessage(base.RpcIncomingMessage): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q, - obsolete_reply_queues): + obsolete_reply_queues, pending_message_actions): super(AMQPIncomingMessage, self).__init__(ctxt, message) self.listener = listener @@ -47,6 +63,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): self.msg_id = msg_id self.reply_q = reply_q self._obsolete_reply_queues = obsolete_reply_queues + self._pending_tasks = pending_message_actions self.stopwatch = timeutils.StopWatch() self.stopwatch.start() @@ -116,7 +133,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): return def acknowledge(self): - self.message.acknowledge() + self._pending_tasks.put(self.message.acknowledge) self.listener.msg_id_cache.add(self.unique_id) def requeue(self): @@ -126,7 +143,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): # msg_id_cache, the message will be reconsumed, the only difference is # the message stay at the beginning of the queue instead of moving to # the end. - self.message.requeue() + self._pending_tasks.put(self.message.requeue) class ObsoleteReplyQueuesCache(object): @@ -184,6 +201,8 @@ class AMQPListener(base.PollStyleListener): self.incoming = [] self._stopped = threading.Event() self._obsolete_reply_queues = ObsoleteReplyQueuesCache() + self._pending_tasks = moves.queue.Queue() + self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) @@ -194,27 +213,45 @@ class AMQPListener(base.PollStyleListener): 'msg_id': ctxt.msg_id}) else: LOG.debug("received message with unique_id: %s", unique_id) - self.incoming.append(AMQPIncomingMessage(self, - ctxt.to_dict(), - message, - unique_id, - ctxt.msg_id, - ctxt.reply_q, - self._obsolete_reply_queues)) + + self.incoming.append(AMQPIncomingMessage( + self, + ctxt.to_dict(), + message, + unique_id, + ctxt.msg_id, + ctxt.reply_q, + self._obsolete_reply_queues, + self._pending_tasks)) @base.batch_poll_helper def poll(self, timeout=None): + stopwatch = timeutils.StopWatch(duration=timeout).start() + while not self._stopped.is_set(): + do_pending_tasks(self._pending_tasks) + if self.incoming: return self.incoming.pop(0) - try: - self.conn.consume(timeout=timeout) - except rpc_common.Timeout: + + left = stopwatch.leftover(return_none=True) + if left is None: + left = self._current_timeout + if left <= 0: return None + try: + self.conn.consume(timeout=min(self._current_timeout, left)) + except rpc_common.Timeout: + self._current_timeout = max(self._current_timeout * 2, + ACK_REQUEUE_EVERY_SECONDS_MAX) + else: + self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN + def stop(self): self._stopped.set() self.conn.stop_consuming() + do_pending_tasks(self._pending_tasks) def cleanup(self): # Closes listener connection @@ -269,6 +306,7 @@ class ReplyWaiter(object): self.allowed_remote_exmods = allowed_remote_exmods self.msg_id_cache = rpc_amqp._MsgIdCache() self.waiters = ReplyWaiters() + self._pending_tasks = moves.queue.Queue() self.conn.declare_direct_consumer(reply_q, self) @@ -283,17 +321,26 @@ class ReplyWaiter(object): self.conn.stop_consuming() self._thread.join() self._thread = None + do_pending_tasks(self._pending_tasks) def poll(self): + current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN while not self._thread_exit_event.is_set(): + do_pending_tasks(self._pending_tasks) try: - self.conn.consume() + # ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds + self.conn.consume(timeout=current_timeout) + except rpc_common.Timeout: + current_timeout = max(current_timeout * 2, + ACK_REQUEUE_EVERY_SECONDS_MAX) except Exception: LOG.exception(_LE("Failed to process incoming message, " "retrying...")) + else: + current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN def __call__(self, message): - message.acknowledge() + self._pending_tasks.put(message.acknowledge) incoming_msg_id = message.pop('_msg_id', None) if message.get('ending'): LOG.debug("received reply msg_id: %s", incoming_msg_id)