Fix target resolution mismatch in neutron, nova, heat

Some tempest tests were failing because of NoSuchMethod,
UnsupportedVersion and other missed endpoint errors.

This fix provides new listener per each target and
more straight-forward matchmaker target resolution logic.

Change-Id: I4bfb42048630a0eab075e462ad1e22ebe9a45820
Closes-Bug: #1501682
This commit is contained in:
Oleksii Zamiatin 2015-10-08 22:26:01 +03:00
parent b93d208543
commit ea106e9a09
17 changed files with 286 additions and 101 deletions

@ -229,7 +229,7 @@ class ZmqDriver(base.BaseDriver):
:param target: Message destination target :param target: Message destination target
:type target: oslo_messaging.Target :type target: oslo_messaging.Target
""" """
server = self.server.get() server = zmq_server.ZmqServer(self, self.conf, self.matchmaker)
server.listen(target) server.listen(target)
return server return server

@ -16,7 +16,6 @@ import logging
import os import os
from oslo_utils import excutils from oslo_utils import excutils
import six
from stevedore import driver from stevedore import driver
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
@ -51,11 +50,8 @@ class ZmqBroker(object):
).driver(self.conf) ).driver(self.conf)
self.context = zmq.Context() self.context = zmq.Context()
self.queue = six.moves.queue.Queue() self.proxies = [zmq_queue_proxy.UniversalQueueProxy(
self.proxies = [zmq_queue_proxy.OutgoingQueueProxy( conf, self.context, self.matchmaker)
conf, self.context, self.queue, self.matchmaker),
zmq_queue_proxy.IncomingQueueProxy(
conf, self.context, self.queue)
] ]
def _create_ipc_dirs(self): def _create_ipc_dirs(self):

@ -14,65 +14,69 @@
import logging import logging
import six
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers\ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_dealer_publisher import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq(zmq_concurrency='native') zmq = zmq_async.import_zmq(zmq_concurrency='native')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class OutgoingQueueProxy(zmq_base_proxy.BaseProxy): class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
def __init__(self, conf, context, matchmaker):
super(UniversalQueueProxy, self).__init__(conf, context)
self.poller = zmq_async.get_poller(zmq_concurrency='native')
self.router_socket = context.socket(zmq.ROUTER)
self.router_socket.bind(zmq_address.get_broker_address(conf))
self.poller.register(self.router_socket, self._receive_in_request)
LOG.info(_LI("Polling at universal proxy"))
def __init__(self, conf, context, queue, matchmaker):
super(OutgoingQueueProxy, self).__init__(conf, context)
self.queue = queue
self.matchmaker = matchmaker self.matchmaker = matchmaker
self.publisher = zmq_dealer_publisher.DealerPublisher( reply_receiver = zmq_dealer_publisher.ReplyReceiver(self.poller)
conf, matchmaker) self.publisher = zmq_dealer_publisher.DealerPublisherProxy(
LOG.info(_LI("Polling at outgoing proxy ...")) conf, matchmaker, reply_receiver)
def run(self): def run(self):
try: message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
request = self.queue.get(timeout=self.conf.rpc_poll_timeout) if message is None:
LOG.info(_LI("Redirecting request %s to TCP publisher ...")
% request)
self.publisher.send_request(request)
except six.moves.queue.Empty:
return return
if socket == self.router_socket:
self._redirect_in_request(message)
else:
self._redirect_reply(message)
class IncomingQueueProxy(zmq_base_proxy.BaseProxy): def _redirect_in_request(self, request):
LOG.info(_LI("-> Redirecting request %s to TCP publisher")
% request)
self.publisher.send_request(request)
def __init__(self, conf, context, queue): def _redirect_reply(self, reply):
super(IncomingQueueProxy, self).__init__(conf, context) LOG.info(_LI("Reply proxy %s") % reply)
self.poller = zmq_async.get_poller( if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
zmq_concurrency='native') LOG.info(_LI("Acknowledge dropped %s") % reply)
self.queue = queue
self.socket = context.socket(zmq.ROUTER)
self.socket.bind(zmq_address.get_broker_address(conf))
self.poller.register(self.socket, self.receive_request)
LOG.info(_LI("Polling at incoming proxy ..."))
def run(self):
request, socket = self.poller.poll(self.conf.rpc_poll_timeout)
if request is None:
return return
LOG.info(_LI("Received request and queue it: %s") % str(request)) LOG.info(_LI("<- Redirecting reply to ROUTER: reply: %s")
% reply[zmq_names.IDX_REPLY_BODY:])
self.queue.put(request) self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
def receive_request(self, socket): def _receive_in_request(self, socket):
reply_id = socket.recv() reply_id = socket.recv()
assert reply_id is not None, "Valid id expected" assert reply_id is not None, "Valid id expected"
empty = socket.recv() empty = socket.recv()
assert empty == b'', "Empty delimiter expected" assert empty == b'', "Empty delimiter expected"
return socket.recv_pyobj() envelope = socket.recv_pyobj()
if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE:
envelope[zmq_names.FIELD_REPLY_ID] = reply_id
payload = socket.recv_multipart()
payload.insert(0, envelope)
return payload

