Merge "Added matchmaker timeouts and retries"
This commit is contained in:
commit
543d303bfd
oslo_messaging
_drivers/zmq_driver
tests/functional
@ -40,17 +40,17 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
|
|||||||
if request.msg_type != zmq_names.CALL_TYPE:
|
if request.msg_type != zmq_names.CALL_TYPE:
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
socket = self._connect_to_host(request.target)
|
socket = self._connect_to_host(request.target, request.timeout)
|
||||||
self._send_request(socket, request)
|
self._send_request(socket, request)
|
||||||
return self._receive_reply(socket, request)
|
return self._receive_reply(socket, request)
|
||||||
|
|
||||||
def _connect_to_host(self, target):
|
def _connect_to_host(self, target, timeout=0):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.zmq_context = zmq.Context()
|
self.zmq_context = zmq.Context()
|
||||||
socket = self.zmq_context.socket(zmq.REQ)
|
socket = self.zmq_context.socket(zmq.REQ)
|
||||||
|
|
||||||
host = self.matchmaker.get_single_host(target)
|
host = self.matchmaker.get_single_host(target, timeout)
|
||||||
connect_address = zmq_address.get_tcp_direct_address(host)
|
connect_address = zmq_address.get_tcp_direct_address(host)
|
||||||
|
|
||||||
LOG.info(_LI("Connecting REQ to %s") % connect_address)
|
LOG.info(_LI("Connecting REQ to %s") % connect_address)
|
||||||
|
@ -15,6 +15,7 @@ import abc
|
|||||||
import collections
|
import collections
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
import retrying
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -62,30 +63,70 @@ class MatchMakerBase(object):
|
|||||||
:returns: a list of "hostname:port" hosts
|
:returns: a list of "hostname:port" hosts
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_single_host(self, target):
|
def get_single_host(self, target, timeout=None, retry=0):
|
||||||
"""Get a single host by target.
|
"""Get a single host by target.
|
||||||
|
|
||||||
:param target: the target for messages
|
:param target: the target for messages
|
||||||
:type target: Target
|
:type target: Target
|
||||||
|
:param timeout: matchmaker query timeout
|
||||||
|
:type timeout: integer
|
||||||
|
:param retry: the number of retries to do
|
||||||
|
None or -1 means retry forever
|
||||||
|
0 means do not retry
|
||||||
|
N means retry N times
|
||||||
|
:type retry: integer
|
||||||
:returns: a "hostname:port" host
|
:returns: a "hostname:port" host
|
||||||
"""
|
"""
|
||||||
|
|
||||||
hosts = self.get_hosts(target)
|
if not isinstance(timeout, int) and timeout is not None:
|
||||||
if not hosts:
|
raise ValueError(
|
||||||
err_msg = "No hosts were found for target %s." % target
|
"timeout must be integer, not {0}".format(type(timeout)))
|
||||||
LOG.error(err_msg)
|
if not isinstance(retry, int) and retry is not None:
|
||||||
raise oslo_messaging.InvalidTarget(err_msg, target)
|
raise ValueError(
|
||||||
|
"retry must be integer, not {0}".format(type(retry)))
|
||||||
|
|
||||||
if len(hosts) == 1:
|
if timeout is None or timeout < 0:
|
||||||
host = hosts[0]
|
full_timeout = 0
|
||||||
LOG.info(_LI("A single host %(host)s found for target %(target)s.")
|
retry_timeout = 0
|
||||||
% {"host": host, "target": target})
|
|
||||||
else:
|
else:
|
||||||
host = random.choice(hosts)
|
retry_timeout = timeout * 1000
|
||||||
LOG.warning(_LW("Multiple hosts %(hosts)s were found for target "
|
|
||||||
" %(target)s. Using the random one - %(host)s.")
|
if retry is None or retry < 0:
|
||||||
|
full_timeout = None
|
||||||
|
else:
|
||||||
|
full_timeout = retry * retry_timeout
|
||||||
|
|
||||||
|
_retry = retrying.retry(stop_max_delay=full_timeout,
|
||||||
|
wait_fixed=retry_timeout)
|
||||||
|
|
||||||
|
@_retry
|
||||||
|
def _get_single_host():
|
||||||
|
hosts = self.get_hosts(target)
|
||||||
|
try:
|
||||||
|
if not hosts:
|
||||||
|
err_msg = "No hosts were found for target %s." % target
|
||||||
|
LOG.error(err_msg)
|
||||||
|
raise oslo_messaging.InvalidTarget(err_msg, target)
|
||||||
|
|
||||||
|
if len(hosts) == 1:
|
||||||
|
host = hosts[0]
|
||||||
|
LOG.info(_LI(
|
||||||
|
"A single host %(host)s found for target %(target)s.")
|
||||||
|
% {"host": host, "target": target})
|
||||||
|
else:
|
||||||
|
host = random.choice(hosts)
|
||||||
|
LOG.warning(_LW(
|
||||||
|
"Multiple hosts %(hosts)s were found for target "
|
||||||
|
" %(target)s. Using the random one - %(host)s.")
|
||||||
% {"hosts": hosts, "target": target, "host": host})
|
% {"hosts": hosts, "target": target, "host": host})
|
||||||
return host
|
return host
|
||||||
|
except oslo_messaging.InvalidTarget as ex:
|
||||||
|
if timeout:
|
||||||
|
raise oslo_messaging.MessagingTimeout()
|
||||||
|
else:
|
||||||
|
raise ex
|
||||||
|
|
||||||
|
return _get_single_host()
|
||||||
|
|
||||||
|
|
||||||
class DummyMatchMaker(MatchMakerBase):
|
class DummyMatchMaker(MatchMakerBase):
|
||||||
|
@ -101,8 +101,6 @@ class CallTestCase(utils.SkipIfNoTransportURL):
|
|||||||
self.assertEqual(0, s.endpoint.ival)
|
self.assertEqual(0, s.endpoint.ival)
|
||||||
|
|
||||||
def test_timeout(self):
|
def test_timeout(self):
|
||||||
if self.url.startswith("zmq"):
|
|
||||||
self.skipTest("Skip CallTestCase.test_timeout for ZMQ driver")
|
|
||||||
transport = self.useFixture(utils.TransportFixture(self.url))
|
transport = self.useFixture(utils.TransportFixture(self.url))
|
||||||
target = oslo_messaging.Target(topic="no_such_topic")
|
target = oslo_messaging.Target(topic="no_such_topic")
|
||||||
c = utils.ClientStub(transport.transport, target, timeout=1)
|
c = utils.ClientStub(transport.transport, target, timeout=1)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user