Merge "[kafka] Add several bootstrap servers support"
This commit is contained in:
commit
ad1bea351d
oslo_messaging
@ -97,21 +97,18 @@ class Connection(object):
|
||||
|
||||
def _parse_url(self):
|
||||
driver_conf = self.conf.oslo_messaging_kafka
|
||||
try:
|
||||
self.host = self.url.hosts[0].hostname
|
||||
except (NameError, IndexError):
|
||||
self.host = driver_conf.kafka_default_host
|
||||
|
||||
try:
|
||||
self.port = self.url.hosts[0].port
|
||||
except (NameError, IndexError):
|
||||
self.port = driver_conf.kafka_default_port
|
||||
self.hostaddrs = []
|
||||
|
||||
if self.host is None:
|
||||
self.host = driver_conf.kafka_default_host
|
||||
for host in self.url.hosts:
|
||||
if host.hostname:
|
||||
self.hostaddrs.append("%s:%s" % (
|
||||
host.hostname,
|
||||
host.port or driver_conf.kafka_default_port))
|
||||
|
||||
if self.port is None:
|
||||
self.port = driver_conf.kafka_default_port
|
||||
if not self.hostaddrs:
|
||||
self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
|
||||
driver_conf.kafka_default_port))
|
||||
|
||||
def notify_send(self, topic, ctxt, msg, retry):
|
||||
"""Send messages to Kafka broker.
|
||||
@ -215,7 +212,7 @@ class Connection(object):
|
||||
return
|
||||
try:
|
||||
self.kafka_client = kafka.KafkaClient(
|
||||
"%s:%s" % (self.host, str(self.port)))
|
||||
self.hostaddrs)
|
||||
self.producer = kafka.SimpleProducer(self.kafka_client)
|
||||
except KafkaError as e:
|
||||
LOG.exception(_LE("Kafka Connection is not available: %s"), e)
|
||||
@ -227,7 +224,7 @@ class Connection(object):
|
||||
self.kafka_client.ensure_topic_exists(topic)
|
||||
self.consumer = kafka.KafkaConsumer(
|
||||
*topics, group_id=group,
|
||||
bootstrap_servers=["%s:%s" % (self.host, str(self.port))],
|
||||
bootstrap_servers=self.hostaddrs,
|
||||
fetch_message_max_bytes=self.fetch_messages_max_bytes)
|
||||
self._consume_loop_stopped = False
|
||||
|
||||
|
@ -57,13 +57,17 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
('none', dict(url=None,
|
||||
expected=[dict(host='localhost', port=9092)])),
|
||||
expected=dict(hostaddrs=['localhost:9092']))),
|
||||
('empty', dict(url='kafka:///',
|
||||
expected=[dict(host='localhost', port=9092)])),
|
||||
expected=dict(hostaddrs=['localhost:9092']))),
|
||||
('host', dict(url='kafka://127.0.0.1',
|
||||
expected=[dict(host='127.0.0.1', port=9092)])),
|
||||
expected=dict(hostaddrs=['127.0.0.1:9092']))),
|
||||
('port', dict(url='kafka://localhost:1234',
|
||||
expected=[dict(host='localhost', port=1234)])),
|
||||
expected=dict(hostaddrs=['localhost:1234']))),
|
||||
('two', dict(url='kafka://localhost:1234,localhost2:1234',
|
||||
expected=dict(hostaddrs=['localhost:1234',
|
||||
'localhost2:1234']))),
|
||||
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
@ -76,8 +80,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
|
||||
driver = transport._driver
|
||||
|
||||
conn = driver._get_connection(kafka_driver.PURPOSE_SEND)
|
||||
self.assertEqual(self.expected[0]['host'], conn.host)
|
||||
self.assertEqual(self.expected[0]['port'], conn.port)
|
||||
self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs)
|
||||
|
||||
|
||||
class TestKafkaDriver(test_utils.BaseTestCase):
|
||||
|
Loading…
x
Reference in New Issue
Block a user