diff --git a/doc/source/index.rst b/doc/source/index.rst
index c4ff4301c..19156babe 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -15,6 +15,7 @@ Contents
    server
    rpcclient
    notifier
+   notification_listener
    serializer
    exceptions
    opts
diff --git a/doc/source/notification_listener.rst b/doc/source/notification_listener.rst
new file mode 100644
index 000000000..4fa06617d
--- /dev/null
+++ b/doc/source/notification_listener.rst
@@ -0,0 +1,14 @@
+---------------------
+Notification Listener
+---------------------
+
+.. automodule:: oslo.messaging.notify.listener
+
+.. currentmodule:: oslo.messaging
+
+.. autofunction:: get_notification_listener
+
+.. autoclass:: MessageHandlingServer
+   :members:
+
+.. autofunction:: get_local_context
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 9935690d5..8e9813207 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -404,6 +404,16 @@ class AMQPDriverBase(base.BaseDriver):
 
         return listener
 
+    def listen_for_notifications(self, targets_and_priorities):
+        conn = self._get_connection(pooled=False)
+
+        listener = AMQPListener(self, conn)
+        for target, priority in targets_and_priorities:
+            conn.declare_topic_consumer('%s.%s' % (target.topic, priority),
+                                        callback=listener,
+                                        exchange_name=target.exchange)
+        return listener
+
     def cleanup(self):
         if self._connection_pool:
             self._connection_pool.empty()
diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py
index b34997f4a..9803e1935 100644
--- a/oslo/messaging/_drivers/base.py
+++ b/oslo/messaging/_drivers/base.py
@@ -73,6 +73,12 @@ class BaseDriver(object):
     def listen(self, target):
         """Construct a Listener for the given target."""
 
+    @abc.abstractmethod
+    def listen_for_notifications(self, targets_and_priorities):
+        """Construct a notification Listener for the given list of
+        tuple of (target, priority).
+        """
+
     @abc.abstractmethod
     def cleanup(self):
         """Release all resources."""
diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py
index 06943f6a2..b43d8ee9d 100644
--- a/oslo/messaging/_drivers/impl_fake.py
+++ b/oslo/messaging/_drivers/impl_fake.py
@@ -161,5 +161,15 @@ class FakeDriver(base.BaseDriver):
                                  messaging.Target(topic=target.topic)])
         return listener
 
+    def listen_for_notifications(self, targets_and_priorities):
+        # TODO(sileht): Handle the target.exchange
+        exchange = self._get_exchange(self._default_exchange)
+
+        targets = [messaging.Target(topic='%s.%s' % (target.topic, priority))
+                   for target, priority in targets_and_priorities]
+        listener = FakeListener(self, exchange, targets)
+
+        return listener
+
     def cleanup(self):
         pass
diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py
index 0e65f9e66..a3c53c42b 100644
--- a/oslo/messaging/_drivers/impl_zmq.py
+++ b/oslo/messaging/_drivers/impl_zmq.py
@@ -959,5 +959,19 @@ class ZmqDriver(base.BaseDriver):
 
         return listener
 
+    def listen_for_notifications(self, targets_and_priorities):
+        conn = create_connection(self.conf)
+
+        listener = ZmqListener(self, None)
+        for target, priority in targets_and_priorities:
+            # NOTE(ewindisch): dot-priority in rpc notifier does not
+            # work with our assumptions.
+            # NOTE(sileht): create_consumer doesn't support target.exchange
+            conn.create_consumer('%s-%s' % (target.topic, priority),
+                                 listener)
+        conn.consume_in_thread()
+
+        return listener
+
     def cleanup(self):
         cleanup()
diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py
index b368e337a..4b87d72c3 100644
--- a/oslo/messaging/notify/__init__.py
+++ b/oslo/messaging/notify/__init__.py
@@ -14,7 +14,9 @@
 #    under the License.
 
 __all__ = ['Notifier',
-           'LoggingNotificationHandler']
+           'LoggingNotificationHandler',
+           'get_notification_listener']
 
 from .notifier import *
