Add kafka driver vhost emulation
Emulate vhost support by adding the virtual host name to the topic created on the kafka server. Also, update connection management for producer/consumer. This patch: * updates target to topic generation * add consumer and producer connection classes * remove connection pool * update driver test Change-Id: Idd164444c04e9f465a43ee909af840a41bb090c0
This commit is contained in:
parent
e43240168f
commit
1ccdccddaa
@ -33,7 +33,6 @@ import tenacity
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as driver_common
|
||||
from oslo_messaging._drivers.kafka_driver import kafka_options
|
||||
from oslo_messaging._drivers import pool as driver_pool
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging._i18n import _LW
|
||||
from oslo_serialization import jsonutils
|
||||
@ -81,17 +80,21 @@ def pack_message(ctxt, msg):
|
||||
return msg
|
||||
|
||||
|
||||
def target_to_topic(target, priority=None):
|
||||
def concat(sep, items):
|
||||
return sep.join(filter(bool, items))
|
||||
|
||||
|
||||
def target_to_topic(target, priority=None, vhost=None):
|
||||
"""Convert target into topic string
|
||||
|
||||
:param target: Message destination target
|
||||
:type target: oslo_messaging.Target
|
||||
:param priority: Notification priority
|
||||
:type priority: string
|
||||
:param priority: Notification vhost
|
||||
:type priority: string
|
||||
"""
|
||||
if not priority:
|
||||
return target.topic
|
||||
return target.topic + '.' + priority
|
||||
return concat(".", [target.topic, priority, vhost])
|
||||
|
||||
|
||||
def retry_on_retriable_kafka_error(exc):
|
||||
@ -114,22 +117,12 @@ def with_reconnect(retries=None):
|
||||
|
||||
class Connection(object):
|
||||
|
||||
def __init__(self, conf, url, purpose):
|
||||
def __init__(self, conf, url):
|
||||
|
||||
self.client = None
|
||||
driver_conf = conf.oslo_messaging_kafka
|
||||
self.batch_size = driver_conf.producer_batch_size
|
||||
self.linger_ms = driver_conf.producer_batch_timeout * 1000
|
||||
self.conf = conf
|
||||
self.producer = None
|
||||
self.producer_lock = threading.Lock()
|
||||
self.consumer = None
|
||||
self.consumer_timeout = driver_conf.kafka_consumer_timeout
|
||||
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
|
||||
self.group_id = driver_conf.consumer_group
|
||||
self.url = url
|
||||
self.virtual_host = url.virtual_host
|
||||
self._parse_url()
|
||||
self._consume_loop_stopped = False
|
||||
|
||||
def _parse_url(self):
|
||||
driver_conf = self.conf.oslo_messaging_kafka
|
||||
@ -145,33 +138,22 @@ class Connection(object):
|
||||
self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
|
||||
driver_conf.kafka_default_port))
|
||||
|
||||
def notify_send(self, topic, ctxt, msg, retry):
|
||||
"""Send messages to Kafka broker.
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again."""
|
||||
pass
|
||||
|
||||
:param topic: String of the topic
|
||||
:param ctxt: context for the messages
|
||||
:param msg: messages for publishing
|
||||
:param retry: the number of retry
|
||||
"""
|
||||
retry = retry if retry >= 0 else None
|
||||
message = pack_message(ctxt, msg)
|
||||
message = jsonutils.dumps(message)
|
||||
|
||||
@with_reconnect(retries=retry)
|
||||
def wrapped_with_reconnect():
|
||||
self._ensure_producer()
|
||||
# NOTE(sileht): This returns a future, we can use get()
|
||||
# if we want to block like other driver
|
||||
future = self.producer.send(topic, message)
|
||||
future.get()
|
||||
class ConsumerConnection(Connection):
|
||||
|
||||
try:
|
||||
wrapped_with_reconnect()
|
||||
except Exception:
|
||||
# NOTE(sileht): if something goes wrong close the producer
|
||||
# connection
|
||||
self._close_producer()
|
||||
raise
|
||||
def __init__(self, conf, url):
|
||||
|
||||
super(ConsumerConnection, self).__init__(conf, url)
|
||||
driver_conf = self.conf.oslo_messaging_kafka
|
||||
self.consumer = None
|
||||
self.consumer_timeout = driver_conf.kafka_consumer_timeout
|
||||
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
|
||||
self.group_id = driver_conf.consumer_group
|
||||
self._consume_loop_stopped = False
|
||||
|
||||
@with_reconnect()
|
||||
def _poll_messages(self, timeout):
|
||||
@ -215,16 +197,67 @@ class Connection(object):
|
||||
def stop_consuming(self):
|
||||
self._consume_loop_stopped = True
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again."""
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
self._close_producer()
|
||||
if self.consumer:
|
||||
self.consumer.close()
|
||||
self.consumer = None
|
||||
|
||||
@with_reconnect()
|
||||
def declare_topic_consumer(self, topics, group=None):
|
||||
# TODO(Support for manual/auto_commit functionality)
|
||||
# When auto_commit is False, consumer can manually notify
|
||||
# the completion of the subscription.
|
||||
# Currently we don't support for non auto commit option
|
||||
self.consumer = kafka.KafkaConsumer(
|
||||
*topics, group_id=(group or self.group_id),
|
||||
bootstrap_servers=self.hostaddrs,
|
||||
max_partition_fetch_bytes=self.max_fetch_bytes,
|
||||
selector=KAFKA_SELECTOR
|
||||
)
|
||||
|
||||
|
||||
class ProducerConnection(Connection):
|
||||
|
||||
def __init__(self, conf, url):
|
||||
|
||||
super(ProducerConnection, self).__init__(conf, url)
|
||||
driver_conf = self.conf.oslo_messaging_kafka
|
||||
self.batch_size = driver_conf.producer_batch_size
|
||||
self.linger_ms = driver_conf.producer_batch_timeout * 1000
|
||||
self.producer = None
|
||||
self.producer_lock = threading.Lock()
|
||||
|
||||
def notify_send(self, topic, ctxt, msg, retry):
|
||||
"""Send messages to Kafka broker.
|
||||
|
||||
:param topic: String of the topic
|
||||
:param ctxt: context for the messages
|
||||
:param msg: messages for publishing
|
||||
:param retry: the number of retry
|
||||
"""
|
||||
retry = retry if retry >= 0 else None
|
||||
message = pack_message(ctxt, msg)
|
||||
message = jsonutils.dumps(message)
|
||||
|
||||
@with_reconnect(retries=retry)
|
||||
def wrapped_with_reconnect():
|
||||
self._ensure_producer()
|
||||
# NOTE(sileht): This returns a future, we can use get()
|
||||
# if we want to block like other driver
|
||||
future = self.producer.send(topic, message)
|
||||
future.get()
|
||||
|
||||
try:
|
||||
wrapped_with_reconnect()
|
||||
except Exception:
|
||||
# NOTE(sileht): if something goes wrong close the producer
|
||||
# connection
|
||||
self._close_producer()
|
||||
raise
|
||||
|
||||
def close(self):
|
||||
self._close_producer()
|
||||
|
||||
def _close_producer(self):
|
||||
with self.producer_lock:
|
||||
if self.producer:
|
||||
@ -243,19 +276,6 @@ class Connection(object):
|
||||
batch_size=self.batch_size,
|
||||
selector=KAFKA_SELECTOR)
|
||||
|
||||
@with_reconnect()
|
||||
def declare_topic_consumer(self, topics, group=None):
|
||||
# TODO(Support for manual/auto_commit functionality)
|
||||
# When auto_commit is False, consumer can manually notify
|
||||
# the completion of the subscription.
|
||||
# Currently we don't support for non auto commit option
|
||||
self.consumer = kafka.KafkaConsumer(
|
||||
*topics, group_id=(group or self.group_id),
|
||||
bootstrap_servers=self.hostaddrs,
|
||||
max_partition_fetch_bytes=self.max_fetch_bytes,
|
||||
selector=KAFKA_SELECTOR
|
||||
)
|
||||
|
||||
|
||||
class OsloKafkaMessage(base.RpcIncomingMessage):
|
||||
|
||||
@ -314,17 +334,12 @@ class KafkaDriver(base.BaseDriver):
|
||||
super(KafkaDriver, self).__init__(
|
||||
conf, url, default_exchange, allowed_remote_exmods)
|
||||
|
||||
# the pool configuration properties
|
||||
max_size = self.conf.oslo_messaging_kafka.pool_size
|
||||
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
|
||||
ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl
|
||||
|
||||
self.connection_pool = driver_pool.ConnectionPool(
|
||||
self.conf, max_size, min_size, ttl,
|
||||
self._url, Connection)
|
||||
self.listeners = []
|
||||
self.virtual_host = url.virtual_host
|
||||
self.pconn = ProducerConnection(conf, url)
|
||||
|
||||
def cleanup(self):
|
||||
self.pconn.close()
|
||||
for c in self.listeners:
|
||||
c.close()
|
||||
self.listeners = []
|
||||
@ -351,8 +366,9 @@ class KafkaDriver(base.BaseDriver):
|
||||
N means N retries
|
||||
:type retry: int
|
||||
"""
|
||||
with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn:
|
||||
conn.notify_send(target_to_topic(target), ctxt, message, retry)
|
||||
self.pconn.notify_send(target_to_topic(target,
|
||||
vhost=self.virtual_host),
|
||||
ctxt, message, retry)
|
||||
|
||||
def listen(self, target, batch_size, batch_timeout):
|
||||
raise NotImplementedError(
|
||||
@ -370,7 +386,7 @@ class KafkaDriver(base.BaseDriver):
|
||||
:param pool: consumer group of Kafka consumers
|
||||
:type pool: string
|
||||
"""
|
||||
conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN)
|
||||
conn = ConsumerConnection(self.conf, self._url)
|
||||
topics = set()
|
||||
for target, priority in targets_and_priorities:
|
||||
topics.add(target_to_topic(target, priority))
|
||||
@ -380,6 +396,3 @@ class KafkaDriver(base.BaseDriver):
|
||||
listener = KafkaListener(conn)
|
||||
return base.PollStyleListenerAdapter(listener, batch_size,
|
||||
batch_timeout)
|
||||
|
||||
def _get_connection(self, purpose):
|
||||
return driver_common.ConnectionContext(self.connection_pool, purpose)
|
||||
|
@ -33,12 +33,18 @@ KAFKA_OPTS = [
|
||||
help='Default timeout(s) for Kafka consumers'),
|
||||
|
||||
cfg.IntOpt('pool_size', default=10,
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='Driver no longer uses connection pool. ',
|
||||
help='Pool Size for Kafka Consumers'),
|
||||
|
||||
cfg.IntOpt('conn_pool_min_size', default=2,
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='Driver no longer uses connection pool. ',
|
||||
help='The pool size limit for connections expiration policy'),
|
||||
|
||||
cfg.IntOpt('conn_pool_ttl', default=1200,
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='Driver no longer uses connection pool. ',
|
||||
help='The time-to-live in sec of idle connections in the pool'),
|
||||
|
||||
cfg.StrOpt('consumer_group', default="oslo_messaging_consumer",
|
||||
|
@ -17,7 +17,6 @@ from six.moves import mock
|
||||
import testscenarios
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import common as common_driver
|
||||
from oslo_messaging._drivers import impl_kafka as kafka_driver
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
@ -39,16 +38,24 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
('none', dict(url=None,
|
||||
expected=dict(hostaddrs=['localhost:9092']))),
|
||||
expected=dict(hostaddrs=['localhost:9092'],
|
||||
vhost=None))),
|
||||
('empty', dict(url='kafka:///',
|
||||
expected=dict(hostaddrs=['localhost:9092']))),
|
||||
expected=dict(hostaddrs=['localhost:9092'],
|
||||
vhost=''))),
|
||||
('host', dict(url='kafka://127.0.0.1',
|
||||
expected=dict(hostaddrs=['127.0.0.1:9092']))),
|
||||
expected=dict(hostaddrs=['127.0.0.1:9092'],
|
||||
vhost=None))),
|
||||
('port', dict(url='kafka://localhost:1234',
|
||||
expected=dict(hostaddrs=['localhost:1234']))),
|
||||
expected=dict(hostaddrs=['localhost:1234'],
|
||||
vhost=None))),
|
||||
('vhost', dict(url='kafka://localhost:1234/my_host',
|
||||
expected=dict(hostaddrs=['localhost:1234'],
|
||||
vhost='my_host'))),
|
||||
('two', dict(url='kafka://localhost:1234,localhost2:1234',
|
||||
expected=dict(hostaddrs=['localhost:1234',
|
||||
'localhost2:1234']))),
|
||||
'localhost2:1234'],
|
||||
vhost=None))),
|
||||
|
||||
]
|
||||
|
||||
@ -62,8 +69,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
|
||||
self.addCleanup(transport.cleanup)
|
||||
driver = transport._driver
|
||||
|
||||
conn = driver._get_connection(common_driver.PURPOSE_SEND)
|
||||
self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs)
|
||||
self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
|
||||
self.assertEqual(self.expected['vhost'], driver.virtual_host)
|
||||
|
||||
|
||||
class TestKafkaDriver(test_utils.BaseTestCase):
|
||||
@ -130,10 +137,11 @@ class TestKafkaConnection(test_utils.BaseTestCase):
|
||||
self.driver = transport._driver
|
||||
|
||||
def test_notify(self):
|
||||
conn = self.driver._get_connection(common_driver.PURPOSE_SEND)
|
||||
|
||||
with mock.patch("kafka.KafkaProducer") as fake_producer_class:
|
||||
fake_producer = fake_producer_class.return_value
|
||||
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
||||
{"fake_text": "fake_message_1"}, 10)
|
||||
self.driver.pconn.notify_send("fake_topic",
|
||||
{"fake_ctxt": "fake_param"},
|
||||
{"fake_text": "fake_message_1"},
|
||||
10)
|
||||
self.assertEqual(2, len(fake_producer.send.mock_calls))
|
||||
|
2
tox.ini
2
tox.ini
@ -53,7 +53,7 @@ commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args=
|
||||
[testenv:py27-func-kafka]
|
||||
setenv =
|
||||
{[testenv]setenv}
|
||||
TRANSPORT_URL=kafka://127.0.0.1:9092//
|
||||
TRANSPORT_URL=kafka://127.0.0.1:9092/
|
||||
OS_GROUP_REGEX=oslo_messaging.tests.functional
|
||||
commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user