diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 33c45d9bd..cc241c213 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -16,7 +16,6 @@ from kafka.common import KafkaError import mock from oslo_serialization import jsonutils import testscenarios -from testtools.testcase import unittest import time import oslo_messaging @@ -26,21 +25,6 @@ from oslo_messaging.tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios -KAFKA_BROKER = 'localhost:9092' -KAFKA_BROKER_URL = 'kafka://localhost:9092' - - -def _is_kafka_service_running(): - """Checks whether the Kafka service is running or not""" - kafka_running = True - try: - broker = KAFKA_BROKER - kafka.KafkaClient(broker) - except KafkaError: - # Kafka service is not running. - kafka_running = False - return kafka_running - class TestKafkaDriverLoad(test_utils.BaseTestCase): @@ -252,62 +236,3 @@ class TestKafkaListener(test_utils.BaseTestCase): fake_response = listener.poll() self.assertEqual(1, len(listener.conn.consume.mock_calls)) self.assertEqual([], fake_response) - - -class TestWithRealKafkaBroker(test_utils.BaseTestCase): - - def setUp(self): - super(TestWithRealKafkaBroker, self).setUp() - self.messaging_conf.transport_driver = 'kafka' - transport = oslo_messaging.get_transport(self.conf, KAFKA_BROKER_URL) - self.driver = transport._driver - - @unittest.skipUnless( - _is_kafka_service_running(), "Kafka service is not available") - def test_send_and_receive_message(self): - target = oslo_messaging.Target( - topic="fake_topic", exchange='fake_exchange') - targets_and_priorities = [(target, 'fake_info')] - - listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - fake_context = {"fake_context_key": "fake_context_value"} - fake_message = {"fake_message_key": "fake_message_value"} - self.driver.send_notification( - target, fake_context, fake_message, None) - - received_message = listener.poll()[0] - self.assertEqual(fake_context, received_message.ctxt) - self.assertEqual(fake_message, received_message.message) - - @unittest.skipUnless( - _is_kafka_service_running(), "Kafka service is not available") - def test_send_and_receive_message_without_exchange(self): - target = oslo_messaging.Target(topic="fake_no_exchange_topic") - targets_and_priorities = [(target, 'fake_info')] - - listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - fake_context = {"fake_context_key": "fake_context_value"} - fake_message = {"fake_message_key": "fake_message_value"} - self.driver.send_notification( - target, fake_context, fake_message, None) - - received_message = listener.poll()[0] - self.assertEqual(fake_context, received_message.ctxt) - self.assertEqual(fake_message, received_message.message) - - @unittest.skipUnless( - _is_kafka_service_running(), "Kafka service is not available") - def test_receive_message_from_empty_topic_with_timeout(self): - target = oslo_messaging.Target( - topic="fake_empty_topic", exchange='fake_empty_exchange') - targets_and_priorities = [(target, 'fake_info')] - - listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - - deadline = time.time() + 3 - received_message = listener.poll(batch_timeout=3) - self.assertEqual(0, int(deadline - time.time())) - self.assertEqual([], received_message) diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index ba9f2989c..820539b7e 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -28,6 +28,8 @@ class CallTestCase(utils.SkipIfNoTransportURL): def setUp(self): super(CallTestCase, self).setUp(conf=cfg.ConfigOpts()) + if self.url.startswith("kafka://"): + self.skipTest("kafka does not support RPC API") self.conf.prog = "test_prog" self.conf.project = "test_project" @@ -156,6 +158,11 @@ class CastTestCase(utils.SkipIfNoTransportURL): # internal sync() cast to ensure prior casts are complete before # making the necessary assertions. + def setUp(self): + super(CastTestCase, self).setUp() + if self.url.startswith("kafka://"): + self.skipTest("kafka does not support RPC API") + def test_specific_server(self): group = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url) diff --git a/oslo_messaging/tests/functional/test_kafka.py b/oslo_messaging/tests/functional/test_kafka.py new file mode 100644 index 000000000..705f99932 --- /dev/null +++ b/oslo_messaging/tests/functional/test_kafka.py @@ -0,0 +1,72 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo_config import cfg + +import oslo_messaging +from oslo_messaging.tests.functional import utils + + +class TestWithRealKafkaBroker(utils.SkipIfNoTransportURL): + def setUp(self): + super(TestWithRealKafkaBroker, self).setUp(conf=cfg.ConfigOpts()) + if not self.url.startswith('kafka://'): + self.skipTest("TRANSPORT_URL is not set to kafka driver") + transport = oslo_messaging.get_transport(self.conf, self.url) + self.driver = transport._driver + + def test_send_and_receive_message(self): + target = oslo_messaging.Target( + topic="fake_topic", exchange='fake_exchange') + targets_and_priorities = [(target, 'fake_info')] + + listener = self.driver.listen_for_notifications( + targets_and_priorities, None, None, None)._poll_style_listener + fake_context = {"fake_context_key": "fake_context_value"} + fake_message = {"fake_message_key": "fake_message_value"} + self.driver.send_notification( + target, fake_context, fake_message, None) + + received_message = listener.poll()[0] + self.assertEqual(fake_context, received_message.ctxt) + self.assertEqual(fake_message, received_message.message) + + def test_send_and_receive_message_without_exchange(self): + target = oslo_messaging.Target(topic="fake_no_exchange_topic") + targets_and_priorities = [(target, 'fake_info')] + + listener = self.driver.listen_for_notifications( + targets_and_priorities, None, None, None)._poll_style_listener + fake_context = {"fake_context_key": "fake_context_value"} + fake_message = {"fake_message_key": "fake_message_value"} + self.driver.send_notification( + target, fake_context, fake_message, None) + + received_message = listener.poll()[0] + self.assertEqual(fake_context, received_message.ctxt) + self.assertEqual(fake_message, received_message.message) + + def test_receive_message_from_empty_topic_with_timeout(self): + target = oslo_messaging.Target( + topic="fake_empty_topic", exchange='fake_empty_exchange') + targets_and_priorities = [(target, 'fake_info')] + + listener = self.driver.listen_for_notifications( + targets_and_priorities, None, None, None)._poll_style_listener + + deadline = time.time() + 3 + received_message = listener.poll(batch_timeout=3) + self.assertEqual(0, int(deadline - time.time())) + self.assertEqual([], received_message) diff --git a/setup-test-env-kafka.sh b/setup-test-env-kafka.sh new file mode 100755 index 000000000..ca981945b --- /dev/null +++ b/setup-test-env-kafka.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -e + +. tools/functions.sh + +DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX) +trap "clean_exit $DATADIR" EXIT + +SCALA_VERSION="2.11" +KAFKA_VERSION="0.10.1.0" +tarball=kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz + +wget http://apache.crihan.fr/dist/kafka/${KAFKA_VERSION}/$tarball -O $DATADIR/$tarball +tar -xzf $DATADIR/$tarball -C $DATADIR +export PATH=$DATADIR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/bin:$PATH +pifpaf run kafka -- $* diff --git a/tox.ini b/tox.ini index 5f60b2eb7..70a5fc083 100644 --- a/tox.ini +++ b/tox.ini @@ -54,6 +54,12 @@ setenv = TRANSPORT_DRIVER=pika commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' +[testenv:py27-func-kafka] +setenv = + {[testenv]setenv} + TRANSPORT_DRIVER=kafka +commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' + [testenv:py27-func-amqp1] setenv = {[testenv]setenv}