diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index 44d5b6ae3..42575fcd9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -14,6 +14,8 @@ import logging +import tenacity + from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_base from oslo_messaging._drivers.zmq_driver.client import zmq_receivers @@ -55,7 +57,7 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): """ def __init__(self, conf, matchmaker): - sender = zmq_senders.RequestSenderDirect(conf) + sender = zmq_senders.RequestSenderDirect(conf, async=True) receiver = zmq_receivers.ReceiverDirect(conf) super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender, receiver) @@ -90,11 +92,16 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): self.receiver.unregister_socket(socket) def send_request(self, socket, request): - if request.msg_type in zmq_names.MULTISEND_TYPES: - for _ in range(socket.connections_count()): + @tenacity.retry(retry=tenacity.retry_if_exception_type(zmq.Again), + stop=tenacity.stop_after_delay( + self.conf.rpc_response_timeout)) + def send_retrying(): + if request.msg_type in zmq_names.MULTISEND_TYPES: + for _ in range(socket.connections_count()): + self.sender.send(socket, request) + else: self.sender.send(socket, request) - else: - self.sender.send(socket, request) + return send_retrying() def cleanup(self): self.routing_table.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py index f63e1d716..9f3f6d72b 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py @@ -31,8 +31,9 @@ zmq = zmq_async.import_zmq() class SenderBase(object): """Base request/response sending interface.""" - def __init__(self, conf): + def __init__(self, conf, async=False): self.conf = conf + self.async = async self._lock = threading.Lock() self._send_versions = zmq_version.get_method_versions(self, 'send') @@ -155,11 +156,12 @@ class RequestSenderDirect(RequestSenderBase): "msg_version": request.message_version}) def _send_v_1_0(self, socket, request): - socket.send(b'', zmq.SNDMORE) - socket.send_string('1.0', zmq.SNDMORE) - socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) - socket.send_string(request.message_id, zmq.SNDMORE) - socket.send_dumped([request.context, request.message]) + flags = zmq.NOBLOCK if self.async else 0 + socket.send(b'', zmq.SNDMORE | flags) + socket.send_string('1.0', zmq.SNDMORE | flags) + socket.send(six.b(str(request.msg_type)), zmq.SNDMORE | flags) + socket.send_string(request.message_id, zmq.SNDMORE | flags) + socket.send_dumped([request.context, request.message], flags) class AckSenderDirect(AckSenderBase): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index e352d8fa0..c9c7cead8 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -56,6 +56,11 @@ class ZmqSocket(object): # Put messages to only connected queues self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0) + # Setup timeout on socket sending + if hasattr(self.conf, 'rpc_response_timeout'): + self.handle.setsockopt(zmq.SNDTIMEO, + self.conf.rpc_response_timeout * 1000) + # Configure TCP keep alive keepalive = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive if keepalive < 0: