diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index ec3296b5c..007727c9f 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -119,24 +119,39 @@ class Connection(object):
 
     def __init__(self, conf, url):
 
-        self.conf = conf
+        self.driver_conf = conf.oslo_messaging_kafka
+        self.security_protocol = self.driver_conf.security_protocol
+        self.sasl_mechanism = self.driver_conf.sasl_mechanism
+        self.ssl_cafile = self.driver_conf.ssl_cafile
         self.url = url
         self.virtual_host = url.virtual_host
         self._parse_url()
 
     def _parse_url(self):
-        driver_conf = self.conf.oslo_messaging_kafka
         self.hostaddrs = []
+        self.username = None
+        self.password = None
 
         for host in self.url.hosts:
+            # NOTE(ansmith): connections and failover are transparently
+            # managed by the client library. Credentials will be
+            # selectd from first host encountered in transport_url
+            if self.username is None:
+                self.username = host.username
+                self.password = host.password
+            else:
+                if self.username != host.username:
+                    LOG.warning(_LW("Different transport usernames detected"))
+
             if host.hostname:
                 self.hostaddrs.append("%s:%s" % (
                     host.hostname,
-                    host.port or driver_conf.kafka_default_port))
+                    host.port or self.driver_conf.kafka_default_port))
 
         if not self.hostaddrs:
-            self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
-                                             driver_conf.kafka_default_port))
+            self.hostaddrs.append("%s:%s" %
+                                  (self.driver_conf.kafka_default_host,
+                                   self.driver_conf.kafka_default_port))
 
     def reset(self):
         """Reset a connection so it can be used again."""
@@ -148,13 +163,12 @@ class ConsumerConnection(Connection):
     def __init__(self, conf, url):
 
         super(ConsumerConnection, self).__init__(conf, url)
-        driver_conf = self.conf.oslo_messaging_kafka
         self.consumer = None
-        self.consumer_timeout = driver_conf.kafka_consumer_timeout
-        self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
-        self.group_id = driver_conf.consumer_group
-        self.enable_auto_commit = driver_conf.enable_auto_commit
-        self.max_poll_records = driver_conf.max_poll_records
+        self.consumer_timeout = self.driver_conf.kafka_consumer_timeout
+        self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes
+        self.group_id = self.driver_conf.consumer_group
+        self.enable_auto_commit = self.driver_conf.enable_auto_commit
+        self.max_poll_records = self.driver_conf.max_poll_records
         self._consume_loop_stopped = False
 
     @with_reconnect()
@@ -216,6 +230,11 @@ class ConsumerConnection(Connection):
             bootstrap_servers=self.hostaddrs,
             max_partition_fetch_bytes=self.max_fetch_bytes,
             max_poll_records=self.max_poll_records,
+            security_protocol=self.security_protocol,
+            sasl_mechanism=self.sasl_mechanism,
+            sasl_plain_username=self.username,
+            sasl_plain_password=self.password,
+            ssl_cafile=self.ssl_cafile,
             selector=KAFKA_SELECTOR
         )
 
@@ -225,9 +244,8 @@ class ProducerConnection(Connection):
     def __init__(self, conf, url):
 
         super(ProducerConnection, self).__init__(conf, url)
-        driver_conf = self.conf.oslo_messaging_kafka
-        self.batch_size = driver_conf.producer_batch_size
-        self.linger_ms = driver_conf.producer_batch_timeout * 1000
+        self.batch_size = self.driver_conf.producer_batch_size
+        self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
         self.producer = None
         self.producer_lock = threading.Lock()
 
@@ -278,6 +296,11 @@ class ProducerConnection(Connection):
                 bootstrap_servers=self.hostaddrs,
                 linger_ms=self.linger_ms,
                 batch_size=self.batch_size,
+                security_protocol=self.security_protocol,
+                sasl_mechanism=self.sasl_mechanism,
+                sasl_plain_username=self.username,
+                sasl_plain_password=self.password,
+                ssl_cafile=self.ssl_cafile,
                 selector=KAFKA_SELECTOR)
 
 
diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
index e435866d0..d68b7efeb 100644
--- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py
+++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
@@ -63,7 +63,20 @@ KAFKA_OPTS = [
                 help='Enable asynchronous consumer commits'),
 
     cfg.IntOpt('max_poll_records', default=500,
-               help='The maximum number of records returned in a poll call')
+               help='The maximum number of records returned in a poll call'),
+
+    cfg.StrOpt('security_protocol', default='PLAINTEXT',
+               choices=('PLAINTEXT', 'SASL_PLAINTEXT', 'SSL', 'SASL_SSL'),
+               help='Protocol used to communicate with brokers'),
+
+    cfg.StrOpt('sasl_mechanism',
+               default='PLAIN',
+               help='Mechanism when security protocol is SASL'),
+
+    cfg.StrOpt('ssl_cafile',
+               default='',
+               help='CA certificate PEM file used to verify the server'
+               ' certificate')
 ]
 
 
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index a34ce639b..a4860bd55 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -39,24 +39,47 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
     scenarios = [
         ('none', dict(url=None,
                       expected=dict(hostaddrs=['localhost:9092'],
+                                    username=None,
+                                    password=None,
                                     vhost=None))),
         ('empty', dict(url='kafka:///',
                        expected=dict(hostaddrs=['localhost:9092'],
+                                     username=None,
+                                     password=None,
                                      vhost=''))),
         ('host', dict(url='kafka://127.0.0.1',
                       expected=dict(hostaddrs=['127.0.0.1:9092'],
+                                    username=None,
+                                    password=None,
                                     vhost=None))),
         ('port', dict(url='kafka://localhost:1234',
                       expected=dict(hostaddrs=['localhost:1234'],
+                                    username=None,
+                                    password=None,
                                     vhost=None))),
         ('vhost', dict(url='kafka://localhost:1234/my_host',
                        expected=dict(hostaddrs=['localhost:1234'],
+                                     username=None,
+                                     password=None,
                                      vhost='my_host'))),
         ('two', dict(url='kafka://localhost:1234,localhost2:1234',
                      expected=dict(hostaddrs=['localhost:1234',
                                               'localhost2:1234'],
+                                   username=None,
+                                   password=None,
                                    vhost=None))),
-
+        ('user', dict(url='kafka://stack:stacksecret@localhost:9092/my_host',
+                      expected=dict(hostaddrs=['localhost:9092'],
+                                    username='stack',
+                                    password='stacksecret',
+                                    vhost='my_host'))),
+        ('user2', dict(url='kafka://stack:stacksecret@localhost:9092,'
+                       'stack2:stacksecret2@localhost:1234/my_host',
+                       expected=dict(hostaddrs=['localhost:9092',
+                                                'localhost:1234'],
+                                     username='stack',
+                                     password='stacksecret',
+                                     vhost='my_host'))),
     ]
 
     def setUp(self):
@@ -70,6 +93,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
         driver = transport._driver
 
         self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
+        self.assertEqual(self.expected['username'], driver.pconn.username)
+        self.assertEqual(self.expected['password'], driver.pconn.password)
         self.assertEqual(self.expected['vhost'], driver.virtual_host)
 
 
@@ -119,6 +144,11 @@ class TestKafkaDriver(test_utils.BaseTestCase):
                 bootstrap_servers=['localhost:9092'],
                 max_partition_fetch_bytes=mock.ANY,
                 max_poll_records=mock.ANY,
+                security_protocol='PLAINTEXT',
+                sasl_mechanism='PLAIN',
+                sasl_plain_username=mock.ANY,
+                sasl_plain_password=mock.ANY,
+                ssl_cafile='',
                 selector=mock.ANY
             )