diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py
index 9be6f0eaf..6b9c88ec9 100644
--- a/oslo/messaging/_drivers/amqp.py
+++ b/oslo/messaging/_drivers/amqp.py
@@ -318,12 +318,16 @@ class _MsgIdCache(object):
         """AMQP consumers may read same message twice when exceptions occur
            before ack is returned. This method prevents doing it.
         """
+        if UNIQUE_ID in message_data:
+            msg_id = message_data.get(UNIQUE_ID)
+            if msg_id in self.prev_msgids:
+                raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+    def add(self, message_data):
         if UNIQUE_ID in message_data:
             msg_id = message_data.pop(UNIQUE_ID)
             if msg_id not in self.prev_msgids:
                 self.prev_msgids.append(msg_id)
-            else:
-                raise rpc_common.DuplicateMessageError(msg_id=msg_id)
 
 
 def _add_unique_id(msg):
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 1b1cac494..8ccba92ac 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -37,7 +37,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
 
         self.msg_id = msg_id
         self.reply_q = reply_q
-        self.acknowledge = message.acknowledge
+        self.acknowledge_callback = message.acknowledge
+        self.requeue_callback = message.requeue
 
     def _send_reply(self, conn, reply=None, failure=None,
                     ending=False, log_failure=True):
@@ -65,6 +66,19 @@ class AMQPIncomingMessage(base.IncomingMessage):
             self._send_reply(conn, reply, failure, log_failure=log_failure)
             self._send_reply(conn, ending=True)
 
+    def acknowledge(self):
+        self.listener.msg_id_cache.add(self.message)
+        self.acknowledge_callback()
+
+    def requeue(self):
+        # NOTE(sileht): In case of the connection is lost between receiving the
+        # message and requeing it, this requeue call fail
+        # but because the message is not acknowledged and not added to the
+        # msg_id_cache, the message will be reconsumed, the only difference is
+        # the message stay at the beginning of the queue instead of moving to
+        # the end.
+        self.requeue_callback()
+
 
 class AMQPListener(base.Listener):
 
diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py
index 83b9b9ef2..dcf08287e 100644
--- a/oslo/messaging/_drivers/base.py
+++ b/oslo/messaging/_drivers/base.py
@@ -40,6 +40,10 @@ class IncomingMessage(object):
     def acknowledge(self):
         "Acknowledge the message."
 
+    @abc.abstractmethod
+    def requeue(self):
+        "Requeue the message."
+
 
 @six.add_metaclass(abc.ABCMeta)
 class Listener(object):
diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py
index a588db035..2bfbe16b3 100644
--- a/oslo/messaging/_drivers/impl_fake.py
+++ b/oslo/messaging/_drivers/impl_fake.py
@@ -26,8 +26,9 @@ from oslo.messaging._drivers import base
 
 
 class FakeIncomingMessage(base.IncomingMessage):
-    def __init__(self, listener, ctxt, message, reply_q):
+    def __init__(self, listener, ctxt, message, reply_q, requeue):
         super(FakeIncomingMessage, self).__init__(listener, ctxt, message)
+        self.requeue_callback = requeue
         self._reply_q = reply_q
 
     def reply(self, reply=None, failure=None, log_failure=True):
@@ -35,6 +36,9 @@ class FakeIncomingMessage(base.IncomingMessage):
             failure = failure[1] if failure else None
             self._reply_q.put((reply, failure))
 
+    def requeue(self):
+        self.requeue_callback()
+
 
 class FakeListener(base.Listener):
 
@@ -46,10 +50,11 @@ class FakeListener(base.Listener):
     def poll(self):
         while True:
             for target in self._targets:
-                (ctxt, message, reply_q) = self._exchange.poll(target)
+                (ctxt, message, reply_q, requeue) = \
+                    self._exchange.poll(target)
                 if message is not None:
-                    message = FakeIncomingMessage(self, ctxt, message, reply_q)
-                    message.acknowledge()
+                    message = FakeIncomingMessage(self, ctxt, message,
+                                                  reply_q, requeue)
                     return message
             time.sleep(.05)
 
@@ -58,7 +63,7 @@ class FakeExchange(object):
 
     def __init__(self, name):
         self.name = name
-        self._queues_lock = threading.Lock()
+        self._queues_lock = threading.RLock()
         self._topic_queues = {}
         self._server_queues = {}
 
@@ -78,8 +83,13 @@ class FakeExchange(object):
                 queues = [self._get_server_queue(topic, server)]
             else:
                 queues = [self._get_topic_queue(topic)]
