diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py index 02bff5a83..cf86e5c73 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py @@ -13,6 +13,9 @@ # under the License. import logging +import uuid + +import six from oslo_messaging._drivers.zmq_driver.proxy.central \ import zmq_publisher_proxy @@ -23,9 +26,10 @@ from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._i18n import _LI, _LE -zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) +zmq = zmq_async.import_zmq() + def check_message_format(func): def _check_message_format(*args, **kwargs): @@ -39,27 +43,27 @@ def check_message_format(func): class SingleRouterProxy(object): + PROXY_TYPE = "ROUTER" + def __init__(self, conf, context, matchmaker): self.conf = conf self.context = context - super(SingleRouterProxy, self).__init__() self.matchmaker = matchmaker - host = conf.zmq_proxy_opts.host + + LOG.info(_LI("Running %s proxy") % self.PROXY_TYPE) self.poller = zmq_async.get_poller() port = conf.zmq_proxy_opts.frontend_port - self.fe_router_socket = zmq_socket.ZmqFixedPortSocket( - conf, context, zmq.ROUTER, host, - conf.zmq_proxy_opts.frontend_port) if port != 0 else \ - zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, - host) + self.fe_router_socket = self._create_router_socket(conf, context, port) self.poller.register(self.fe_router_socket, self._receive_message) - self.publisher = zmq_publisher_proxy.PublisherProxy( - conf, matchmaker) + self.publisher = zmq_publisher_proxy.PublisherProxy(conf, matchmaker) + self.router_sender = zmq_sender.CentralRouterSender() + self.ack_sender = zmq_sender.CentralAckSender() + self._router_updater = self._create_router_updater() def run(self): @@ -67,15 +71,28 @@ class SingleRouterProxy(object): if message is None: return - msg_type = int(message[zmq_names.MESSAGE_TYPE_IDX]) + message_type = int(message[zmq_names.MESSAGE_TYPE_IDX]) if self.conf.oslo_messaging_zmq.use_pub_sub and \ - msg_type in (zmq_names.CAST_FANOUT_TYPE, - zmq_names.NOTIFY_TYPE): + message_type in zmq_names.MULTISEND_TYPES: self.publisher.send_request(message) + if socket is self.fe_router_socket and \ + self.conf.zmq_proxy_opts.ack_pub_sub: + self.ack_sender.send_message(socket, message) else: self.router_sender.send_message( self._get_socket_to_dispatch_on(socket), message) + @staticmethod + def _create_router_socket(conf, context, port): + host = conf.zmq_proxy_opts.host + identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4())) + if port != 0: + return zmq_socket.ZmqFixedPortSocket(conf, context, zmq.ROUTER, + host, port, identity=identity) + else: + return zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, + host, identity=identity) + def _create_router_updater(self): return RouterUpdater( self.conf, self.matchmaker, self.publisher.host, @@ -105,15 +122,11 @@ class SingleRouterProxy(object): class DoubleRouterProxy(SingleRouterProxy): + PROXY_TYPE = "ROUTER-ROUTER" + def __init__(self, conf, context, matchmaker): - LOG.info(_LI('Running double router proxy')) port = conf.zmq_proxy_opts.backend_port - host = conf.zmq_proxy_opts.host - self.be_router_socket = zmq_socket.ZmqFixedPortSocket( - conf, context, zmq.ROUTER, host, - conf.zmq_proxy_opts.backend_port) if port != 0 else \ - zmq_socket.ZmqRandomPortSocket( - conf, context, zmq.ROUTER, host) + self.be_router_socket = self._create_router_socket(conf, context, port) super(DoubleRouterProxy, self).__init__(conf, context, matchmaker) self.poller.register(self.be_router_socket, self._receive_message) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py index 886da5464..bfd6f98ea 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py @@ -23,9 +23,10 @@ from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LI -zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) +zmq = zmq_async.import_zmq() + USAGE = """ Usage: ./zmq-proxy.py [-h] [] ... @@ -46,6 +47,11 @@ zmq_proxy_opts = [ cfg.IntOpt('publisher_port', default=0, help='Publisher port number. Zero means random.'), + + cfg.BoolOpt('ack_pub_sub', default=False, + help='Use acknowledgements for notifying senders about ' + 'receiving their fanout messages. ' + 'The option is ignored if PUB/SUB is disabled.') ] @@ -70,9 +76,11 @@ def parse_command_line_args(conf): parser.add_argument('-p', '--publisher-port', dest='publisher_port', type=int, help='Front-end PUBLISHER port number') + parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub', + action='store_true', + help='Acknowledge PUB/SUB messages') - parser.add_argument('-d', '--debug', dest='debug', type=bool, - default=False, + parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='Turn on DEBUG logging level instead of INFO') args = parser.parse_args() @@ -87,7 +95,7 @@ def parse_command_line_args(conf): logging.basicConfig(**log_kwargs) if args.host: - conf.zmq_proxy_opts.host = args.host + conf.set_override('host', args.host, group='zmq_proxy_opts') if args.frontend_port: conf.set_override('frontend_port', args.frontend_port, group='zmq_proxy_opts') @@ -97,6 +105,9 @@ def parse_command_line_args(conf): if args.publisher_port: conf.set_override('publisher_port', args.publisher_port, group='zmq_proxy_opts') + if args.ack_pub_sub: + conf.set_override('ack_pub_sub', args.ack_pub_sub, + group='zmq_proxy_opts') class ZmqProxy(object): diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py index 3499292ff..f1583d8f1 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py @@ -20,9 +20,10 @@ import six from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) +zmq = zmq_async.import_zmq() + @six.add_metaclass(abc.ABCMeta) class Sender(object): @@ -39,25 +40,48 @@ class CentralRouterSender(Sender): routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX] reply_id = multipart_message[zmq_names.REPLY_ID_IDX] message_id = multipart_message[zmq_names.MESSAGE_ID_IDX] + socket.send(routing_key, zmq.SNDMORE) socket.send(b'', zmq.SNDMORE) socket.send(reply_id, zmq.SNDMORE) socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE) + socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:]) + LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s " - "to -> %(rkey)s" % + "-> to %(rkey)s", {"msg_type": zmq_names.message_type_str(message_type), "msg_id": message_id, "rkey": routing_key, "rid": reply_id}) - socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:]) + + +class CentralAckSender(Sender): + + def send_message(self, socket, multipart_message): + message_type = zmq_names.ACK_TYPE + message_id = multipart_message[zmq_names.MESSAGE_ID_IDX] + routing_key = socket.handle.identity + reply_id = multipart_message[zmq_names.REPLY_ID_IDX] + + socket.send(reply_id, zmq.SNDMORE) + socket.send(b'', zmq.SNDMORE) + socket.send(routing_key, zmq.SNDMORE) + socket.send(six.b(str(message_type)), zmq.SNDMORE) + socket.send_string(message_id) + + LOG.debug("Sending %(msg_type)s for %(msg_id)s to %(rid)s " + "[from %(rkey)s]", + {"msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id, + "rid": reply_id, + "rkey": routing_key}) class CentralPublisherSender(Sender): def send_message(self, socket, multipart_message): message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX]) - assert message_type in (zmq_names.CAST_FANOUT_TYPE, - zmq_names.NOTIFY_TYPE), "Fanout expected!" + assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!" topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX] message_id = multipart_message[zmq_names.MESSAGE_ID_IDX] diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 8ff3d7897..437e891f4 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -108,6 +108,9 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): message_id, socket, message_type) except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failure: %s"), str(e)) + # NOTE(gdavoian): drop the left parts of a broken message + if socket.getsockopt(zmq.RCVMORE): + socket.recv_multipart() def cleanup(self): LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 80317f54e..fe09bc99a 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -79,6 +79,9 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): message_id, socket, message_type) except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) + # NOTE(gdavoian): drop the left parts of a broken message + if socket.getsockopt(zmq.RCVMORE): + socket.recv_multipart() def cleanup(self): LOG.info(_LI("[%s] Destroy ROUTER consumer"), self.host) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 9e0f14cf3..7f0fcc21f 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -213,10 +213,10 @@ class ZmqPortBusy(exceptions.MessagingException): class ZmqRandomPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type, host=None, - high_watermark=0): + high_watermark=0, identity=None): super(ZmqRandomPortSocket, self).__init__( conf, context, socket_type, immediate=False, - high_watermark=high_watermark) + high_watermark=high_watermark, identity=identity) self.bind_address = zmq_address.get_tcp_random_address(self.conf) if host is None: host = conf.oslo_messaging_zmq.rpc_zmq_host @@ -235,10 +235,10 @@ class ZmqRandomPortSocket(ZmqSocket): class ZmqFixedPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type, host, port, - high_watermark=0): + high_watermark=0, identity=None): super(ZmqFixedPortSocket, self).__init__( conf, context, socket_type, immediate=False, - high_watermark=high_watermark) + high_watermark=high_watermark, identity=identity) self.connect_address = zmq_address.combine_address(host, port) self.bind_address = zmq_address.get_tcp_direct_address( zmq_address.combine_address(