diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index ceeb07810..6e19dd897 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -216,23 +216,24 @@ class ReplyWaiter(object): _('Timed out waiting for a reply to message ID %s.') % msg_id) def _process_reply(self, data): - result = None - ending = False self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] result = rpc_common.deserialize_remote_exception( failure, self.allowed_remote_exmods) - elif data.get('ending', False): - ending = True else: - result = data['result'] + result = data.get('result', None) + + ending = data.get('ending', False) return result, ending def wait(self, msg_id, timeout): # NOTE(sileht): for each msg_id we receive two amqp message # first one with the payload, a second one to ensure the other # have finish to send the payload + # NOTE(viktors): We are going to remove this behavior in the N + # release, but we need to keep backward compatibility, so we should + # support both cases for now. timer = rpc_common.DecayingTimer(duration=timeout) timer.start() final_reply = None @@ -245,7 +246,10 @@ class ReplyWaiter(object): self._raise_timeout_exception(msg_id) reply, ending = self._process_reply(message) - if not ending: + if reply is not None: + # NOTE(viktors): This can be either first _send_reply() with an + # empty `result` field or a second _send_reply() with + # ending=True and no `result` field. final_reply = reply return final_reply