Setup backend scenarios for functional tests

This patchset introduces scenarios for the functional tests to unify the
setup and configuration of alternate messaging backends for RPC and
Notifications. The scenarios are defined to reduce redundancy in
the testing of backends and to demonstrate functional correctness
across driver combinations.

Current driver support:
rabbit - RPC, Notify
amqp   - RPC
kafka  - Notify

                  RPC       Notify
               ---------  ----------
scenario01       rabbit     rabbit
scenario02       rabbit     kafka
scenario03        amqp      rabbit
scenario04        amqp      kafka

It is anticipated that additional scenarios will be defined as new
drivers are introduced and/or new messaging intermediarites are supported.

Note: The removal of python2 jobs are also included patch

Change-Id: I0f86416623a0b718516147f0660b4df2b74cf867
This commit is contained in:
Andy Smith 2019-08-21 10:39:49 -04:00
parent 32a1b6e948
commit ab78c8e3dd
8 changed files with 186 additions and 171 deletions

View File

@ -1,23 +1,31 @@
- job: - job:
name: oslo.messaging-tox-py36-func-amqp1 name: oslo.messaging-tox-py36-func-scenario01
parent: openstack-tox-py36 parent: openstack-tox-py36
vars: vars:
tox_envlist: py36-func-amqp1 tox_envlist: py36-func-scenario01
- job:
name: oslo.messaging-tox-py36-func-kafka
parent: openstack-tox-py36
vars:
tox_envlist: py36-func-kafka
bindep_profile: kafka
- job:
name: oslo.messaging-tox-py36-func-rabbit
parent: openstack-tox-py36
vars:
tox_envlist: py36-func-rabbit
bindep_profile: rabbit bindep_profile: rabbit
- job:
name: oslo.messaging-tox-py36-func-scenario02
parent: openstack-tox-py36
vars:
tox_envlist: py36-func-scenario02
bindep_profile: rabbit kafka
- job:
name: oslo.messaging-tox-py36-func-scenario03
parent: openstack-tox-py36
vars:
tox_envlist: py36-func-scenario03
bindep_profile: rabbit
- job:
name: oslo.messaging-tox-py36-func-scenario04
parent: openstack-tox-py36
vars:
tox_envlist: py36-func-scenario04
bindep_profile: kafka
# Begin v3 native jobs # Begin v3 native jobs
# See https://docs.openstack.org/devstack/latest/ # See https://docs.openstack.org/devstack/latest/
@ -60,16 +68,6 @@
'{{ devstack_log_dir }}/qdrouterd.log': logs '{{ devstack_log_dir }}/qdrouterd.log': logs
- job:
name: oslo.messaging-src-dsvm-full-amqp1-centos-7
description: |
Run the oslo.messaging-src-dsvm-full-amqp1-hybrid test on a
centos 7 node.
parent: oslo.messaging-src-dsvm-full-amqp1-hybrid
# nodeset: centos-7
nodeset: devstack-single-node-centos-7
- job: - job:
name: oslo.messaging-src-dsvm-full-kafka-hybrid name: oslo.messaging-src-dsvm-full-kafka-hybrid
description: | description: |
@ -84,15 +82,6 @@
zuul_copy_output: zuul_copy_output:
'{{ devstack_log_dir }}/server.log': logs '{{ devstack_log_dir }}/server.log': logs
- job:
name: oslo.messaging-src-dsvm-full-kafka-centos-7
description: |
Run the oslo.messaging-src-dsvm-full-kafka-hybrid test on a
centos 7 node.
parent: oslo.messaging-src-dsvm-full-kafka-hybrid
# nodeset: centos-7
nodeset: devstack-single-node-centos-7
# End v3 native jobs # End v3 native jobs
@ -139,23 +128,18 @@
- release-notes-jobs-python3 - release-notes-jobs-python3
check: check:
jobs: jobs:
- oslo.messaging-tox-py36-func-rabbit: - oslo.messaging-tox-py36-func-scenario01
- oslo.messaging-tox-py36-func-scenario02:
voting: false voting: false
- oslo.messaging-tox-py36-func-amqp1: - oslo.messaging-tox-py36-func-scenario03:
voting: false voting: false
- oslo.messaging-tox-py36-func-kafka: - oslo.messaging-tox-py36-func-scenario04:
voting: false voting: false
- oslo.messaging-src-dsvm-full-rabbit - oslo.messaging-src-dsvm-full-rabbit
- oslo.messaging-src-dsvm-full-amqp1-hybrid: - oslo.messaging-src-dsvm-full-amqp1-hybrid:
voting: false voting: false
- oslo.messaging-src-dsvm-full-amqp1-centos-7:
voting: false
- oslo.messaging-src-dsvm-full-kafka-hybrid: - oslo.messaging-src-dsvm-full-kafka-hybrid:
voting: false voting: false
- oslo.messaging-src-dsvm-full-kafka-centos-7:
voting: false
- oslo.messaging-src-grenade-dsvm: - oslo.messaging-src-grenade-dsvm:
voting: false voting: false
- oslo.messaging-src-grenade-dsvm-multinode: - oslo.messaging-src-grenade-dsvm-multinode:
@ -163,5 +147,5 @@
gate: gate:
jobs: jobs:
- oslo.messaging-tox-py36-func-rabbit - oslo.messaging-tox-py36-func-scenario01
- oslo.messaging-src-dsvm-full-rabbit - oslo.messaging-src-dsvm-full-rabbit