+from .listener import *
 from .logger import *
diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py
new file mode 100644
index 000000000..f36c3925f
--- /dev/null
+++ b/oslo/messaging/notify/dispatcher.py
@@ -0,0 +1,83 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+# Copyright 2013 eNovance
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import itertools
+import logging
+
+from oslo.messaging import localcontext
+from oslo.messaging import serializer as msg_serializer
+
+
+LOG = logging.getLogger(__name__)
+
+PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
+
+
+class NotificationDispatcher(object):
+    """A message dispatcher which understands Notification messages.
+
+    A MessageHandlingServer is constructed by passing a callable dispatcher
+    which is invoked with context and message dictionaries each time a message
+    is received.
+
+    NotifcationDispatcher is one such dispatcher which pass a raw notification
+    message to the endpoints
+    """
+
+    def __init__(self, targets, endpoints, serializer):
+        self.targets = targets
+        self.endpoints = endpoints
+        self.serializer = serializer or msg_serializer.NoOpSerializer()
+
+        self._callbacks_by_priority = {}
+        for endpoint, prio in itertools.product(endpoints, PRIORITIES):
+            if hasattr(endpoint, prio):
+                method = getattr(endpoint, prio)
+                self._callbacks_by_priority.setdefault(prio, []).append(method)
+
+        priorities = self._callbacks_by_priority.keys()
+        self._targets_priorities = set(itertools.product(self.targets,
+                                                         priorities))
+
+    def _listen(self, transport):
+        return transport._listen_for_notifications(self._targets_priorities)
+
+    def __call__(self, ctxt, message):
+        """Dispatch an RPC message to the appropriate endpoint method.
+
+        :param ctxt: the request context
+        :type ctxt: dict
+        :param message: the message payload
+        :type message: dict
+        """
+        ctxt = self.serializer.deserialize_context(ctxt)
+
+        publisher_id = message.get('publisher_id')
+        event_type = message.get('event_type')
+        priority = message.get('priority', '').lower()
+        if priority not in PRIORITIES:
+            LOG.warning('Unknown priority "%s"' % priority)
+            return
+
+        payload = self.serializer.deserialize_entity(ctxt,
+                                                     message.get('payload'))
+
+        for callback in self._callbacks_by_priority.get(priority, []):
+            localcontext.set_local_context(ctxt)
+            try:
+                callback(ctxt, publisher_id, event_type, payload)
+            finally:
+                localcontext.clear_local_context()
diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py
new file mode 100644
index 000000000..f7384c148
--- /dev/null
+++ b/oslo/messaging/notify/listener.py
@@ -0,0 +1,105 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+# Copyright 2013 eNovance
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+A notification listener exposes a number of endpoints, each of which
+contain a set of methods. Each method corresponds to a notification priority.
+
+To create a notification listener, you supply a transport, list of targets and
+a list of endpoints.
+
+A transport can be obtained simply by calling the get_transport() method::
+
+    transport = messaging.get_transport(conf)
+
+which will load the appropriate transport driver according to the user's
+messaging configuration configuration. See get_transport() for more details.
+
+The target supplied when creating a notification listener expresses the topic
+and - optionally - the exchange to listen on. See Target for more details
+on these attributes.
+
+Notification listener have start(), stop() and wait() messages to begin
+handling requests, stop handling requests and wait for all in-process
+requests to complete.
+
+Each notification listener is associated with an executor which integrates the
+listener with a specific I/O handling framework. Currently, there are blocking
+and eventlet executors available.
+
+A simple example of a notification listener with multiple endpoints might be::
+
+    from oslo.config import cfg
+    from oslo import messaging
+
+    class NotificationEndpoint(object):
+        def warn(self, ctxt, publisher_id, event_type, payload):
+            do_something(payload)
+
+    class ErrorEndpoint(object):
+        def error(self, ctxt, publisher_id, event_type, payload):
+            do_something(payload)
+
+    transport = messaging.get_transport(cfg.CONF)
+    targets = [
+        messaging.Target(topic='notifications')
+        messaging.Target(topic='notifications_bis')
+    ]
+    endpoints = [
+        NotificationEndpoint(),
+        ErrorEndpoint(),
+    ]
+    server = messaging.get_notification_listener(transport, targets, endpoints)
+    server.start()
+    server.wait()
+
+A notifier sends a notification on a topic with a priority, the notification
+listener will receive this notification if the topic of this one have been set
+in one of the targets and if an endpoint implements the method named like the
+priority
+
+Parameters to endpoint methods are the request context supplied by the client,
+the publisher_id of the notification message, the event_type, the payload.
+
+By supplying a serializer object, a listener can deserialize a request context
+and arguments from - and serialize return values to - primitive types.
+"""
+
+from oslo.messaging.notify import dispatcher as notify_dispatcher
+from oslo.messaging import server as msg_server
+
+
+def get_notification_listener(transport, targets, endpoints,
+                              executor='blocking', serializer=None):
+    """Construct a notification listener
+
+    The executor parameter controls how incoming messages will be received and
+    dispatched. By default, the most simple executor is used - the blocking
+    executor.
+
+    :param transport: the messaging transport
+    :type transport: Transport
+    :param targets: the exchanges and topics to listen on
+    :type targets: list of Target
+    :param endpoints: a list of endpoint objects
+    :type endpoints: list
+    :param executor: name of a message executor - e.g. 'eventlet', 'blocking'
+    :type executor: str
+    :param serializer: an optional entity serializer
+    :type serializer: Serializer
+    """
+    dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
+                                                          serializer)
+    return msg_server.MessageHandlingServer(transport, dispatcher, executor)
diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py
index 9e8e9d78c..7c8a3be97 100644
--- a/oslo/messaging/transport.py
+++ b/oslo/messaging/transport.py
@@ -99,6 +99,14 @@ class Transport(object):
                                            target)
         return self._driver.listen(target)
 
+    def _listen_for_notifications(self, targets_and_priorities):
+        for target, priority in targets_and_priorities:
+            if not target.topic:
+                raise exceptions.InvalidTarget('A target must have '
+                                               'topic specified',
+                                               target)
+        return self._driver.listen_for_notifications(targets_and_priorities)
+
     def cleanup(self):
         """Release all resources associated with this transport."""
         self._driver.cleanup()
diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py
new file mode 100644
index 000000000..e1c1f9abb
--- /dev/null
+++ b/tests/test_notify_dispatcher.py
@@ -0,0 +1,98 @@
+
+# Copyright 2013 eNovance
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import itertools
+
+import mock
+import testscenarios
+
+from oslo import messaging
+from oslo.messaging.notify import dispatcher as notify_dispatcher
+from oslo.messaging.openstack.common import timeutils
+from tests import utils as test_utils
+
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+notification_msg = dict(
+    publisher_id="publisher_id",
+    event_type="compute.start",
+    payload={"info": "fuu"},
+    message_id="uuid",
+    timestamp=str(timeutils.utcnow())
+)
+
+
+class TestDispatcher(test_utils.BaseTestCase):
+
+    scenarios = [
+        ('no_endpoints',
+         dict(endpoints=[],
+              endpoints_expect_calls=[],
+              priority='info')),
+        ('one_endpoints',
+         dict(endpoints=[['warn']],
+              endpoints_expect_calls=['warn'],
+              priority='warn')),
+        ('two_endpoints_only_one_match',
+         dict(endpoints=[['warn'], ['info']],
+              endpoints_expect_calls=[None, 'info'],
+              priority='info')),
+        ('two_endpoints_both_match',
+         dict(endpoints=[['debug', 'info'], ['info', 'debug']],
+              endpoints_expect_calls=['debug', 'debug'],
+              priority='debug')),
+    ]
+
+    def test_dispatcher(self):
+        endpoints = [mock.Mock(spec=endpoint_methods)
+                     for endpoint_methods in self.endpoints]
+        msg = notification_msg.copy()
+        msg['priority'] = self.priority
+
+        targets = [messaging.Target(topic='notifications')]
+        dispatcher = notify_dispatcher.NotificationDispatcher(targets,
+                                                              endpoints,
+                                                              None)
+
+        # check it listen on wanted topics
+        self.assertEqual(sorted(dispatcher._targets_priorities),
+                         sorted(set((targets[0], prio)
+                                    for prio in itertools.chain.from_iterable(
+                                        self.endpoints))))
+
+        dispatcher({}, msg)
+
+        # check endpoint callbacks are called or not
+        for i, endpoint_methods in enumerate(self.endpoints):
+            for m in endpoint_methods:
+                if m == self.endpoints_expect_calls[i]:
+                    method = getattr(endpoints[i], m)
+                    expected = [mock.call({}, msg['publisher_id'],
+                                          msg['event_type'],
+                                          msg['payload'])]
+                    self.assertEqual(method.call_args_list, expected)
+                else:
+                    self.assertEqual(endpoints[i].call_count, 0)
+
+    @mock.patch('oslo.messaging.notify.dispatcher.LOG')
+    def test_dispatcher_unknown_prio(self, mylog):
+        msg = notification_msg.copy()
+        msg['priority'] = 'what???'
+        dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()],
+                                                              [mock.Mock()],
+                                                              None)
+        dispatcher({}, msg)
+        mylog.warning.assert_called_once_with('Unknown priority "what???"')
diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py
new file mode 100644
index 000000000..7bb30b41b
--- /dev/null
+++ b/tests/test_notify_listener.py
@@ -0,0 +1,173 @@
+
+# Copyright 2013 eNovance
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import threading
+
+import mock
+from oslo.config import cfg
+import testscenarios
+
+from oslo import messaging
+from oslo.messaging.notify import dispatcher
+from tests import utils as test_utils
+
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+class ListenerSetupMixin(object):
+
+    class Listener(object):
+        def __init__(self, transport, topics, endpoints, expect_messages):
+            targets = [messaging.Target(topic=topic)
+                       for topic in topics]
+            self._expect_messages = expect_messages
+            self._received_msgs = 0
+            self._listener = messaging.get_notification_listener(
+                transport, targets, endpoints + [self])
+
+        def info(self, ctxt, publisher_id, event_type, payload):
+            self._received_msgs += 1
+            if self._expect_messages == self._received_msgs:
+                # Check start() does nothing with a running listener
+                self._listener.start()
+                self._listener.stop()
+                self._listener.wait()
+
+        def start(self):
+            self._listener.start()
+
+    def _setup_listener(self, transport, endpoints, expect_messages,
+                        topics=None):
+        listener = self.Listener(transport,
+                                 topics=topics or ['testtopic'],
+                                 expect_messages=expect_messages,
+                                 endpoints=endpoints)
+
+        thread = threading.Thread(target=listener.start)
+        thread.daemon = True
+        thread.start()
+        return thread
+
+    def _stop_listener(self, thread):
+        thread.join(timeout=5)
+
+    def _setup_notifier(self, transport, topic='testtopic',
+                        publisher_id='testpublisher'):
+        return messaging.Notifier(transport, topic=topic,
+                                  driver='messaging',
+                                  publisher_id=publisher_id)
+
+
+class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
+
+    def __init__(self, *args):
+        super(TestNotifyListener, self).__init__(*args)
+        ListenerSetupMixin.__init__(self)
+
+    def setUp(self):
+        super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
+
+    def test_constructor(self):
+        transport = messaging.get_transport(self.conf, url='fake:')
+        target = messaging.Target(topic='foo')
+        endpoints = [object()]
+
+        listener = messaging.get_notification_listener(transport, [target],
+                                                       endpoints)
+
+        self.assertIs(listener.conf, self.conf)
+        self.assertIs(listener.transport, transport)
+        self.assertIsInstance(listener.dispatcher,
+                              dispatcher.NotificationDispatcher)
+        self.assertIs(listener.dispatcher.endpoints, endpoints)
+        self.assertIs(listener.executor, 'blocking')
+
+    def test_no_target_topic(self):
+        transport = messaging.get_transport(self.conf, url='fake:')
+
+        listener = messaging.get_notification_listener(transport,
+                                                       [messaging.Target()],
+                                                       [mock.Mock()])
+        try:
+            listener.start()
+        except Exception as ex:
+            self.assertIsInstance(ex, messaging.InvalidTarget, ex)
+        else:
+            self.assertTrue(False)
+
+    def test_unknown_executor(self):
+        transport = messaging.get_transport(self.conf, url='fake:')
+
+        try:
+            messaging.get_notification_listener(transport, [], [],
+                                                executor='foo')
+        except Exception as ex:
+            self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
+            self.assertEqual(ex.executor, 'foo')
+        else:
+            self.assertTrue(False)
+
+    def test_one_topic(self):
+        transport = messaging.get_transport(self.conf, url='fake:')
+
+        endpoint = mock.Mock()
+        endpoint.info = mock.Mock()
+        listener_thread = self._setup_listener(transport, [endpoint], 1)
+
+        notifier = self._setup_notifier(transport)
+        notifier.info({}, 'an_event.start', 'test message')
+
+        self._stop_listener(listener_thread)
+
+        endpoint.info.assert_called_once_with(
+            {}, 'testpublisher', 'an_event.start', 'test message')
+
+    def test_two_topics(self):
+        transport = messaging.get_transport(self.conf, url='fake:')
+
+        endpoint = mock.Mock()
+        endpoint.info = mock.Mock()
+        topics = ["topic1", "topic2"]
+        listener_thread = self._setup_listener(transport, [endpoint], 2,
+                                               topics=topics)
+        notifier = self._setup_notifier(transport, topic='topic1')
+        notifier.info({}, 'an_event.start1', 'test')
+        notifier = self._setup_notifier(transport, topic='topic2')
+        notifier.info({}, 'an_event.start2', 'test')
+
+        self._stop_listener(listener_thread)
+
+        expected = [mock.call({}, 'testpublisher', 'an_event.start1', 'test'),
+                    mock.call({}, 'testpublisher', 'an_event.start2', 'test')]
+        self.assertEqual(sorted(endpoint.info.call_args_list), expected)
+
+    def test_two_endpoints(self):
+        transport = messaging.get_transport(self.conf, url='fake:')
+
+        endpoint1 = mock.Mock()
+        endpoint1.info = mock.Mock()
+        endpoint2 = mock.Mock()
+        endpoint2.info = mock.Mock()
+        listener_thread = self._setup_listener(transport,
+                                               [endpoint1, endpoint2], 1)
+        notifier = self._setup_notifier(transport)
+        notifier.info({}, 'an_event.start', 'test')
+
+        self._stop_listener(listener_thread)
+
+        endpoint1.info.assert_called_once_with(
+            {}, 'testpublisher', 'an_event.start', 'test')
+        endpoint2.info.assert_called_once_with(
+            {}, 'testpublisher', 'an_event.start', 'test')
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index 4e1273cb6..8c0874191 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -108,6 +108,11 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
         self._driver.listen(self._target)
         self.assertEqual(self._server_params[0], self.expected)
 
+    def test_transport_url_listen_for_notification(self):
+        self._driver.listen_for_notifications(
+            [(messaging.Target(topic='topic'), 'info')])
+        self.assertEqual(self._server_params[0], self.expected)
+
     def test_transport_url_send(self):
         self._driver.send(self._target, {}, {})
         self.assertEqual(self._server_params[0], self.expected)