@ -29,12 +29,10 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker): def __init__(self, conf, matchmaker):
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER) super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
self.ack_receiver = AcknowledgementReceiver()
def send_request(self, request): def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE: self._check_request_pattern(request)
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
dealer_socket, hosts = self._check_hosts_connections(request.target) dealer_socket, hosts = self._check_hosts_connections(request.target)
@ -47,25 +45,26 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
% request.msg_type) % request.msg_type)
return return
self.ack_receiver.track_socket(dealer_socket.handle)
if request.msg_type in zmq_names.MULTISEND_TYPES: if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()): for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, request) self._send_request(dealer_socket, request)
else: else:
self._send_request(dealer_socket, request) self._send_request(dealer_socket, request)
def _check_request_pattern(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
def _send_request(self, socket, request): def _send_request(self, socket, request):
socket.send(b'', zmq.SNDMORE) socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request) socket.send_pyobj(request)
LOG.info(_LI("Sending message %(message)s to a target %(target)s") LOG.info(_LI("Sending message_id %(message)s to a target %(target)s")
% {"message": request.message, % {"message": request.message_id,
"target": request.target}) "target": request.target})
def cleanup(self): def cleanup(self):
self.ack_receiver.cleanup()
super(DealerPublisher, self).cleanup() super(DealerPublisher, self).cleanup()
@ -81,7 +80,10 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
if request.msg_type == zmq_names.CALL_TYPE: if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
envelope = request.create_envelope()
self.socket.send(b'', zmq.SNDMORE) self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(envelope, zmq.SNDMORE)
self.socket.send_pyobj(request) self.socket.send_pyobj(request)
def cleanup(self): def cleanup(self):
@ -89,6 +91,67 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
self.socket.close() self.socket.close()
class DealerPublisherProxy(DealerPublisher):
def __init__(self, conf, matchmaker, reply_receiver):
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
self.reply_receiver = reply_receiver
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.info(_LI("Envelope: %s") % envelope)
target = envelope[zmq_names.FIELD_TARGET]
dealer_socket, hosts = self._check_hosts_connections(target)
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection")
% envelope[zmq_names.FIELD_MSG_TYPE])
return
self.reply_receiver.track_socket(dealer_socket.handle)
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": envelope[zmq_names.FIELD_MSG_ID],
"target": envelope[zmq_names.FIELD_TARGET]})
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, multipart_message)
else:
self._send_request(dealer_socket, multipart_message)
def _send_request(self, socket, multipart_message):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
zmq.SNDMORE)
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
class ReplyReceiver(object):
def __init__(self, poller):
self.poller = poller
LOG.info(_LI("Reply waiter created in broker"))
def _receive_reply(self, socket):
return socket.recv_multipart()
def track_socket(self, socket):
self.poller.register(socket, self._receive_reply)
def cleanup(self):
self.poller.close()
class AcknowledgementReceiver(object): class AcknowledgementReceiver(object):
def __init__(self): def __init__(self):

