diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index e5538e5e9..7bc7c43e9 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -156,7 +156,7 @@ rabbit_opts = [
 LOG = logging.getLogger(__name__)
 
 
-def _get_queue_arguments(conf):
+def _get_queue_arguments(rabbit_ha_queues):
     """Construct the arguments for declaring a queue.
 
     If the rabbit_ha_queues option is set, we declare a mirrored queue
@@ -167,7 +167,7 @@ def _get_queue_arguments(conf):
     Setting x-ha-policy to all means that the queue will be mirrored
     to all nodes in the cluster.
     """
-    return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+    return {'x-ha-policy': 'all'} if rabbit_ha_queues else {}
 
 
 class RabbitMessage(dict):
@@ -186,8 +186,8 @@ class RabbitMessage(dict):
 class Consumer(object):
     """Consumer class."""
 
-    def __init__(self, conf, exchange_name, queue_name, routing_key, type,
-                 durable, auto_delete, callback, nowait=True):
+    def __init__(self, exchange_name, queue_name, routing_key, type, durable,
+                 auto_delete, callback, nowait=True, rabbit_ha_queues=None):
         """Init the Publisher class with the exchange_name, routing_key,
         type, durable auto_delete
         """
@@ -199,7 +199,7 @@ class Consumer(object):
         self.callback = callback
         self.type = type
         self.nowait = nowait
-        self.queue_arguments = _get_queue_arguments(conf)
+        self.queue_arguments = _get_queue_arguments(rabbit_ha_queues)
 
         self.queue = None
         self.exchange = kombu.entity.Exchange(
@@ -377,26 +377,50 @@ class Connection(object):
     pools = {}
 
     def __init__(self, conf, url, purpose):
-        self.conf = conf
-        self.driver_conf = self.conf.oslo_messaging_rabbit
-        self.max_retries = self.driver_conf.rabbit_max_retries
+        # NOTE(viktors): Parse config options
+        driver_conf = conf.oslo_messaging_rabbit
+
+        self.max_retries = driver_conf.rabbit_max_retries
+        self.interval_start = driver_conf.rabbit_retry_interval
+        self.interval_stepping = driver_conf.rabbit_retry_backoff
+
+        self.login_method = driver_conf.rabbit_login_method
+        self.fake_rabbit = driver_conf.fake_rabbit
+        self.virtual_host = driver_conf.rabbit_virtual_host
+        self.rabbit_hosts = driver_conf.rabbit_hosts
+        self.rabbit_port = driver_conf.rabbit_port
+        self.rabbit_userid = driver_conf.rabbit_userid
+        self.rabbit_password = driver_conf.rabbit_password
+        self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
+        self.heartbeat_timeout_threshold = \
+            driver_conf.heartbeat_timeout_threshold
+        self.heartbeat_rate = driver_conf.heartbeat_rate
+        self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
+        self.amqp_durable_queues = driver_conf.amqp_durable_queues
+        self.amqp_auto_delete = driver_conf.amqp_auto_delete
+        self.rabbit_use_ssl = driver_conf.rabbit_use_ssl
+        self.kombu_reconnect_timeout = driver_conf.kombu_reconnect_timeout
+
+        if self.rabbit_use_ssl:
+            self.kombu_ssl_version = driver_conf.kombu_ssl_version
+            self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile
+            self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile
+            self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs
+
         # Try forever?
         if self.max_retries <= 0:
             self.max_retries = None
-        self.interval_start = self.driver_conf.rabbit_retry_interval
-        self.interval_stepping = self.driver_conf.rabbit_retry_backoff
+
         # max retry-interval = 30 seconds
         self.interval_max = 30
 
-        self._login_method = self.driver_conf.rabbit_login_method
-
         if url.virtual_host is not None:
             virtual_host = url.virtual_host
         else:
-            virtual_host = self.driver_conf.rabbit_virtual_host
+            virtual_host = self.virtual_host
 
         self._url = ''
-        if self.driver_conf.fake_rabbit:
+        if self.fake_rabbit:
             LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
                      "rpc_backend to kombu+memory or use the fake "
                      "driver instead.")
@@ -423,13 +447,13 @@ class Connection(object):
             transport = url.transport.replace('kombu+', '')
             self._url = "%s://%s" % (transport, virtual_host)
         else:
-            for adr in self.driver_conf.rabbit_hosts:
+            for adr in self.rabbit_hosts:
                 hostname, port = netutils.parse_host_port(
-                    adr, default_port=self.driver_conf.rabbit_port)
+                    adr, default_port=self.rabbit_port)
                 self._url += '%samqp://%s:%s@%s:%s/%s' % (
                     ";" if self._url else '',
-                    parse.quote(self.driver_conf.rabbit_userid),
-                    parse.quote(self.driver_conf.rabbit_password),
+                    parse.quote(self.rabbit_userid),
+                    parse.quote(self.rabbit_password),
                     self._parse_url_hostname(hostname), port,
                     virtual_host)
 
