diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py index 3eecca4df..bd4937f27 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py @@ -13,49 +13,32 @@ # under the License. import logging -import uuid - -import six from oslo_messaging._drivers.zmq_driver.proxy.central \ import zmq_publisher_proxy +from oslo_messaging._drivers.zmq_driver.proxy \ + import zmq_base_proxy from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater -from oslo_messaging._i18n import _LI, _LE +from oslo_messaging._i18n import _LI LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -def check_message_format(func): - def _check_message_format(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - LOG.error(_LE("Received message with wrong format")) - LOG.exception(e) - return _check_message_format - - -class SingleRouterProxy(object): +class SingleRouterProxy(zmq_base_proxy.ProxyBase): PROXY_TYPE = "ROUTER" def __init__(self, conf, context, matchmaker): - self.conf = conf - self.context = context - self.matchmaker = matchmaker - - LOG.info(_LI("Running %s proxy") % self.PROXY_TYPE) - - self.poller = zmq_async.get_poller() + super(SingleRouterProxy, self).__init__(conf, context, matchmaker) port = conf.zmq_proxy_opts.frontend_port - self.fe_router_socket = self._create_router_socket(conf, context, port) + self.fe_router_socket = zmq_base_proxy.create_socket( + conf, context, port, zmq.ROUTER) self.poller.register(self.fe_router_socket, self._receive_message) @@ -82,17 +65,6 @@ class SingleRouterProxy(object): self.router_sender.send_message( self._get_socket_to_dispatch_on(socket), message) - @staticmethod - def _create_router_socket(conf, context, port): - host = conf.zmq_proxy_opts.host - identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4())) - if port != 0: - return zmq_socket.ZmqFixedPortSocket(conf, context, zmq.ROUTER, - host, port, identity=identity) - else: - return zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, - host, identity=identity) - def _create_router_updater(self): return RouterUpdater( self.conf, self.matchmaker, self.publisher.host, @@ -102,20 +74,9 @@ class SingleRouterProxy(object): def _get_socket_to_dispatch_on(self, socket): return self.fe_router_socket - @staticmethod - @check_message_format - def _receive_message(socket): - message = socket.recv_multipart() - assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts" - assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected" - message_type = int(message[zmq_names.MESSAGE_TYPE_IDX]) - assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!" - assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected" - return message - def cleanup(self): + super(SingleRouterProxy, self).cleanup() self._router_updater.cleanup() - self.poller.close() self.fe_router_socket.close() self.publisher.cleanup() @@ -126,7 +87,8 @@ class DoubleRouterProxy(SingleRouterProxy): def __init__(self, conf, context, matchmaker): port = conf.zmq_proxy_opts.backend_port - self.be_router_socket = self._create_router_socket(conf, context, port) + self.be_router_socket = zmq_base_proxy.create_socket( + conf, context, port, zmq.ROUTER) super(DoubleRouterProxy, self).__init__(conf, context, matchmaker) self.poller.register(self.be_router_socket, self._receive_message) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py index 09f578552..ea1fced2b 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py @@ -37,7 +37,7 @@ class PublisherProxy(object): Target object. """ - def __init__(self, conf, matchmaker): + def __init__(self, conf, matchmaker, sender=None): super(PublisherProxy, self).__init__() self.conf = conf self.zmq_context = zmq.Context() @@ -51,7 +51,7 @@ class PublisherProxy(object): self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host) self.host = self.socket.connect_address - self.sender = zmq_sender.CentralPublisherSender() + self.sender = sender or zmq_sender.CentralPublisherSender() def send_request(self, multipart_message): self.sender.send_message(self.socket, multipart_message) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/local/__init__.py b/oslo_messaging/_drivers/zmq_driver/proxy/local/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/local/zmq_local_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/local/zmq_local_proxy.py new file mode 100644 index 000000000..361f662d2 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/proxy/local/zmq_local_proxy.py @@ -0,0 +1,63 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver.proxy.central \ + import zmq_publisher_proxy +from oslo_messaging._drivers.zmq_driver.proxy \ + import zmq_base_proxy +from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender +from oslo_messaging._drivers.zmq_driver.server.consumers \ + import zmq_sub_consumer +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_socket + + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class LocalPublisherProxy(zmq_base_proxy.ProxyBase): + + PROXY_TYPE = "L-PUBLISHER" + + def __init__(self, conf, context, matchmaker): + wrapper = zmq_sub_consumer.SubscriptionMatchmakerWrapper(conf, + matchmaker) + super(LocalPublisherProxy, self).__init__(conf, context, wrapper) + self.fe_sub = zmq_socket.ZmqSocket(conf, context, zmq.SUB, False) + self.fe_sub.setsockopt(zmq.SUBSCRIBE, b'') + self.connection_updater = zmq_sub_consumer.SubscriberConnectionUpdater( + conf, self.matchmaker, self.fe_sub) + self.poller.register(self.fe_sub, self.receive_message) + self.publisher = zmq_publisher_proxy.PublisherProxy( + conf, matchmaker, sender=zmq_sender.LocalPublisherSender()) + + def run(self): + message, socket = self.poller.poll() + if message is None: + return + self.publisher.send_request(message) + + @staticmethod + def receive_message(socket): + return socket.recv_multipart() + + def cleanup(self): + super(LocalPublisherProxy, self).cleanup() + self.fe_sub.close() + self.connection_updater.cleanup() + self.publisher.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_base_proxy.py new file mode 100644 index 000000000..4bfe521a5 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_base_proxy.py @@ -0,0 +1,76 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import uuid + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LI, _LE + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +def check_message_format(func): + def _check_message_format(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + LOG.error(_LE("Received message with wrong format")) + LOG.exception(e) + return _check_message_format + + +def create_socket(conf, context, port, socket_type): + host = conf.zmq_proxy_opts.host + identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4())) + if port != 0: + return zmq_socket.ZmqFixedPortSocket(conf, context, socket_type, + host, port, identity=identity) + else: + return zmq_socket.ZmqRandomPortSocket(conf, context, socket_type, + host, identity=identity) + + +class ProxyBase(object): + + PROXY_TYPE = "UNDEFINED" + + def __init__(self, conf, context, matchmaker): + self.conf = conf + self.context = context + self.matchmaker = matchmaker + + LOG.info(_LI("Running %s proxy") % self.PROXY_TYPE) + + self.poller = zmq_async.get_poller() + + @staticmethod + @check_message_format + def _receive_message(socket): + message = socket.recv_multipart() + assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts" + assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected" + message_type = int(message[zmq_names.MESSAGE_TYPE_IDX]) + assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!" + assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected" + return message + + def cleanup(self): + self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py index 76ade412f..f69fbe6c0 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py @@ -21,6 +21,7 @@ from stevedore import driver from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy +from oslo_messaging._drivers.zmq_driver.proxy.local import zmq_local_proxy from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LI from oslo_messaging import transport @@ -50,6 +51,9 @@ zmq_proxy_opts = [ cfg.IntOpt('publisher_port', default=0, help='Publisher port number. Zero means random.'), + cfg.BoolOpt('local_publisher', default=False, + help='Specify publisher/subscriber local proxy.'), + cfg.BoolOpt('ack_pub_sub', default=False, help='Use acknowledgements for notifying senders about ' 'receiving their fanout messages. ' @@ -80,7 +84,10 @@ def parse_command_line_args(conf): help='Back-end ROUTER port number') parser.add_argument('-p', '--publisher-port', dest='publisher_port', type=int, - help='Front-end PUBLISHER port number') + help='Back-end PUBLISHER port number') + parser.add_argument('-lp', '--local-publisher', dest='local_publisher', + action='store_true', + help='Specify publisher/subscriber local proxy.') parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub', action='store_true', help='Acknowledge PUB/SUB messages') @@ -112,6 +119,9 @@ def parse_command_line_args(conf): if args.publisher_port: conf.set_override('publisher_port', args.publisher_port, group='zmq_proxy_opts') + if args.local_publisher: + conf.set_override('local_publisher', args.local_publisher, + group='zmq_proxy_opts') if args.ack_pub_sub: conf.set_override('ack_pub_sub', args.ack_pub_sub, group='zmq_proxy_opts') @@ -172,7 +182,10 @@ class ZmqProxy(object): self.proxy = self._choose_proxy_implementation() def _choose_proxy_implementation(self): - if self.conf.zmq_proxy_opts.frontend_port != 0 and \ + if self.conf.zmq_proxy_opts.local_publisher: + return zmq_local_proxy.LocalPublisherProxy(self.conf, self.context, + self.matchmaker) + elif self.conf.zmq_proxy_opts.frontend_port != 0 and \ self.conf.zmq_proxy_opts.backend_port == 0: return zmq_central_proxy.SingleRouterProxy(self.conf, self.context, self.matchmaker) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py index f1583d8f1..8318d3b70 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py @@ -86,8 +86,23 @@ class CentralPublisherSender(Sender): message_id = multipart_message[zmq_names.MESSAGE_ID_IDX] socket.send(topic_filter, zmq.SNDMORE) + socket.send(six.b(str(message_type)), zmq.SNDMORE) socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:]) LOG.debug("Publishing message %(message_id)s on [%(topic)s]", {"topic": topic_filter, "message_id": message_id}) + + +class LocalPublisherSender(Sender): + + TOPIC_IDX = 0 + MSG_TYPE_IDX = 1 + MSG_ID_IDX = 2 + + def send_message(self, socket, multipart_message): + socket.send_multipart(multipart_message) + + LOG.debug("Publishing message %(message_id)s on [%(topic)s]", + {"topic": multipart_message[self.TOPIC_IDX], + "message_id": multipart_message[self.MSG_ID_IDX]}) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 89b630ed7..c843d3c44 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -22,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers \ from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._i18n import _LE, _LI @@ -35,7 +36,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): def __init__(self, conf, poller, server): super(SubConsumer, self).__init__(conf, poller, server) - self.matchmaker = server.matchmaker + self.matchmaker = SubscriptionMatchmakerWrapper(conf, + server.matchmaker) self.target = server.target self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB, immediate=False, @@ -61,11 +63,15 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): def _receive_request(self, socket): topic_filter = socket.recv() + message_type = int(socket.recv()) message_id = socket.recv() context, message = socket.recv_loaded() - LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s", - {'host': self.host, 'filter': topic_filter, - 'msg_id': message_id}) + LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s " + "%(msg_type)s", + {'host': self.host, + 'filter': topic_filter, + 'msg_id': message_id, + 'msg_type': zmq_names.message_type_str(message_type)}) return context, message def receive_message(self, socket): @@ -83,6 +89,20 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): super(SubConsumer, self).cleanup() +class SubscriptionMatchmakerWrapper(object): + + def __init__(self, conf, matchmaker): + self.conf = conf + self.matchmaker = matchmaker + + def get_publishers(self): + conf_publishers = self.conf.oslo_messaging_zmq.subscribe_on + LOG.debug("Publishers taken from configuration %s", conf_publishers) + if conf_publishers: + return [(publisher, None) for publisher in conf_publishers] + return self.matchmaker.get_publishers() + + class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater): def _update_connection(self): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index 4d481518b..f5cac0f16 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -181,7 +181,14 @@ zmq_opts = [ 'of any problems occurred: positive value N means ' 'at most N retries, 0 means no retries, None or -1 ' '(or any other negative values) mean to retry forever. ' - 'This option is used only if acknowledgments are enabled.') + 'This option is used only if acknowledgments are ' + 'enabled.'), + + cfg.ListOpt('subscribe_on', + default=[], + help='List of publisher hosts SubConsumer can subscribe on. ' + 'This option has higher priority then the default ' + 'publishers list taken from the matchmaker.'), ]