View File

@ -52,7 +52,7 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
# NOTE(gtt): Using different topic to make tests run in parallel # NOTE(gtt): Using different topic to make tests run in parallel
topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver) topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver)
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
self.conf.set_override('consumer_group', str(uuid.uuid4()), self.conf.set_override('consumer_group', str(uuid.uuid4()),
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
@ -61,9 +61,9 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
group='oslo_messaging_notifications') group='oslo_messaging_notifications')
listener = self.useFixture( listener = self.useFixture(
utils.NotificationFixture(self.conf, self.url, [topic])) utils.NotificationFixture(self.conf, self.notify_url, [topic]))
log_notify = oslo_messaging.LoggingNotificationHandler(self.url) log_notify = oslo_messaging.LoggingNotificationHandler(self.notify_url)
log = logging.getLogger(topic) log = logging.getLogger(topic)
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
@ -72,7 +72,7 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
log_method = getattr(log, self.priority) log_method = getattr(log, self.priority)
log_method('Test logging at priority: %s' % self.priority) log_method('Test logging at priority: %s' % self.priority)
events = listener.get_events(timeout=5) events = listener.get_events(timeout=15)
self.assertEqual(1, len(events)) self.assertEqual(1, len(events))
info_event = events[0] info_event = events[0]

View File

