Merge "Add driver independent functional tests"
This commit is contained in:
commit
6dc19e0b93
0
tests/functional/__init__.py
Normal file
0
tests/functional/__init__.py
Normal file
279
tests/functional/test_functional.py
Normal file
279
tests/functional/test_functional.py
Normal file
@ -0,0 +1,279 @@
|
|||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from oslo import messaging
|
||||||
|
|
||||||
|
from testtools import matchers
|
||||||
|
|
||||||
|
from tests.functional.utils import ClientStub
|
||||||
|
from tests.functional.utils import IsValidDistributionOf
|
||||||
|
from tests.functional.utils import NotificationFixture
|
||||||
|
from tests.functional.utils import RpcServerGroupFixture
|
||||||
|
from tests.functional.utils import SkipIfNoTransportURL
|
||||||
|
from tests.functional.utils import TransportFixture
|
||||||
|
|
||||||
|
|
||||||
|
class CallTestCase(SkipIfNoTransportURL):
|
||||||
|
def test_specific_server(self):
|
||||||
|
group = self.useFixture(RpcServerGroupFixture(self.url))
|
||||||
|
client = group.client(1)
|
||||||
|
client.append(text='open')
|
||||||
|
self.assertEqual('openstack', client.append(text='stack'))
|
||||||
|
client.add(increment=2)
|
||||||
|
self.assertEqual(12, client.add(increment=10))
|
||||||
|
self.assertEqual(9, client.subtract(increment=3))
|
||||||
|
self.assertEqual('openstack', group.servers[1].endpoint.sval)
|
||||||
|
self.assertEqual(9, group.servers[1].endpoint.ival)
|
||||||
|
for i in [0, 2]:
|
||||||
|
self.assertEqual('', group.servers[i].endpoint.sval)
|
||||||
|
self.assertEqual(0, group.servers[i].endpoint.ival)
|
||||||
|
|
||||||
|
def test_server_in_group(self):
|
||||||
|
group = self.useFixture(RpcServerGroupFixture(self.url))
|
||||||
|
|
||||||
|
client = group.client()
|
||||||
|
data = [c for c in 'abcdefghijklmn']
|
||||||
|
for i in data:
|
||||||
|
client.append(text=i)
|
||||||
|
|
||||||
|
for s in group.servers:
|
||||||
|
self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
|
||||||
|
actual = [[c for c in s.endpoint.sval] for s in group.servers]
|
||||||
|
self.assertThat(actual, IsValidDistributionOf(data))
|
||||||
|
|
||||||
|
def test_different_exchanges(self):
|
||||||
|
t = self.useFixture(TransportFixture(self.url))
|
||||||
|
# If the different exchanges are not honoured, then the
|
||||||
|
# teardown may hang unless we broadcast all control messages
|
||||||
|
# to each server
|
||||||
|
group1 = self.useFixture(RpcServerGroupFixture(self.url, transport=t,
|
||||||
|
use_fanout_ctrl=True))
|
||||||
|
group2 = self.useFixture(RpcServerGroupFixture(self.url, exchange="a",
|
||||||
|
transport=t,
|
||||||
|
use_fanout_ctrl=True))
|
||||||
|
group3 = self.useFixture(RpcServerGroupFixture(self.url, exchange="b",
|
||||||
|
transport=t,
|
||||||
|
use_fanout_ctrl=True))
|
||||||
|
|
||||||
|
client1 = group1.client(1)
|
||||||
|
data1 = [c for c in 'abcdefghijklmn']
|
||||||
|
for i in data1:
|
||||||
|
client1.append(text=i)
|
||||||
|
|
||||||
|
client2 = group2.client()
|
||||||
|
data2 = [c for c in 'opqrstuvwxyz']
|
||||||
|
for i in data2:
|
||||||
|
client2.append(text=i)
|
||||||
|
|
||||||
|
actual1 = [[c for c in s.endpoint.sval] for s in group1.servers]
|
||||||
|
self.assertThat(actual1, IsValidDistributionOf(data1))
|
||||||
|
actual1 = [c for c in group1.servers[1].endpoint.sval]
|
||||||
|
self.assertThat([actual1], IsValidDistributionOf(data1))
|
||||||
|
for s in group1.servers:
|
||||||
|
expected = len(data1) if group1.servers.index(s) == 1 else 0
|
||||||
|
self.assertEqual(expected, len(s.endpoint.sval))
|
||||||
|
self.assertEqual(0, s.endpoint.ival)
|
||||||
|
|
||||||
|
actual2 = [[c for c in s.endpoint.sval] for s in group2.servers]
|
||||||
|
for s in group2.servers:
|
||||||
|
self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
|
||||||
|
self.assertEqual(0, s.endpoint.ival)
|
||||||
|
self.assertThat(actual2, IsValidDistributionOf(data2))
|
||||||
|
|
||||||
|
for s in group3.servers:
|
||||||
|
self.assertEqual(0, len(s.endpoint.sval))
|
||||||
|
self.assertEqual(0, s.endpoint.ival)
|
||||||
|
|
||||||
|
def test_timeout(self):
|
||||||
|
transport = self.useFixture(TransportFixture(self.url))
|
||||||
|
target = messaging.Target(topic="no_such_topic")
|
||||||
|
c = ClientStub(transport.transport, target, timeout=1)
|
||||||
|
self.assertThat(c.ping, matchers.raises(messaging.MessagingTimeout))
|
||||||
|
|
||||||
|
def test_exception(self):
|
||||||
|
group = self.useFixture(RpcServerGroupFixture(self.url))
|
||||||
|
client = group.client(1)
|
||||||
|
client.add(increment=2)
|
||||||
|
f = lambda: client.subtract(increment=3)
|
||||||
|
self.assertThat(f, matchers.raises(ValueError))
|
||||||
|
|
||||||
|
|
||||||
|
class CastTestCase(SkipIfNoTransportURL):
|
||||||
|
# Note: casts return immediately, so these tests utilise a special
|
||||||
|
# internal sync() cast to ensure prior casts are complete before
|
||||||
|
# making the necessary assertions.
|
||||||
|
|
||||||
|
def test_specific_server(self):
|
||||||
|
group = self.useFixture(RpcServerGroupFixture(self.url))
|
||||||
|
client = group.client(1, cast=True)
|
||||||
|
client.append(text='open')
|
||||||
|
client.append(text='stack')
|
||||||
|
client.add(increment=2)
|
||||||
|
client.add(increment=10)
|
||||||
|
group.sync()
|
||||||
|
|
||||||
|
self.assertEqual('openstack', group.servers[1].endpoint.sval)
|
||||||
|
self.assertEqual(12, group.servers[1].endpoint.ival)
|
||||||
|
for i in [0, 2]:
|
||||||
|
self.assertEqual('', group.servers[i].endpoint.sval)
|
||||||
|
self.assertEqual(0, group.servers[i].endpoint.ival)
|
||||||
|
|
||||||
|
def test_server_in_group(self):
|
||||||
|
group = self.useFixture(RpcServerGroupFixture(self.url))
|
||||||
|
client = group.client(cast=True)
|
||||||
|
for i in range(20):
|
||||||
|
client.add(increment=1)
|
||||||
|
group.sync()
|
||||||
|
total = 0
|
||||||
|
for s in group.servers:
|
||||||
|
ival = s.endpoint.ival
|
||||||
|
self.assertThat(ival, matchers.GreaterThan(0))
|
||||||
|
self.assertThat(ival, matchers.LessThan(20))
|
||||||
|
total += ival
|
||||||
|
self.assertEqual(20, total)
|
||||||
|
|
||||||
|
def test_fanout(self):
|
||||||
|
group = self.useFixture(RpcServerGroupFixture(self.url))
|
||||||
|
client = group.client('all', cast=True)
|
||||||
|
client.append(text='open')
|
||||||
|
client.append(text='stack')
|
||||||
|
client.add(increment=2)
|
||||||
|
client.add(increment=10)
|
||||||
|
group.sync(server='all')
|
||||||
|
for s in group.servers:
|
||||||
|
self.assertEqual('openstack', s.endpoint.sval)
|
||||||
|
self.assertEqual(12, s.endpoint.ival)
|
||||||
|
|
||||||
|
|
||||||
|
class NotifyTestCase(SkipIfNoTransportURL):
|
||||||
|
# NOTE(sileht): Each test must not use the same topics
|
||||||
|
# to be run in parallel
|
||||||
|
|
||||||
|
def test_simple(self):
|
||||||
|
transport = self.useFixture(TransportFixture(self.url))
|
||||||
|
listener = self.useFixture(NotificationFixture(transport.transport,
|
||||||
|
['test_simple']))
|
||||||
|
transport.wait()
|
||||||
|
notifier = listener.notifier('abc')
|
||||||
|
|
||||||
|
notifier.info({}, 'test', 'Hello World!')
|
||||||
|
event = listener.events.get(timeout=1)
|
||||||
|
self.assertEqual('info', event[0])
|
||||||
|
self.assertEqual('test', event[1])
|
||||||
|
self.assertEqual('Hello World!', event[2])
|
||||||
|
self.assertEqual('abc', event[3])
|
||||||
|
|
||||||
|
def test_multiple_topics(self):
|
||||||
|
transport = self.useFixture(TransportFixture(self.url))
|
||||||
|
listener = self.useFixture(NotificationFixture(transport.transport,
|
||||||
|
['a', 'b']))
|
||||||
|
transport.wait()
|
||||||
|
a = listener.notifier('pub-a', topic='a')
|
||||||
|
b = listener.notifier('pub-b', topic='b')
|
||||||
|
|
||||||
|
sent = {
|
||||||
|
'pub-a': [a, 'test-a', 'payload-a'],
|
||||||
|
'pub-b': [b, 'test-b', 'payload-b']
|
||||||
|
}
|
||||||
|
for e in sent.values():
|
||||||
|
e[0].info({}, e[1], e[2])
|
||||||
|
|
||||||
|
received = {}
|
||||||
|
while len(received) < len(sent):
|
||||||
|
e = listener.events.get(timeout=1)
|
||||||
|
received[e[3]] = e
|
||||||
|
|
||||||
|
for key in received:
|
||||||
|
actual = received[key]
|
||||||
|
expected = sent[key]
|
||||||
|
self.assertEqual('info', actual[0])
|
||||||
|
self.assertEqual(expected[1], actual[1])
|
||||||
|
self.assertEqual(expected[2], actual[2])
|
||||||
|
|
||||||
|
def test_multiple_servers(self):
|
||||||
|
transport = self.useFixture(TransportFixture(self.url))
|
||||||
|
listener_a = self.useFixture(NotificationFixture(transport.transport,
|
||||||
|
['test-topic']))
|
||||||
|
listener_b = self.useFixture(NotificationFixture(transport.transport,
|
||||||
|
['test-topic']))
|
||||||
|
transport.wait()
|
||||||
|
n = listener_a.notifier('pub')
|
||||||
|
|
||||||
|
events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh']
|
||||||
|
|
||||||
|
for event_type, payload in events_out:
|
||||||
|
n.info({}, event_type, payload)
|
||||||
|
|
||||||
|
events_in = [[(e[1], e[2]) for e in listener_a.get_events()],
|
||||||
|
[(e[1], e[2]) for e in listener_b.get_events()]]
|
||||||
|
|
||||||
|
self.assertThat(events_in, IsValidDistributionOf(events_out))
|
||||||
|
for stream in events_in:
|
||||||
|
self.assertThat(len(stream), matchers.GreaterThan(0))
|
||||||
|
|
||||||
|
def test_independent_topics(self):
|
||||||
|
transport = self.useFixture(TransportFixture(self.url))
|
||||||
|
listener_a = self.useFixture(NotificationFixture(transport.transport,
|
||||||
|
['1']))
|
||||||
|
listener_b = self.useFixture(NotificationFixture(transport.transport,
|
||||||
|
['2']))
|
||||||
|
transport.wait()
|
||||||
|
|
||||||
|
a = listener_a.notifier('pub-1', topic='1')
|
||||||
|
b = listener_b.notifier('pub-2', topic='2')
|
||||||
|
|
||||||
|
a_out = [('test-1-%s' % c, 'payload-1-%s' % c) for c in 'abcdefgh']
|
||||||
|
for event_type, payload in a_out:
|
||||||
|
a.info({}, event_type, payload)
|
||||||
|
|
||||||
|
b_out = [('test-2-%s' % c, 'payload-2-%s' % c) for c in 'ijklmnop']
|
||||||
|
for event_type, payload in b_out:
|
||||||
|
b.info({}, event_type, payload)
|
||||||
|
|
||||||
|
for expected in a_out:
|
||||||
|
actual = listener_a.events.get(timeout=0.5)
|
||||||
|
self.assertEqual('info', actual[0])
|
||||||
|
self.assertEqual(expected[0], actual[1])
|
||||||
|
self.assertEqual(expected[1], actual[2])
|
||||||
|
self.assertEqual('pub-1', actual[3])
|
||||||
|
|
||||||
|
for expected in b_out:
|
||||||
|
actual = listener_b.events.get(timeout=0.5)
|
||||||
|
self.assertEqual('info', actual[0])
|
||||||
|
self.assertEqual(expected[0], actual[1])
|
||||||
|
self.assertEqual(expected[1], actual[2])
|
||||||
|
self.assertEqual('pub-2', actual[3])
|
||||||
|
|
||||||
|
def test_all_categories(self):
|
||||||
|
transport = self.useFixture(TransportFixture(self.url))
|
||||||
|
listener = self.useFixture(NotificationFixture(
|
||||||
|
transport.transport, ['test_all_categories']))
|
||||||
|
transport.wait()
|
||||||
|
n = listener.notifier('abc')
|
||||||
|
|
||||||
|
cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical']
|
||||||
|
events = [(getattr(n, c), c, 'type-' + c, c + '-data') for c in cats]
|
||||||
|
for e in events:
|
||||||
|
e[0]({}, e[2], e[3])
|
||||||
|
|
||||||
|
# order between events with different categories is not guaranteed
|
||||||
|
received = {}
|
||||||
|
for expected in events:
|
||||||
|
e = listener.events.get(timeout=0.5)
|
||||||
|
received[e[0]] = e
|
||||||
|
|
||||||
|
for expected in events:
|
||||||
|
actual = received[expected[1]]
|
||||||
|
self.assertEqual(expected[1], actual[0])
|
||||||
|
self.assertEqual(expected[2], actual[1])
|
||||||
|
self.assertEqual(expected[3], actual[2])
|
343
tests/functional/utils.py
Normal file
343
tests/functional/utils.py
Normal file
@ -0,0 +1,343 @@
|
|||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import fixtures
|
||||||
|
from six import moves
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
from oslo import messaging
|
||||||
|
from oslo.messaging.notify import notifier
|
||||||
|
from tests import utils as test_utils
|
||||||
|
|
||||||
|
|
||||||
|
class TestServerEndpoint(object):
|
||||||
|
"""This MessagingServer that will be used during functional testing."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.ival = 0
|
||||||
|
self.sval = ''
|
||||||
|
|
||||||
|
def add(self, ctxt, increment):
|
||||||
|
self.ival += increment
|
||||||
|
return self.ival
|
||||||
|
|
||||||
|
def subtract(self, ctxt, increment):
|
||||||
|
if self.ival < increment:
|
||||||
|
raise ValueError("ival can't go negative!")
|
||||||
|
self.ival -= increment
|
||||||
|
return self.ival
|
||||||
|
|
||||||
|
def append(self, ctxt, text):
|
||||||
|
self.sval += text
|
||||||
|
return self.sval
|
||||||
|
|
||||||
|
|
||||||
|
class TransportFixture(fixtures.Fixture):
|
||||||
|
"""Fixture defined to setup the oslo.messaging transport."""
|
||||||
|
|
||||||
|
def __init__(self, url):
|
||||||
|
self.url = url
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TransportFixture, self).setUp()
|
||||||
|
self.transport = messaging.get_transport(cfg.CONF, url=self.url)
|
||||||
|
|
||||||
|
def cleanUp(self):
|
||||||
|
self.transport.cleanup()
|
||||||
|
super(TransportFixture, self).cleanUp()
|
||||||
|
|
||||||
|
def wait(self):
|
||||||
|
if self.url.startswith("rabbit") or self.url.startswith("qpid"):
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
class RpcServerFixture(fixtures.Fixture):
|
||||||
|
"""Fixture to setup the TestServerEndpoint."""
|
||||||
|
|
||||||
|
def __init__(self, transport, target, endpoint=None, ctrl_target=None):
|
||||||
|
super(RpcServerFixture, self).__init__()
|
||||||
|
self.transport = transport
|
||||||
|
self.target = target
|
||||||
|
self.endpoint = endpoint or TestServerEndpoint()
|
||||||
|
self.syncq = moves.queue.Queue()
|
||||||
|
self.ctrl_target = ctrl_target or self.target
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(RpcServerFixture, self).setUp()
|
||||||
|
endpoints = [self.endpoint, self]
|
||||||
|
self.server = messaging.get_rpc_server(self.transport,
|
||||||
|
self.target,
|
||||||
|
endpoints)
|
||||||
|
self._ctrl = messaging.RPCClient(self.transport, self.ctrl_target)
|
||||||
|
self._start()
|
||||||
|
|
||||||
|
def cleanUp(self):
|
||||||
|
self._stop()
|
||||||
|
super(RpcServerFixture, self).cleanUp()
|
||||||
|
|
||||||
|
def _start(self):
|
||||||
|
self.thread = threading.Thread(target=self.server.start)
|
||||||
|
self.thread.daemon = True
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def _stop(self):
|
||||||
|
self.server.stop()
|
||||||
|
self._ctrl.cast({}, 'ping')
|
||||||
|
self.server.wait()
|
||||||
|
self.thread.join()
|
||||||
|
|
||||||
|
def ping(self, ctxt):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def sync(self, ctxt, item):
|
||||||
|
self.syncq.put(item)
|
||||||
|
|
||||||
|
|
||||||
|
class RpcServerGroupFixture(fixtures.Fixture):
|
||||||
|
def __init__(self, url, topic=None, names=None, exchange=None,
|
||||||
|
transport=None, use_fanout_ctrl=False):
|
||||||
|
self.url = url
|
||||||
|
# NOTE(sileht): topic and servier_name must be uniq
|
||||||
|
# to be able to run all tests in parallel
|
||||||
|
self.topic = topic or str(uuid.uuid4())
|
||||||
|
self.names = names or ["server_%i_%s" % (i, uuid.uuid4())
|
||||||
|
for i in range(3)]
|
||||||
|
self.exchange = exchange
|
||||||
|
self.targets = [self._target(server=n) for n in self.names]
|
||||||
|
self.transport = transport
|
||||||
|
self.use_fanout_ctrl = use_fanout_ctrl
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(RpcServerGroupFixture, self).setUp()
|
||||||
|
if not self.transport:
|
||||||
|
self.transport = self.useFixture(TransportFixture(self.url))
|
||||||
|
self.servers = [self.useFixture(self._server(t)) for t in self.targets]
|
||||||
|
self.transport.wait()
|
||||||
|
|
||||||
|
def _target(self, server=None, fanout=False):
|
||||||
|
t = messaging.Target(exchange=self.exchange, topic=self.topic)
|
||||||
|
t.server = server
|
||||||
|
t.fanout = fanout
|
||||||
|
return t
|
||||||
|
|
||||||
|
def _server(self, target):
|
||||||
|
ctrl = None
|
||||||
|
if self.use_fanout_ctrl:
|
||||||
|
ctrl = self._target(fanout=True)
|
||||||
|
return RpcServerFixture(self.transport.transport, target,
|
||||||
|
ctrl_target=ctrl)
|
||||||
|
|
||||||
|
def client(self, server=None, cast=False):
|
||||||
|
if server:
|
||||||
|
if server == 'all':
|
||||||
|
target = self._target(fanout=True)
|
||||||
|
elif server >= 0 and server < len(self.targets):
|
||||||
|
target = self.targets[server]
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid value for server: %r" % server)
|
||||||
|
else:
|
||||||
|
target = self._target()
|
||||||
|
return ClientStub(self.transport.transport, target, cast=cast,
|
||||||
|
timeout=5)
|
||||||
|
|
||||||
|
def sync(self, server=None):
|
||||||
|
if server:
|
||||||
|
if server == 'all':
|
||||||
|
c = self.client(server='all', cast=True)
|
||||||
|
c.sync(item='x')
|
||||||
|
for s in self.servers:
|
||||||
|
s.syncq.get(timeout=5)
|
||||||
|
elif server >= 0 and server < len(self.targets):
|
||||||
|
c = self.client(server=server, cast=True)
|
||||||
|
c.sync(item='x')
|
||||||
|
self.servers[server].syncq.get(timeout=5)
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid value for server: %r" % server)
|
||||||
|
else:
|
||||||
|
for i in range(len(self.servers)):
|
||||||
|
self.client(i).ping()
|
||||||
|
|
||||||
|
|
||||||
|
class RpcCall(object):
|
||||||
|
def __init__(self, client, method, context):
|
||||||
|
self.client = client
|
||||||
|
self.method = method
|
||||||
|
self.context = context
|
||||||
|
|
||||||
|
def __call__(self, **kwargs):
|
||||||
|
self.context['time'] = time.ctime()
|
||||||
|
self.context['cast'] = False
|
||||||
|
result = self.client.call(self.context, self.method, **kwargs)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
class RpcCast(RpcCall):
|
||||||
|
def __call__(self, **kwargs):
|
||||||
|
self.context['time'] = time.ctime()
|
||||||
|
self.context['cast'] = True
|
||||||
|
self.client.cast(self.context, self.method, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class ClientStub(object):
|
||||||
|
def __init__(self, transport, target, cast=False, name=None, **kwargs):
|
||||||
|
self.name = name or "functional-tests"
|
||||||
|
self.cast = cast
|
||||||
|
self.client = messaging.RPCClient(transport, target, **kwargs)
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
context = {"application": self.name}
|
||||||
|
if self.cast:
|
||||||
|
return RpcCast(self.client, name, context)
|
||||||
|
else:
|
||||||
|
return RpcCall(self.client, name, context)
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidDistribution(object):
|
||||||
|
def __init__(self, original, received):
|
||||||
|
self.original = original
|
||||||
|
self.received = received
|
||||||
|
self.missing = []
|
||||||
|
self.extra = []
|
||||||
|
self.wrong_order = []
|
||||||
|
|
||||||
|
def describe(self):
|
||||||
|
text = "Sent %s, got %s; " % (self.original, self.received)
|
||||||
|
e1 = ["%r was missing" % m for m in self.missing]
|
||||||
|
e2 = ["%r was not expected" % m for m in self.extra]
|
||||||
|
e3 = ["%r expected before %r" % (m[0], m[1]) for m in self.wrong_order]
|
||||||
|
return text + ", ".join(e1 + e2 + e3)
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.extra) + len(self.missing) + len(self.wrong_order)
|
||||||
|
|
||||||
|
def get_details(self):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
class IsValidDistributionOf(object):
|
||||||
|
"""Test whether a given list can be split into particular
|
||||||
|
sub-lists. All items in the original list must be in exactly one
|
||||||
|
sub-list, and must appear in that sub-list in the same order with
|
||||||
|
respect to any other items as in the original list.
|
||||||
|
"""
|
||||||
|
def __init__(self, original):
|
||||||
|
self.original = original
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return 'IsValidDistribution(%s)' % self.original
|
||||||
|
|
||||||
|
def match(self, actual):
|
||||||
|
errors = InvalidDistribution(self.original, actual)
|
||||||
|
received = [[i for i in l] for l in actual]
|
||||||
|
|
||||||
|
def _remove(obj, lists):
|
||||||
|
for l in lists:
|
||||||
|
if obj in l:
|
||||||
|
front = l[0]
|
||||||
|
l.remove(obj)
|
||||||
|
return front
|
||||||
|
return None
|
||||||
|
|
||||||
|
for item in self.original:
|
||||||
|
o = _remove(item, received)
|
||||||
|
if not o:
|
||||||
|
errors.missing += item
|
||||||
|
elif item != o:
|
||||||
|
errors.wrong_order.append([item, o])
|
||||||
|
for l in received:
|
||||||
|
errors.extra += l
|
||||||
|
return errors or None
|
||||||
|
|
||||||
|
|
||||||
|
class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(SkipIfNoTransportURL, self).setUp()
|
||||||
|
self.url = os.environ.get('TRANSPORT_URL')
|
||||||
|
if not self.url:
|
||||||
|
self.skipTest("No transport url configured")
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationFixture(fixtures.Fixture):
|
||||||
|
def __init__(self, transport, topics):
|
||||||
|
super(NotificationFixture, self).__init__()
|
||||||
|
self.transport = transport
|
||||||
|
self.topics = topics
|
||||||
|
self.events = moves.queue.Queue()
|
||||||
|
self.name = str(id(self))
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(NotificationFixture, self).setUp()
|
||||||
|
targets = [messaging.Target(topic=t) for t in self.topics]
|
||||||
|
# add a special topic for internal notifications
|
||||||
|
targets.append(messaging.Target(topic=self.name))
|
||||||
|
self.server = messaging.get_notification_listener(self.transport,
|
||||||
|
targets,
|
||||||
|
[self])
|
||||||
|
self._ctrl = self.notifier('internal', topic=self.name)
|
||||||
|
self._start()
|
||||||
|
|
||||||
|
def cleanUp(self):
|
||||||
|
self._stop()
|
||||||
|
super(NotificationFixture, self).cleanUp()
|
||||||
|
|
||||||
|
def _start(self):
|
||||||
|
self.thread = threading.Thread(target=self.server.start)
|
||||||
|
self.thread.daemon = True
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def _stop(self):
|
||||||
|
self.server.stop()
|
||||||
|
self._ctrl.sample({}, 'shutdown', 'shutdown')
|
||||||
|
self.server.wait()
|
||||||
|
self.thread.join()
|
||||||
|
|
||||||
|
def notifier(self, publisher, topic=None):
|
||||||
|
return notifier.Notifier(self.transport,
|
||||||
|
publisher,
|
||||||
|
driver='messaging',
|
||||||
|
topic=topic or self.topics[0])
|
||||||
|
|
||||||
|
def debug(self, ctxt, publisher, event_type, payload, metadata):
|
||||||
|
self.events.put(['debug', event_type, payload, publisher])
|
||||||
|
|
||||||
|
def audit(self, ctxt, publisher, event_type, payload, metadata):
|
||||||
|
self.events.put(['audit', event_type, payload, publisher])
|
||||||
|
|
||||||
|
def info(self, ctxt, publisher, event_type, payload, metadata):
|
||||||
|
self.events.put(['info', event_type, payload, publisher])
|
||||||
|
|
||||||
|
def warn(self, ctxt, publisher, event_type, payload, metadata):
|
||||||
|
self.events.put(['warn', event_type, payload, publisher])
|
||||||
|
|
||||||
|
def error(self, ctxt, publisher, event_type, payload, metadata):
|
||||||
|
self.events.put(['error', event_type, payload, publisher])
|
||||||
|
|
||||||
|
def critical(self, ctxt, publisher, event_type, payload, metadata):
|
||||||
|
self.events.put(['critical', event_type, payload, publisher])
|
||||||
|
|
||||||
|
def sample(self, ctxt, publisher, event_type, payload, metadata):
|
||||||
|
pass # Just used for internal shutdown control
|
||||||
|
|
||||||
|
def get_events(self, timeout=0.5):
|
||||||
|
results = []
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
results.append(self.events.get(timeout=timeout))
|
||||||
|
except moves.queue.Empty:
|
||||||
|
pass
|
||||||
|
return results
|
3
tox.ini
3
tox.ini
@ -36,6 +36,9 @@ deps = -r{toxinidir}/requirements-py3.txt
|
|||||||
deps = -r{toxinidir}/amqp1-requirements.txt
|
deps = -r{toxinidir}/amqp1-requirements.txt
|
||||||
{[testenv]deps}
|
{[testenv]deps}
|
||||||
|
|
||||||
|
[testenv:py27-rabbit]
|
||||||
|
setenv = TRANSPORT_URL=rabbit://guest:guest@localhost
|
||||||
|
|
||||||
[flake8]
|
[flake8]
|
||||||
show-source = True
|
show-source = True
|
||||||
ignore = H237,H402,H405,H904
|
ignore = H237,H402,H405,H904
|
||||||
|
Loading…
x
Reference in New Issue
Block a user