Merge "Class-level _exchanges in FakeExchangeManager"
This commit is contained in:
commit
6bbb3bc09d
@ -142,10 +142,11 @@ class FakeExchange(object):
|
|||||||
|
|
||||||
|
|
||||||
class FakeExchangeManager(object):
|
class FakeExchangeManager(object):
|
||||||
|
_exchanges_lock = threading.Lock()
|
||||||
|
_exchanges = {}
|
||||||
|
|
||||||
def __init__(self, default_exchange):
|
def __init__(self, default_exchange):
|
||||||
self._default_exchange = default_exchange
|
self._default_exchange = default_exchange
|
||||||
self._exchanges_lock = threading.Lock()
|
|
||||||
self._exchanges = {}
|
|
||||||
|
|
||||||
def get_exchange(self, name):
|
def get_exchange(self, name):
|
||||||
if name is None:
|
if name is None:
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import fixtures
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -131,6 +132,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
|
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
|
||||||
ListenerSetupMixin.setUp(self)
|
ListenerSetupMixin.setUp(self)
|
||||||
|
self.useFixture(fixtures.MonkeyPatch(
|
||||||
|
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
|
||||||
|
new_value={}))
|
||||||
|
|
||||||
@mock.patch('debtcollector.deprecate')
|
@mock.patch('debtcollector.deprecate')
|
||||||
def test_constructor(self, deprecate):
|
def test_constructor(self, deprecate):
|
||||||
|
@ -35,9 +35,11 @@ load_tests = testscenarios.load_tests_apply_scenarios
|
|||||||
class ServerSetupMixin(object):
|
class ServerSetupMixin(object):
|
||||||
|
|
||||||
class Server(object):
|
class Server(object):
|
||||||
def __init__(self, transport, topic, server, endpoint, serializer):
|
def __init__(self, transport, topic, server, endpoint, serializer,
|
||||||
|
exchange):
|
||||||
self.controller = ServerSetupMixin.ServerController()
|
self.controller = ServerSetupMixin.ServerController()
|
||||||
target = oslo_messaging.Target(topic=topic, server=server)
|
target = oslo_messaging.Target(topic=topic, server=server,
|
||||||
|
exchange=exchange)
|
||||||
self.server = oslo_messaging.get_rpc_server(transport,
|
self.server = oslo_messaging.get_rpc_server(transport,
|
||||||
target,
|
target,
|
||||||
[endpoint,
|
[endpoint,
|
||||||
@ -81,25 +83,25 @@ class ServerSetupMixin(object):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.serializer = self.TestSerializer()
|
self.serializer = self.TestSerializer()
|
||||||
|
|
||||||
def _setup_server(self, transport, endpoint, topic=None, server=None):
|
def _setup_server(self, transport, endpoint, topic=None, server=None,
|
||||||
|
exchange=None):
|
||||||
server = self.Server(transport,
|
server = self.Server(transport,
|
||||||
topic=topic or 'testtopic',
|
topic=topic or 'testtopic',
|
||||||
server=server or 'testserver',
|
server=server or 'testserver',
|
||||||
endpoint=endpoint,
|
endpoint=endpoint,
|
||||||
serializer=self.serializer)
|
serializer=self.serializer,
|
||||||
|
exchange=exchange)
|
||||||
|
|
||||||
server.start()
|
server.start()
|
||||||
return server
|
return server
|
||||||
|
|
||||||
def _stop_server(self, client, server, topic=None):
|
def _stop_server(self, client, server, topic=None, exchange=None):
|
||||||
if topic is not None:
|
|
||||||
client = client.prepare(topic=topic)
|
|
||||||
client.cast({}, 'stop')
|
client.cast({}, 'stop')
|
||||||
server.wait()
|
server.wait()
|
||||||
|
|
||||||
def _setup_client(self, transport, topic='testtopic'):
|
def _setup_client(self, transport, topic='testtopic', exchange=None):
|
||||||
return oslo_messaging.RPCClient(transport,
|
target = oslo_messaging.Target(topic=topic, exchange=exchange)
|
||||||
oslo_messaging.Target(topic=topic),
|
return oslo_messaging.RPCClient(transport, target=target,
|
||||||
serializer=self.serializer)
|
serializer=self.serializer)
|
||||||
|
|
||||||
|
|
||||||
@ -111,6 +113,11 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
||||||
|
# FakeExchangeManager uses a class-level exchanges mapping; "reset" it
|
||||||
|
# before tests assert amount of items stored
|
||||||
|
self.useFixture(fixtures.MonkeyPatch(
|
||||||
|
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
|
||||||
|
new_value={}))
|
||||||
|
|
||||||
@mock.patch('warnings.warn')
|
@mock.patch('warnings.warn')
|
||||||
def test_constructor(self, warn):
|
def test_constructor(self, warn):
|
||||||
@ -300,14 +307,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
|
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
|
||||||
|
|
||||||
def test_call(self):
|
def test_call(self):
|
||||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
# NOTE(milan): using a separate transport instance for each the client
|
||||||
|
# and the server to be able to check independent transport instances
|
||||||
|
# can communicate over same exchange&topic
|
||||||
|
transport_srv = oslo_messaging.get_rpc_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
transport_cli = oslo_messaging.get_rpc_transport(self.conf,
|
||||||
|
url='fake:')
|
||||||
|
|
||||||
class TestEndpoint(object):
|
class TestEndpoint(object):
|
||||||
def ping(self, ctxt, arg):
|
def ping(self, ctxt, arg):
|
||||||
return arg
|
return arg
|
||||||
|
|
||||||
server_thread = self._setup_server(transport, TestEndpoint())
|
server_thread = self._setup_server(transport_srv, TestEndpoint())
|
||||||
client = self._setup_client(transport)
|
client = self._setup_client(transport_cli)
|
||||||
|
|
||||||
self.assertIsNone(client.call({}, 'ping', arg=None))
|
self.assertIsNone(client.call({}, 'ping', arg=None))
|
||||||
self.assertEqual(0, client.call({}, 'ping', arg=0))
|
self.assertEqual(0, client.call({}, 'ping', arg=0))
|
||||||
@ -498,8 +511,8 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
single_server = params['server1'] == params['server2']
|
single_server = params['server1'] == params['server2']
|
||||||
return not (single_topic and single_server)
|
return not (single_topic and single_server)
|
||||||
|
|
||||||
# fanout to multiple servers on same topic and exchange
|
# fanout to multiple servers on same topic and exchange each endpoint
|
||||||
# each endpoint will receive both messages
|
# will receive both messages
|
||||||
def fanout_to_servers(scenario):
|
def fanout_to_servers(scenario):
|
||||||
params = scenario[1]
|
params = scenario[1]
|
||||||
fanout = params['fanout1'] or params['fanout2']
|
fanout = params['fanout1'] or params['fanout2']
|
||||||
@ -536,14 +549,16 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
|
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
|
||||||
|
self.useFixture(fixtures.MonkeyPatch(
|
||||||
|
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
|
||||||
|
new_value={}))
|
||||||
|
|
||||||
def test_multiple_servers(self):
|
def test_multiple_servers(self):
|
||||||
url1 = 'fake:///' + (self.exchange1 or '')
|
transport1 = oslo_messaging.get_rpc_transport(self.conf,
|
||||||
url2 = 'fake:///' + (self.exchange2 or '')
|
url='fake:')
|
||||||
|
if self.exchange1 != self.exchange2:
|
||||||
transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
|
transport2 = oslo_messaging.get_rpc_transport(self.conf,
|
||||||
if url1 != url2:
|
url='fake:')
|
||||||
transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
|
|
||||||
else:
|
else:
|
||||||
transport2 = transport1
|
transport2 = transport1
|
||||||
|
|
||||||
@ -563,12 +578,18 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
endpoint1 = endpoint2 = TestEndpoint()
|
endpoint1 = endpoint2 = TestEndpoint()
|
||||||
|
|
||||||
server1 = self._setup_server(transport1, endpoint1,
|
server1 = self._setup_server(transport1, endpoint1,
|
||||||
topic=self.topic1, server=self.server1)
|
topic=self.topic1,
|
||||||
|
exchange=self.exchange1,
|
||||||
|
server=self.server1)
|
||||||
server2 = self._setup_server(transport2, endpoint2,
|
server2 = self._setup_server(transport2, endpoint2,
|
||||||
topic=self.topic2, server=self.server2)
|
topic=self.topic2,
|
||||||
|
exchange=self.exchange2,
|
||||||
|
server=self.server2)
|
||||||
|
|
||||||
client1 = self._setup_client(transport1, topic=self.topic1)
|
client1 = self._setup_client(transport1, topic=self.topic1,
|
||||||
client2 = self._setup_client(transport2, topic=self.topic2)
|
exchange=self.exchange1)
|
||||||
|
client2 = self._setup_client(transport2, topic=self.topic2,
|
||||||
|
exchange=self.exchange2)
|
||||||
|
|
||||||
client1 = client1.prepare(server=self.server1)
|
client1 = client1.prepare(server=self.server1)
|
||||||
client2 = client2.prepare(server=self.server2)
|
client2 = client2.prepare(server=self.server2)
|
||||||
@ -584,9 +605,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
(client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
|
(client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
|
||||||
|
|
||||||
self._stop_server(client1.prepare(fanout=None),
|
self._stop_server(client1.prepare(fanout=None),
|
||||||
server1, topic=self.topic1)
|
server1, topic=self.topic1, exchange=self.exchange1)
|
||||||
self._stop_server(client2.prepare(fanout=None),
|
self._stop_server(client2.prepare(fanout=None),
|
||||||
server2, topic=self.topic2)
|
server2, topic=self.topic2, exchange=self.exchange2)
|
||||||
|
|
||||||
def check(pings, expect):
|
def check(pings, expect):
|
||||||
self.assertEqual(len(expect), len(pings))
|
self.assertEqual(len(expect), len(pings))
|
||||||
|
Loading…
Reference in New Issue
Block a user