@ -28,7 +28,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def setUp(self): def setUp(self):
super(CallTestCase, self).setUp(conf=cfg.ConfigOpts()) super(CallTestCase, self).setUp(conf=cfg.ConfigOpts())
if self.url.startswith("kafka://"): if self.rpc_url.startswith("kafka://"):
self.skipTest("kafka does not support RPC API") self.skipTest("kafka does not support RPC API")
self.conf.prog = "test_prog" self.conf.prog = "test_prog"
@ -39,7 +39,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def test_specific_server(self): def test_specific_server(self):
group = self.useFixture(utils.RpcServerGroupFixture( group = self.useFixture(utils.RpcServerGroupFixture(
self.conf, self.url) self.conf, self.rpc_url)
) )
client = group.client(1) client = group.client(1)
client.append(text='open') client.append(text='open')
@ -55,7 +55,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def test_server_in_group(self): def test_server_in_group(self):
group = self.useFixture( group = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url) utils.RpcServerGroupFixture(self.conf, self.rpc_url)
) )
client = group.client() client = group.client()
@ -73,13 +73,13 @@ class CallTestCase(utils.SkipIfNoTransportURL):
# teardown may hang unless we broadcast all control messages # teardown may hang unless we broadcast all control messages
# to each server # to each server
group1 = self.useFixture( group1 = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url, utils.RpcServerGroupFixture(self.conf, self.rpc_url,
use_fanout_ctrl=True)) use_fanout_ctrl=True))
group2 = self.useFixture( group2 = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url, exchange="a", utils.RpcServerGroupFixture(self.conf, self.rpc_url, exchange="a",
use_fanout_ctrl=True)) use_fanout_ctrl=True))
group3 = self.useFixture( group3 = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url, exchange="b", utils.RpcServerGroupFixture(self.conf, self.rpc_url, exchange="b",
use_fanout_ctrl=True)) use_fanout_ctrl=True))
client1 = group1.client(1) client1 = group1.client(1)
@ -113,7 +113,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def test_timeout(self): def test_timeout(self):
transport = self.useFixture( transport = self.useFixture(
utils.RPCTransportFixture(self.conf, self.url) utils.RPCTransportFixture(self.conf, self.rpc_url)
) )
target = oslo_messaging.Target(topic="no_such_topic") target = oslo_messaging.Target(topic="no_such_topic")
c = utils.ClientStub(transport.transport, target, timeout=1) c = utils.ClientStub(transport.transport, target, timeout=1)
@ -122,7 +122,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def test_exception(self): def test_exception(self):
group = self.useFixture( group = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url) utils.RpcServerGroupFixture(self.conf, self.rpc_url)
) )
client = group.client(1) client = group.client(1)
client.add(increment=2) client.add(increment=2)
@ -130,12 +130,12 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def test_timeout_with_concurrently_queues(self): def test_timeout_with_concurrently_queues(self):
transport = self.useFixture( transport = self.useFixture(
utils.RPCTransportFixture(self.conf, self.url) utils.RPCTransportFixture(self.conf, self.rpc_url)
) )
target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()),
server="server_" + str(uuid.uuid4())) server="server_" + str(uuid.uuid4()))
server = self.useFixture( server = self.useFixture(
utils.RpcServerFixture(self.conf, self.url, target, utils.RpcServerFixture(self.conf, self.rpc_url, target,
executor="threading")) executor="threading"))
client = utils.ClientStub(transport.transport, target, client = utils.ClientStub(transport.transport, target,
cast=False, timeout=5) cast=False, timeout=5)
@ -153,11 +153,11 @@ class CallTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(10, server.endpoint.ival) self.assertEqual(10, server.endpoint.ival)
def test_mandatory_call(self): def test_mandatory_call(self):
if not self.url.startswith("rabbit://"): if not self.rpc_url.startswith("rabbit://"):
self.skipTest("backend does not support call monitoring") self.skipTest("backend does not support call monitoring")
transport = self.useFixture(utils.RPCTransportFixture(self.conf, transport = self.useFixture(utils.RPCTransportFixture(self.conf,
self.url)) self.rpc_url))
target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()),
server='server_' + str(uuid.uuid4())) server='server_' + str(uuid.uuid4()))
@ -187,12 +187,12 @@ class CallTestCase(utils.SkipIfNoTransportURL):
client2.delay) client2.delay)
def test_monitor_long_call(self): def test_monitor_long_call(self):
if not (self.url.startswith("rabbit://") or if not (self.rpc_url.startswith("rabbit://") or
self.url.startswith("amqp://")): self.rpc_url.startswith("amqp://")):
self.skipTest("backend does not support call monitoring") self.skipTest("backend does not support call monitoring")
transport = self.useFixture(utils.RPCTransportFixture(self.conf, transport = self.useFixture(utils.RPCTransportFixture(self.conf,
self.url)) self.rpc_url))
target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()),
server='server_' + str(uuid.uuid4())) server='server_' + str(uuid.uuid4()))
@ -202,7 +202,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
return seconds return seconds
self.useFixture( self.useFixture(
utils.RpcServerFixture(self.conf, self.url, target, utils.RpcServerFixture(self.conf, self.rpc_url, target,
executor='threading', executor='threading',
endpoint=_endpoint())) endpoint=_endpoint()))
@ -238,10 +238,10 @@ class CallTestCase(utils.SkipIfNoTransportURL):
return echo return echo
transport = self.useFixture( transport = self.useFixture(
utils.RPCTransportFixture(self.conf, self.url) utils.RPCTransportFixture(self.conf, self.rpc_url)
) )
self.useFixture( self.useFixture(
utils.RpcServerFixture(self.conf, self.url, target, utils.RpcServerFixture(self.conf, self.rpc_url, target,
executor="threading", executor="threading",
endpoint=_endpoint(target))) endpoint=_endpoint(target)))
client1 = utils.ClientStub(transport.transport, target, client1 = utils.ClientStub(transport.transport, target,
@ -280,7 +280,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()),
server="server_" + str(uuid.uuid4())) server="server_" + str(uuid.uuid4()))
transport = self.useFixture( transport = self.useFixture(
utils.RPCTransportFixture(self.conf, self.url) utils.RPCTransportFixture(self.conf, self.rpc_url)
) )
self.assertRaises(TypeError, self.assertRaises(TypeError,
oslo_messaging.get_rpc_server, oslo_messaging.get_rpc_server,
@ -297,12 +297,12 @@ class CastTestCase(utils.SkipIfNoTransportURL):
def setUp(self): def setUp(self):
super(CastTestCase, self).setUp() super(CastTestCase, self).setUp()
if self.url.startswith("kafka://"): if self.rpc_url.startswith("kafka://"):
self.skipTest("kafka does not support RPC API") self.skipTest("kafka does not support RPC API")
def test_specific_server(self): def test_specific_server(self):
group = self.useFixture( group = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url) utils.RpcServerGroupFixture(self.conf, self.rpc_url)
) )
client = group.client(1, cast=True) client = group.client(1, cast=True)
client.append(text='open') client.append(text='open')
@ -321,10 +321,10 @@ class CastTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(0, group.servers[i].endpoint.ival) self.assertEqual(0, group.servers[i].endpoint.ival)
def test_server_in_group(self): def test_server_in_group(self):
if self.url.startswith("amqp:"): if self.rpc_url.startswith("amqp:"):
self.skipTest("QPID-6307") self.skipTest("QPID-6307")
group = self.useFixture( group = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url) utils.RpcServerGroupFixture(self.conf, self.rpc_url)
) )
client = group.client(cast=True) client = group.client(cast=True)
for i in range(20): for i in range(20):
@ -343,7 +343,7 @@ class CastTestCase(utils.SkipIfNoTransportURL):
def test_fanout(self): def test_fanout(self):
group = self.useFixture( group = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.url) utils.RpcServerGroupFixture(self.conf, self.rpc_url)
) )
client = group.client('all', cast=True) client = group.client('all', cast=True)
client.append(text='open') client.append(text='open')
@ -367,13 +367,14 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_simple(self): def test_simple(self):
get_timeout = 1 get_timeout = 1
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
get_timeout = 5 get_timeout = 5
self.conf.set_override('consumer_group', 'test_simple', self.conf.set_override('consumer_group', 'test_simple',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener = self.useFixture( listener = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['test_simple'])) utils.NotificationFixture(self.conf, self.notify_url,
['test_simple']))
notifier = listener.notifier('abc') notifier = listener.notifier('abc')
notifier.info({}, 'test', 'Hello World!') notifier.info({}, 'test', 'Hello World!')
@ -385,13 +386,13 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_multiple_topics(self): def test_multiple_topics(self):
get_timeout = 1 get_timeout = 1
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
get_timeout = 5 get_timeout = 5
self.conf.set_override('consumer_group', 'test_multiple_topics', self.conf.set_override('consumer_group', 'test_multiple_topics',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener = self.useFixture( listener = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['a', 'b'])) utils.NotificationFixture(self.conf, self.notify_url, ['a', 'b']))
a = listener.notifier('pub-a', topics=['a']) a = listener.notifier('pub-a', topics=['a'])
b = listener.notifier('pub-b', topics=['b']) b = listener.notifier('pub-b', topics=['b'])
@ -416,9 +417,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_multiple_servers(self): def test_multiple_servers(self):
timeout = 0.5 timeout = 0.5
if self.url.startswith("amqp:"): if self.notify_url.startswith("amqp:"):
self.skipTest("QPID-6307") self.skipTest("QPID-6307")
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
self.skipTest("Kafka: needs to be fixed") self.skipTest("Kafka: needs to be fixed")
timeout = 5 timeout = 5
self.conf.set_override('consumer_group', self.conf.set_override('consumer_group',
@ -426,10 +427,12 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener_a = self.useFixture( listener_a = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['test-topic'])) utils.NotificationFixture(self.conf, self.notify_url,
['test-topic']))
listener_b = self.useFixture( listener_b = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['test-topic'])) utils.NotificationFixture(self.conf, self.notify_url,
['test-topic']))
n = listener_a.notifier('pub') n = listener_a.notifier('pub')
@ -446,20 +449,20 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_independent_topics(self): def test_independent_topics(self):
get_timeout = 0.5 get_timeout = 0.5
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
get_timeout = 5 get_timeout = 5
self.conf.set_override('consumer_group', self.conf.set_override('consumer_group',
'test_independent_topics_a', 'test_independent_topics_a',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener_a = self.useFixture( listener_a = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['1'])) utils.NotificationFixture(self.conf, self.notify_url, ['1']))
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
self.conf.set_override('consumer_group', self.conf.set_override('consumer_group',
'test_independent_topics_b', 'test_independent_topics_b',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener_b = self.useFixture( listener_b = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['2'])) utils.NotificationFixture(self.conf, self.notify_url, ['2']))
a = listener_a.notifier('pub-1', topics=['1']) a = listener_a.notifier('pub-1', topics=['1'])
b = listener_b.notifier('pub-2', topics=['2']) b = listener_b.notifier('pub-2', topics=['2'])
@ -484,13 +487,13 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_all_categories(self): def test_all_categories(self):
get_timeout = 1 get_timeout = 1
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
get_timeout = 5 get_timeout = 5
self.conf.set_override('consumer_group', 'test_all_categories', self.conf.set_override('consumer_group', 'test_all_categories',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener = self.useFixture(utils.NotificationFixture( listener = self.useFixture(utils.NotificationFixture(
self.conf, self.url, ['test_all_categories'])) self.conf, self.notify_url, ['test_all_categories']))
n = listener.notifier('abc') n = listener.notifier('abc')
cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical'] cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical']
@ -513,20 +516,20 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_simple_batch(self): def test_simple_batch(self):
get_timeout = 3 get_timeout = 3
batch_timeout = 2 batch_timeout = 2
if self.url.startswith("amqp:"): if self.notify_url.startswith("amqp:"):
backend = os.environ.get("AMQP1_BACKEND") backend = os.environ.get("AMQP1_BACKEND")
if backend == "qdrouterd": if backend == "qdrouterd":
# end-to-end acknowledgement with router intermediary # end-to-end acknowledgement with router intermediary
# sender pends until batch_size or timeout reached # sender pends until batch_size or timeout reached
self.skipTest("qdrouterd backend") self.skipTest("qdrouterd backend")
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
get_timeout = 10 get_timeout = 10
batch_timeout = 5 batch_timeout = 5
self.conf.set_override('consumer_group', 'test_simple_batch', self.conf.set_override('consumer_group', 'test_simple_batch',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener = self.useFixture( listener = self.useFixture(
utils.BatchNotificationFixture(self.conf, self.url, utils.BatchNotificationFixture(self.conf, self.notify_url,
['test_simple_batch'], ['test_simple_batch'],
batch_size=100, batch_size=100,
batch_timeout=batch_timeout)) batch_timeout=batch_timeout))
@ -542,10 +545,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_compression(self): def test_compression(self):
get_timeout = 1 get_timeout = 1
if self.url.startswith("amqp:"): if self.notify_url.startswith("amqp:"):
self.conf.set_override('kombu_compression', 'gzip', self.conf.set_override('kombu_compression', 'gzip',
group='oslo_messaging_rabbit') group='oslo_messaging_rabbit')
if self.url.startswith("kafka://"): if self.notify_url.startswith("kafka://"):
get_timeout = 5 get_timeout = 5
self.conf.set_override('compression_codec', 'gzip', self.conf.set_override('compression_codec', 'gzip',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
@ -553,7 +556,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener = self.useFixture( listener = self.useFixture(
utils.NotificationFixture(self.conf, self.url, utils.NotificationFixture(self.conf, self.notify_url,
['test_compression'])) ['test_compression']))
notifier = listener.notifier('abc') notifier = listener.notifier('abc')

View File

@ -305,16 +305,14 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
def setUp(self, conf=cfg.CONF): def setUp(self, conf=cfg.CONF):
super(SkipIfNoTransportURL, self).setUp(conf=conf) super(SkipIfNoTransportURL, self).setUp(conf=conf)
driver = os.environ.get("TRANSPORT_DRIVER") self.rpc_url = os.environ.get('RPC_TRANSPORT_URL')
if driver: self.notify_url = os.environ.get('NOTIFY_TRANSPORT_URL')
self.url = os.environ.get('PIFPAF_URL')
else:
self.url = os.environ.get('TRANSPORT_URL')
if not self.url: if not (self.rpc_url or self.notify_url):
self.skipTest("No transport url configured") self.skipTest("No transport url configured")
transport_url = oslo_messaging.TransportURL.parse(conf, self.url) transport_url = oslo_messaging.TransportURL.parse(conf,
self.notify_url)
kafka_options.register_opts(conf, transport_url) kafka_options.register_opts(conf, transport_url)

View File

@ -1,20 +0,0 @@
#!/bin/bash
set -e
. tools/functions.sh
SCALA_VERSION=${SCALA_VERSION:-"2.12"}
KAFKA_VERSION=${KAFKA_VERSION:-"2.0.0"}
if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then
DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX)
trap "clean_exit $DATADIR" EXIT
tarball=kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
wget http://archive.apache.org/dist/kafka/${KAFKA_VERSION}/$tarball -O $DATADIR/$tarball
tar -xzf $DATADIR/$tarball -C $DATADIR
export PATH=$DATADIR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/bin:$PATH
fi
pifpaf run kafka -- $*

69
tools/setup-scenario-env.sh Executable file
View File

@ -0,0 +1,69 @@
#!/bin/bash
set -e
. tools/functions.sh
SCENARIO=${SCENARIO:-"scenario01"}
function _setup_kafka {
SCALA_VERSION=${SCALA_VERSION:-"2.12"}
KAFKA_VERSION=${KAFKA_VERSION:-"2.0.0"}
if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then
DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX)
trap "clean_exit $DATADIR" EXIT
tarball=kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
wget http://archive.apache.org/dist/kafka/${KAFKA_VERSION}/$tarball -O $DATADIR/$tarball
tar -xzf $DATADIR/$tarball -C $DATADIR
export PATH=$DATADIR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/bin:$PATH
fi
}
function _setup_global_site_package_path {
MAJOR=$(python -c 'import sys; print(sys.version_info.major)')
MINOR=$(python -c 'import sys; print(sys.version_info.minor)')
if [ -f "/etc/debian_version" ]; then
PRE="dist"
else
PRE="site"
fi
# qdrouterd needs access to global site packages
# create path file and place in virtual env working directory
SITEDIR=${WORKDIR}/${ENVNAME}/lib/python${MAJOR}.${MINOR}/site-packages
cat > ${SITEDIR}/dispatch.pth <<EOF
/usr/lib/python${MAJOR}.${MINOR}/${PRE}-packages
EOF
}
case $SCENARIO in
scenario01)
export RPC_TRANSPORT_URL=rabbit://pifpaf:secret@127.0.0.1:5682/
export NOTIFY_TRANSPORT_URL=rabbit://pifpaf:secret@127.0.0.1:5682/
RUN="--env-prefix RABBITMQ run rabbitmq"
;;
scenario02)
_setup_kafka
export RPC_TRANSPORT_URL=rabbit://pifpaf:secret@127.0.0.1:5682/
export NOTIFY_TRANSPORT_URL=kafka://127.0.0.1:9092/
RUN="--env-prefix RABBITMQ run rabbitmq -- pifpaf --env-prefix KAFKA run kafka"
;;
scenario03)
_setup_global_site_package_path
export RPC_TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:5692/
export NOTIFY_TRANSPORT_URL=rabbit://pifpaf:secret@127.0.0.1:5682/
RUN="--env-prefix RABBITMQ run rabbitmq -- pifpaf --debug --env-prefix QDR run qdrouterd --username stackqpid --password secretqpid --port 5692"
;;
scenario04)
_setup_global_site_package_path
_setup_kafka
export RPC_TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:5692/
export NOTIFY_TRANSPORT_URL=kafka://127.0.0.1:9092/
RUN="--env-prefix KAFKA run kafka -- pifpaf --debug --env-prefix QDR run qdrouterd --username stackqpid --password secretqpid --port 5692"
;;
*) ;;
esac
pifpaf $RUN -- $*

