From 31691745d43be5ea74772a8a2e4072616fdf0814 Mon Sep 17 00:00:00 2001 From: ozamiatin <ozamiatin@mirantis.com> Date: Mon, 16 May 2016 15:43:27 +0300 Subject: [PATCH] [zmq] Don't skip non-direct message types If using the router proxy we need to be able to dispatch all types of messages over the routers, not limiting them to direct types only. Also added fanout possibility to do on a client-side, so we can reduce latency on a proxy, and not using pub-sub for fanout when we don't want it. Change-Id: Ic88c306c1d386dd582cbccfc5719fba5668a9db8 --- oslo_messaging/_drivers/impl_zmq.py | 2 +- .../zmq_driver/broker/zmq_queue_proxy.py | 2 +- .../dealer/zmq_dealer_publisher_proxy.py | 18 +++++++++++++++--- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 5a3e3888c..529063def 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -87,7 +87,7 @@ zmq_opts = [ 'PUB/SUB always uses proxy.'), cfg.BoolOpt('use_router_proxy', default=True, - help='Use ROUTER remote proxy for direct methods.'), + help='Use ROUTER remote proxy.'), cfg.PortOpt('rpc_zmq_min_port', default=49153, diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 61f3e37a0..215f0a347 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -73,7 +73,7 @@ class UniversalQueueProxy(object): if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE, zmq_names.NOTIFY_TYPE): self.pub_publisher.send_request(message) - elif msg_type in zmq_names.DIRECT_TYPES: + else: self._redirect_message(self.be_router_socket if socket is self.fe_router_socket else self.fe_router_socket, message) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 1064e7279..5cba7820a 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -35,6 +35,7 @@ class DealerPublisherProxy(object): """Used when publishing to a proxy. """ def __init__(self, conf, matchmaker, socket_to_proxy): + self.conf = conf self.sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) self.socket = socket_to_proxy @@ -45,10 +46,17 @@ class DealerPublisherProxy(object): raise zmq_publisher_base.UnsupportedSendPattern( request.msg_type) - routing_key = self.routing_table.get_routable_host(request.target) \ - if request.msg_type in zmq_names.DIRECT_TYPES else \ - zmq_address.target_to_subscribe_filter(request.target) + if self.conf.use_pub_sub: + routing_key = self.routing_table.get_routable_host(request.target) \ + if request.msg_type in zmq_names.DIRECT_TYPES else \ + zmq_address.target_to_subscribe_filter(request.target) + self._do_send_request(request, routing_key) + else: + routing_keys = self.routing_table.get_all_hosts(request.target) + for routing_key in routing_keys: + self._do_send_request(request, routing_key) + def _do_send_request(self, request, routing_key): self.socket.send(b'', zmq.SNDMORE) self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) self.socket.send(six.b(routing_key), zmq.SNDMORE) @@ -132,6 +140,10 @@ class RoutingTable(object): self.routing_table = {} self.routable_hosts = {} + def get_all_hosts(self, target): + self._update_routing_table(target) + return list(self.routable_hosts.get(str(target)) or []) + def get_routable_host(self, target): self._update_routing_table(target) hosts_for_target = self.routable_hosts[str(target)]