From d1dac1c11d357aa8391de7e62f4d003eb820948d Mon Sep 17 00:00:00 2001 From: dparalen Date: Thu, 31 Aug 2017 16:32:53 +0200 Subject: [PATCH] Class-level _exchanges in FakeExchangeManager The FakeExchangeManager uses an instance-level storage for FakeExchanges mapping[1]. When a client--server pair is created, each keeps their own instance of FakeDriver -> FakeExchangeManager -> FakeExchange, each of which has their own (instance-level) copy of e.g _server_queues[2], making it impossible for them to communicate. This patch makes the _exchanges mapping a class-level attribute in order to keep the registered exchanges shared between all Manager instances, allowing client and server communication (within a single process). The test_server unit-tests had to be refactored to explicitly pass an exchange name when building a target. This is required for an exchange name change to have any effect during a test case run time when compared to passing the exchange name through the URL. This issue was revealed with this patch. [1] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L145,#L148 [2] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L88,#L92 Change-Id: I8dff66f4cafeb1f4c57dbfbfaba5d49e50f55fee Closes-Bug: #1714055 --- oslo_messaging/_drivers/impl_fake.py | 5 +- oslo_messaging/tests/notify/test_listener.py | 4 ++ oslo_messaging/tests/rpc/test_server.py | 75 +++++++++++++------- 3 files changed, 55 insertions(+), 29 deletions(-) diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index f25e8b7d1..6898350e7 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -142,10 +142,11 @@ class FakeExchange(object): class FakeExchangeManager(object): + _exchanges_lock = threading.Lock() + _exchanges = {} + def __init__(self, default_exchange): self._default_exchange = default_exchange - self._exchanges_lock = threading.Lock() - self._exchanges = {} def get_exchange(self, name): if name is None: diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index 6582ccd03..8467c0bf0 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import fixtures import threading from oslo_config import cfg @@ -131,6 +132,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): def setUp(self): super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts()) ListenerSetupMixin.setUp(self) + self.useFixture(fixtures.MonkeyPatch( + 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges', + new_value={})) @mock.patch('debtcollector.deprecate') def test_constructor(self, deprecate): diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 22793242b..4d5c6d5b8 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -35,9 +35,11 @@ load_tests = testscenarios.load_tests_apply_scenarios class ServerSetupMixin(object): class Server(object): - def __init__(self, transport, topic, server, endpoint, serializer): + def __init__(self, transport, topic, server, endpoint, serializer, + exchange): self.controller = ServerSetupMixin.ServerController() - target = oslo_messaging.Target(topic=topic, server=server) + target = oslo_messaging.Target(topic=topic, server=server, + exchange=exchange) self.server = oslo_messaging.get_rpc_server(transport, target, [endpoint, @@ -81,25 +83,25 @@ class ServerSetupMixin(object): def __init__(self): self.serializer = self.TestSerializer() - def _setup_server(self, transport, endpoint, topic=None, server=None): + def _setup_server(self, transport, endpoint, topic=None, server=None, + exchange=None): server = self.Server(transport, topic=topic or 'testtopic', server=server or 'testserver', endpoint=endpoint, - serializer=self.serializer) + serializer=self.serializer, + exchange=exchange) server.start() return server - def _stop_server(self, client, server, topic=None): - if topic is not None: - client = client.prepare(topic=topic) + def _stop_server(self, client, server, topic=None, exchange=None): client.cast({}, 'stop') server.wait() - def _setup_client(self, transport, topic='testtopic'): - return oslo_messaging.RPCClient(transport, - oslo_messaging.Target(topic=topic), + def _setup_client(self, transport, topic='testtopic', exchange=None): + target = oslo_messaging.Target(topic=topic, exchange=exchange) + return oslo_messaging.RPCClient(transport, target=target, serializer=self.serializer) @@ -111,6 +113,11 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): def setUp(self): super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts()) + # FakeExchangeManager uses a class-level exchanges mapping; "reset" it + # before tests assert amount of items stored + self.useFixture(fixtures.MonkeyPatch( + 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges', + new_value={})) @mock.patch('warnings.warn') def test_constructor(self, warn): @@ -300,14 +307,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings) def test_call(self): - transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') + # NOTE(milan): using a separate transport instance for each the client + # and the server to be able to check independent transport instances + # can communicate over same exchange&topic + transport_srv = oslo_messaging.get_rpc_transport(self.conf, + url='fake:') + transport_cli = oslo_messaging.get_rpc_transport(self.conf, + url='fake:') class TestEndpoint(object): def ping(self, ctxt, arg): return arg - server_thread = self._setup_server(transport, TestEndpoint()) - client = self._setup_client(transport) + server_thread = self._setup_server(transport_srv, TestEndpoint()) + client = self._setup_client(transport_cli) self.assertIsNone(client.call({}, 'ping', arg=None)) self.assertEqual(0, client.call({}, 'ping', arg=0)) @@ -498,8 +511,8 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin): single_server = params['server1'] == params['server2'] return not (single_topic and single_server) - # fanout to multiple servers on same topic and exchange - # each endpoint will receive both messages + # fanout to multiple servers on same topic and exchange each endpoint + # will receive both messages def fanout_to_servers(scenario): params = scenario[1] fanout = params['fanout1'] or params['fanout2'] @@ -536,14 +549,16 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin): def setUp(self): super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts()) + self.useFixture(fixtures.MonkeyPatch( + 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges', + new_value={})) def test_multiple_servers(self): - url1 = 'fake:///' + (self.exchange1 or '') - url2 = 'fake:///' + (self.exchange2 or '') - - transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1) - if url1 != url2: - transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1) + transport1 = oslo_messaging.get_rpc_transport(self.conf, + url='fake:') + if self.exchange1 != self.exchange2: + transport2 = oslo_messaging.get_rpc_transport(self.conf, + url='fake:') else: transport2 = transport1 @@ -563,12 +578,18 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin): endpoint1 = endpoint2 = TestEndpoint() server1 = self._setup_server(transport1, endpoint1, - topic=self.topic1, server=self.server1) + topic=self.topic1, + exchange=self.exchange1, + server=self.server1) server2 = self._setup_server(transport2, endpoint2, - topic=self.topic2, server=self.server2) + topic=self.topic2, + exchange=self.exchange2, + server=self.server2) - client1 = self._setup_client(transport1, topic=self.topic1) - client2 = self._setup_client(transport2, topic=self.topic2) + client1 = self._setup_client(transport1, topic=self.topic1, + exchange=self.exchange1) + client2 = self._setup_client(transport2, topic=self.topic2, + exchange=self.exchange2) client1 = client1.prepare(server=self.server1) client2 = client2.prepare(server=self.server2) @@ -584,9 +605,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin): (client2.call if self.call2 else client2.cast)({}, 'ping', arg='2') self._stop_server(client1.prepare(fanout=None), - server1, topic=self.topic1) + server1, topic=self.topic1, exchange=self.exchange1) self._stop_server(client2.prepare(fanout=None), - server2, topic=self.topic2) + server2, topic=self.topic2, exchange=self.exchange2) def check(pings, expect): self.assertEqual(len(expect), len(pings))