@@ -450,9 +474,9 @@ class Connection(object):
 
         self.connection = kombu.connection.Connection(
             self._url, ssl=self._fetch_ssl_params(),
-            login_method=self._login_method,
+            login_method=self.login_method,
             failover_strategy="shuffle",
-            heartbeat=self.driver_conf.heartbeat_timeout_threshold,
+            heartbeat=self.heartbeat_timeout_threshold,
             transport_options={'confirm_publish': True})
 
         LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
@@ -467,8 +491,8 @@ class Connection(object):
         # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
         # heartbeat_rate is 2)
         self._heartbeat_wait_timeout = (
-            float(self.driver_conf.heartbeat_timeout_threshold) /
-            float(self.driver_conf.heartbeat_rate) / 2.0)
+            float(self.heartbeat_timeout_threshold) /
+            float(self.heartbeat_rate) / 2.0)
         self._heartbeat_support_log_emitted = False
 
         # NOTE(sileht): just ensure the connection is setuped at startup
@@ -538,19 +562,19 @@ class Connection(object):
         """Handles fetching what ssl params should be used for the connection
         (if any).
         """
-        if self.driver_conf.rabbit_use_ssl:
+        if self.rabbit_use_ssl:
             ssl_params = dict()
 
             # http://docs.python.org/library/ssl.html - ssl.wrap_socket
-            if self.driver_conf.kombu_ssl_version:
+            if self.kombu_ssl_version:
                 ssl_params['ssl_version'] = self.validate_ssl_version(
-                    self.driver_conf.kombu_ssl_version)
-            if self.driver_conf.kombu_ssl_keyfile:
-                ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile
-            if self.driver_conf.kombu_ssl_certfile:
-                ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile
-            if self.driver_conf.kombu_ssl_ca_certs:
-                ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs
+                    self.kombu_ssl_version)
+            if self.kombu_ssl_keyfile:
+                ssl_params['keyfile'] = self.kombu_ssl_keyfile
+            if self.kombu_ssl_certfile:
+                ssl_params['certfile'] = self.kombu_ssl_certfile
+            if self.kombu_ssl_ca_certs:
+                ssl_params['ca_certs'] = self.kombu_ssl_ca_certs
                 # We might want to allow variations in the
                 # future with this?
                 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
@@ -591,8 +615,8 @@ class Connection(object):
 
             recoverable_error_callback and recoverable_error_callback(exc)
 
