diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index ab171930f..d4dbaa9ab 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -40,17 +40,17 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): if request.msg_type != zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - socket = self._connect_to_host(request.target) + socket = self._connect_to_host(request.target, request.timeout) self._send_request(socket, request) return self._receive_reply(socket, request) - def _connect_to_host(self, target): + def _connect_to_host(self, target, timeout=0): try: self.zmq_context = zmq.Context() socket = self.zmq_context.socket(zmq.REQ) - host = self.matchmaker.get_single_host(target) + host = self.matchmaker.get_single_host(target, timeout) connect_address = zmq_address.get_tcp_direct_address(host) LOG.info(_LI("Connecting REQ to %s") % connect_address) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py index 876520de1..8b2365b41 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py @@ -15,6 +15,7 @@ import abc import collections import logging import random +import retrying import six @@ -62,30 +63,70 @@ class MatchMakerBase(object): :returns: a list of "hostname:port" hosts """ - def get_single_host(self, target): + def get_single_host(self, target, timeout=None, retry=0): """Get a single host by target. :param target: the target for messages :type target: Target + :param timeout: matchmaker query timeout + :type timeout: integer + :param retry: the number of retries to do + None or -1 means retry forever + 0 means do not retry + N means retry N times + :type retry: integer :returns: a "hostname:port" host """ - hosts = self.get_hosts(target) - if not hosts: - err_msg = "No hosts were found for target %s." % target - LOG.error(err_msg) - raise oslo_messaging.InvalidTarget(err_msg, target) + if not isinstance(timeout, int) and timeout is not None: + raise ValueError( + "timeout must be integer, not {0}".format(type(timeout))) + if not isinstance(retry, int) and retry is not None: + raise ValueError( + "retry must be integer, not {0}".format(type(retry))) - if len(hosts) == 1: - host = hosts[0] - LOG.info(_LI("A single host %(host)s found for target %(target)s.") - % {"host": host, "target": target}) + if timeout is None or timeout < 0: + full_timeout = 0 + retry_timeout = 0 else: - host = random.choice(hosts) - LOG.warning(_LW("Multiple hosts %(hosts)s were found for target " - " %(target)s. Using the random one - %(host)s.") + retry_timeout = timeout * 1000 + + if retry is None or retry < 0: + full_timeout = None + else: + full_timeout = retry * retry_timeout + + _retry = retrying.retry(stop_max_delay=full_timeout, + wait_fixed=retry_timeout) + + @_retry + def _get_single_host(): + hosts = self.get_hosts(target) + try: + if not hosts: + err_msg = "No hosts were found for target %s." % target + LOG.error(err_msg) + raise oslo_messaging.InvalidTarget(err_msg, target) + + if len(hosts) == 1: + host = hosts[0] + LOG.info(_LI( + "A single host %(host)s found for target %(target)s.") + % {"host": host, "target": target}) + else: + host = random.choice(hosts) + LOG.warning(_LW( + "Multiple hosts %(hosts)s were found for target " + " %(target)s. Using the random one - %(host)s.") % {"hosts": hosts, "target": target, "host": host}) - return host + return host + except oslo_messaging.InvalidTarget as ex: + if timeout: + raise oslo_messaging.MessagingTimeout() + else: + raise ex + + return _get_single_host() class DummyMatchMaker(MatchMakerBase): diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 4ff5d5867..d6c2797a2 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -101,8 +101,6 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertEqual(0, s.endpoint.ival) def test_timeout(self): - if self.url.startswith("zmq"): - self.skipTest("Skip CallTestCase.test_timeout for ZMQ driver") transport = self.useFixture(utils.TransportFixture(self.url)) target = oslo_messaging.Target(topic="no_such_topic") c = utils.ClientStub(transport.transport, target, timeout=1)