diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 7ab43fb31..3da557338 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -132,7 +132,7 @@ class ZmqDriver(base.BaseDriver): return conf.oslo_messaging_zmq.rpc_zmq_matchmaker if matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS: raise rpc_common.RPCException( - _LE("Incorrect matchmaker backend name %(backend_name)s!" + _LE("Incorrect matchmaker backend name %(backend_name)s! " "Available names are: %(available_names)s") % {"backend_name": matchmaker_backend, "available_names": zmq_options.MATCHMAKER_BACKENDS}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py index dd992e085..ba58d3ad0 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py @@ -14,6 +14,7 @@ import abc import logging +import threading import six @@ -31,10 +32,11 @@ class SenderBase(object): def __init__(self, conf): self.conf = conf + self._lock = threading.Lock() @abc.abstractmethod def send(self, socket, message): - pass + """Send a message via a socket in a thread-safe manner.""" class RequestSender(SenderBase): @@ -52,11 +54,8 @@ class ReplySender(SenderBase): class RequestSenderProxy(SenderBase): def send(self, socket, request): - socket.send(b'', zmq.SNDMORE) - socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) - socket.send(request.routing_key, zmq.SNDMORE) - socket.send_string(request.message_id, zmq.SNDMORE) - socket.send_dumped([request.context, request.message]) + with self._lock: + self._send(socket, request) LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message " "%(msg_id)s to target %(target)s", @@ -65,47 +64,60 @@ class RequestSenderProxy(SenderBase): "msg_id": request.message_id, "target": request.target}) + def _send(self, socket, request): + socket.send(b'', zmq.SNDMORE) + socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) + socket.send(request.routing_key, zmq.SNDMORE) + socket.send_string(request.message_id, zmq.SNDMORE) + socket.send_dumped([request.context, request.message]) + class AckSenderProxy(AckSender): def send(self, socket, ack): assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!" - socket.send(b'', zmq.SNDMORE) - socket.send(six.b(str(ack.msg_type)), zmq.SNDMORE) - socket.send(ack.reply_id, zmq.SNDMORE) - socket.send_string(ack.message_id) + with self._lock: + self._send(socket, ack) LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s for %(msg_id)s", {"addr": list(socket.connections), "msg_type": zmq_names.message_type_str(ack.msg_type), "msg_id": ack.message_id}) + def _send(self, socket, ack): + socket.send(b'', zmq.SNDMORE) + socket.send(six.b(str(ack.msg_type)), zmq.SNDMORE) + socket.send(ack.reply_id, zmq.SNDMORE) + socket.send_string(ack.message_id) + class ReplySenderProxy(SenderBase): def send(self, socket, reply): assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!" - socket.send(b'', zmq.SNDMORE) - socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE) - socket.send(reply.reply_id, zmq.SNDMORE) - socket.send_string(reply.message_id, zmq.SNDMORE) - socket.send_dumped([reply.reply_body, reply.failure]) + with self._lock: + self._send(socket, reply) LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s for %(msg_id)s", {"addr": list(socket.connections), "msg_type": zmq_names.message_type_str(reply.msg_type), "msg_id": reply.message_id}) + def _send(self, socket, reply): + socket.send(b'', zmq.SNDMORE) + socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE) + socket.send(reply.reply_id, zmq.SNDMORE) + socket.send_string(reply.message_id, zmq.SNDMORE) + socket.send_dumped([reply.reply_body, reply.failure]) + class RequestSenderDirect(SenderBase): def send(self, socket, request): - socket.send(b'', zmq.SNDMORE) - socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) - socket.send_string(request.message_id, zmq.SNDMORE) - socket.send_dumped([request.context, request.message]) + with self._lock: + self._send(socket, request) LOG.debug("Sending %(msg_type)s message %(msg_id)s to " "target %(target)s", @@ -113,28 +125,42 @@ class RequestSenderDirect(SenderBase): "msg_id": request.message_id, "target": request.target}) + def _send(self, socket, request): + socket.send(b'', zmq.SNDMORE) + socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) + socket.send_string(request.message_id, zmq.SNDMORE) + socket.send_dumped([request.context, request.message]) + class AckSenderDirect(AckSender): def send(self, socket, ack): assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!" - # not implemented yet + with self._lock: + self._send(socket, ack) LOG.debug("Sending %(msg_type)s for %(msg_id)s", {"msg_type": zmq_names.message_type_str(ack.msg_type), "msg_id": ack.message_id}) + def _send(self, socket, ack): + raise NotImplementedError() + class ReplySenderDirect(SenderBase): def send(self, socket, reply): assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!" - socket.send(reply.reply_id, zmq.SNDMORE) - socket.send(b'', zmq.SNDMORE) - socket.send_dumped(reply.to_dict()) + with self._lock: + self._send(socket, reply) LOG.debug("Sending %(msg_type)s for %(msg_id)s", {"msg_type": zmq_names.message_type_str(reply.msg_type), "msg_id": reply.message_id}) + + def _send(self, socket, reply): + socket.send(reply.reply_id, zmq.SNDMORE) + socket.send(b'', zmq.SNDMORE) + socket.send_dumped(reply.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index eec2004db..4d481518b 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -87,7 +87,7 @@ zmq_opts = [ help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), - cfg.BoolOpt('use_router_proxy', default=False, + cfg.BoolOpt('use_router_proxy', default=True, deprecated_group='DEFAULT', help='Use ROUTER remote proxy.'), @@ -115,7 +115,7 @@ zmq_opts = [ help='Default serialization mechanism for ' 'serializing/deserializing outgoing/incoming messages'), - cfg.BoolOpt('zmq_immediate', default=False, + cfg.BoolOpt('zmq_immediate', default=True, help='This option configures round-robin mode in zmq socket. ' 'True means not keeping a queue when server side ' 'disconnects. False means to keep queue and messages '