-            interval = (self.driver_conf.kombu_reconnect_delay + interval
-                        if self.driver_conf.kombu_reconnect_delay > 0
+            interval = (self.kombu_reconnect_delay + interval
+                        if self.kombu_reconnect_delay > 0
                         else interval)
 
             info = {'err_str': exc, 'sleep_time': interval}
@@ -617,8 +641,8 @@ class Connection(object):
             # use kombu for HA connection, the interval_step
             # should sufficient, because the underlying kombu transport
             # connection object freed.
-            if self.driver_conf.kombu_reconnect_delay > 0:
-                time.sleep(self.driver_conf.kombu_reconnect_delay)
+            if self.kombu_reconnect_delay > 0:
+                time.sleep(self.kombu_reconnect_delay)
 
         def on_reconnection(new_channel):
             """Callback invoked when the kombu reconnects and creates
@@ -712,7 +736,7 @@ class Connection(object):
             self._consumers = []
 
     def _heartbeat_supported_and_enabled(self):
-        if self.driver_conf.heartbeat_timeout_threshold <= 0:
+        if self.heartbeat_timeout_threshold <= 0:
             return False
 
         if self.connection.supports_heartbeats:
@@ -741,9 +765,9 @@ class Connection(object):
         # NOTE(sileht): we are suposed to send at least one heartbeat
         # every heartbeat_timeout_threshold, so no need to way more
         with self._transport_socket_timeout(
-                self.driver_conf.heartbeat_timeout_threshold):
+                self.heartbeat_timeout_threshold):
             self.connection.heartbeat_check(
-                rate=self.driver_conf.heartbeat_rate)
+                rate=self.heartbeat_rate)
 
     def _heartbeat_start(self):
         if self._heartbeat_supported_and_enabled():
@@ -880,28 +904,28 @@ class Connection(object):
         responses for call/multicall
         """
 
-        consumer = Consumer(self.driver_conf,
-                            exchange_name=topic,
+        consumer = Consumer(exchange_name=topic,
                             queue_name=topic,
                             routing_key=topic,
                             type='direct',
                             durable=False,
                             auto_delete=True,
-                            callback=callback)
+                            callback=callback,
+                            rabbit_ha_queues=self.rabbit_ha_queues)
 
         self.declare_consumer(consumer)
 
     def declare_topic_consumer(self, exchange_name, topic, callback=None,
                                queue_name=None):
         """Create a 'topic' consumer."""
-        consumer = Consumer(self.driver_conf,
-                            exchange_name=exchange_name,
+        consumer = Consumer(exchange_name=exchange_name,
                             queue_name=queue_name or topic,
                             routing_key=topic,
                             type='topic',
-                            durable=self.driver_conf.amqp_durable_queues,
-                            auto_delete=self.driver_conf.amqp_auto_delete,
-                            callback=callback)
+                            durable=self.amqp_durable_queues,
+                            auto_delete=self.amqp_auto_delete,
+                            callback=callback,
+                            rabbit_ha_queues=self.rabbit_ha_queues)
 
         self.declare_consumer(consumer)
 
@@ -912,15 +936,14 @@ class Connection(object):
         exchange_name = '%s_fanout' % topic
         queue_name = '%s_fanout_%s' % (topic, unique)
 
-        consumer = Consumer(self.driver_conf,
-                            exchange_name=exchange_name,
+        consumer = Consumer(exchange_name=exchange_name,
                             queue_name=queue_name,
                             routing_key=topic,
                             type='fanout',
                             durable=False,
                             auto_delete=True,
                             callback=callback,
-                            nowait=False)
+                            rabbit_ha_queues=self.rabbit_ha_queues)
 
         self.declare_consumer(consumer)
 
@@ -955,7 +978,7 @@ class Connection(object):
         # a answer before timeout is reached
         transport_timeout = timeout
 
-        heartbeat_timeout = self.driver_conf.heartbeat_timeout_threshold
+        heartbeat_timeout = self.heartbeat_timeout_threshold
         if (self._heartbeat_supported_and_enabled() and (
                 transport_timeout is None or
                 transport_timeout > heartbeat_timeout)):
@@ -999,7 +1022,7 @@ class Connection(object):
                 auto_delete=exchange.auto_delete,
                 name=routing_key,
                 routing_key=routing_key,
-                queue_arguments=_get_queue_arguments(self.driver_conf))
+                queue_arguments=_get_queue_arguments(self.rabbit_ha_queues))
             queue.declare()
             self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier)
 
@@ -1019,7 +1042,7 @@ class Connection(object):
         # before timeout is exshauted
         duration = (
             timeout if timeout is not None
-            else self.conf.oslo_messaging_rabbit.kombu_reconnect_timeout
+            else self.kombu_reconnect_timeout
         )
 
         timer = rpc_common.DecayingTimer(duration=duration)
@@ -1065,8 +1088,8 @@ class Connection(object):
         exchange = kombu.entity.Exchange(
             name=exchange_name,
             type='topic',
-            durable=self.driver_conf.amqp_durable_queues,
-            auto_delete=self.driver_conf.amqp_auto_delete)
+            durable=self.amqp_durable_queues,
+            auto_delete=self.amqp_auto_delete)
 
         self._ensure_publishing(self._publish, exchange, msg,
                                 routing_key=topic, retry=retry)
@@ -1085,8 +1108,8 @@ class Connection(object):
         exchange = kombu.entity.Exchange(
             name=exchange_name,
             type='topic',
-            durable=self.driver_conf.amqp_durable_queues,
-            auto_delete=self.driver_conf.amqp_auto_delete)
+            durable=self.amqp_durable_queues,
+            auto_delete=self.amqp_auto_delete)
 
         self._ensure_publishing(self._publish_and_creates_default_queue,
                                 exchange, msg, routing_key=topic, retry=retry)