From ab78c8e3dde1a598159630059b5c78568a9c7425 Mon Sep 17 00:00:00 2001 From: Andy Smith Date: Wed, 21 Aug 2019 10:39:49 -0400 Subject: [PATCH] Setup backend scenarios for functional tests This patchset introduces scenarios for the functional tests to unify the setup and configuration of alternate messaging backends for RPC and Notifications. The scenarios are defined to reduce redundancy in the testing of backends and to demonstrate functional correctness across driver combinations. Current driver support: rabbit - RPC, Notify amqp - RPC kafka - Notify RPC Notify --------- ---------- scenario01 rabbit rabbit scenario02 rabbit kafka scenario03 amqp rabbit scenario04 amqp kafka It is anticipated that additional scenarios will be defined as new drivers are introduced and/or new messaging intermediarites are supported. Note: The removal of python2 jobs are also included patch Change-Id: I0f86416623a0b718516147f0660b4df2b74cf867 --- .zuul.yaml | 72 ++++++--------- .../tests/functional/notify/test_logger.py | 8 +- .../tests/functional/test_functional.py | 91 ++++++++++--------- oslo_messaging/tests/functional/utils.py | 12 +-- setup-test-env-kafka.sh | 20 ---- tools/setup-scenario-env.sh | 69 ++++++++++++++ tools/setup-test-env-amqp1.sh | 41 --------- tox.ini | 44 ++++++--- 8 files changed, 186 insertions(+), 171 deletions(-) delete mode 100755 setup-test-env-kafka.sh create mode 100755 tools/setup-scenario-env.sh delete mode 100755 tools/setup-test-env-amqp1.sh diff --git a/.zuul.yaml b/.zuul.yaml index 766436019..cc6255e60 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -1,23 +1,31 @@ - job: - name: oslo.messaging-tox-py36-func-amqp1 + name: oslo.messaging-tox-py36-func-scenario01 parent: openstack-tox-py36 vars: - tox_envlist: py36-func-amqp1 - -- job: - name: oslo.messaging-tox-py36-func-kafka - parent: openstack-tox-py36 - vars: - tox_envlist: py36-func-kafka - bindep_profile: kafka - -- job: - name: oslo.messaging-tox-py36-func-rabbit - parent: openstack-tox-py36 - vars: - tox_envlist: py36-func-rabbit + tox_envlist: py36-func-scenario01 bindep_profile: rabbit +- job: + name: oslo.messaging-tox-py36-func-scenario02 + parent: openstack-tox-py36 + vars: + tox_envlist: py36-func-scenario02 + bindep_profile: rabbit kafka + +- job: + name: oslo.messaging-tox-py36-func-scenario03 + parent: openstack-tox-py36 + vars: + tox_envlist: py36-func-scenario03 + bindep_profile: rabbit + +- job: + name: oslo.messaging-tox-py36-func-scenario04 + parent: openstack-tox-py36 + vars: + tox_envlist: py36-func-scenario04 + bindep_profile: kafka + # Begin v3 native jobs # See https://docs.openstack.org/devstack/latest/ @@ -60,16 +68,6 @@ '{{ devstack_log_dir }}/qdrouterd.log': logs -- job: - name: oslo.messaging-src-dsvm-full-amqp1-centos-7 - description: | - Run the oslo.messaging-src-dsvm-full-amqp1-hybrid test on a - centos 7 node. - parent: oslo.messaging-src-dsvm-full-amqp1-hybrid - # nodeset: centos-7 - nodeset: devstack-single-node-centos-7 - - - job: name: oslo.messaging-src-dsvm-full-kafka-hybrid description: | @@ -84,15 +82,6 @@ zuul_copy_output: '{{ devstack_log_dir }}/server.log': logs -- job: - name: oslo.messaging-src-dsvm-full-kafka-centos-7 - description: | - Run the oslo.messaging-src-dsvm-full-kafka-hybrid test on a - centos 7 node. - parent: oslo.messaging-src-dsvm-full-kafka-hybrid - # nodeset: centos-7 - nodeset: devstack-single-node-centos-7 - # End v3 native jobs @@ -139,23 +128,18 @@ - release-notes-jobs-python3 check: jobs: - - oslo.messaging-tox-py36-func-rabbit: + - oslo.messaging-tox-py36-func-scenario01 + - oslo.messaging-tox-py36-func-scenario02: voting: false - - oslo.messaging-tox-py36-func-amqp1: + - oslo.messaging-tox-py36-func-scenario03: voting: false - - oslo.messaging-tox-py36-func-kafka: + - oslo.messaging-tox-py36-func-scenario04: voting: false - - oslo.messaging-src-dsvm-full-rabbit - oslo.messaging-src-dsvm-full-amqp1-hybrid: voting: false - - oslo.messaging-src-dsvm-full-amqp1-centos-7: - voting: false - oslo.messaging-src-dsvm-full-kafka-hybrid: voting: false - - oslo.messaging-src-dsvm-full-kafka-centos-7: - voting: false - - oslo.messaging-src-grenade-dsvm: voting: false - oslo.messaging-src-grenade-dsvm-multinode: @@ -163,5 +147,5 @@ gate: jobs: - - oslo.messaging-tox-py36-func-rabbit + - oslo.messaging-tox-py36-func-scenario01 - oslo.messaging-src-dsvm-full-rabbit diff --git a/oslo_messaging/tests/functional/notify/test_logger.py b/oslo_messaging/tests/functional/notify/test_logger.py index feb1ee01d..80ba22bca 100644 --- a/oslo_messaging/tests/functional/notify/test_logger.py +++ b/oslo_messaging/tests/functional/notify/test_logger.py @@ -52,7 +52,7 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL): # NOTE(gtt): Using different topic to make tests run in parallel topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver) - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): self.conf.set_override('consumer_group', str(uuid.uuid4()), group='oslo_messaging_kafka') @@ -61,9 +61,9 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL): group='oslo_messaging_notifications') listener = self.useFixture( - utils.NotificationFixture(self.conf, self.url, [topic])) + utils.NotificationFixture(self.conf, self.notify_url, [topic])) - log_notify = oslo_messaging.LoggingNotificationHandler(self.url) + log_notify = oslo_messaging.LoggingNotificationHandler(self.notify_url) log = logging.getLogger(topic) log.setLevel(logging.DEBUG) @@ -72,7 +72,7 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL): log_method = getattr(log, self.priority) log_method('Test logging at priority: %s' % self.priority) - events = listener.get_events(timeout=5) + events = listener.get_events(timeout=15) self.assertEqual(1, len(events)) info_event = events[0] diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 384b37265..a1fcdd16d 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -28,7 +28,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): def setUp(self): super(CallTestCase, self).setUp(conf=cfg.ConfigOpts()) - if self.url.startswith("kafka://"): + if self.rpc_url.startswith("kafka://"): self.skipTest("kafka does not support RPC API") self.conf.prog = "test_prog" @@ -39,7 +39,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): def test_specific_server(self): group = self.useFixture(utils.RpcServerGroupFixture( - self.conf, self.url) + self.conf, self.rpc_url) ) client = group.client(1) client.append(text='open') @@ -55,7 +55,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): def test_server_in_group(self): group = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url) + utils.RpcServerGroupFixture(self.conf, self.rpc_url) ) client = group.client() @@ -73,13 +73,13 @@ class CallTestCase(utils.SkipIfNoTransportURL): # teardown may hang unless we broadcast all control messages # to each server group1 = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url, + utils.RpcServerGroupFixture(self.conf, self.rpc_url, use_fanout_ctrl=True)) group2 = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url, exchange="a", + utils.RpcServerGroupFixture(self.conf, self.rpc_url, exchange="a", use_fanout_ctrl=True)) group3 = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url, exchange="b", + utils.RpcServerGroupFixture(self.conf, self.rpc_url, exchange="b", use_fanout_ctrl=True)) client1 = group1.client(1) @@ -113,7 +113,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): def test_timeout(self): transport = self.useFixture( - utils.RPCTransportFixture(self.conf, self.url) + utils.RPCTransportFixture(self.conf, self.rpc_url) ) target = oslo_messaging.Target(topic="no_such_topic") c = utils.ClientStub(transport.transport, target, timeout=1) @@ -122,7 +122,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): def test_exception(self): group = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url) + utils.RpcServerGroupFixture(self.conf, self.rpc_url) ) client = group.client(1) client.add(increment=2) @@ -130,12 +130,12 @@ class CallTestCase(utils.SkipIfNoTransportURL): def test_timeout_with_concurrently_queues(self): transport = self.useFixture( - utils.RPCTransportFixture(self.conf, self.url) + utils.RPCTransportFixture(self.conf, self.rpc_url) ) target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), server="server_" + str(uuid.uuid4())) server = self.useFixture( - utils.RpcServerFixture(self.conf, self.url, target, + utils.RpcServerFixture(self.conf, self.rpc_url, target, executor="threading")) client = utils.ClientStub(transport.transport, target, cast=False, timeout=5) @@ -153,11 +153,11 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertEqual(10, server.endpoint.ival) def test_mandatory_call(self): - if not self.url.startswith("rabbit://"): + if not self.rpc_url.startswith("rabbit://"): self.skipTest("backend does not support call monitoring") transport = self.useFixture(utils.RPCTransportFixture(self.conf, - self.url)) + self.rpc_url)) target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), server='server_' + str(uuid.uuid4())) @@ -187,12 +187,12 @@ class CallTestCase(utils.SkipIfNoTransportURL): client2.delay) def test_monitor_long_call(self): - if not (self.url.startswith("rabbit://") or - self.url.startswith("amqp://")): + if not (self.rpc_url.startswith("rabbit://") or + self.rpc_url.startswith("amqp://")): self.skipTest("backend does not support call monitoring") transport = self.useFixture(utils.RPCTransportFixture(self.conf, - self.url)) + self.rpc_url)) target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), server='server_' + str(uuid.uuid4())) @@ -202,7 +202,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): return seconds self.useFixture( - utils.RpcServerFixture(self.conf, self.url, target, + utils.RpcServerFixture(self.conf, self.rpc_url, target, executor='threading', endpoint=_endpoint())) @@ -238,10 +238,10 @@ class CallTestCase(utils.SkipIfNoTransportURL): return echo transport = self.useFixture( - utils.RPCTransportFixture(self.conf, self.url) + utils.RPCTransportFixture(self.conf, self.rpc_url) ) self.useFixture( - utils.RpcServerFixture(self.conf, self.url, target, + utils.RpcServerFixture(self.conf, self.rpc_url, target, executor="threading", endpoint=_endpoint(target))) client1 = utils.ClientStub(transport.transport, target, @@ -280,7 +280,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), server="server_" + str(uuid.uuid4())) transport = self.useFixture( - utils.RPCTransportFixture(self.conf, self.url) + utils.RPCTransportFixture(self.conf, self.rpc_url) ) self.assertRaises(TypeError, oslo_messaging.get_rpc_server, @@ -297,12 +297,12 @@ class CastTestCase(utils.SkipIfNoTransportURL): def setUp(self): super(CastTestCase, self).setUp() - if self.url.startswith("kafka://"): + if self.rpc_url.startswith("kafka://"): self.skipTest("kafka does not support RPC API") def test_specific_server(self): group = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url) + utils.RpcServerGroupFixture(self.conf, self.rpc_url) ) client = group.client(1, cast=True) client.append(text='open') @@ -321,10 +321,10 @@ class CastTestCase(utils.SkipIfNoTransportURL): self.assertEqual(0, group.servers[i].endpoint.ival) def test_server_in_group(self): - if self.url.startswith("amqp:"): + if self.rpc_url.startswith("amqp:"): self.skipTest("QPID-6307") group = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url) + utils.RpcServerGroupFixture(self.conf, self.rpc_url) ) client = group.client(cast=True) for i in range(20): @@ -343,7 +343,7 @@ class CastTestCase(utils.SkipIfNoTransportURL): def test_fanout(self): group = self.useFixture( - utils.RpcServerGroupFixture(self.conf, self.url) + utils.RpcServerGroupFixture(self.conf, self.rpc_url) ) client = group.client('all', cast=True) client.append(text='open') @@ -367,13 +367,14 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_simple(self): get_timeout = 1 - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_simple', group='oslo_messaging_kafka') listener = self.useFixture( - utils.NotificationFixture(self.conf, self.url, ['test_simple'])) + utils.NotificationFixture(self.conf, self.notify_url, + ['test_simple'])) notifier = listener.notifier('abc') notifier.info({}, 'test', 'Hello World!') @@ -385,13 +386,13 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_multiple_topics(self): get_timeout = 1 - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_multiple_topics', group='oslo_messaging_kafka') listener = self.useFixture( - utils.NotificationFixture(self.conf, self.url, ['a', 'b'])) + utils.NotificationFixture(self.conf, self.notify_url, ['a', 'b'])) a = listener.notifier('pub-a', topics=['a']) b = listener.notifier('pub-b', topics=['b']) @@ -416,9 +417,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_multiple_servers(self): timeout = 0.5 - if self.url.startswith("amqp:"): + if self.notify_url.startswith("amqp:"): self.skipTest("QPID-6307") - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): self.skipTest("Kafka: needs to be fixed") timeout = 5 self.conf.set_override('consumer_group', @@ -426,10 +427,12 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): group='oslo_messaging_kafka') listener_a = self.useFixture( - utils.NotificationFixture(self.conf, self.url, ['test-topic'])) + utils.NotificationFixture(self.conf, self.notify_url, + ['test-topic'])) listener_b = self.useFixture( - utils.NotificationFixture(self.conf, self.url, ['test-topic'])) + utils.NotificationFixture(self.conf, self.notify_url, + ['test-topic'])) n = listener_a.notifier('pub') @@ -446,20 +449,20 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_independent_topics(self): get_timeout = 0.5 - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_independent_topics_a', group='oslo_messaging_kafka') listener_a = self.useFixture( - utils.NotificationFixture(self.conf, self.url, ['1'])) + utils.NotificationFixture(self.conf, self.notify_url, ['1'])) - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): self.conf.set_override('consumer_group', 'test_independent_topics_b', group='oslo_messaging_kafka') listener_b = self.useFixture( - utils.NotificationFixture(self.conf, self.url, ['2'])) + utils.NotificationFixture(self.conf, self.notify_url, ['2'])) a = listener_a.notifier('pub-1', topics=['1']) b = listener_b.notifier('pub-2', topics=['2']) @@ -484,13 +487,13 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_all_categories(self): get_timeout = 1 - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_all_categories', group='oslo_messaging_kafka') listener = self.useFixture(utils.NotificationFixture( - self.conf, self.url, ['test_all_categories'])) + self.conf, self.notify_url, ['test_all_categories'])) n = listener.notifier('abc') cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical'] @@ -513,20 +516,20 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_simple_batch(self): get_timeout = 3 batch_timeout = 2 - if self.url.startswith("amqp:"): + if self.notify_url.startswith("amqp:"): backend = os.environ.get("AMQP1_BACKEND") if backend == "qdrouterd": # end-to-end acknowledgement with router intermediary # sender pends until batch_size or timeout reached self.skipTest("qdrouterd backend") - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): get_timeout = 10 batch_timeout = 5 self.conf.set_override('consumer_group', 'test_simple_batch', group='oslo_messaging_kafka') listener = self.useFixture( - utils.BatchNotificationFixture(self.conf, self.url, + utils.BatchNotificationFixture(self.conf, self.notify_url, ['test_simple_batch'], batch_size=100, batch_timeout=batch_timeout)) @@ -542,10 +545,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_compression(self): get_timeout = 1 - if self.url.startswith("amqp:"): + if self.notify_url.startswith("amqp:"): self.conf.set_override('kombu_compression', 'gzip', group='oslo_messaging_rabbit') - if self.url.startswith("kafka://"): + if self.notify_url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('compression_codec', 'gzip', group='oslo_messaging_kafka') @@ -553,7 +556,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): group='oslo_messaging_kafka') listener = self.useFixture( - utils.NotificationFixture(self.conf, self.url, + utils.NotificationFixture(self.conf, self.notify_url, ['test_compression'])) notifier = listener.notifier('abc') diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 700c16277..5f6f9c7e6 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -305,16 +305,14 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): def setUp(self, conf=cfg.CONF): super(SkipIfNoTransportURL, self).setUp(conf=conf) - driver = os.environ.get("TRANSPORT_DRIVER") - if driver: - self.url = os.environ.get('PIFPAF_URL') - else: - self.url = os.environ.get('TRANSPORT_URL') + self.rpc_url = os.environ.get('RPC_TRANSPORT_URL') + self.notify_url = os.environ.get('NOTIFY_TRANSPORT_URL') - if not self.url: + if not (self.rpc_url or self.notify_url): self.skipTest("No transport url configured") - transport_url = oslo_messaging.TransportURL.parse(conf, self.url) + transport_url = oslo_messaging.TransportURL.parse(conf, + self.notify_url) kafka_options.register_opts(conf, transport_url) diff --git a/setup-test-env-kafka.sh b/setup-test-env-kafka.sh deleted file mode 100755 index 40cc35db3..000000000 --- a/setup-test-env-kafka.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -set -e - -. tools/functions.sh - -SCALA_VERSION=${SCALA_VERSION:-"2.12"} -KAFKA_VERSION=${KAFKA_VERSION:-"2.0.0"} - -if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then - DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX) - trap "clean_exit $DATADIR" EXIT - - tarball=kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz - - wget http://archive.apache.org/dist/kafka/${KAFKA_VERSION}/$tarball -O $DATADIR/$tarball - tar -xzf $DATADIR/$tarball -C $DATADIR - export PATH=$DATADIR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/bin:$PATH -fi - -pifpaf run kafka -- $* diff --git a/tools/setup-scenario-env.sh b/tools/setup-scenario-env.sh new file mode 100755 index 000000000..026cb1abf --- /dev/null +++ b/tools/setup-scenario-env.sh @@ -0,0 +1,69 @@ +#!/bin/bash +set -e + +. tools/functions.sh + +SCENARIO=${SCENARIO:-"scenario01"} + +function _setup_kafka { + + SCALA_VERSION=${SCALA_VERSION:-"2.12"} + KAFKA_VERSION=${KAFKA_VERSION:-"2.0.0"} + + if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then + DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX) + trap "clean_exit $DATADIR" EXIT + + tarball=kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz + + wget http://archive.apache.org/dist/kafka/${KAFKA_VERSION}/$tarball -O $DATADIR/$tarball + tar -xzf $DATADIR/$tarball -C $DATADIR + export PATH=$DATADIR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/bin:$PATH + fi +} + +function _setup_global_site_package_path { + MAJOR=$(python -c 'import sys; print(sys.version_info.major)') + MINOR=$(python -c 'import sys; print(sys.version_info.minor)') + if [ -f "/etc/debian_version" ]; then + PRE="dist" + else + PRE="site" + fi + # qdrouterd needs access to global site packages + # create path file and place in virtual env working directory + SITEDIR=${WORKDIR}/${ENVNAME}/lib/python${MAJOR}.${MINOR}/site-packages + cat > ${SITEDIR}/dispatch.pth < ${SITEDIR}/dispatch.pth <