@ -89,6 +89,13 @@ class PublisherBase(object):
:param request: Message data and destination container object :param request: Message data and destination container object
:type request: zmq_request.Request :type request: zmq_request.Request
""" """
LOG.info(_LI("Sending %(type)s message_id %(message)s to a target"
"%(target)s key: %(key)s, host:%(host)s")
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"key": zmq_address.target_to_key(request.target),
"host": request.host})
socket.send_pyobj(request) socket.send_pyobj(request)
def cleanup(self): def cleanup(self):

@ -14,6 +14,9 @@
import contextlib import contextlib
import logging import logging
import uuid
import six
import oslo_messaging import oslo_messaging
from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers import common as rpc_common
@ -40,24 +43,34 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
if request.msg_type != zmq_names.CALL_TYPE: if request.msg_type != zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket = self._connect_to_host(request.target, request.timeout) socket, connect_address = self._connect_to_host(request.target,
request.timeout)
request.host = connect_address
self._send_request(socket, request) self._send_request(socket, request)
return self._receive_reply(socket, request) return self._receive_reply(socket, request)
def _resolve_host_address(self, target, timeout=0):
host = self.matchmaker.get_single_host(target, timeout)
return zmq_address.get_tcp_direct_address(host)
def _connect_to_host(self, target, timeout=0): def _connect_to_host(self, target, timeout=0):
try: try:
self.zmq_context = zmq.Context() self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ) socket = self.zmq_context.socket(zmq.REQ)
host = self.matchmaker.get_single_host(target, timeout) if six.PY3:
connect_address = zmq_address.get_tcp_direct_address(host) socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
else:
socket.identity = str(uuid.uuid1())
connect_address = self._resolve_host_address(target, timeout)
LOG.info(_LI("Connecting REQ to %s") % connect_address) LOG.info(_LI("Connecting REQ to %s") % connect_address)
socket.connect(connect_address) socket.connect(connect_address)
self.outbound_sockets[str(target)] = socket self.outbound_sockets[str(target)] = socket
return socket return socket, connect_address
except zmq.ZMQError as e: except zmq.ZMQError as e:
errmsg = _LE("Error connecting to socket: %s") % str(e) errmsg = _LE("Error connecting to socket: %s") % str(e)
@ -77,6 +90,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
if reply is None: if reply is None:
raise oslo_messaging.MessagingTimeout( raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout) "Timeout %s seconds was reached" % request.timeout)
LOG.info(_LI("Received reply %s") % reply)
if reply[zmq_names.FIELD_FAILURE]: if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception( raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE], reply[zmq_names.FIELD_FAILURE],
@ -87,3 +101,26 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
def close(self): def close(self):
# For contextlib compatibility # For contextlib compatibility
self.cleanup() self.cleanup()
class ReqPublisherLight(ReqPublisher):
def __init__(self, conf, matchmaker):
super(ReqPublisherLight, self).__init__(conf, matchmaker)
def _resolve_host_address(self, target, timeout=0):
return zmq_address.get_broker_address(self.conf)
def _send_request(self, socket, request):
LOG.info(_LI("Sending %(type)s message_id %(message)s"
" to a target %(target)s, host:%(host)s")
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"host": request.host})
envelope = request.create_envelope()
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)

@ -37,16 +37,18 @@ class ZmqClient(object):
if self.conf.zmq_use_broker: if self.conf.zmq_use_broker:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight( self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(self.conf)) conf, zmq_address.get_broker_address(self.conf))
self.req_publisher_cls = zmq_req_publisher.ReqPublisherLight
else: else:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher( self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker) conf, matchmaker)
self.req_publisher_cls = zmq_req_publisher.ReqPublisher
def send_call(self, target, context, message, timeout=None, retry=None): def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest( with contextlib.closing(zmq_request.CallRequest(
target, context=context, message=message, target, context=context, message=message,
timeout=timeout, retry=retry, timeout=timeout, retry=retry,
allowed_remote_exmods=self.allowed_remote_exmods)) as request: allowed_remote_exmods=self.allowed_remote_exmods)) as request:
with contextlib.closing(zmq_req_publisher.ReqPublisher( with contextlib.closing(self.req_publisher_cls(
self.conf, self.matchmaker)) as req_publisher: self.conf, self.matchmaker)) as req_publisher:
return req_publisher.send_request(request) return req_publisher.send_request(request)

