diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py old mode 100644 new mode 100755 index d495d2f54..616e4d401 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py @@ -16,9 +16,11 @@ import collections import logging import six +import time from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -181,28 +183,48 @@ class MatchmakerDummy(MatchmakerBase): self._cache = collections.defaultdict(list) self._publishers = set() self._routers = set() + self._address = {} + self.executor = zmq_async.get_executor(method=self._loop) + self.executor.execute() def register_publisher(self, hostname, expire=-1): if hostname not in self._publishers: self._publishers.add(hostname) + self._address[hostname] = expire def unregister_publisher(self, hostname): if hostname in self._publishers: self._publishers.remove(hostname) + if hostname in self._address: + self._address.pop(hostname) def get_publishers(self): - return list(self._publishers) + hosts = [host for host in self._publishers + if self._address[host] > 0] + return hosts def register_router(self, hostname, expire=-1): if hostname not in self._routers: self._routers.add(hostname) + self._address[hostname] = expire def unregister_router(self, hostname): if hostname in self._routers: self._routers.remove(hostname) + if hostname in self._address: + self._address.pop(hostname) def get_routers(self): - return list(self._routers) + hosts = [host for host in self._routers + if self._address[host] > 0] + return hosts + + def _loop(self): + for hostname in self._address: + expire = self._address[hostname] + if expire > 0: + self._address[hostname] = expire - 1 + time.sleep(1) def register(self, target, hostname, listener_type, expire=-1): if target.server: @@ -214,6 +236,8 @@ class MatchmakerDummy(MatchmakerBase): if hostname not in self._cache[key]: self._cache[key].append(hostname) + self._address[hostname] = expire + def unregister(self, target, hostname, listener_type): if target.server: key = zmq_address.target_to_key(target, listener_type) @@ -224,16 +248,21 @@ class MatchmakerDummy(MatchmakerBase): if hostname in self._cache[key]: self._cache[key].remove(hostname) + if hostname in self._address: + self._address.pop(hostname) + def get_hosts(self, target, listener_type): hosts = [] if target.server: key = zmq_address.target_to_key(target, listener_type) - hosts.extend(self._cache[key]) + hosts.extend([host for host in self._cache[key] + if self._address[host] > 0]) if not hosts: key = zmq_address.prefix_str(target.topic, listener_type) - hosts.extend(self._cache[key]) + hosts.extend([host for host in self._cache[key] + if self._address[host] > 0]) LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s", {"target": target, "hosts": hosts}) @@ -246,8 +275,10 @@ class MatchmakerDummy(MatchmakerBase): return self.get_hosts(target, listener_type) def get_hosts_fanout(self, target, listener_type): + hosts = [] key = zmq_address.target_to_key(target, listener_type) - hosts = list(self._cache[key]) + hosts.extend([host for host in self._cache[key] + if self._address[host] > 0]) LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s", {"target": target, "hosts": hosts}) diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py old mode 100644 new mode 100755 index 7c117ff83..a20cdb4c7 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py @@ -65,21 +65,41 @@ class TestImplMatchmaker(test_utils.BaseTestCase): self.host2 = b"test_host2" def test_register(self): - self.test_matcher.register(self.target, self.host1, "test") + self.test_matcher.register( + self.target, + self.host1, + "test", + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) self.assertEqual([self.host1], self.test_matcher.get_hosts(self.target, "test")) def test_register_two_hosts(self): - self.test_matcher.register(self.target, self.host1, "test") - self.test_matcher.register(self.target, self.host2, "test") + self.test_matcher.register( + self.target, + self.host1, + "test", + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) + self.test_matcher.register( + self.target, + self.host2, + "test", + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1, self.host2]) def test_register_unregister(self): - self.test_matcher.register(self.target, self.host1, "test") - self.test_matcher.register(self.target, self.host2, "test") + self.test_matcher.register( + self.target, + self.host1, + "test", + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) + self.test_matcher.register( + self.target, + self.host2, + "test", + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) self.test_matcher.unregister(self.target, self.host2, "test") @@ -87,8 +107,16 @@ class TestImplMatchmaker(test_utils.BaseTestCase): [self.host1]) def test_register_two_same_hosts(self): - self.test_matcher.register(self.target, self.host1, "test") - self.test_matcher.register(self.target, self.host1, "test") + self.test_matcher.register( + self.target, + self.host1, + "test", + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) + self.test_matcher.register( + self.target, + self.host1, + "test", + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) self.assertEqual([self.host1], self.test_matcher.get_hosts(self.target, "test")) diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py old mode 100644 new mode 100755 index ec302c6c7..cc72608a8 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -63,7 +63,9 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): self.publisher = zmq_publisher_proxy.PublisherProxy( self.conf, self.driver.matchmaker) - self.driver.matchmaker.register_publisher((self.publisher.host, '')) + self.driver.matchmaker.register_publisher( + (self.publisher.host, ''), + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) self.listeners = [] for _ in range(self.LISTENERS_COUNT):