Merge "[zmq] Dynamic connections send failure"
This commit is contained in:
commit
bf966f6f1f
oslo_messaging/_drivers/zmq_driver
@ -14,6 +14,8 @@
|
||||
|
||||
import logging
|
||||
|
||||
import tenacity
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
|
||||
@ -55,7 +57,7 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
sender = zmq_senders.RequestSenderDirect(conf)
|
||||
sender = zmq_senders.RequestSenderDirect(conf, async=True)
|
||||
receiver = zmq_receivers.ReceiverDirect(conf)
|
||||
super(DealerPublisherDirect, self).__init__(conf, matchmaker,
|
||||
sender, receiver)
|
||||
@ -90,11 +92,16 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
self.receiver.unregister_socket(socket)
|
||||
|
||||
def send_request(self, socket, request):
|
||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(socket.connections_count()):
|
||||
@tenacity.retry(retry=tenacity.retry_if_exception_type(zmq.Again),
|
||||
stop=tenacity.stop_after_delay(
|
||||
self.conf.rpc_response_timeout))
|
||||
def send_retrying():
|
||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(socket.connections_count()):
|
||||
self.sender.send(socket, request)
|
||||
else:
|
||||
self.sender.send(socket, request)
|
||||
else:
|
||||
self.sender.send(socket, request)
|
||||
return send_retrying()
|
||||
|
||||
def cleanup(self):
|
||||
self.routing_table.cleanup()
|
||||
|
@ -31,8 +31,9 @@ zmq = zmq_async.import_zmq()
|
||||
class SenderBase(object):
|
||||
"""Base request/response sending interface."""
|
||||
|
||||
def __init__(self, conf):
|
||||
def __init__(self, conf, async=False):
|
||||
self.conf = conf
|
||||
self.async = async
|
||||
self._lock = threading.Lock()
|
||||
self._send_versions = zmq_version.get_method_versions(self, 'send')
|
||||
|
||||
@ -155,11 +156,12 @@ class RequestSenderDirect(RequestSenderBase):
|
||||
"msg_version": request.message_version})
|
||||
|
||||
def _send_v_1_0(self, socket, request):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_string('1.0', zmq.SNDMORE)
|
||||
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
|
||||
socket.send_string(request.message_id, zmq.SNDMORE)
|
||||
socket.send_dumped([request.context, request.message])
|
||||
flags = zmq.NOBLOCK if self.async else 0
|
||||
socket.send(b'', zmq.SNDMORE | flags)
|
||||
socket.send_string('1.0', zmq.SNDMORE | flags)
|
||||
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE | flags)
|
||||
socket.send_string(request.message_id, zmq.SNDMORE | flags)
|
||||
socket.send_dumped([request.context, request.message], flags)
|
||||
|
||||
|
||||
class AckSenderDirect(AckSenderBase):
|
||||
|
@ -56,6 +56,11 @@ class ZmqSocket(object):
|
||||
# Put messages to only connected queues
|
||||
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
|
||||
|
||||
# Setup timeout on socket sending
|
||||
if hasattr(self.conf, 'rpc_response_timeout'):
|
||||
self.handle.setsockopt(zmq.SNDTIMEO,
|
||||
self.conf.rpc_response_timeout * 1000)
|
||||
|
||||
# Configure TCP keep alive
|
||||
keepalive = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive
|
||||
if keepalive < 0:
|
||||
|
Loading…
x
Reference in New Issue
Block a user