diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py
index 6b9c88ec9..4f3056770 100644
--- a/oslo/messaging/_drivers/amqp.py
+++ b/oslo/messaging/_drivers/amqp.py
@@ -318,16 +318,17 @@ class _MsgIdCache(object):
         """AMQP consumers may read same message twice when exceptions occur
            before ack is returned. This method prevents doing it.
         """
-        if UNIQUE_ID in message_data:
-            msg_id = message_data.get(UNIQUE_ID)
-            if msg_id in self.prev_msgids:
-                raise rpc_common.DuplicateMessageError(msg_id=msg_id)
-
-    def add(self, message_data):
-        if UNIQUE_ID in message_data:
+        try:
             msg_id = message_data.pop(UNIQUE_ID)
-            if msg_id not in self.prev_msgids:
-                self.prev_msgids.append(msg_id)
+        except KeyError:
+            return
+        if msg_id in self.prev_msgids:
+            raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+        return msg_id
+
+    def add(self, msg_id):
+        if msg_id and msg_id not in self.prev_msgids:
+            self.prev_msgids.append(msg_id)
 
 
 def _add_unique_id(msg):
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 8ccba92ac..b8b9fcefe 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -31,10 +31,11 @@ LOG = logging.getLogger(__name__)
 
 class AMQPIncomingMessage(base.IncomingMessage):
 
-    def __init__(self, listener, ctxt, message, msg_id, reply_q):
+    def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
         super(AMQPIncomingMessage, self).__init__(listener, ctxt,
                                                   dict(message))
 
+        self.unique_id = unique_id
         self.msg_id = msg_id
         self.reply_q = reply_q
         self.acknowledge_callback = message.acknowledge
@@ -67,7 +68,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
             self._send_reply(conn, ending=True)
 
     def acknowledge(self):
-        self.listener.msg_id_cache.add(self.message)
+        self.listener.msg_id_cache.add(self.unique_id)
         self.acknowledge_callback()
 
     def requeue(self):
@@ -92,12 +93,13 @@ class AMQPListener(base.Listener):
         # FIXME(markmc): logging isn't driver specific
         rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
 
-        self.msg_id_cache.check_duplicate_message(message)
+        unique_id = self.msg_id_cache.check_duplicate_message(message)
         ctxt = rpc_amqp.unpack_context(self.conf, message)
 
         self.incoming.append(AMQPIncomingMessage(self,
                                                  ctxt.to_dict(),
                                                  message,
+                                                 unique_id,
                                                  ctxt.msg_id,
                                                  ctxt.reply_q))
 
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index 77afd41bb..8c0874191 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -206,7 +206,6 @@ class TestSendReceive(test_utils.BaseTestCase):
             senders[i].start()
 
             received = listener.poll()
-            received.message.pop('_unique_id')
             self.assertIsNotNone(received)
             self.assertEqual(received.ctxt, self.ctxt)
             self.assertEqual(received.message, {'tx_id': i})
@@ -303,14 +302,12 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
         senders[0].start()
 
         msgs.append(listener.poll())
-        msgs[-1].message.pop('_unique_id')
         self.assertEqual(msgs[-1].message, {'tx_id': 0})
 
         # Start the second guy, receive his message
         senders[1].start()
 
         msgs.append(listener.poll())
-        msgs[-1].message.pop('_unique_id')
         self.assertEqual(msgs[-1].message, {'tx_id': 1})
 
         # Reply to both in order, making the second thread queue
@@ -605,7 +602,6 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
         producer.publish(msg)
 
         received = listener.poll()
-        received.message.pop('_unique_id')
         self.assertIsNotNone(received)
         self.assertEqual(self.expected_ctxt, received.ctxt)
         self.assertEqual(self.expected, received.message)