@ -63,6 +63,12 @@ class Request(object):
self.message = message self.message = message
self.retry = retry self.retry = retry
self.message_id = str(uuid.uuid1()) self.message_id = str(uuid.uuid1())
self.proxy_reply_id = None
def create_envelope(self):
return {'msg_type': self.msg_type,
'message_id': self.message_id,
'target': self.target}
@abc.abstractproperty @abc.abstractproperty
def msg_type(self): def msg_type(self):
@ -86,6 +92,11 @@ class RpcRequest(Request):
super(RpcRequest, self).__init__(*args, **kwargs) super(RpcRequest, self).__init__(*args, **kwargs)
def create_envelope(self):
envelope = super(RpcRequest, self).create_envelope()
envelope['timeout'] = self.timeout
return envelope
class CallRequest(RpcRequest): class CallRequest(RpcRequest):

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_utils import importutils from oslo_utils import importutils
from oslo_messaging._drivers.zmq_driver.matchmaker import base from oslo_messaging._drivers.zmq_driver.matchmaker import base
from oslo_messaging._drivers.zmq_driver import zmq_address
redis = importutils.try_import('redis') redis = importutils.try_import('redis')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -48,34 +49,30 @@ class RedisMatchMaker(base.MatchMakerBase):
password=self.conf.matchmaker_redis.password, password=self.conf.matchmaker_redis.password,
) )
def _target_to_key(self, target):
attributes = ['topic', 'exchange', 'server']
prefix = "ZMQ-target"
key = ":".join((getattr(target, attr) or "*") for attr in attributes)
return "%s-%s" % (prefix, key)
def _get_keys_by_pattern(self, pattern):
return self._redis.keys(pattern)
def _get_hosts_by_key(self, key): def _get_hosts_by_key(self, key):
return self._redis.lrange(key, 0, -1) return self._redis.lrange(key, 0, -1)
def register(self, target, hostname): def register(self, target, hostname):
key = self._target_to_key(target)
if hostname not in self._get_hosts_by_key(key): if target.topic and target.server:
self._redis.lpush(key, hostname) key = zmq_address.target_to_key(target)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
if target.topic:
if hostname not in self._get_hosts_by_key(target.topic):
self._redis.lpush(target.topic, hostname)
if target.server:
if hostname not in self._get_hosts_by_key(target.server):
self._redis.lpush(target.server, hostname)
def unregister(self, target, hostname): def unregister(self, target, hostname):
key = self._target_to_key(target) key = zmq_address.target_to_key(target)
self._redis.lrem(key, 0, hostname) self._redis.lrem(key, 0, hostname)
def get_hosts(self, target): def get_hosts(self, target):
pattern = self._target_to_key(target)
if "*" not in pattern:
# pattern have no placeholders, so this is valid key
return self._get_hosts_by_key(pattern)
hosts = [] hosts = []
for key in self._get_keys_by_pattern(pattern): key = zmq_address.target_to_key(target)
hosts.extend(self._get_hosts_by_key(key)) hosts.extend(self._get_hosts_by_key(key))
return hosts return hosts

@ -38,12 +38,17 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
self.recv_methods = {} self.recv_methods = {}
def register(self, socket, recv_method=None): def register(self, socket, recv_method=None):
if socket in self.recv_methods:
return
if recv_method is not None: if recv_method is not None:
self.recv_methods[socket] = recv_method self.recv_methods[socket] = recv_method
self.poller.register(socket, zmq.POLLIN) self.poller.register(socket, zmq.POLLIN)
def poll(self, timeout=None): def poll(self, timeout=None):
timeout *= 1000 # zmq poller waits milliseconds
if timeout:
timeout *= 1000 # zmq poller waits milliseconds
sockets = None sockets = None
try: try:

@ -19,6 +19,7 @@ import six
from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE, _LI from oslo_messaging._i18n import _LE, _LI
@ -44,7 +45,7 @@ class ConsumerBase(object):
self.sockets.append(socket) self.sockets.append(socket)
self.poller.register(socket, self.receive_message) self.poller.register(socket, self.receive_message)
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"), LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
{"stype": socket_type, {"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address, "addr": socket.bind_address,
"port": socket.port}) "port": socket.port})
return socket return socket