+
+            def requeue():
+                self.deliver_message(topic, ctxt, message, server=server,
+                                     fanout=fanout, reply_q=reply_q)
+
             for queue in queues:
-                queue.append((ctxt, message, reply_q))
+                queue.append((ctxt, message, reply_q, requeue))
 
     def poll(self, target):
         with self._queues_lock:
@@ -87,7 +97,7 @@ class FakeExchange(object):
                 queue = self._get_server_queue(target.topic, target.server)
             else:
                 queue = self._get_topic_queue(target.topic)
-            return queue.pop(0) if queue else (None, None, None)
+            return queue.pop(0) if queue else (None, None, None, None)
 
 
 class FakeDriver(base.BaseDriver):
diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py
index bac02c7e5..c53ad8946 100644
--- a/oslo/messaging/_drivers/impl_qpid.py
+++ b/oslo/messaging/_drivers/impl_qpid.py
@@ -99,6 +99,10 @@ class QpidMessage(dict):
     def acknowledge(self):
         self._session.acknowledge(self._raw_message)
 
+    def requeue(self):
+        raise NotImplementedError('The QPID driver does not yet support '
+                                  'requeuing messages')
+
 
 class ConsumerBase(object):
     """Consumer base class."""
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 0bc6a8bda..be41a4ec1 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -128,6 +128,9 @@ class RabbitMessage(dict):
     def acknowledge(self):
         self._raw_message.ack()
 
+    def requeue(self):
+        self._raw_message.requeue()
+
 
 class ConsumerBase(object):
     """Consumer base class."""
diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py
index d9869389c..6b2b53957 100644
--- a/oslo/messaging/_drivers/impl_zmq.py
+++ b/oslo/messaging/_drivers/impl_zmq.py
@@ -843,6 +843,10 @@ class ZmqIncomingMessage(base.IncomingMessage):
         with self.condition:
             self.condition.notify()
 
+    def requeue(self):
+        raise NotImplementedError('The ZeroMQ driver does not yet support '
+                                  'requeuing messages')
+
 
 class ZmqListener(base.Listener):
 
@@ -960,6 +964,8 @@ class ZmqDriver(base.BaseDriver):
         return listener
 
     def listen_for_notifications(self, targets_and_priorities):
+        # NOTE(sileht): this listener implementation is limited
+        # because zeromq doesn't support requeing message
         conn = create_connection(self.conf)
 
         listener = ZmqListener(self, None)
diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py
index 4b87d72c3..92d726ba7 100644
--- a/oslo/messaging/notify/__init__.py
+++ b/oslo/messaging/notify/__init__.py
@@ -15,8 +15,10 @@
 
 __all__ = ['Notifier',
            'LoggingNotificationHandler',
-           'get_notification_listener']
+           'get_notification_listener',
+           'NotificationResult']
 
 from .notifier import *
 from .listener import *
 from .logger import *
+from .dispatcher import NotificationResult
diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py
index 1bcf84c93..87deee854 100644
--- a/oslo/messaging/notify/dispatcher.py
+++ b/oslo/messaging/notify/dispatcher.py
@@ -28,6 +28,11 @@ LOG = logging.getLogger(__name__)
 PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
 
 
+class NotificationResult(object):
+    HANDLED = 'handled'
+    REQUEUE = 'requeue'
+
+
 class NotificationDispatcher(object):
     """A message dispatcher which understands Notification messages.
 
@@ -59,8 +64,15 @@ class NotificationDispatcher(object):
 
     @contextlib.contextmanager
     def __call__(self, incoming):
-        yield lambda: self._dispatch_and_handle_error(incoming)
-        incoming.acknowledge()
+        result_wrapper = []
+
+        yield lambda: result_wrapper.append(
+            self._dispatch_and_handle_error(incoming))
+
+        if result_wrapper[0] == NotificationResult.HANDLED:
+            incoming.acknowledge()
+        else:
+            incoming.requeue()
 
     def _dispatch_and_handle_error(self, incoming):
         """Dispatch a notification message to the appropriate endpoint method.
@@ -69,12 +81,13 @@ class NotificationDispatcher(object):
         :type ctxt: IncomingMessage
         """
         try:
-            self._dispatch(incoming.ctxt, incoming.message)
+            return self._dispatch(incoming.ctxt, incoming.message)
         except Exception:
             # sys.exc_info() is deleted by LOG.exception().
             exc_info = sys.exc_info()
             LOG.error('Exception during message handling',
                       exc_info=exc_info)
+            return NotificationResult.HANDLED
 
     def _dispatch(self, ctxt, message):
         """Dispatch an RPC message to the appropriate endpoint method.
