Merge "Display the reply queue's name in timeout logs"

This commit is contained in:
Zuul 2024-02-09 14:39:10 +00:00 committed by Gerrit Code Review
commit ebdc7db19e
3 changed files with 29 additions and 12 deletions

View File

@ -583,9 +583,11 @@ class ReplyWaiter(object):
self.waiters.remove(msg_id) self.waiters.remove(msg_id)
@staticmethod @staticmethod
def _raise_timeout_exception(msg_id): def _raise_timeout_exception(msg_id, reply_q):
raise oslo_messaging.MessagingTimeout( raise oslo_messaging.MessagingTimeout(
'Timed out waiting for a reply to message ID %s.', msg_id) 'Timed out waiting for a reply %(reply_q)s '
'to message ID %(msg_id)s.',
{'msg_id': msg_id, 'reply_q': reply_q})
def _process_reply(self, data): def _process_reply(self, data):
self.msg_id_cache.check_duplicate_message(data) self.msg_id_cache.check_duplicate_message(data)
@ -599,7 +601,7 @@ class ReplyWaiter(object):
ending = data.get('ending', False) ending = data.get('ending', False)
return result, ending return result, ending
def wait(self, msg_id, timeout, call_monitor_timeout): def wait(self, msg_id, timeout, call_monitor_timeout, reply_q):
# NOTE(sileht): for each msg_id we receive two amqp message # NOTE(sileht): for each msg_id we receive two amqp message
# first one with the payload, a second one to ensure the other # first one with the payload, a second one to ensure the other
# have finish to send the payload # have finish to send the payload
@ -617,16 +619,26 @@ class ReplyWaiter(object):
final_reply = None final_reply = None
ending = False ending = False
while not ending: while not ending:
timeout = timer.check_return(self._raise_timeout_exception, msg_id) timeout = timer.check_return(
self._raise_timeout_exception,
msg_id,
reply_q
)
if call_monitor_timer and timeout > 0: if call_monitor_timer and timeout > 0:
cm_timeout = call_monitor_timer.check_return( cm_timeout = call_monitor_timer.check_return(
self._raise_timeout_exception, msg_id) self._raise_timeout_exception,
msg_id,
reply_q
)
if cm_timeout < timeout: if cm_timeout < timeout:
timeout = cm_timeout timeout = cm_timeout
try: try:
message = self.waiters.get(msg_id, timeout=timeout) message = self.waiters.get(msg_id, timeout=timeout)
except queue.Empty: except queue.Empty:
self._raise_timeout_exception(msg_id) self._raise_timeout_exception(
msg_id,
reply_q
)
reply, ending = self._process_reply(message) reply, ending = self._process_reply(message)
if reply is not None: if reply is not None:
@ -700,6 +712,7 @@ class AMQPDriverBase(base.BaseDriver):
envelope=True, notify=False, retry=None, transport_options=None): envelope=True, notify=False, retry=None, transport_options=None):
msg = message msg = message
reply_q = None
if 'method' in msg: if 'method' in msg:
LOG.debug('Calling RPC method %s on target %s', msg.get('method'), LOG.debug('Calling RPC method %s on target %s', msg.get('method'),
target.topic) target.topic)
@ -707,13 +720,13 @@ class AMQPDriverBase(base.BaseDriver):
LOG.debug('Sending message to topic %s', target.topic) LOG.debug('Sending message to topic %s', target.topic)
if wait_for_reply: if wait_for_reply:
_reply_q = self._get_reply_q() reply_q = self._get_reply_q()
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
msg.update({'_reply_q': _reply_q}) msg.update({'_reply_q': reply_q})
msg.update({'_timeout': call_monitor_timeout}) msg.update({'_timeout': call_monitor_timeout})
LOG.info('Expecting reply to msg %s in queue %s', msg_id, LOG.info('Expecting reply to msg %s in queue %s', msg_id,
_reply_q) reply_q)
rpc_amqp._add_unique_id(msg) rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID] unique_id = msg[rpc_amqp.UNIQUE_ID]
@ -756,7 +769,7 @@ class AMQPDriverBase(base.BaseDriver):
if wait_for_reply: if wait_for_reply:
result = self._waiter.wait(msg_id, timeout, result = self._waiter.wait(msg_id, timeout,
call_monitor_timeout) call_monitor_timeout, reply_q)
if isinstance(result, Exception): if isinstance(result, Exception):
raise result raise result
return result return result

View File

@ -668,7 +668,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
wait_conditions = [] wait_conditions = []
orig_reply_waiter = amqpdriver.ReplyWaiter.wait orig_reply_waiter = amqpdriver.ReplyWaiter.wait
def reply_waiter(self, msg_id, timeout, call_monitor_timeout): def reply_waiter(self, msg_id, timeout, call_monitor_timeout, reply_q):
if wait_conditions: if wait_conditions:
cond = wait_conditions.pop() cond = wait_conditions.pop()
with cond: with cond:
@ -676,7 +676,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
with cond: with cond:
cond.wait() cond.wait()
return orig_reply_waiter(self, msg_id, timeout, return orig_reply_waiter(self, msg_id, timeout,
call_monitor_timeout) call_monitor_timeout, reply_q)
self.useFixture(fixtures.MockPatchObject( self.useFixture(fixtures.MockPatchObject(
amqpdriver.ReplyWaiter, 'wait', reply_waiter)) amqpdriver.ReplyWaiter, 'wait', reply_waiter))

View File

@ -0,0 +1,4 @@
---
features:
- |
The name of the ``reply_q`` is now logged when a timeout occurs while waiting for a reply.