Merge "Refactor processing reply in ReplyWaiter"
This commit is contained in:
commit
02fae06de5
@ -216,23 +216,24 @@ class ReplyWaiter(object):
|
||||
_('Timed out waiting for a reply to message ID %s.') % msg_id)
|
||||
|
||||
def _process_reply(self, data):
|
||||
result = None
|
||||
ending = False
|
||||
self.msg_id_cache.check_duplicate_message(data)
|
||||
if data['failure']:
|
||||
failure = data['failure']
|
||||
result = rpc_common.deserialize_remote_exception(
|
||||
failure, self.allowed_remote_exmods)
|
||||
elif data.get('ending', False):
|
||||
ending = True
|
||||
else:
|
||||
result = data['result']
|
||||
result = data.get('result', None)
|
||||
|
||||
ending = data.get('ending', False)
|
||||
return result, ending
|
||||
|
||||
def wait(self, msg_id, timeout):
|
||||
# NOTE(sileht): for each msg_id we receive two amqp message
|
||||
# first one with the payload, a second one to ensure the other
|
||||
# have finish to send the payload
|
||||
# NOTE(viktors): We are going to remove this behavior in the N
|
||||
# release, but we need to keep backward compatibility, so we should
|
||||
# support both cases for now.
|
||||
timer = rpc_common.DecayingTimer(duration=timeout)
|
||||
timer.start()
|
||||
final_reply = None
|
||||
@ -245,7 +246,10 @@ class ReplyWaiter(object):
|
||||
self._raise_timeout_exception(msg_id)
|
||||
|
||||
reply, ending = self._process_reply(message)
|
||||
if not ending:
|
||||
if reply is not None:
|
||||
# NOTE(viktors): This can be either first _send_reply() with an
|
||||
# empty `result` field or a second _send_reply() with
|
||||
# ending=True and no `result` field.
|
||||
final_reply = reply
|
||||
return final_reply
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user