diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index e4d0617d9..47967d71c 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -167,8 +167,34 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
                                      'duration': duration})
                     return
 
+    def heartbeat(self):
+        with self.listener.driver._get_connection(
+                rpc_common.PURPOSE_SEND) as conn:
+            self._send_reply(conn, None, None, ending=False)
+
+    # NOTE(sileht): Those have already be ack in RpcListener IO thread
+    # We keep them as noop until all drivers do the same
     def acknowledge(self):
-        self._message_operations_handler.do(self.message.acknowledge)
+        pass
+
+    def requeue(self):
+        pass
+
+
+class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
+    def acknowledge(self):
+        def _do_ack():
+            try:
+                self.message.acknowledge()
+            except Exception as exc:
+                # NOTE(kgiusti): this failure is likely due to a loss of the
+                # connection to the broker.  Not much we can do in this case,
+                # especially considering the Notification has already been
+                # dispatched. This *could* result in message duplication
+                # (unacked msg is returned to the queue by the broker), but the
+                # driver tries to catch that using the msg_id_cache.
+                LOG.warning("Failed to acknowledge received message: %s", exc)
+        self._message_operations_handler.do(_do_ack)
         self.listener.msg_id_cache.add(self.unique_id)
 
     def requeue(self):
@@ -178,12 +204,12 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
         # msg_id_cache, the message will be reconsumed, the only difference is
         # the message stay at the beginning of the queue instead of moving to
         # the end.
-        self._message_operations_handler.do(self.message.requeue)
-
-    def heartbeat(self):
-        with self.listener.driver._get_connection(
-                rpc_common.PURPOSE_SEND) as conn:
-            self._send_reply(conn, None, None, ending=False)
+        def _do_requeue():
+            try:
+                self.message.requeue()
+            except Exception as exc:
+                LOG.warning("Failed to requeue received message: %s", exc)
+        self._message_operations_handler.do(_do_requeue)
 
 
 class ObsoleteReplyQueuesCache(object):
@@ -256,7 +282,7 @@ class AMQPListener(base.PollStyleListener):
         else:
             LOG.debug("received message with unique_id: %s", unique_id)
 
-        self.incoming.append(AMQPIncomingMessage(
+        self.incoming.append(self.message_cls(
             self,
             ctxt.to_dict(),
             message,
@@ -319,6 +345,41 @@ class AMQPListener(base.PollStyleListener):
         self.conn.close()
 
 
+class RpcAMQPListener(AMQPListener):
+    message_cls = AMQPIncomingMessage
+
+    def __call__(self, message):
+        # NOTE(kgiusti): In the original RPC implementation the RPC server
+        # would acknowledge the request THEN process it.  The goal of this was
+        # to prevent duplication if the ack failed.  Should the ack fail the
+        # request would be discarded since the broker would not remove the
+        # request from the queue since no ack was received.  That would lead to
+        # the request being redelivered at some point. However this approach
+        # meant that the ack was issued from the dispatch thread, not the
+        # consumer thread, which is bad since kombu is not thread safe.  So a
+        # change was made to schedule the ack to be sent on the consumer thread
+        # - breaking the ability to catch ack errors before dispatching the
+        # request.  To fix this we do the actual ack here in the consumer
+        # callback and avoid the upcall if the ack fails.  See
+        # https://bugs.launchpad.net/oslo.messaging/+bug/1695746
+        # for all the gory details...
+        try:
+            message.acknowledge()
+        except Exception as exc:
+            LOG.warning("Discarding RPC request due to failed acknowlege: %s",
+                        exc)
+        else:
+            # NOTE(kgiusti): be aware that even if the acknowledge call
+            # succeeds there is no guarantee the broker actually gets the ACK
+            # since acknowledge() simply writes the ACK to the socket (there is
+            # no ACK confirmation coming back from the broker)
+            super(RpcAMQPListener, self).__call__(message)
+
+
+class NotificationAMQPListener(AMQPListener):
+    message_cls = NotificationAMQPIncomingMessage
+
+
 class ReplyWaiters(object):
 
     WAKE_UP = object()
@@ -590,7 +651,7 @@ class AMQPDriverBase(base.BaseDriver):
     def listen(self, target, batch_size, batch_timeout):
         conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
-        listener = AMQPListener(self, conn)
+        listener = RpcAMQPListener(self, conn)
 
         conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                     topic=target.topic,
@@ -608,7 +669,7 @@ class AMQPDriverBase(base.BaseDriver):
                                  batch_size, batch_timeout):
         conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
-        listener = AMQPListener(self, conn)
+        listener = NotificationAMQPListener(self, conn)
         for target, priority in targets_and_priorities:
             conn.declare_topic_consumer(
                 exchange_name=self._get_exchange(target),
diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py
index 18d5869d3..9dac110cc 100644
--- a/oslo_messaging/rpc/server.py
+++ b/oslo_messaging/rpc/server.py
@@ -152,6 +152,9 @@ class RPCServer(msg_server.MessageHandlingServer):
 
     def _process_incoming(self, incoming):
         message = incoming[0]
+
+        # TODO(sileht): We should remove that at some point and do
+        # this directly in the driver
         try:
             message.acknowledge()
         except Exception: