diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 0297e1e30..939a3cec2 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -15,7 +15,6 @@
 import functools
 import itertools
 import logging
-import random
 import socket
 import ssl
 import time
@@ -24,8 +23,10 @@ import uuid
 import kombu
 import kombu.connection
 import kombu.entity
+import kombu.exceptions
 import kombu.messaging
 import six
+from six.moves.urllib import parse
 
 from oslo.config import cfg
 from oslo.messaging._drivers import amqp as rpc_amqp
@@ -35,6 +36,7 @@ from oslo.messaging._i18n import _
 from oslo.messaging import exceptions
 from oslo.utils import netutils
 
+
 rabbit_opts = [
     cfg.StrOpt('kombu_ssl_version',
                default='',
@@ -424,6 +426,7 @@ class Connection(object):
 
     def __init__(self, conf, url):
         self.consumers = []
+        self.consumer_num = itertools.count(1)
         self.conf = conf
         self.max_retries = self.conf.rabbit_max_retries
         # Try forever?
@@ -433,62 +436,62 @@ class Connection(object):
         self.interval_stepping = self.conf.rabbit_retry_backoff
         # max retry-interval = 30 seconds
         self.interval_max = 30
-        self.memory_transport = False
 
-        ssl_params = self._fetch_ssl_params()
+        self._ssl_params = self._fetch_ssl_params()
+        self._login_method = self.conf.rabbit_login_method
 
         if url.virtual_host is not None:
             virtual_host = url.virtual_host
         else:
             virtual_host = self.conf.rabbit_virtual_host
 
-        self.brokers_params = []
-        if url.hosts:
+        self._url = ''
+        if self.conf.fake_rabbit:
+            # TODO(sileht): use memory://virtual_host into
+            # unit tests to remove cfg.CONF.fake_rabbit
+            self._url = 'memory://%s/' % virtual_host
+        elif url.hosts:
             for host in url.hosts:
-                params = {
-                    'hostname': host.hostname,
-                    'port': host.port or 5672,
-                    'userid': host.username or '',
-                    'password': host.password or '',
-                    'login_method': self.conf.rabbit_login_method,
-                    'virtual_host': virtual_host
-                }
-                if self.conf.fake_rabbit:
-                    params['transport'] = 'memory'
-                if self.conf.rabbit_use_ssl:
-                    params['ssl'] = ssl_params
-
-                self.brokers_params.append(params)
+                transport = url.transport.replace('kombu+', '')
+                transport = url.transport.replace('rabbit', 'amqp')
+                self._url += '%s%s://%s:%s@%s:%s/%s' % (
+                    ";" if self._url else '',
+                    transport,
+                    parse.quote(host.username or ''),
+                    parse.quote(host.password or ''),
+                    host.hostname or '', str(host.port or 5672),
+                    virtual_host)
         else:
-            # Old configuration format
             for adr in self.conf.rabbit_hosts:
                 hostname, port = netutils.parse_host_port(
                     adr, default_port=self.conf.rabbit_port)
+                self._url += '%samqp://%s:%s@%s:%s/%s' % (
+                    ";" if self._url else '',
+                    parse.quote(self.conf.rabbit_userid),
+                    parse.quote(self.conf.rabbit_password),
+                    hostname, port,
+                    virtual_host)
 
-                params = {
-                    'hostname': hostname,
-                    'port': port,
-                    'userid': self.conf.rabbit_userid,
-                    'password': self.conf.rabbit_password,
-                    'login_method': self.conf.rabbit_login_method,
-                    'virtual_host': virtual_host
-                }
+        self.do_consume = True
 
-                if self.conf.fake_rabbit:
-                    params['transport'] = 'memory'
-                if self.conf.rabbit_use_ssl:
-                    params['ssl'] = ssl_params
+        self.channel = None
+        self.connection = kombu.connection.Connection(
+            self._url, ssl=self._ssl_params, login_method=self._login_method,
+            failover_strategy="shuffle")
 
-                self.brokers_params.append(params)
+        LOG.info(_('Connecting to AMQP server on %(hostname)s:%(port)d'),
+                 {'hostname': self.connection.hostname,
+                  'port': self.connection.port})
+        # NOTE(sileht): just ensure the connection is setuped at startup
+        self.ensure(error_callback=None,
+                    method=lambda channel: True)
+        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
+                 {'hostname': self.connection.hostname,
+                  'port': self.connection.port})
 
-        random.shuffle(self.brokers_params)
-        self.brokers = itertools.cycle(self.brokers_params)
-
-        self.memory_transport = self.conf.fake_rabbit
-
-        self.connection = None
-        self.do_consume = None
-        self.reconnect()
+        if self.conf.fake_rabbit:
+            # Kludge to speed up tests.
+            self.connection.transport.polling_interval = 0.0
 
     # FIXME(markmc): use oslo sslutils when it is available as a library
     _SSL_PROTOCOLS = {
@@ -531,153 +534,87 @@ class Connection(object):
             ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
 
         # Return the extended behavior or just have the default behavior
-        return ssl_params or True
+        return ssl_params or None
 
-    def _connect(self, broker):
-        """Connect to rabbit.  Re-establish any queues that may have
-        been declared before if we are reconnecting.  Exceptions should
-        be handled by the caller.
+    def _setup_new_channel(self, new_channel):
+        """Callback invoked when the kombu connection have created
+        a new channel, we use it the reconfigure our consumers.
         """
-        LOG.info(_("Connecting to AMQP server on "
-                   "%(hostname)s:%(port)d"), broker)
-        self.connection = kombu.connection.BrokerConnection(**broker)
-        self.connection_errors = self.connection.connection_errors
-        self.channel_errors = self.connection.channel_errors
-        if self.memory_transport:
-            # Kludge to speed up tests.
-            self.connection.transport.polling_interval = 0.0
-        self.do_consume = True
         self.consumer_num = itertools.count(1)
-        self.connection.connect()
-        self.channel = self.connection.channel()
-        # work around 'memory' transport bug in 1.1.3
-        if self.memory_transport:
-            self.channel._new_queue('ae.undeliver')
         for consumer in self.consumers:
-            consumer.reconnect(self.channel)
-        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
-                 broker)
+            consumer.reconnect(new_channel)
+
+    def ensure(self, error_callback, method, retry=None,
+               timeout_is_error=True):
+        """Will retry up to retry number of times.
+        retry = None means use the value of rabbit_max_retries
+        retry = -1 means to retry forever
+        retry = 0 means no retry
+        retry = N means N retries
+        """
+
+        if retry is None:
+            retry = self.max_retries
+        if retry is None or retry < 0:
+            retry = None
+
+        def on_error(exc, interval):
+            error_callback and error_callback(exc)
+
+            info = {'hostname': self.connection.hostname,
+                    'port': self.connection.port,
+                    'err_str': exc, 'sleep_time': interval}
+
+            if 'Socket closed' in six.text_type(exc):
+                LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
+                            ' the connection. Check login credentials:'
+                            ' %(err_str)s'), info)
+            else:
+                LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+                            'unreachable: %(err_str)s. Trying again in '
+                            '%(sleep_time)d seconds.'), info)
 
-    def _disconnect(self):
-        if self.connection:
             # XXX(nic): when reconnecting to a RabbitMQ cluster
             # with mirrored queues in use, the attempt to release the
             # connection can hang "indefinitely" somewhere deep down
             # in Kombu.  Blocking the thread for a bit prior to
             # release seems to kludge around the problem where it is
             # otherwise reproduceable.
+            # TODO(sileht): Check if this is useful since we
+            # use kombu for HA connection, the interval_step
+            # should sufficient, because the underlying kombu transport
+            # connection object freed.
             if self.conf.kombu_reconnect_delay > 0:
                 LOG.info(_("Delaying reconnect for %1.1f seconds...") %
                          self.conf.kombu_reconnect_delay)
                 time.sleep(self.conf.kombu_reconnect_delay)
 
-            try:
-                self.connection.release()
-            except self.connection_errors:
-                pass
-            self.connection = None
-
-    def reconnect(self, retry=None):
-        """Handles reconnecting and re-establishing queues.
-        Will retry up to retry number of times.
-        retry = None means use the value of rabbit_max_retries
-        retry = -1 means to retry forever
-        retry = 0 means no retry
-        retry = N means N retries
-        Sleep between tries, starting at self.interval_start
-        seconds, backing off self.interval_stepping number of seconds
-        each attempt.
-        """
-
-        attempt = 0
-        loop_forever = False
-        if retry is None:
-            retry = self.max_retries
-        if retry is None or retry < 0:
-            loop_forever = True
-
-        while True:
-            self._disconnect()
-
-            broker = six.next(self.brokers)
-            attempt += 1
-            try:
-                self._connect(broker)
-                return
-            except IOError as ex:
-                e = ex
-            except self.connection_errors as ex:
-                e = ex
-            except Exception as ex:
-                # NOTE(comstud): Unfortunately it's possible for amqplib
-                # to return an error not covered by its transport
-                # connection_errors in the case of a timeout waiting for
-                # a protocol response.  (See paste link in LP888621)
-                # So, we check all exceptions for 'timeout' in them
-                # and try to reconnect in this case.
-                if 'timeout' not in six.text_type(e):
-                    raise
-                e = ex
-
-            log_info = {}
-            log_info['err_str'] = e
-            log_info['retry'] = retry or 0
-            log_info.update(broker)
-
-            if not loop_forever and attempt > retry:
-                msg = _('Unable to connect to AMQP server on '
-                        '%(hostname)s:%(port)d after %(retry)d '
-                        'tries: %(err_str)s') % log_info
-                LOG.error(msg)
-                raise exceptions.MessageDeliveryFailure(msg)
-            else:
-                if attempt == 1:
-                    sleep_time = self.interval_start or 1
-                elif attempt > 1:
-                    sleep_time += self.interval_stepping
-
-                sleep_time = min(sleep_time, self.interval_max)
-
-                log_info['sleep_time'] = sleep_time
-                if 'Socket closed' in six.text_type(e):
-                    LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
-                                ' the connection. Check login credentials:'
-                                ' %(err_str)s'), log_info)
-                else:
-                    LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
-                                'unreachable: %(err_str)s. Trying again in '
-                                '%(sleep_time)d seconds.'), log_info)
-                time.sleep(sleep_time)
-
-    def ensure(self, error_callback, method, retry=None):
-        while True:
-            try:
-                return method()
-            except self.connection_errors as e:
-                if error_callback:
-                    error_callback(e)
-            except self.channel_errors as e:
-                if error_callback:
-                    error_callback(e)
-            except (socket.timeout, IOError) as e:
-                if error_callback:
-                    error_callback(e)
-            except Exception as e:
-                # NOTE(comstud): Unfortunately it's possible for amqplib
-                # to return an error not covered by its transport
-                # connection_errors in the case of a timeout waiting for
-                # a protocol response.  (See paste link in LP888621)
-                # So, we check all exceptions for 'timeout' in them
-                # and try to reconnect in this case.
-                if 'timeout' not in six.text_type(e):
-                    raise
-                if error_callback:
-                    error_callback(e)
-            self.reconnect(retry=retry)
-
-    def get_channel(self):
-        """Convenience call for bin/clear_rabbit_queues."""
-        return self.channel
+        recoverable_errors = (self.connection.recoverable_channel_errors +
+                              self.connection.recoverable_connection_errors)
+        try:
+            autoretry_method = self.connection.autoretry(
+                method, channel=self.channel,
+                max_retries=retry,
+                errback=on_error,
+                interval_start=self.interval_start or 1,
+                interval_step=self.interval_stepping,
+                on_revive=self._setup_new_channel,
+            )
+            ret, channel = autoretry_method()
+            self.channel = channel
+            return ret
+        except recoverable_errors as exc:
+            # NOTE(sileht): number of retry exceeded and the connection
+            # is still broken
+            msg = _('Unable to connect to AMQP server on '
+                    '%(hostname)s:%(port)d after %(retry)d '
+                    'tries: %(err_str)s') % {
+                        'hostname': self.connection.hostname,
+                        'port': self.connection.port,
+                        'err_str': exc,
+                        'retry': retry}
+            LOG.error(msg)
+            raise exceptions.MessageDeliveryFailure(msg)
 
     def close(self):
         """Close/release this connection."""
@@ -689,10 +626,8 @@ class Connection(object):
         """Reset a connection so it can be used again."""
         self.channel.close()
         self.channel = self.connection.channel()
-        # work around 'memory' transport bug in 1.1.3
-        if self.memory_transport:
-            self.channel._new_queue('ae.undeliver')
         self.consumers = []
+        self._setup_new_channel(self.channel)
 
     def declare_consumer(self, consumer_cls, topic, callback):
         """Create a Consumer using the class that was passed in and
@@ -704,8 +639,8 @@ class Connection(object):
             LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
                       "%(err_str)s"), log_info)
 
-        def _declare_consumer():
-            consumer = consumer_cls(self.conf, self.channel, topic, callback,
+        def _declare_consumer(channel):
+            consumer = consumer_cls(self.conf, channel, topic, callback,
                                     six.next(self.consumer_num))
             self.consumers.append(consumer)
             return consumer
@@ -716,15 +651,11 @@ class Connection(object):
         """Return an iterator that will consume from all queues/consumers."""
 
         def _error_callback(exc):
-            if isinstance(exc, socket.timeout):
-                LOG.debug('Timed out waiting for RPC response: %s', exc)
-                raise rpc_common.Timeout()
-            else:
-                LOG.exception(_('Failed to consume message from queue: %s'),
-                              exc)
-                self.do_consume = True
+            LOG.exception(_('Failed to consume message from queue: %s'),
+                          exc)
+            self.do_consume = True
 
-        def _consume():
+        def _consume(channel):
             if self.do_consume:
                 queues_head = self.consumers[:-1]  # not fanout.
                 queues_tail = self.consumers[-1]  # fanout
@@ -732,7 +663,11 @@ class Connection(object):
                     queue.consume(nowait=True)
                 queues_tail.consume(nowait=False)
                 self.do_consume = False
-            return self.connection.drain_events(timeout=timeout)
+            try:
+                return self.connection.drain_events(timeout=timeout)
+            except socket.timeout as exc:
+                LOG.debug('Timed out waiting for RPC response: %s', exc)
+                raise rpc_common.Timeout()
 
         for iteration in itertools.count(0):
             if limit and iteration >= limit:
@@ -748,8 +683,8 @@ class Connection(object):
             LOG.exception(_("Failed to publish message to topic "
                           "'%(topic)s': %(err_str)s"), log_info)
 
-        def _publish():
-            publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
+        def _publish(channel):
+            publisher = cls(self.conf, channel, topic=topic, **kwargs)
             publisher.send(msg, timeout)
 
         self.ensure(_error_callback, _publish, retry=retry)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index de331ee8f..dc0d1a3f8 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -13,7 +13,6 @@
 #    under the License.
 
 import datetime
-import operator
 import sys
 import threading
 import uuid
@@ -21,6 +20,7 @@ import uuid
 import fixtures
 import kombu
 import mock
+from oslotest import mockpatch
 import testscenarios
 
 from oslo import messaging
@@ -49,87 +49,43 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
 
     scenarios = [
         ('none', dict(url=None,
-                      expected=[dict(hostname='localhost',
-                                     port=5672,
-                                     userid='guest',
-                                     password='guest',
-                                     virtual_host='/')])),
+                      expected=["amqp://guest:guest@localhost:5672//"])),
         ('empty',
          dict(url='rabbit:///',
-              expected=[dict(hostname='localhost',
-                             port=5672,
-                             userid='guest',
-                             password='guest',
-                             virtual_host='')])),
+              expected=['amqp://guest:guest@localhost:5672/'])),
         ('localhost',
          dict(url='rabbit://localhost/',
-              expected=[dict(hostname='localhost',
-                             port=5672,
-                             userid='',
-                             password='',
-                             virtual_host='')])),
+              expected=['amqp://:@localhost:5672/'])),
         ('virtual_host',
          dict(url='rabbit:///vhost',
-              expected=[dict(hostname='localhost',
-                             port=5672,
-                             userid='guest',
-                             password='guest',
-                             virtual_host='vhost')])),
+              expected=['amqp://guest:guest@localhost:5672/vhost'])),
         ('no_creds',
          dict(url='rabbit://host/virtual_host',
-              expected=[dict(hostname='host',
-                             port=5672,
-                             userid='',
-                             password='',
-                             virtual_host='virtual_host')])),
+              expected=['amqp://:@host:5672/virtual_host'])),
         ('no_port',
          dict(url='rabbit://user:password@host/virtual_host',
-              expected=[dict(hostname='host',
-                             port=5672,
-                             userid='user',
-                             password='password',
-                             virtual_host='virtual_host')])),
+              expected=['amqp://user:password@host:5672/virtual_host'])),
         ('full_url',
          dict(url='rabbit://user:password@host:10/virtual_host',
-              expected=[dict(hostname='host',
-                             port=10,
-                             userid='user',
-                             password='password',
-                             virtual_host='virtual_host')])),
+              expected=['amqp://user:password@host:10/virtual_host'])),
         ('full_two_url',
          dict(url='rabbit://user:password@host:10,'
               'user2:password2@host2:12/virtual_host',
-              expected=[dict(hostname='host',
-                             port=10,
-                             userid='user',
-                             password='password',
-                             virtual_host='virtual_host'),
-                        dict(hostname='host2',
-                             port=12,
-                             userid='user2',
-                             password='password2',
-                             virtual_host='virtual_host')
-                        ]
+              expected=["amqp://user:password@host:10/virtual_host",
+                        "amqp://user2:password2@host2:12/virtual_host"]
               )),
-
     ]
 
-    def test_transport_url(self):
-        self.messaging_conf.in_memory = True
+    @mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure')
+    def test_transport_url(self, fake_ensure):
+        self.messaging_conf.in_memory = False
 
         transport = messaging.get_transport(self.conf, self.url)
         self.addCleanup(transport.cleanup)
         driver = transport._driver
 
-        brokers_params = driver._get_connection().brokers_params[:]
-        brokers_params = [dict((k, v) for k, v in broker.items()
-                               if k not in ['transport', 'login_method'])
-                          for broker in brokers_params]
-
-        self.assertEqual(sorted(self.expected,
-                                key=operator.itemgetter('hostname')),
-                         sorted(brokers_params,
-                                key=operator.itemgetter('hostname')))
+        urls = driver._get_connection()._url.split(";")
+        self.assertEqual(sorted(self.expected), sorted(urls))
 
 
 class TestSendReceive(test_utils.BaseTestCase):
@@ -674,62 +630,38 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
         self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
         self.config(rabbit_hosts=self.brokers)
 
-        hostname_sets = set()
-        self.info = {'attempt': 0,
-                     'fail': False}
-
-        def _connect(myself, params):
-            # do as little work that is enough to pass connection attempt
-            myself.connection = kombu.connection.BrokerConnection(**params)
-            myself.connection_errors = myself.connection.connection_errors
-
-            hostname = params['hostname']
-            self.assertNotIn(hostname, hostname_sets)
-            hostname_sets.add(hostname)
-
-            self.info['attempt'] += 1
-            if self.info['fail']:
-                raise IOError('fake fail')
-
-        # just make sure connection instantiation does not fail with an
-        # exception
-        self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
+        self.kombu_connect = mock.Mock()
+        self.useFixture(mockpatch.Patch(
+            'kombu.connection.Connection.connect',
+            side_effect=self.kombu_connect))
+        self.useFixture(mockpatch.Patch(
+            'kombu.connection.Connection.channel'))
 
         # starting from the first broker in the list
         url = messaging.TransportURL.parse(self.conf, None)
         self.connection = rabbit_driver.Connection(self.conf, url)
         self.addCleanup(self.connection.close)
 