@@ -99,6 +112,10 @@ class NotificationDispatcher(object):
         for callback in self._callbacks_by_priority.get(priority, []):
             localcontext.set_local_context(ctxt)
             try:
-                callback(ctxt, publisher_id, event_type, payload)
+                ret = callback(ctxt, publisher_id, event_type, payload)
+                ret = NotificationResult.HANDLED if ret is None else ret
+                if ret != NotificationResult.HANDLED:
+                    return ret
             finally:
                 localcontext.clear_local_context()
+        return NotificationResult.HANDLED
diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py
index f7384c148..06a2e24ce 100644
--- a/oslo/messaging/notify/listener.py
+++ b/oslo/messaging/notify/listener.py
@@ -73,6 +73,24 @@ priority
 Parameters to endpoint methods are the request context supplied by the client,
 the publisher_id of the notification message, the event_type, the payload.
 
+An endpoint method can return explicitly messaging.NotificationResult.HANDLED
+to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
+message.
+
+The message is acknowledge only if all endpoints return
+messaging.NotificationResult.HANDLED
+
+If nothing is returned by an endpoint, this is considered like
+messaging.NotificationResult.HANDLED
+
+messaging.NotificationResult values needs to be handled by drivers:
+
+* HANDLED: supported by all drivers
+* REQUEUE: supported by drivers: fake://, rabbit://
+
+In case of an unsupported driver nothing is done to the message and a
+NotImplementedError is raised and logged.
+
 By supplying a serializer object, a listener can deserialize a request context
 and arguments from - and serialize return values to - primitive types.
 """
diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py
index 24b6e6b32..3e2c966a1 100644
--- a/tests/test_notify_dispatcher.py
+++ b/tests/test_notify_dispatcher.py
@@ -41,24 +41,58 @@ class TestDispatcher(test_utils.BaseTestCase):
         ('no_endpoints',
          dict(endpoints=[],
               endpoints_expect_calls=[],
-              priority='info')),
+              priority='info',
+              ex=None,
+              return_value=messaging.NotificationResult.HANDLED)),
         ('one_endpoints',
          dict(endpoints=[['warn']],
               endpoints_expect_calls=['warn'],
-              priority='warn')),
+              priority='warn',
+              ex=None,
+              return_value=messaging.NotificationResult.HANDLED)),
         ('two_endpoints_only_one_match',
          dict(endpoints=[['warn'], ['info']],
               endpoints_expect_calls=[None, 'info'],
-              priority='info')),
+              priority='info',
+              ex=None,
+              return_value=messaging.NotificationResult.HANDLED)),
         ('two_endpoints_both_match',
          dict(endpoints=[['debug', 'info'], ['info', 'debug']],
               endpoints_expect_calls=['debug', 'debug'],
-              priority='debug')),
+              priority='debug',
+              ex=None,
+              return_value=messaging.NotificationResult.HANDLED)),
+        ('no_return_value',
+         dict(endpoints=[['warn']],
+              endpoints_expect_calls=['warn'],
+              priority='warn',
+              ex=None, return_value=None)),
+        ('requeue',
+         dict(endpoints=[['debug', 'warn']],
+              endpoints_expect_calls=['debug'],
+              priority='debug', msg=notification_msg,
+              ex=None,
+              return_value=messaging.NotificationResult.REQUEUE)),
+        ('exception',
+         dict(endpoints=[['debug', 'warn']],
+              endpoints_expect_calls=['debug'],
+              priority='debug', msg=notification_msg,
+              ex=Exception,
+              return_value=messaging.NotificationResult.HANDLED)),
     ]
 
     def test_dispatcher(self):
-        endpoints = [mock.Mock(spec=endpoint_methods)
-                     for endpoint_methods in self.endpoints]
+        endpoints = []
+        for endpoint_methods in self.endpoints:
+            e = mock.Mock(spec=endpoint_methods)
+            endpoints.append(e)
+            for m in endpoint_methods:
+                method = getattr(e, m)
+                if self.ex:
+                    method.side_effect = self.ex()
+                else:
+                    method.return_value = self.return_value
+
         msg = notification_msg.copy()
         msg['priority'] = self.priority
 
@@ -89,6 +123,17 @@ class TestDispatcher(test_utils.BaseTestCase):
                 else:
                     self.assertEqual(endpoints[i].call_count, 0)
 
+        if self.ex:
+            self.assertEqual(incoming.acknowledge.call_count, 1)
+            self.assertEqual(incoming.requeue.call_count, 0)
+        elif self.return_value == messaging.NotificationResult.HANDLED \
+                or self.return_value is None:
+            self.assertEqual(incoming.acknowledge.call_count, 1)
+            self.assertEqual(incoming.requeue.call_count, 0)
+        elif self.return_value == messaging.NotificationResult.REQUEUE:
+            self.assertEqual(incoming.acknowledge.call_count, 0)
+            self.assertEqual(incoming.requeue.call_count, 1)
+
     @mock.patch('oslo.messaging.notify.dispatcher.LOG')
     def test_dispatcher_unknown_prio(self, mylog):
         msg = notification_msg.copy()
diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py
index 7bb30b41b..3b35a3bb1 100644
--- a/tests/test_notify_listener.py
+++ b/tests/test_notify_listener.py
@@ -123,7 +123,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
         transport = messaging.get_transport(self.conf, url='fake:')
 
         endpoint = mock.Mock()
-        endpoint.info = mock.Mock()
+        endpoint.info.return_value = None
         listener_thread = self._setup_listener(transport, [endpoint], 1)
 
         notifier = self._setup_notifier(transport)
@@ -138,7 +138,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
         transport = messaging.get_transport(self.conf, url='fake:')
 
         endpoint = mock.Mock()
-        endpoint.info = mock.Mock()
+        endpoint.info.return_value = None
         topics = ["topic1", "topic2"]
         listener_thread = self._setup_listener(transport, [endpoint], 2,
                                                topics=topics)
@@ -157,9 +157,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
         transport = messaging.get_transport(self.conf, url='fake:')
 
         endpoint1 = mock.Mock()
-        endpoint1.info = mock.Mock()
+        endpoint1.info.return_value = None
         endpoint2 = mock.Mock()
-        endpoint2.info = mock.Mock()
+        endpoint2.info.return_value = messaging.NotificationResult.HANDLED
         listener_thread = self._setup_listener(transport,
                                                [endpoint1, endpoint2], 1)
         notifier = self._setup_notifier(transport)
@@ -171,3 +171,25 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
             {}, 'testpublisher', 'an_event.start', 'test')
         endpoint2.info.assert_called_once_with(
             {}, 'testpublisher', 'an_event.start', 'test')
+
+    def test_requeue(self):
+        transport = messaging.get_transport(self.conf, url='fake:')
+        endpoint = mock.Mock()
+        endpoint.info = mock.Mock()
+
+        def side_effect_requeue(*args, **kwargs):
+            if endpoint.info.call_count == 1:
+                return messaging.NotificationResult.REQUEUE
+            return messaging.NotificationResult.HANDLED
+
+        endpoint.info.side_effect = side_effect_requeue
+        listener_thread = self._setup_listener(transport,
+                                               [endpoint], 2)
+        notifier = self._setup_notifier(transport)
+        notifier.info({}, 'an_event.start', 'test')
+
+        self._stop_listener(listener_thread)
+
+        expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'),
+                    mock.call({}, 'testpublisher', 'an_event.start', 'test')]
+        self.assertEqual(endpoint.info.call_args_list, expected)
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index 8c0874191..77afd41bb 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -206,6 +206,7 @@ class TestSendReceive(test_utils.BaseTestCase):
             senders[i].start()
 
             received = listener.poll()
+            received.message.pop('_unique_id')
             self.assertIsNotNone(received)
             self.assertEqual(received.ctxt, self.ctxt)
             self.assertEqual(received.message, {'tx_id': i})
@@ -302,12 +303,14 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
         senders[0].start()
 
         msgs.append(listener.poll())
+        msgs[-1].message.pop('_unique_id')
         self.assertEqual(msgs[-1].message, {'tx_id': 0})
 
         # Start the second guy, receive his message
         senders[1].start()
 
         msgs.append(listener.poll())
+        msgs[-1].message.pop('_unique_id')
         self.assertEqual(msgs[-1].message, {'tx_id': 1})
 
         # Reply to both in order, making the second thread queue
@@ -602,6 +605,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
         producer.publish(msg)
 
         received = listener.poll()
+        received.message.pop('_unique_id')
         self.assertIsNotNone(received)
         self.assertEqual(self.expected_ctxt, received.ctxt)
         self.assertEqual(self.expected, received.message)