From 99b843767d0ab1f37dc5662c0efc0263fcd88977 Mon Sep 17 00:00:00 2001
From: Ildar Svetlov <isvetlov@gmail.com>
Date: Thu, 14 Apr 2016 19:45:32 +0300
Subject: [PATCH] [kafka] Add several bootstrap servers support

At that moment kafka driver can use only url with one "host:port"
for the bootstrap server defining, but kafka client supports
set of host:port adresses: "host1:port1,host2:port2", ... .
This patch implement this functional in kafka driver for the better HA.

List self.hostaddrs stores strings "host:port" of Connection.
It collects from self.url.hosts

Change-Id: I5eece66ca6bd069a0df8c8629b4ac815f69a7c7d
Closes-Bug: #1572017
---
 oslo_messaging/_drivers/impl_kafka.py         | 25 ++++++++-----------
 .../tests/drivers/test_impl_kafka.py          | 15 ++++++-----
 2 files changed, 20 insertions(+), 20 deletions(-)

diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 471734cfb..018362062 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -97,21 +97,18 @@ class Connection(object):
 
     def _parse_url(self):
         driver_conf = self.conf.oslo_messaging_kafka
-        try:
-            self.host = self.url.hosts[0].hostname
-        except (NameError, IndexError):
-            self.host = driver_conf.kafka_default_host
 
-        try:
-            self.port = self.url.hosts[0].port
-        except (NameError, IndexError):
-            self.port = driver_conf.kafka_default_port
+        self.hostaddrs = []
 
-        if self.host is None:
-            self.host = driver_conf.kafka_default_host
+        for host in self.url.hosts:
+            if host.hostname:
+                self.hostaddrs.append("%s:%s" % (
+                    host.hostname,
+                    host.port or driver_conf.kafka_default_port))
 
-        if self.port is None:
-            self.port = driver_conf.kafka_default_port
+        if not self.hostaddrs:
+            self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
+                                             driver_conf.kafka_default_port))
 
     def notify_send(self, topic, ctxt, msg, retry):
         """Send messages to Kafka broker.
@@ -215,7 +212,7 @@ class Connection(object):
             return
         try:
             self.kafka_client = kafka.KafkaClient(
-                "%s:%s" % (self.host, str(self.port)))
+                self.hostaddrs)
             self.producer = kafka.SimpleProducer(self.kafka_client)
         except KafkaError as e:
             LOG.exception(_LE("Kafka Connection is not available: %s"), e)
@@ -227,7 +224,7 @@ class Connection(object):
             self.kafka_client.ensure_topic_exists(topic)
         self.consumer = kafka.KafkaConsumer(
             *topics, group_id=group,
-            bootstrap_servers=["%s:%s" % (self.host, str(self.port))],
+            bootstrap_servers=self.hostaddrs,
             fetch_message_max_bytes=self.fetch_messages_max_bytes)
         self._consume_loop_stopped = False
 
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index e5528cc30..a1d7e6c57 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -57,13 +57,17 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
 
     scenarios = [
         ('none', dict(url=None,
-                      expected=[dict(host='localhost', port=9092)])),
+                      expected=dict(hostaddrs=['localhost:9092']))),
         ('empty', dict(url='kafka:///',
-                       expected=[dict(host='localhost', port=9092)])),
+                       expected=dict(hostaddrs=['localhost:9092']))),
         ('host', dict(url='kafka://127.0.0.1',
-                      expected=[dict(host='127.0.0.1', port=9092)])),
+                      expected=dict(hostaddrs=['127.0.0.1:9092']))),
         ('port', dict(url='kafka://localhost:1234',
-                      expected=[dict(host='localhost', port=1234)])),
+                      expected=dict(hostaddrs=['localhost:1234']))),
+        ('two', dict(url='kafka://localhost:1234,localhost2:1234',
+                     expected=dict(hostaddrs=['localhost:1234',
+                                              'localhost2:1234']))),
+
     ]
 
     def setUp(self):
@@ -76,8 +80,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
         driver = transport._driver
 
         conn = driver._get_connection(kafka_driver.PURPOSE_SEND)
-        self.assertEqual(self.expected[0]['host'], conn.host)
-        self.assertEqual(self.expected[0]['port'], conn.port)
+        self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs)
 
 
 class TestKafkaDriver(test_utils.BaseTestCase):