-        self.info.update({'attempt': 0,
-                          'fail': True})
-        hostname_sets.clear()
-
-    def test_reconnect_order(self):
-        self.assertRaises(messaging.MessageDeliveryFailure,
-                          self.connection.reconnect,
-                          retry=len(self.brokers) - 1)
-        self.assertEqual(len(self.brokers), self.info['attempt'])
-
     def test_ensure_four_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(messaging.MessageDeliveryFailure,
                           self.connection.ensure, None, mock_callback,
                           retry=4)
-        self.assertEqual(5, self.info['attempt'])
-        self.assertEqual(1, mock_callback.call_count)
+        self.assertEqual(5, self.kombu_connect.call_count)
+        self.assertEqual(6, mock_callback.call_count)
 
     def test_ensure_one_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(messaging.MessageDeliveryFailure,
                           self.connection.ensure, None, mock_callback,
                           retry=1)
-        self.assertEqual(2, self.info['attempt'])
-        self.assertEqual(1, mock_callback.call_count)
+        self.assertEqual(2, self.kombu_connect.call_count)
+        self.assertEqual(3, mock_callback.call_count)
 
     def test_ensure_no_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(messaging.MessageDeliveryFailure,
                           self.connection.ensure, None, mock_callback,
                           retry=0)
-        self.assertEqual(1, self.info['attempt'])
-        self.assertEqual(1, mock_callback.call_count)
+        self.assertEqual(1, self.kombu_connect.call_count)
+        self.assertEqual(2, mock_callback.call_count)