@ -43,11 +43,7 @@ class RouterIncomingMessage(base.IncomingMessage):
"""Reply is not needed for non-call messages""" """Reply is not needed for non-call messages"""
def acknowledge(self): def acknowledge(self):
LOG.info("Sending acknowledge for %s", self.msg_id) LOG.info("Not sending acknowledge for %s", self.msg_id)
ack_message = {zmq_names.FIELD_ID: self.msg_id}
self.socket.send(self.reply_id, zmq.SNDMORE)
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(ack_message)
def requeue(self): def requeue(self):
"""Requeue is not supported""" """Requeue is not supported"""
@ -61,11 +57,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
self.targets = [] self.targets = []
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
self.port) self.port)
LOG.info("[%s] Run ROUTER consumer" % self.host)
def listen(self, target): def listen(self, target):
LOG.info("Listen to target %s on %s:%d" % LOG.info("[%s] Listen to target %s" % (self.host, target))
(target, self.address, self.port))
self.targets.append(target) self.targets.append(target)
self.matchmaker.register(target=target, self.matchmaker.register(target=target,
@ -76,21 +72,25 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
for target in self.targets: for target in self.targets:
self.matchmaker.unregister(target, self.host) self.matchmaker.unregister(target, self.host)
def _receive_request(self, socket):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
request = socket.recv_pyobj()
return request, reply_id
def receive_message(self, socket): def receive_message(self, socket):
try: try:
reply_id = socket.recv() request, reply_id = self._receive_request(socket)
empty = socket.recv() LOG.info(_LI("[%(host)s] Received %(type)s, %(id)s, %(target)s")
assert empty == b'', 'Bad format: empty delimiter expected' % {"host": self.host,
request = socket.recv_pyobj() "type": request.msg_type,
"id": request.message_id,
LOG.info(_LI("Received %(msg_type)s message %(msg)s") "target": request.target})
% {"msg_type": request.msg_type,
"msg": str(request.message)})
if request.msg_type == zmq_names.CALL_TYPE: if request.msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest( return zmq_incoming_message.ZmqIncomingRequest(
self.server, request.context, request.message, socket, self.server, socket, reply_id, request, self.poller)
reply_id, self.poller)
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
return RouterIncomingMessage( return RouterIncomingMessage(
self.server, request.context, request.message, socket, self.server, request.context, request.message, socket,
@ -100,3 +100,20 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
except zmq.ZMQError as e: except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e)) LOG.error(_LE("Receiving message failed: %s") % str(e))
class RouterConsumerBroker(RouterConsumer):
def __init__(self, conf, poller, server):
super(RouterConsumerBroker, self).__init__(conf, poller, server)
def _receive_request(self, socket):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
envelope = socket.recv_pyobj()
request = socket.recv_pyobj()
if zmq_names.FIELD_REPLY_ID in envelope:
request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID]
return request, reply_id

@ -28,10 +28,12 @@ zmq = zmq_async.import_zmq()
class ZmqIncomingRequest(base.IncomingMessage): class ZmqIncomingRequest(base.IncomingMessage):
def __init__(self, listener, context, message, socket, rep_id, poller): def __init__(self, listener, socket, rep_id, request, poller):
super(ZmqIncomingRequest, self).__init__(listener, context, message) super(ZmqIncomingRequest, self).__init__(listener, request.context,
request.message)
self.reply_socket = socket self.reply_socket = socket
self.reply_id = rep_id self.reply_id = rep_id
self.request = request
self.received = None self.received = None
self.poller = poller self.poller = poller
@ -39,15 +41,21 @@ class ZmqIncomingRequest(base.IncomingMessage):
if failure is not None: if failure is not None:
failure = rpc_common.serialize_remote_exception(failure, failure = rpc_common.serialize_remote_exception(failure,
log_failure) log_failure)
message_reply = {zmq_names.FIELD_REPLY: reply, message_reply = {zmq_names.FIELD_TYPE: zmq_names.REPLY_TYPE,
zmq_names.FIELD_REPLY: reply,
zmq_names.FIELD_FAILURE: failure, zmq_names.FIELD_FAILURE: failure,
zmq_names.FIELD_LOG_FAILURE: log_failure} zmq_names.FIELD_LOG_FAILURE: log_failure,
zmq_names.FIELD_ID: self.request.proxy_reply_id}
LOG.info("Replying %s REP", (str(message_reply))) LOG.info("Replying %s REP", (str(self.request.message_id)))
self.received = True self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE)
if self.request.proxy_reply_id:
self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE)
self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_pyobj(message_reply) self.reply_socket.send_pyobj(message_reply)
self.poller.resume_polling(self.reply_socket) self.poller.resume_polling(self.reply_socket)

@ -31,8 +31,12 @@ class ZmqServer(base.Listener):
super(ZmqServer, self).__init__(driver) super(ZmqServer, self).__init__(driver)
self.matchmaker = matchmaker self.matchmaker = matchmaker
self.poller = zmq_async.get_poller() self.poller = zmq_async.get_poller()
self.rpc_consumer = zmq_router_consumer.RouterConsumer( if conf.zmq_use_broker:
conf, self.poller, self) self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker(
conf, self.poller, self)
else:
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self)
self.notify_consumer = self.rpc_consumer self.notify_consumer = self.rpc_consumer
self.consumers = [self.rpc_consumer] self.consumers = [self.rpc_consumer]

@ -27,3 +27,14 @@ def get_tcp_random_address(conf):
def get_broker_address(conf): def get_broker_address(conf):
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
def target_to_key(target):
if target.topic and target.server:
attributes = ['topic', 'server']
key = ".".join(getattr(target, attr) for attr in attributes)
return key
if target.topic:
return target.topic
if target.server:
return target.server

@ -17,10 +17,23 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
FIELD_TYPE = 'type'
FIELD_FAILURE = 'failure' FIELD_FAILURE = 'failure'
FIELD_REPLY = 'reply' FIELD_REPLY = 'reply'
FIELD_LOG_FAILURE = 'log_failure' FIELD_LOG_FAILURE = 'log_failure'
FIELD_ID = 'id' FIELD_ID = 'id'
FIELD_MSG_ID = 'message_id'
FIELD_MSG_TYPE = 'msg_type'
FIELD_REPLY_ID = 'reply_id'
FIELD_TARGET = 'target'
IDX_REPLY_TYPE = 1
IDX_REPLY_BODY = 2
MULTIPART_IDX_ENVELOPE = 0
MULTIPART_IDX_BODY = 1
CALL_TYPE = 'call' CALL_TYPE = 'call'
CAST_TYPE = 'cast' CAST_TYPE = 'cast'
@ -28,6 +41,9 @@ CAST_FANOUT_TYPE = 'cast-f'
NOTIFY_TYPE = 'notify' NOTIFY_TYPE = 'notify'
NOTIFY_FANOUT_TYPE = 'notify-f' NOTIFY_FANOUT_TYPE = 'notify-f'
REPLY_TYPE = 'reply'
ACK_TYPE = 'ack'
MESSAGE_TYPES = (CALL_TYPE, MESSAGE_TYPES = (CALL_TYPE,
CAST_TYPE, CAST_TYPE,
CAST_FANOUT_TYPE, CAST_FANOUT_TYPE,

@ -57,6 +57,9 @@ class ZmqSocket(object):
def send_pyobj(self, *args, **kwargs): def send_pyobj(self, *args, **kwargs):
self.handle.send_pyobj(*args, **kwargs) self.handle.send_pyobj(*args, **kwargs)
def send_multipart(self, *args, **kwargs):
self.handle.send_multipart(*args, **kwargs)
def recv(self, *args, **kwargs): def recv(self, *args, **kwargs):
return self.handle.recv(*args, **kwargs) return self.handle.recv(*args, **kwargs)
@ -69,6 +72,9 @@ class ZmqSocket(object):
def recv_pyobj(self, *args, **kwargs): def recv_pyobj(self, *args, **kwargs):
return self.handle.recv_pyobj(*args, **kwargs) return self.handle.recv_pyobj(*args, **kwargs)
def recv_multipart(self, *args, **kwargs):
return self.handle.recv_multipart(*args, **kwargs)
def close(self, *args, **kwargs): def close(self, *args, **kwargs):
self.handle.close(*args, **kwargs) self.handle.close(*args, **kwargs)