diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py
new file mode 100644
index 000000000..5c4077478
--- /dev/null
+++ b/tests/functional/test_functional.py
@@ -0,0 +1,279 @@
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from oslo import messaging
+
+from testtools import matchers
+
+from tests.functional.utils import ClientStub
+from tests.functional.utils import IsValidDistributionOf
+from tests.functional.utils import NotificationFixture
+from tests.functional.utils import RpcServerGroupFixture
+from tests.functional.utils import SkipIfNoTransportURL
+from tests.functional.utils import TransportFixture
+
+
+class CallTestCase(SkipIfNoTransportURL):
+    def test_specific_server(self):
+        group = self.useFixture(RpcServerGroupFixture(self.url))
+        client = group.client(1)
+        client.append(text='open')
+        self.assertEqual('openstack', client.append(text='stack'))
+        client.add(increment=2)
+        self.assertEqual(12, client.add(increment=10))
+        self.assertEqual(9, client.subtract(increment=3))
+        self.assertEqual('openstack', group.servers[1].endpoint.sval)
+        self.assertEqual(9, group.servers[1].endpoint.ival)
+        for i in [0, 2]:
+            self.assertEqual('', group.servers[i].endpoint.sval)
+            self.assertEqual(0, group.servers[i].endpoint.ival)
+
+    def test_server_in_group(self):
+        group = self.useFixture(RpcServerGroupFixture(self.url))
+
+        client = group.client()
+        data = [c for c in 'abcdefghijklmn']
+        for i in data:
+            client.append(text=i)
+
+        for s in group.servers:
+            self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
+        actual = [[c for c in s.endpoint.sval] for s in group.servers]
+        self.assertThat(actual, IsValidDistributionOf(data))
+
+    def test_different_exchanges(self):
+        t = self.useFixture(TransportFixture(self.url))
+        # If the different exchanges are not honoured, then the
+        # teardown may hang unless we broadcast all control messages
+        # to each server
+        group1 = self.useFixture(RpcServerGroupFixture(self.url, transport=t,
+                                                       use_fanout_ctrl=True))
+        group2 = self.useFixture(RpcServerGroupFixture(self.url, exchange="a",
+                                                       transport=t,
+                                                       use_fanout_ctrl=True))
+        group3 = self.useFixture(RpcServerGroupFixture(self.url, exchange="b",
+                                                       transport=t,
+                                                       use_fanout_ctrl=True))
+
+        client1 = group1.client(1)
+        data1 = [c for c in 'abcdefghijklmn']
+        for i in data1:
+            client1.append(text=i)
+
+        client2 = group2.client()
+        data2 = [c for c in 'opqrstuvwxyz']
+        for i in data2:
+            client2.append(text=i)
+
+        actual1 = [[c for c in s.endpoint.sval] for s in group1.servers]
+        self.assertThat(actual1, IsValidDistributionOf(data1))
+        actual1 = [c for c in group1.servers[1].endpoint.sval]
+        self.assertThat([actual1], IsValidDistributionOf(data1))
+        for s in group1.servers:
+            expected = len(data1) if group1.servers.index(s) == 1 else 0
+            self.assertEqual(expected, len(s.endpoint.sval))
+            self.assertEqual(0, s.endpoint.ival)
+
+        actual2 = [[c for c in s.endpoint.sval] for s in group2.servers]
+        for s in group2.servers:
+            self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
+            self.assertEqual(0, s.endpoint.ival)
+        self.assertThat(actual2, IsValidDistributionOf(data2))
+
+        for s in group3.servers:
+            self.assertEqual(0, len(s.endpoint.sval))
+            self.assertEqual(0, s.endpoint.ival)
+
+    def test_timeout(self):
+        transport = self.useFixture(TransportFixture(self.url))
+        target = messaging.Target(topic="no_such_topic")
+        c = ClientStub(transport.transport, target, timeout=1)
+        self.assertThat(c.ping, matchers.raises(messaging.MessagingTimeout))
+
+    def test_exception(self):
+        group = self.useFixture(RpcServerGroupFixture(self.url))
+        client = group.client(1)
+        client.add(increment=2)
+        f = lambda: client.subtract(increment=3)
+        self.assertThat(f, matchers.raises(ValueError))
+
+
+class CastTestCase(SkipIfNoTransportURL):
+    # Note: casts return immediately, so these tests utilise a special
+    # internal sync() cast to ensure prior casts are complete before
+    # making the necessary assertions.
+
+    def test_specific_server(self):
+        group = self.useFixture(RpcServerGroupFixture(self.url))
+        client = group.client(1, cast=True)
+        client.append(text='open')
+        client.append(text='stack')
+        client.add(increment=2)
+        client.add(increment=10)
+        group.sync()
+
+        self.assertEqual('openstack', group.servers[1].endpoint.sval)
+        self.assertEqual(12, group.servers[1].endpoint.ival)
+        for i in [0, 2]:
+            self.assertEqual('', group.servers[i].endpoint.sval)
+            self.assertEqual(0, group.servers[i].endpoint.ival)
+
+    def test_server_in_group(self):
+        group = self.useFixture(RpcServerGroupFixture(self.url))
+        client = group.client(cast=True)
+        for i in range(20):
+            client.add(increment=1)
+        group.sync()
+        total = 0
+        for s in group.servers:
+            ival = s.endpoint.ival
+            self.assertThat(ival, matchers.GreaterThan(0))
+            self.assertThat(ival, matchers.LessThan(20))
+            total += ival
+        self.assertEqual(20, total)
+
+    def test_fanout(self):
+        group = self.useFixture(RpcServerGroupFixture(self.url))
+        client = group.client('all', cast=True)
+        client.append(text='open')
+        client.append(text='stack')
+        client.add(increment=2)
+        client.add(increment=10)
+        group.sync(server='all')
+        for s in group.servers:
+            self.assertEqual('openstack', s.endpoint.sval)
+            self.assertEqual(12, s.endpoint.ival)
+
+
+class NotifyTestCase(SkipIfNoTransportURL):
+    # NOTE(sileht): Each test must not use the same topics
+    # to be run in parallel
+
+    def test_simple(self):
+        transport = self.useFixture(TransportFixture(self.url))
+        listener = self.useFixture(NotificationFixture(transport.transport,
+                                                       ['test_simple']))
+        transport.wait()
+        notifier = listener.notifier('abc')
+
+        notifier.info({}, 'test', 'Hello World!')
+        event = listener.events.get(timeout=1)
+        self.assertEqual('info', event[0])
+        self.assertEqual('test', event[1])
+        self.assertEqual('Hello World!', event[2])
+        self.assertEqual('abc', event[3])
+
+    def test_multiple_topics(self):
+        transport = self.useFixture(TransportFixture(self.url))
+        listener = self.useFixture(NotificationFixture(transport.transport,
+                                                       ['a', 'b']))
+        transport.wait()
+        a = listener.notifier('pub-a', topic='a')
+        b = listener.notifier('pub-b', topic='b')
+
+        sent = {
+            'pub-a': [a, 'test-a', 'payload-a'],
+            'pub-b': [b, 'test-b', 'payload-b']
+        }
+        for e in sent.values():
+            e[0].info({}, e[1], e[2])
+
+        received = {}
+        while len(received) < len(sent):
+            e = listener.events.get(timeout=1)
+            received[e[3]] = e
+
+        for key in received:
+            actual = received[key]
+            expected = sent[key]
+            self.assertEqual('info', actual[0])
+            self.assertEqual(expected[1], actual[1])
+            self.assertEqual(expected[2], actual[2])
+
+    def test_multiple_servers(self):
+        transport = self.useFixture(TransportFixture(self.url))
+        listener_a = self.useFixture(NotificationFixture(transport.transport,
+                                                         ['test-topic']))
+        listener_b = self.useFixture(NotificationFixture(transport.transport,
+                                                         ['test-topic']))
+        transport.wait()
+        n = listener_a.notifier('pub')
+
+        events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh']
+
+        for event_type, payload in events_out:
+            n.info({}, event_type, payload)
+
+        events_in = [[(e[1], e[2]) for e in listener_a.get_events()],
+                     [(e[1], e[2]) for e in listener_b.get_events()]]
+
+        self.assertThat(events_in, IsValidDistributionOf(events_out))
+        for stream in events_in:
+            self.assertThat(len(stream), matchers.GreaterThan(0))
+
+    def test_independent_topics(self):
+        transport = self.useFixture(TransportFixture(self.url))
+        listener_a = self.useFixture(NotificationFixture(transport.transport,
+                                                         ['1']))
+        listener_b = self.useFixture(NotificationFixture(transport.transport,
+                                                         ['2']))
+        transport.wait()
+
+        a = listener_a.notifier('pub-1', topic='1')
+        b = listener_b.notifier('pub-2', topic='2')
+
+        a_out = [('test-1-%s' % c, 'payload-1-%s' % c) for c in 'abcdefgh']
+        for event_type, payload in a_out:
+            a.info({}, event_type, payload)
+
+        b_out = [('test-2-%s' % c, 'payload-2-%s' % c) for c in 'ijklmnop']
+        for event_type, payload in b_out:
+            b.info({}, event_type, payload)
+
+        for expected in a_out:
+            actual = listener_a.events.get(timeout=0.5)
+            self.assertEqual('info', actual[0])
+            self.assertEqual(expected[0], actual[1])
+            self.assertEqual(expected[1], actual[2])
+            self.assertEqual('pub-1', actual[3])
+
+        for expected in b_out:
+            actual = listener_b.events.get(timeout=0.5)
+            self.assertEqual('info', actual[0])
+            self.assertEqual(expected[0], actual[1])
+            self.assertEqual(expected[1], actual[2])
+            self.assertEqual('pub-2', actual[3])
+
+    def test_all_categories(self):
+        transport = self.useFixture(TransportFixture(self.url))
+        listener = self.useFixture(NotificationFixture(
+            transport.transport, ['test_all_categories']))
+        transport.wait()
+        n = listener.notifier('abc')
+
+        cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical']
+        events = [(getattr(n, c), c, 'type-' + c, c + '-data') for c in cats]
+        for e in events:
+            e[0]({}, e[2], e[3])
+
+        # order between events with different categories is not guaranteed
+        received = {}
+        for expected in events:
+            e = listener.events.get(timeout=0.5)
+            received[e[0]] = e
+
+        for expected in events:
+            actual = received[expected[1]]
+            self.assertEqual(expected[1], actual[0])
+            self.assertEqual(expected[2], actual[1])
+            self.assertEqual(expected[3], actual[2])
diff --git a/tests/functional/utils.py b/tests/functional/utils.py
new file mode 100644
index 000000000..8c5751895
--- /dev/null
+++ b/tests/functional/utils.py
@@ -0,0 +1,343 @@
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import threading
+import time
+import uuid
+
+import fixtures
+from six import moves
+
+from oslo.config import cfg
+from oslo import messaging
+from oslo.messaging.notify import notifier
+from tests import utils as test_utils
+
+
+class TestServerEndpoint(object):
+    """This MessagingServer that will be used during functional testing."""
+
+    def __init__(self):
+        self.ival = 0
+        self.sval = ''
+
+    def add(self, ctxt, increment):
+        self.ival += increment
+        return self.ival
+
+    def subtract(self, ctxt, increment):
+        if self.ival < increment:
+            raise ValueError("ival can't go negative!")
+        self.ival -= increment
+        return self.ival
+
+    def append(self, ctxt, text):
+        self.sval += text
+        return self.sval
+
+
+class TransportFixture(fixtures.Fixture):
+    """Fixture defined to setup the oslo.messaging transport."""
+
+    def __init__(self, url):
+        self.url = url
+
+    def setUp(self):
+        super(TransportFixture, self).setUp()
+        self.transport = messaging.get_transport(cfg.CONF, url=self.url)
+
+    def cleanUp(self):
+        self.transport.cleanup()
+        super(TransportFixture, self).cleanUp()
+
+    def wait(self):
+        if self.url.startswith("rabbit") or self.url.startswith("qpid"):
+            time.sleep(0.5)
+
+
+class RpcServerFixture(fixtures.Fixture):
+    """Fixture to setup the TestServerEndpoint."""
+
+    def __init__(self, transport, target, endpoint=None, ctrl_target=None):
+        super(RpcServerFixture, self).__init__()
+        self.transport = transport
+        self.target = target
+        self.endpoint = endpoint or TestServerEndpoint()
+        self.syncq = moves.queue.Queue()
+        self.ctrl_target = ctrl_target or self.target
+
+    def setUp(self):
+        super(RpcServerFixture, self).setUp()
+        endpoints = [self.endpoint, self]
+        self.server = messaging.get_rpc_server(self.transport,
+                                               self.target,
+                                               endpoints)
+        self._ctrl = messaging.RPCClient(self.transport, self.ctrl_target)
+        self._start()
+
+    def cleanUp(self):
+        self._stop()
+        super(RpcServerFixture, self).cleanUp()
+
+    def _start(self):
+        self.thread = threading.Thread(target=self.server.start)
+        self.thread.daemon = True
+        self.thread.start()
+
+    def _stop(self):
+        self.server.stop()
+        self._ctrl.cast({}, 'ping')
+        self.server.wait()
+        self.thread.join()
+
+    def ping(self, ctxt):
+        pass
+
+    def sync(self, ctxt, item):
+        self.syncq.put(item)
+
+
+class RpcServerGroupFixture(fixtures.Fixture):
+    def __init__(self, url, topic=None, names=None, exchange=None,
+                 transport=None, use_fanout_ctrl=False):
+        self.url = url
+        # NOTE(sileht): topic and servier_name must be uniq
+        # to be able to run all tests in parallel
+        self.topic = topic or str(uuid.uuid4())
+        self.names = names or ["server_%i_%s" % (i, uuid.uuid4())
+                               for i in range(3)]
+        self.exchange = exchange
+        self.targets = [self._target(server=n) for n in self.names]
+        self.transport = transport
+        self.use_fanout_ctrl = use_fanout_ctrl
+
+    def setUp(self):
+        super(RpcServerGroupFixture, self).setUp()
+        if not self.transport:
+            self.transport = self.useFixture(TransportFixture(self.url))
+        self.servers = [self.useFixture(self._server(t)) for t in self.targets]
+        self.transport.wait()
+
+    def _target(self, server=None, fanout=False):
+        t = messaging.Target(exchange=self.exchange, topic=self.topic)
+        t.server = server
+        t.fanout = fanout
+        return t
+
+    def _server(self, target):
+        ctrl = None
+        if self.use_fanout_ctrl:
+            ctrl = self._target(fanout=True)
+        return RpcServerFixture(self.transport.transport, target,
+                                ctrl_target=ctrl)
+
+    def client(self, server=None, cast=False):
+        if server:
+            if server == 'all':
+                target = self._target(fanout=True)
+            elif server >= 0 and server < len(self.targets):
+                target = self.targets[server]
+            else:
+                raise ValueError("Invalid value for server: %r" % server)
+        else:
+            target = self._target()
+        return ClientStub(self.transport.transport, target, cast=cast,
+                          timeout=5)
+
+    def sync(self, server=None):
+        if server:
+            if server == 'all':
+                c = self.client(server='all', cast=True)
+                c.sync(item='x')
+                for s in self.servers:
+                    s.syncq.get(timeout=5)
+            elif server >= 0 and server < len(self.targets):
+                c = self.client(server=server, cast=True)
+                c.sync(item='x')
+                self.servers[server].syncq.get(timeout=5)
+            else:
+                raise ValueError("Invalid value for server: %r" % server)
+        else:
+            for i in range(len(self.servers)):
+                self.client(i).ping()
+
+
+class RpcCall(object):
+    def __init__(self, client, method, context):
+        self.client = client
+        self.method = method
+        self.context = context
+
+    def __call__(self, **kwargs):
+        self.context['time'] = time.ctime()
+        self.context['cast'] = False
+        result = self.client.call(self.context, self.method, **kwargs)
+        return result
+
+
+class RpcCast(RpcCall):
+    def __call__(self, **kwargs):
+        self.context['time'] = time.ctime()
+        self.context['cast'] = True
+        self.client.cast(self.context, self.method, **kwargs)
+
+
+class ClientStub(object):
+    def __init__(self, transport, target, cast=False, name=None, **kwargs):
+        self.name = name or "functional-tests"
+        self.cast = cast
+        self.client = messaging.RPCClient(transport, target, **kwargs)
+
+    def __getattr__(self, name):
+        context = {"application": self.name}
+        if self.cast:
+            return RpcCast(self.client, name, context)
+        else:
+            return RpcCall(self.client, name, context)
+
+
+class InvalidDistribution(object):
+    def __init__(self, original, received):
+        self.original = original
+        self.received = received
+        self.missing = []
+        self.extra = []
+        self.wrong_order = []
+
+    def describe(self):
+        text = "Sent %s, got %s; " % (self.original, self.received)
+        e1 = ["%r was missing" % m for m in self.missing]
+        e2 = ["%r was not expected" % m for m in self.extra]
+        e3 = ["%r expected before %r" % (m[0], m[1]) for m in self.wrong_order]
+        return text + ", ".join(e1 + e2 + e3)
+
+    def __len__(self):
+        return len(self.extra) + len(self.missing) + len(self.wrong_order)
+
+    def get_details(self):
+        return {}
+
+
+class IsValidDistributionOf(object):
+    """Test whether a given list can be split into particular
+    sub-lists. All items in the original list must be in exactly one
+    sub-list, and must appear in that sub-list in the same order with
+    respect to any other items as in the original list.
+    """
+    def __init__(self, original):
+        self.original = original
+
+    def __str__(self):
+        return 'IsValidDistribution(%s)' % self.original
+
+    def match(self, actual):
+        errors = InvalidDistribution(self.original, actual)
+        received = [[i for i in l] for l in actual]
+
+        def _remove(obj, lists):
+            for l in lists:
+                if obj in l:
+                    front = l[0]
+                    l.remove(obj)
+                    return front
+            return None
+
+        for item in self.original:
+            o = _remove(item, received)
+            if not o:
+                errors.missing += item
+            elif item != o:
+                errors.wrong_order.append([item, o])
+        for l in received:
+            errors.extra += l
+        return errors or None
+
+
+class SkipIfNoTransportURL(test_utils.BaseTestCase):
+    def setUp(self):
+        super(SkipIfNoTransportURL, self).setUp()
+        self.url = os.environ.get('TRANSPORT_URL')
+        if not self.url:
+            self.skipTest("No transport url configured")
+
+
+class NotificationFixture(fixtures.Fixture):
+    def __init__(self, transport, topics):
+        super(NotificationFixture, self).__init__()
+        self.transport = transport
+        self.topics = topics
+        self.events = moves.queue.Queue()
+        self.name = str(id(self))
+
+    def setUp(self):
+        super(NotificationFixture, self).setUp()
+        targets = [messaging.Target(topic=t) for t in self.topics]
+        # add a special topic for internal notifications
+        targets.append(messaging.Target(topic=self.name))
+        self.server = messaging.get_notification_listener(self.transport,
+                                                          targets,
+                                                          [self])
+        self._ctrl = self.notifier('internal', topic=self.name)
+        self._start()
+
+    def cleanUp(self):
+        self._stop()
+        super(NotificationFixture, self).cleanUp()
+
+    def _start(self):
+        self.thread = threading.Thread(target=self.server.start)
+        self.thread.daemon = True
+        self.thread.start()
+
+    def _stop(self):
+        self.server.stop()
+        self._ctrl.sample({}, 'shutdown', 'shutdown')
+        self.server.wait()
+        self.thread.join()
+
+    def notifier(self, publisher, topic=None):
+        return notifier.Notifier(self.transport,
+                                 publisher,
+                                 driver='messaging',
+                                 topic=topic or self.topics[0])
+
+    def debug(self, ctxt, publisher, event_type, payload, metadata):
+        self.events.put(['debug', event_type, payload, publisher])
+
+    def audit(self, ctxt, publisher, event_type, payload, metadata):
+        self.events.put(['audit', event_type, payload, publisher])
+
+    def info(self, ctxt, publisher, event_type, payload, metadata):
+        self.events.put(['info', event_type, payload, publisher])
+
+    def warn(self, ctxt, publisher, event_type, payload, metadata):
+        self.events.put(['warn', event_type, payload, publisher])
+
+    def error(self, ctxt, publisher, event_type, payload, metadata):
+        self.events.put(['error', event_type, payload, publisher])
+
+    def critical(self, ctxt, publisher, event_type, payload, metadata):
+        self.events.put(['critical', event_type, payload, publisher])
+
+    def sample(self, ctxt, publisher, event_type, payload, metadata):
+        pass  # Just used for internal shutdown control
+
+    def get_events(self, timeout=0.5):
+        results = []
+        try:
+            while True:
+                results.append(self.events.get(timeout=timeout))
+        except moves.queue.Empty:
+            pass
+        return results
diff --git a/tox.ini b/tox.ini
index fafc5c902..36dbf09b3 100644
--- a/tox.ini
+++ b/tox.ini
@@ -36,6 +36,9 @@ deps = -r{toxinidir}/requirements-py3.txt
 deps = -r{toxinidir}/amqp1-requirements.txt
         {[testenv]deps}
 
+[testenv:py27-rabbit]
+setenv = TRANSPORT_URL=rabbit://guest:guest@localhost
+
 [flake8]
 show-source = True
 ignore = H237,H402,H405,H904