View File

@ -1,41 +0,0 @@
#!/bin/bash
#
# Configuration files for the qdrouterd and artemis backends generated
# by pifpaf driver fixtures (https://github.com/jd/pifpaf)
set -e
. tools/functions.sh
ARTEMIS_VERSION=${ARTEMIS_VERSION:-"2.6.4"}
if [[ -z "$(which artemis)" ]]; then
DATADIR=$(mktemp -d /tmp/OSLOMSG-ARTEMIS.XXXXX)
trap "clean_exit $DATADIR" EXIT
tarball=apache-artemis-${ARTEMIS_VERSION}-bin.tar.gz
wget http://archive.apache.org/dist/activemq/activemq-artemis/${ARTEMIS_VERSION}/$tarball -O $DATADIR/$tarball
tar -xzf $DATADIR/$tarball -C $DATADIR
export PATH=$DATADIR/apache-artemis-${ARTEMIS_VERSION}/bin:$PATH
fi
# TODO(ansmith) look to move this to pifpaf driver
function _setup_global_site_package_path {
MAJOR=$(python -c 'import sys; print(sys.version_info.major)')
MINOR=$(python -c 'import sys; print(sys.version_info.minor)')
if [ -f "/etc/debian_version" ]; then
PRE="dist"
else
PRE="site"
fi
# qdrouterd needs access to global site packages
# create path file and place in virtual env working directory
SITEDIR=${WORKDIR}/${ENVNAME}/lib/python${MAJOR}.${MINOR}/site-packages
cat > ${SITEDIR}/dispatch.pth <<EOF
/usr/lib/python${MAJOR}.${MINOR}/${PRE}-packages
EOF
}
_setup_global_site_package_path
pifpaf --env-prefix ARTEMIS run artemis -- pifpaf --debug --env-prefix QDR run qdrouterd --username stackqpid --password secretqpid -- $*

