From 492ffe9774ee4297bbf1a8954146cf218dfc90b3 Mon Sep 17 00:00:00 2001 From: ozamiatin <ozamiatin@mirantis.com> Date: Wed, 11 Jan 2017 13:19:45 +0200 Subject: [PATCH] [zmq] Distinguish Round-Robin/Fanout socket sending mode For zmq.DEALER socket there are two round-robin modes exist, controlled by zmq.IMMEDIATE option. Immediate True means sending only to running servers and False means to send to all connected servers (may be not up and running at the moment). If we do all messaging over DEALER socket as we do for direct connections, immediate=True better suits for direct CALL/CAST message types and immediate=False is better for fanout. This patch fixes things for dynamic connections as for static we already used the approach. Change-Id: I70c7aeb3c7a2c63c128bec394827577cab580def --- .../client/publishers/dealer/zmq_dealer_publisher_direct.py | 6 ++++-- .../_drivers/zmq_driver/client/zmq_sockets_manager.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index ccecda631..86c92a32b 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -72,12 +72,14 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): socket.connect_to_host(host) def acquire_connection(self, request): - socket = self.sockets_manager.get_socket() if request.msg_type in zmq_names.DIRECT_TYPES: + socket = self.sockets_manager.get_socket() self._get_round_robin_host_connection(request.target, socket) + return socket elif request.msg_type in zmq_names.MULTISEND_TYPES: + socket = self.sockets_manager.get_socket(immediate=False) self._get_fanout_connection(request.target, socket) - return socket + return socket def _finally_unregister(self, socket, request): super(DealerPublisherDirect, self)._finally_unregister(socket, request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index b94fc6fb2..7ce0b705e 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -33,9 +33,9 @@ class SocketsManager(object): self.socket_to_routers = None self.sockets = {} - def get_socket(self): + def get_socket(self, immediate=True): return zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type, immediate=False) + self.socket_type, immediate=immediate) def get_cached_socket(self, target_key, hosts=None, immediate=True): hosts = [] if hosts is None else hosts