rabbit: make ack/requeue thread-safe
ack/requeue messages are currently done in the MessageHandlingServer._process_incoming(). But _process_incoming() in run by a futurist Executor. That can be a threading or an eventlet executor. With eventlet, we don't really share the socket between threads. But with threading executor and expecialy ssl, this can't work, if you write data with two different threads to the socket. This change moves back the message ack/requeue to the polling threads that handle the connection, instead of the threads we spawn for the application. Oslo Messaging now always use a connection in the same thread. Change-Id: I5c0e6def6b34f4d195fb1f8dbb26eda0f21ff34e
This commit is contained in:
parent
296d93d586
commit
7e71ac821f
@ -35,11 +35,27 @@ from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# Minimum/Maximum sleep between a poll and ack/requeue
|
||||
# Maximum should be small enough to not get rejected ack,
|
||||
# minimum should be big enough to not burn the CPU.
|
||||
ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
|
||||
ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0
|
||||
|
||||
|
||||
def do_pending_tasks(tasks):
|
||||
while True:
|
||||
try:
|
||||
task = tasks.get(block=False)
|
||||
except moves.queue.Empty:
|
||||
break
|
||||
else:
|
||||
task()
|
||||
|
||||
|
||||
class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
|
||||
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
|
||||
obsolete_reply_queues):
|
||||
obsolete_reply_queues, pending_message_actions):
|
||||
super(AMQPIncomingMessage, self).__init__(ctxt, message)
|
||||
self.listener = listener
|
||||
|
||||
@ -47,6 +63,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
self.msg_id = msg_id
|
||||
self.reply_q = reply_q
|
||||
self._obsolete_reply_queues = obsolete_reply_queues
|
||||
self._pending_tasks = pending_message_actions
|
||||
self.stopwatch = timeutils.StopWatch()
|
||||
self.stopwatch.start()
|
||||
|
||||
@ -116,7 +133,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
return
|
||||
|
||||
def acknowledge(self):
|
||||
self.message.acknowledge()
|
||||
self._pending_tasks.put(self.message.acknowledge)
|
||||
self.listener.msg_id_cache.add(self.unique_id)
|
||||
|
||||
def requeue(self):
|
||||
@ -126,7 +143,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
# msg_id_cache, the message will be reconsumed, the only difference is
|
||||
# the message stay at the beginning of the queue instead of moving to
|
||||
# the end.
|
||||
self.message.requeue()
|
||||
self._pending_tasks.put(self.message.requeue)
|
||||
|
||||
|
||||
class ObsoleteReplyQueuesCache(object):
|
||||
@ -184,6 +201,8 @@ class AMQPListener(base.PollStyleListener):
|
||||
self.incoming = []
|
||||
self._stopped = threading.Event()
|
||||
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
|
||||
self._pending_tasks = moves.queue.Queue()
|
||||
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
|
||||
def __call__(self, message):
|
||||
ctxt = rpc_amqp.unpack_context(message)
|
||||
@ -194,27 +213,45 @@ class AMQPListener(base.PollStyleListener):
|
||||
'msg_id': ctxt.msg_id})
|
||||
else:
|
||||
LOG.debug("received message with unique_id: %s", unique_id)
|
||||
self.incoming.append(AMQPIncomingMessage(self,
|
||||
ctxt.to_dict(),
|
||||
message,
|
||||
unique_id,
|
||||
ctxt.msg_id,
|
||||
ctxt.reply_q,
|
||||
self._obsolete_reply_queues))
|
||||
|
||||
self.incoming.append(AMQPIncomingMessage(
|
||||
self,
|
||||
ctxt.to_dict(),
|
||||
message,
|
||||
unique_id,
|
||||
ctxt.msg_id,
|
||||
ctxt.reply_q,
|
||||
self._obsolete_reply_queues,
|
||||
self._pending_tasks))
|
||||
|
||||
@base.batch_poll_helper
|
||||
def poll(self, timeout=None):
|
||||
stopwatch = timeutils.StopWatch(duration=timeout).start()
|
||||
|
||||
while not self._stopped.is_set():
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
|
||||
if self.incoming:
|
||||
return self.incoming.pop(0)
|
||||
try:
|
||||
self.conn.consume(timeout=timeout)
|
||||
except rpc_common.Timeout:
|
||||
|
||||
left = stopwatch.leftover(return_none=True)
|
||||
if left is None:
|
||||
left = self._current_timeout
|
||||
if left <= 0:
|
||||
return None
|
||||
|
||||
try:
|
||||
self.conn.consume(timeout=min(self._current_timeout, left))
|
||||
except rpc_common.Timeout:
|
||||
self._current_timeout = max(self._current_timeout * 2,
|
||||
ACK_REQUEUE_EVERY_SECONDS_MAX)
|
||||
else:
|
||||
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
|
||||
def stop(self):
|
||||
self._stopped.set()
|
||||
self.conn.stop_consuming()
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
|
||||
def cleanup(self):
|
||||
# Closes listener connection
|
||||
@ -269,6 +306,7 @@ class ReplyWaiter(object):
|
||||
self.allowed_remote_exmods = allowed_remote_exmods
|
||||
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
||||
self.waiters = ReplyWaiters()
|
||||
self._pending_tasks = moves.queue.Queue()
|
||||
|
||||
self.conn.declare_direct_consumer(reply_q, self)
|
||||
|
||||
@ -283,17 +321,26 @@ class ReplyWaiter(object):
|
||||
self.conn.stop_consuming()
|
||||
self._thread.join()
|
||||
self._thread = None
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
|
||||
def poll(self):
|
||||
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
while not self._thread_exit_event.is_set():
|
||||
do_pending_tasks(self._pending_tasks)
|
||||
try:
|
||||
self.conn.consume()
|
||||
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
|
||||
self.conn.consume(timeout=current_timeout)
|
||||
except rpc_common.Timeout:
|
||||
current_timeout = max(current_timeout * 2,
|
||||
ACK_REQUEUE_EVERY_SECONDS_MAX)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to process incoming message, "
|
||||
"retrying..."))
|
||||
else:
|
||||
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
|
||||
|
||||
def __call__(self, message):
|
||||
message.acknowledge()
|
||||
self._pending_tasks.put(message.acknowledge)
|
||||
incoming_msg_id = message.pop('_msg_id', None)
|
||||
if message.get('ending'):
|
||||
LOG.debug("received reply msg_id: %s", incoming_msg_id)
|
||||
|
Loading…
x
Reference in New Issue
Block a user