44
tox.ini
View File

@ -42,27 +42,49 @@ commands =
rm -fr doc/build rm -fr doc/build
sphinx-build -W --keep-going -b html doc/source doc/build/html sphinx-build -W --keep-going -b html doc/source doc/build/html
[testenv:py36-func-rabbit] #
# The following functional test scenarios are defined for the
# testing of the messaging backends and to demonstrated the functiona
# correctness across driver combinations (e.g. RPC and Notify)
#
# RPC Notify
# -------- --------
# scenario01 rabbit rabbit
# scenario02 rabbit kafka
# scenario03 amqp rabbit
# scenario04 amqp kafka
#
[testenv:py36-func-scenario01]
basepython = python3.6
setenv = setenv =
{[testenv]setenv} {[testenv]setenv}
TRANSPORT_DRIVER=rabbit SCENARIO=scenario01
commands = pifpaf run rabbitmq -- stestr run --slowest {posargs:oslo_messaging.tests.functional} commands = {toxinidir}/tools/setup-scenario-env.sh stestr run --slowest {posargs:oslo_messaging.tests.functional}
[testenv:py36-func-amqp1] [testenv:py36-func-scenario02]
basepython = python3.6
setenv = setenv =
{[testenv]setenv} {[testenv]setenv}
TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:5672// SCENARIO=scenario02
commands = {toxinidir}/tools/setup-scenario-env.sh stestr run --slowest {posargs:oslo_messaging.tests.functional}
[testenv:py36-func-scenario03]
basepython = python3.6
setenv =
{[testenv]setenv}
SCENARIO=scenario03
ENVNAME={envname} ENVNAME={envname}
WORKDIR={toxworkdir} WORKDIR={toxworkdir}
commands = {toxinidir}/tools/setup-test-env-amqp1.sh stestr run --slowest {posargs:oslo_messaging.tests.functional} commands = {toxinidir}/tools/setup-scenario-env.sh stestr run --slowest {posargs:oslo_messaging.tests.functional}
[testenv:py36-func-kafka] [testenv:py36-func-scenario04]
basepython = python3.6
setenv = setenv =
{[testenv]setenv} {[testenv]setenv}
TRANSPORT_URL=kafka://127.0.0.1:9092/ SCENARIO=scenario04
OS_GROUP_REGEX=oslo_messaging.tests.functional ENVNAME={envname}
commands = {toxinidir}/setup-test-env-kafka.sh stestr run --slowest {posargs:oslo_messaging.tests.functional} WORKDIR={toxworkdir}
commands = {toxinidir}/tools/setup-scenario-env.sh stestr run --slowest {posargs:oslo_messaging.tests.functional}
[testenv:bandit] [testenv:bandit]
# NOTE(kgiusti): This is required for the integration test job of the bandit # NOTE(kgiusti): This is required for the integration test job of the bandit