[zmq] Don't skip non-direct message types

If using the router proxy we need to be able to
dispatch all types of messages over the routers,
not limiting them to direct types only.

Also added fanout possibility to do on a client-side,
so we can reduce latency on a proxy, and not using
pub-sub for fanout when we don't want it.

Change-Id: Ic88c306c1d386dd582cbccfc5719fba5668a9db8
This commit is contained in:
ozamiatin 2016-05-16 15:43:27 +03:00 committed by Oleksii Zamiatin
parent 8ee19159d2
commit 31691745d4
3 changed files with 17 additions and 5 deletions
oslo_messaging/_drivers
impl_zmq.py
zmq_driver

@ -87,7 +87,7 @@ zmq_opts = [
'PUB/SUB always uses proxy.'),
cfg.BoolOpt('use_router_proxy', default=True,
help='Use ROUTER remote proxy for direct methods.'),
help='Use ROUTER remote proxy.'),
cfg.PortOpt('rpc_zmq_min_port',
default=49153,

@ -73,7 +73,7 @@ class UniversalQueueProxy(object):
if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE,
zmq_names.NOTIFY_TYPE):
self.pub_publisher.send_request(message)
elif msg_type in zmq_names.DIRECT_TYPES:
else:
self._redirect_message(self.be_router_socket
if socket is self.fe_router_socket
else self.fe_router_socket, message)

@ -35,6 +35,7 @@ class DealerPublisherProxy(object):
"""Used when publishing to a proxy. """
def __init__(self, conf, matchmaker, socket_to_proxy):
self.conf = conf
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = socket_to_proxy
@ -45,10 +46,17 @@ class DealerPublisherProxy(object):
raise zmq_publisher_base.UnsupportedSendPattern(
request.msg_type)
routing_key = self.routing_table.get_routable_host(request.target) \
if request.msg_type in zmq_names.DIRECT_TYPES else \
zmq_address.target_to_subscribe_filter(request.target)
if self.conf.use_pub_sub:
routing_key = self.routing_table.get_routable_host(request.target) \
if request.msg_type in zmq_names.DIRECT_TYPES else \
zmq_address.target_to_subscribe_filter(request.target)
self._do_send_request(request, routing_key)
else:
routing_keys = self.routing_table.get_all_hosts(request.target)
for routing_key in routing_keys:
self._do_send_request(request, routing_key)
def _do_send_request(self, request, routing_key):
self.socket.send(b'', zmq.SNDMORE)
self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
self.socket.send(six.b(routing_key), zmq.SNDMORE)
@ -132,6 +140,10 @@ class RoutingTable(object):
self.routing_table = {}
self.routable_hosts = {}
def get_all_hosts(self, target):
self._update_routing_table(target)
return list(self.routable_hosts.get(str(target)) or [])
def get_routable_host(self, target):
self._update_routing_table(target)
hosts_for_target = self.routable_hosts[str(target)]