From 75cba16d15b720e7a2ca769ef799a3c26326590d Mon Sep 17 00:00:00 2001 From: Victor Sergeyev <vsergeyev@mirantis.com> Date: Thu, 7 May 2015 13:03:48 +0300 Subject: [PATCH] Refactor processing reply in ReplyWaiter At the moment for each msg_id we receive two amqp message - first one with the payload, a second one to ensure the other have finish to send the payload. This was made, because a long time ago 'reply' allowed generator as payload to send multiple messages on one 'rpc.call' - [1] It's a bad idea - to double RPC messages for each call, so we are going to remove this the second AMQP message sending. This patch allows receiver side to proceed correctly old case - two AMQP messages (first with data and second with 'ending' parameter) same as the new one (a single message with data 'ending' parameter) Blueprint: remove-double-reply [1] - https://github.com/openstack/oslo-incubator/blob/stable/icehouse/openstack/common/rpc/amqp.py#L464 Change-Id: Ic09fe619694c300c4502acb7157d7ecdd47c5fd7 --- oslo_messaging/_drivers/amqpdriver.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index ceeb07810..6e19dd897 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -216,23 +216,24 @@ class ReplyWaiter(object): _('Timed out waiting for a reply to message ID %s.') % msg_id) def _process_reply(self, data): - result = None - ending = False self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] result = rpc_common.deserialize_remote_exception( failure, self.allowed_remote_exmods) - elif data.get('ending', False): - ending = True else: - result = data['result'] + result = data.get('result', None) + + ending = data.get('ending', False) return result, ending def wait(self, msg_id, timeout): # NOTE(sileht): for each msg_id we receive two amqp message # first one with the payload, a second one to ensure the other # have finish to send the payload + # NOTE(viktors): We are going to remove this behavior in the N + # release, but we need to keep backward compatibility, so we should + # support both cases for now. timer = rpc_common.DecayingTimer(duration=timeout) timer.start() final_reply = None @@ -245,7 +246,10 @@ class ReplyWaiter(object): self._raise_timeout_exception(msg_id) reply, ending = self._process_reply(message) - if not ending: + if reply is not None: + # NOTE(viktors): This can be either first _send_reply() with an + # empty `result` field or a second _send_reply() with + # ending=True and no `result` field. final_reply = reply return final_reply