diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py new file mode 100644 index 000000000..9be417bdd --- /dev/null +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -0,0 +1,363 @@ +# Copyright (C) 2015 Cisco Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import threading + +from oslo_messaging._drivers import base +from oslo_messaging._drivers import common as driver_common +from oslo_messaging._drivers import pool as driver_pool +from oslo_messaging._i18n import _LE +from oslo_messaging._i18n import _LW +from oslo_serialization import jsonutils + +import kafka +from kafka.common import KafkaError +from oslo_config import cfg +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + +PURPOSE_SEND = 'send' +PURPOSE_LISTEN = 'listen' + +kafka_opts = [ + cfg.StrOpt('kafka_default_host', default='localhost', + help='Default Kafka broker Host'), + + cfg.IntOpt('kafka_default_port', default=9092, + help='Default Kafka broker Port'), + + cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024, + help='Max fetch bytes of Kafka consumer'), + + cfg.IntOpt('kafka_consumer_timeout', default=1.0, + help='Default timeout(s) for Kafka consumers'), + + cfg.IntOpt('pool_size', default=10, + help='Pool Size for Kafka Consumers'), +] + +CONF = cfg.CONF + + +def pack_context_with_message(ctxt, msg): + """Pack context into msg.""" + if isinstance(ctxt, dict): + context_d = ctxt + else: + context_d = ctxt.to_dict() + + return {'message': msg, 'context': context_d} + + +def target_to_topic(target): + """Convert target into topic string + + :param target: Message destination target + :type target: oslo_messaging.Target + """ + if target.exchange is None: + return target.topic + return "%s_%s" % (target.exchange, target.topic) + + +class Connection(object): + + def __init__(self, conf, url, purpose): + + driver_conf = conf.oslo_messaging_kafka + + self.conf = conf + self.kafka_client = None + self.producer = None + self.consumer = None + self.fetch_messages_max_bytes = driver_conf.kafka_max_fetch_bytes + self.consumer_timeout = float(driver_conf.kafka_consumer_timeout) + self.url = url + self._parse_url() + # TODO(Support for manual/auto_commit functionality) + # When auto_commit is False, consumer can manually notify + # the completion of the subscription. + # Currently we don't support for non auto commit option + self.auto_commit = True + self._consume_loop_stopped = False + + def _parse_url(self): + driver_conf = self.conf.oslo_messaging_kafka + try: + self.host = self.url.hosts[0].hostname + except (NameError, IndexError): + self.host = driver_conf.kafka_default_host + + try: + self.port = self.url.hosts[0].port + except (NameError, IndexError): + self.port = driver_conf.kafka_default_port + + if self.host is None: + self.host = driver_conf.kafka_default_host + + if self.port is None: + self.port = driver_conf.kafka_default_port + + def notify_send(self, topic, ctxt, msg, retry): + """Send messages to Kafka broker. + + :param topic: String of the topic + :param ctxt: context for the messages + :param msg: messages for publishing + :param retry: the number of retry + """ + message = pack_context_with_message(ctxt, msg) + self._ensure_connection() + self._send_and_retry(message, topic, retry) + + def _send_and_retry(self, message, topic, retry): + current_retry = 0 + if not isinstance(message, str): + message = jsonutils.dumps(message) + while message is not None: + try: + self._send(message, topic) + message = None + except Exception: + LOG.warn(_LW("Failed to publish a message of topic %s"), topic) + current_retry += 1 + if retry is not None and current_retry >= retry: + LOG.exception(_LE("Failed to retry to send data " + "with max retry times")) + message = None + + def _send(self, message, topic): + self.producer.send_messages(topic, message) + + def consume(self, timeout=None): + """recieve messages as many as max_fetch_messages. + + In this functions, there are no while loop to subscribe. + This would be helpful when we wants to control the velocity of + subscription. + """ + duration = (self.consumer_timeout if timeout is None else timeout) + timer = driver_common.DecayingTimer(duration=duration) + timer.start() + + def _raise_timeout(): + LOG.debug('Timed out waiting for Kafka response') + raise driver_common.Timeout() + + poll_timeout = (self.consumer_timeout if timeout is None + else min(timeout, self.consumer_timeout)) + + while True: + if self._consume_loop_stopped: + return + try: + next_timeout = poll_timeout * 1000.0 + # TODO(use configure() method instead) + # Currently KafkaConsumer does not support for + # the case of updating only fetch_max_wait_ms parameter + self.consumer._config['fetch_max_wait_ms'] = next_timeout + messages = list(self.consumer.fetch_messages()) + except Exception as e: + LOG.exception(_LE("Failed to consume messages: %s"), e) + messages = None + + if not messages: + poll_timeout = timer.check_return( + _raise_timeout, maximum=self.consumer_timeout) + continue + + return messages + + def stop_consuming(self): + self._consume_loop_stopped = True + + def reset(self): + """Reset a connection so it can be used again.""" + if self.kafka_client: + self.kafka_client.close() + self.kafka_client = None + if self.producer: + self.producer.stop() + self.producer = None + self.consumer = None + + def close(self): + if self.kafka_client: + self.kafka_client.close() + self.kafka_client = None + if self.producer: + self.producer.stop() + self.consumer = None + + def commit(self): + """Commit is used by subscribers belonging to the same group. + After subscribing messages, commit is called to prevent + the other subscribers which belong to the same group + from re-subscribing the same messages. + + Currently self.auto_commit option is always True, + so we don't need to call this function. + """ + self.consumer.commit() + + def _ensure_connection(self): + if self.kafka_client: + return + try: + self.kafka_client = kafka.KafkaClient( + "%s:%s" % (self.host, str(self.port))) + self.producer = kafka.SimpleProducer(self.kafka_client) + except KafkaError as e: + LOG.exception(_LE("Kafka Connection is not available: %s"), e) + self.kafka_client = None + + def declare_topic_consumer(self, topics, group=None): + self.consumer = kafka.KafkaConsumer( + *topics, group_id=group, + metadata_broker_list=["%s:%s" % (self.host, str(self.port))], + # auto_commit_enable=self.auto_commit, + fetch_message_max_bytes=self.fetch_messages_max_bytes) + + +class OsloKafkaMessage(base.IncomingMessage): + + def __init__(self, listener, ctxt, message): + super(OsloKafkaMessage, self).__init__(listener, ctxt, message) + + def requeue(self): + LOG.warn(_LW("requeue is not supported")) + + def reply(self, reply=None, failure=None, log_failure=True): + LOG.warn(_LW("reply is not supported")) + + +class KafkaListener(base.Listener): + + def __init__(self, driver, conn): + super(KafkaListener, self).__init__(driver) + self._stopped = threading.Event() + self.conn = conn + self.incoming_queue = [] + + def poll(self, timeout=None): + while not self._stopped.is_set(): + if self.incoming_queue: + return self.incoming_queue.pop(0) + try: + messages = self.conn.consume(timeout=timeout) + for msg in messages: + message = msg.value + message = jsonutils.loads(message) + self.incoming_queue.append(OsloKafkaMessage( + listener=self, ctxt=message['context'], + message=message['message'])) + except driver_common.Timeout: + return None + + def stop(self): + self._stopped.set() + self.conn.stop_consuming() + + def cleanup(self): + self.conn.close() + + def commit(self): + # TODO(Support for manually/auto commit functionality) + # It's better to allow users to commit manually and support for + # self.auto_commit = False option. For now, this commit function + # is meaningless since user couldn't call this function and + # auto_commit option is always True. + self.conn.commit() + + +class KafkaDriver(base.BaseDriver): + """Note: Current implementation of this driver is experimental. + We will have functional and/or integrated testing enabled for this driver. + """ + + def __init__(self, conf, url, default_exchange=None, + allowed_remote_exmods=None): + + opt_group = cfg.OptGroup(name='oslo_messaging_kafka', + title='Kafka driver options') + conf.register_group(opt_group) + conf.register_opts(kafka_opts, group=opt_group) + + super(KafkaDriver, self).__init__( + conf, url, default_exchange, allowed_remote_exmods) + + self.connection_pool = driver_pool.ConnectionPool( + self.conf, self.conf.oslo_messaging_kafka.pool_size, + self._url, Connection) + self.listeners = [] + + def cleanup(self): + for c in self.listeners: + c.close() + self.listeners = [] + + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + retry=None): + raise NotImplementedError( + 'The RPC implementation for Kafka is not implemented') + + def send_notification(self, target, ctxt, message, version, retry=None): + """Send notification to Kafka brokers + + :param target: Message destination target + :type target: oslo_messaging.Target + :param ctxt: Message context + :type ctxt: dict + :param message: Message payload to pass + :type message: dict + :param version: Messaging API version (currently not used) + :type version: str + :param retry: an optional default kafka consumer retries configuration + None means to retry forever + 0 means no retry + N means N retries + :type retry: int + """ + with self._get_connection(purpose=PURPOSE_SEND) as conn: + conn.notify_send(target_to_topic(target), ctxt, message, retry) + + def listen(self, target): + raise NotImplementedError( + 'The RPC implementation for Kafka is not implemented') + + def listen_for_notifications(self, targets_and_priorities, pool=None): + """Listen to a specified list of targets on Kafka brokers + + :param targets_and_priorities: List of pairs (target, priority) + priority is not used for kafka driver + target.exchange_target.topic is used as + a kafka topic + :type targets_and_priorities: list + :param pool: consumer group of Kafka consumers + :type pool: string + """ + conn = self._get_connection(purpose=PURPOSE_LISTEN) + topics = [] + for target, priority in targets_and_priorities: + topics.append(target_to_topic(target)) + + conn.declare_topic_consumer(topics, pool) + + listener = KafkaListener(self, conn) + return listener + + def _get_connection(self, purpose): + return driver_common.ConnectionContext(self.connection_pool, purpose) diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py new file mode 100644 index 000000000..6f25b2c64 --- /dev/null +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -0,0 +1,288 @@ +# Copyright (C) 2015 Cisco Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import json +import kafka +from kafka.common import KafkaError +import mock +import testscenarios +from testtools.testcase import unittest +import time + +import oslo_messaging +from oslo_messaging._drivers import common as driver_common +from oslo_messaging._drivers import impl_kafka as kafka_driver +from oslo_messaging.tests import utils as test_utils + +load_tests = testscenarios.load_tests_apply_scenarios + +KAFKA_BROKER = 'localhost:9092' +KAFKA_BROKER_URL = 'kafka://localhost:9092' + + +def _is_kafka_service_running(): + """Checks whether the Kafka service is running or not""" + kafka_running = True + try: + broker = KAFKA_BROKER + kafka.KafkaClient(broker) + except KafkaError: + # Kafka service is not running. + kafka_running = False + return kafka_running + + +class TestKafkaDriverLoad(test_utils.BaseTestCase): + + def setUp(self): + super(TestKafkaDriverLoad, self).setUp() + self.messaging_conf.transport_driver = 'kafka' + + def test_driver_load(self): + transport = oslo_messaging.get_transport(self.conf) + self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver) + + +class TestKafkaTransportURL(test_utils.BaseTestCase): + + scenarios = [ + ('none', dict(url=None, + expected=[dict(host='localhost', port=9092)])), + ('empty', dict(url='kafka:///', + expected=[dict(host='localhost', port=9092)])), + ('host', dict(url='kafka://127.0.0.1', + expected=[dict(host='127.0.0.1', port=9092)])), + ('port', dict(url='kafka://localhost:1234', + expected=[dict(host='localhost', port=1234)])), + ] + + def setUp(self): + super(TestKafkaTransportURL, self).setUp() + self.messaging_conf.transport_driver = 'kafka' + + def test_transport_url(self): + transport = oslo_messaging.get_transport(self.conf, self.url) + self.addCleanup(transport.cleanup) + driver = transport._driver + + conn = driver._get_connection(kafka_driver.PURPOSE_SEND) + self.assertEqual(self.expected[0]['host'], conn.host) + self.assertEqual(self.expected[0]['port'], conn.port) + + +class TestKafkaDriver(test_utils.BaseTestCase): + """Unit Test cases to test the kafka driver + """ + + def setUp(self): + super(TestKafkaDriver, self).setUp() + self.messaging_conf.transport_driver = 'kafka' + transport = oslo_messaging.get_transport(self.conf) + self.driver = transport._driver + + def test_send(self): + target = oslo_messaging.Target(topic="topic_test") + self.assertRaises(NotImplementedError, + self.driver.send, target, {}, {}) + + def test_send_notification(self): + target = oslo_messaging.Target(topic="topic_test") + + with mock.patch.object( + kafka_driver.Connection, 'notify_send') as fake_send: + self.driver.send_notification(target, {}, {}, None) + self.assertEquals(1, len(fake_send.mock_calls)) + + def test_listen(self): + target = oslo_messaging.Target(topic="topic_test") + self.assertRaises(NotImplementedError, self.driver.listen, target) + + +class TestKafkaConnection(test_utils.BaseTestCase): + + def setUp(self): + super(TestKafkaConnection, self).setUp() + self.messaging_conf.transport_driver = 'kafka' + transport = oslo_messaging.get_transport(self.conf) + self.driver = transport._driver + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, '_send') + def test_notify(self, fake_send, fake_ensure_connection): + conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND) + conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, + {"fake_text": "fake_message_1"}, 10) + self.assertEqual(1, len(fake_send.mock_calls)) + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, '_send') + def test_notify_with_retry(self, fake_send, fake_ensure_connection): + conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND) + fake_send.side_effect = KafkaError("fake_exception") + conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, + {"fake_text": "fake_message_2"}, 10) + self.assertEqual(10, len(fake_send.mock_calls)) + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, '_parse_url') + def test_consume(self, fake_parse_url, fake_ensure_connection): + fake_message = { + "context": {"fake": "fake_context_1"}, + "message": {"fake": "fake_message_1"}} + + conn = kafka_driver.Connection( + self.conf, '', kafka_driver.PURPOSE_LISTEN) + + conn.consumer = mock.MagicMock() + conn.consumer.fetch_messages = mock.MagicMock( + return_value=iter([json.dumps(fake_message)])) + + self.assertEqual(fake_message, json.loads(conn.consume()[0])) + self.assertEqual(1, len(conn.consumer.fetch_messages.mock_calls)) + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, '_parse_url') + def test_consume_timeout(self, fake_parse_url, fake_ensure_connection): + deadline = time.time() + 3 + conn = kafka_driver.Connection( + self.conf, '', kafka_driver.PURPOSE_LISTEN) + + conn.consumer = mock.MagicMock() + conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([])) + + self.assertRaises(driver_common.Timeout, conn.consume, timeout=3) + self.assertEqual(0, int(deadline - time.time())) + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, '_parse_url') + def test_consume_with_default_timeout( + self, fake_parse_url, fake_ensure_connection): + deadline = time.time() + 1 + conn = kafka_driver.Connection( + self.conf, '', kafka_driver.PURPOSE_LISTEN) + + conn.consumer = mock.MagicMock() + conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([])) + + self.assertRaises(driver_common.Timeout, conn.consume) + self.assertEqual(0, int(deadline - time.time())) + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, '_parse_url') + def test_consume_timeout_without_consumers( + self, fake_parse_url, fake_ensure_connection): + deadline = time.time() + 3 + conn = kafka_driver.Connection( + self.conf, '', kafka_driver.PURPOSE_LISTEN) + conn.consumer = mock.MagicMock(return_value=None) + + self.assertRaises(driver_common.Timeout, conn.consume, timeout=3) + self.assertEqual(0, int(deadline - time.time())) + + +class TestKafkaListener(test_utils.BaseTestCase): + + def setUp(self): + super(TestKafkaListener, self).setUp() + self.messaging_conf.transport_driver = 'kafka' + transport = oslo_messaging.get_transport(self.conf) + self.driver = transport._driver + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') + def test_create_listener(self, fake_consumer, fake_ensure_connection): + fake_target = oslo_messaging.Target(topic='fake_topic') + fake_targets_and_priorities = [(fake_target, 'info')] + listener = self.driver.listen_for_notifications( + fake_targets_and_priorities) + self.assertEqual(1, len(fake_consumer.mock_calls)) + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') + def test_stop_listener(self, fake_consumer, fake_client): + fake_target = oslo_messaging.Target(topic='fake_topic') + fake_targets_and_priorities = [(fake_target, 'info')] + listener = self.driver.listen_for_notifications( + fake_targets_and_priorities) + listener.conn.consume = mock.MagicMock() + listener.conn.consume.return_value = ( + iter([kafka.common.KafkaMessage( + topic='fake_topic', partition=0, offset=0, + key=None, value='{"message": {"fake": "fake_message_1"},' + '"context": {"fake": "fake_context_1"}}')])) + listener.poll() + self.assertEqual(1, len(listener.conn.consume.mock_calls)) + listener.conn.stop_consuming = mock.MagicMock() + listener.stop() + fake_response = listener.poll() + self.assertEqual(1, len(listener.conn.consume.mock_calls)) + self.assertEqual(fake_response, None) + + +class TestWithRealKafkaBroker(test_utils.BaseTestCase): + + def setUp(self): + super(TestWithRealKafkaBroker, self).setUp() + self.messaging_conf.transport_driver = 'kafka' + transport = oslo_messaging.get_transport(self.conf, KAFKA_BROKER_URL) + self.driver = transport._driver + + @unittest.skipUnless( + _is_kafka_service_running(), "Kafka service is not available") + def test_send_and_recieve_message(self): + target = oslo_messaging.Target( + topic="fake_topic", exchange='fake_exchange') + targets_and_priorities = [(target, 'fake_info')] + + listener = self.driver.listen_for_notifications( + targets_and_priorities) + fake_context = {"fake_context_key": "fake_context_value"} + fake_message = {"fake_message_key": "fake_message_value"} + self.driver.send_notification( + target, fake_context, fake_message, None) + + received_message = listener.poll() + self.assertEqual(fake_context, received_message.ctxt) + self.assertEqual(fake_message, received_message.message) + + @unittest.skipUnless( + _is_kafka_service_running(), "Kafka service is not available") + def test_send_and_recieve_message_without_exchange(self): + target = oslo_messaging.Target(topic="fake_no_exchange_topic") + targets_and_priorities = [(target, 'fake_info')] + + listener = self.driver.listen_for_notifications( + targets_and_priorities) + fake_context = {"fake_context_key": "fake_context_value"} + fake_message = {"fake_message_key": "fake_message_value"} + self.driver.send_notification( + target, fake_context, fake_message, None) + + received_message = listener.poll() + self.assertEqual(fake_context, received_message.ctxt) + self.assertEqual(fake_message, received_message.message) + + @unittest.skipUnless( + _is_kafka_service_running(), "Kafka service is not available") + def test_recieve_message_from_empty_topic_with_timeout(self): + target = oslo_messaging.Target( + topic="fake_empty_topic", exchange='fake_empty_exchange') + targets_and_priorities = [(target, 'fake_info')] + + listener = self.driver.listen_for_notifications( + targets_and_priorities) + + deadline = time.time() + 3 + received_message = listener.poll(timeout=3) + self.assertEqual(0, int(deadline - time.time())) + self.assertEqual(None, received_message) diff --git a/setup.cfg b/setup.cfg index cbed37743..b45466ca9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,9 @@ oslo.messaging.drivers = zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver amqp = oslo_messaging._drivers.protocols.amqp.driver:ProtonDriver + # This driver is supporting for only notification usage + kafka = oslo_messaging._drivers.impl_kafka:KafkaDriver + # To avoid confusion kombu = oslo_messaging._drivers.impl_rabbit:RabbitDriver diff --git a/test-requirements.txt b/test-requirements.txt index 89cda423c..1387e1a6b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -21,6 +21,9 @@ redis>=2.10.0 # for test_impl_zmq pyzmq>=14.3.1 # LGPL+BSD +# for test_impl_kafka +kafka-python>=0.9.2 # Apache-2.0 + # when we can require tox>= 1.4, this can go into tox.ini: # [testenv:cover] # deps = {[testenv]deps} coverage