[zmq] SUB-PUB local proxy
Central + local proxies combination optimizes following: * Makes it possible to shorten the number of TCP connections to a single subscription from node. * PUB/SUB limitation of 10k consumers can be bumped up to 10k nodes not only to 10k services * Fanout happens in 2 steps therefore central proxy takes less load and blocks for a shorter period of time since it needs to fanout to less consumers (local proxies) not directly to every single service. Change-Id: I57d87bc8310354142ab69a9f2d3e0a0cf5b972b8
This commit is contained in:
parent
96b9618228
commit
1121a6bb60
@ -13,49 +13,32 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.proxy.central \
|
||||
import zmq_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy \
|
||||
import zmq_base_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LI, _LE
|
||||
from oslo_messaging._i18n import _LI
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
def check_message_format(func):
|
||||
def _check_message_format(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
LOG.error(_LE("Received message with wrong format"))
|
||||
LOG.exception(e)
|
||||
return _check_message_format
|
||||
|
||||
|
||||
class SingleRouterProxy(object):
|
||||
class SingleRouterProxy(zmq_base_proxy.ProxyBase):
|
||||
|
||||
PROXY_TYPE = "ROUTER"
|
||||
|
||||
def __init__(self, conf, context, matchmaker):
|
||||
self.conf = conf
|
||||
self.context = context
|
||||
self.matchmaker = matchmaker
|
||||
|
||||
LOG.info(_LI("Running %s proxy") % self.PROXY_TYPE)
|
||||
|
||||
self.poller = zmq_async.get_poller()
|
||||
super(SingleRouterProxy, self).__init__(conf, context, matchmaker)
|
||||
|
||||
port = conf.zmq_proxy_opts.frontend_port
|
||||
self.fe_router_socket = self._create_router_socket(conf, context, port)
|
||||
self.fe_router_socket = zmq_base_proxy.create_socket(
|
||||
conf, context, port, zmq.ROUTER)
|
||||
|
||||
self.poller.register(self.fe_router_socket, self._receive_message)
|
||||
|
||||
@ -82,17 +65,6 @@ class SingleRouterProxy(object):
|
||||
self.router_sender.send_message(
|
||||
self._get_socket_to_dispatch_on(socket), message)
|
||||
|
||||
@staticmethod
|
||||
def _create_router_socket(conf, context, port):
|
||||
host = conf.zmq_proxy_opts.host
|
||||
identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4()))
|
||||
if port != 0:
|
||||
return zmq_socket.ZmqFixedPortSocket(conf, context, zmq.ROUTER,
|
||||
host, port, identity=identity)
|
||||
else:
|
||||
return zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
|
||||
host, identity=identity)
|
||||
|
||||
def _create_router_updater(self):
|
||||
return RouterUpdater(
|
||||
self.conf, self.matchmaker, self.publisher.host,
|
||||
@ -102,20 +74,9 @@ class SingleRouterProxy(object):
|
||||
def _get_socket_to_dispatch_on(self, socket):
|
||||
return self.fe_router_socket
|
||||
|
||||
@staticmethod
|
||||
@check_message_format
|
||||
def _receive_message(socket):
|
||||
message = socket.recv_multipart()
|
||||
assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts"
|
||||
assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected"
|
||||
message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
|
||||
assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!"
|
||||
assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected"
|
||||
return message
|
||||
|
||||
def cleanup(self):
|
||||
super(SingleRouterProxy, self).cleanup()
|
||||
self._router_updater.cleanup()
|
||||
self.poller.close()
|
||||
self.fe_router_socket.close()
|
||||
self.publisher.cleanup()
|
||||
|
||||
@ -126,7 +87,8 @@ class DoubleRouterProxy(SingleRouterProxy):
|
||||
|
||||
def __init__(self, conf, context, matchmaker):
|
||||
port = conf.zmq_proxy_opts.backend_port
|
||||
self.be_router_socket = self._create_router_socket(conf, context, port)
|
||||
self.be_router_socket = zmq_base_proxy.create_socket(
|
||||
conf, context, port, zmq.ROUTER)
|
||||
super(DoubleRouterProxy, self).__init__(conf, context, matchmaker)
|
||||
self.poller.register(self.be_router_socket, self._receive_message)
|
||||
|
||||
|
@ -37,7 +37,7 @@ class PublisherProxy(object):
|
||||
Target object.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
def __init__(self, conf, matchmaker, sender=None):
|
||||
super(PublisherProxy, self).__init__()
|
||||
self.conf = conf
|
||||
self.zmq_context = zmq.Context()
|
||||
@ -51,7 +51,7 @@ class PublisherProxy(object):
|
||||
self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host)
|
||||
|
||||
self.host = self.socket.connect_address
|
||||
self.sender = zmq_sender.CentralPublisherSender()
|
||||
self.sender = sender or zmq_sender.CentralPublisherSender()
|
||||
|
||||
def send_request(self, multipart_message):
|
||||
self.sender.send_message(self.socket, multipart_message)
|
||||
|
@ -0,0 +1,63 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.proxy.central \
|
||||
import zmq_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy \
|
||||
import zmq_base_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers \
|
||||
import zmq_sub_consumer
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class LocalPublisherProxy(zmq_base_proxy.ProxyBase):
|
||||
|
||||
PROXY_TYPE = "L-PUBLISHER"
|
||||
|
||||
def __init__(self, conf, context, matchmaker):
|
||||
wrapper = zmq_sub_consumer.SubscriptionMatchmakerWrapper(conf,
|
||||
matchmaker)
|
||||
super(LocalPublisherProxy, self).__init__(conf, context, wrapper)
|
||||
self.fe_sub = zmq_socket.ZmqSocket(conf, context, zmq.SUB, False)
|
||||
self.fe_sub.setsockopt(zmq.SUBSCRIBE, b'')
|
||||
self.connection_updater = zmq_sub_consumer.SubscriberConnectionUpdater(
|
||||
conf, self.matchmaker, self.fe_sub)
|
||||
self.poller.register(self.fe_sub, self.receive_message)
|
||||
self.publisher = zmq_publisher_proxy.PublisherProxy(
|
||||
conf, matchmaker, sender=zmq_sender.LocalPublisherSender())
|
||||
|
||||
def run(self):
|
||||
message, socket = self.poller.poll()
|
||||
if message is None:
|
||||
return
|
||||
self.publisher.send_request(message)
|
||||
|
||||
@staticmethod
|
||||
def receive_message(socket):
|
||||
return socket.recv_multipart()
|
||||
|
||||
def cleanup(self):
|
||||
super(LocalPublisherProxy, self).cleanup()
|
||||
self.fe_sub.close()
|
||||
self.connection_updater.cleanup()
|
||||
self.publisher.cleanup()
|
76
oslo_messaging/_drivers/zmq_driver/proxy/zmq_base_proxy.py
Normal file
76
oslo_messaging/_drivers/zmq_driver/proxy/zmq_base_proxy.py
Normal file
@ -0,0 +1,76 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._i18n import _LI, _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
def check_message_format(func):
|
||||
def _check_message_format(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
LOG.error(_LE("Received message with wrong format"))
|
||||
LOG.exception(e)
|
||||
return _check_message_format
|
||||
|
||||
|
||||
def create_socket(conf, context, port, socket_type):
|
||||
host = conf.zmq_proxy_opts.host
|
||||
identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4()))
|
||||
if port != 0:
|
||||
return zmq_socket.ZmqFixedPortSocket(conf, context, socket_type,
|
||||
host, port, identity=identity)
|
||||
else:
|
||||
return zmq_socket.ZmqRandomPortSocket(conf, context, socket_type,
|
||||
host, identity=identity)
|
||||
|
||||
|
||||
class ProxyBase(object):
|
||||
|
||||
PROXY_TYPE = "UNDEFINED"
|
||||
|
||||
def __init__(self, conf, context, matchmaker):
|
||||
self.conf = conf
|
||||
self.context = context
|
||||
self.matchmaker = matchmaker
|
||||
|
||||
LOG.info(_LI("Running %s proxy") % self.PROXY_TYPE)
|
||||
|
||||
self.poller = zmq_async.get_poller()
|
||||
|
||||
@staticmethod
|
||||
@check_message_format
|
||||
def _receive_message(socket):
|
||||
message = socket.recv_multipart()
|
||||
assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts"
|
||||
assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected"
|
||||
message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
|
||||
assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!"
|
||||
assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected"
|
||||
return message
|
||||
|
||||
def cleanup(self):
|
||||
self.poller.close()
|
@ -21,6 +21,7 @@ from stevedore import driver
|
||||
|
||||
from oslo_messaging._drivers import impl_zmq
|
||||
from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy.local import zmq_local_proxy
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._i18n import _LI
|
||||
from oslo_messaging import transport
|
||||
@ -50,6 +51,9 @@ zmq_proxy_opts = [
|
||||
cfg.IntOpt('publisher_port', default=0,
|
||||
help='Publisher port number. Zero means random.'),
|
||||
|
||||
cfg.BoolOpt('local_publisher', default=False,
|
||||
help='Specify publisher/subscriber local proxy.'),
|
||||
|
||||
cfg.BoolOpt('ack_pub_sub', default=False,
|
||||
help='Use acknowledgements for notifying senders about '
|
||||
'receiving their fanout messages. '
|
||||
@ -80,7 +84,10 @@ def parse_command_line_args(conf):
|
||||
help='Back-end ROUTER port number')
|
||||
parser.add_argument('-p', '--publisher-port', dest='publisher_port',
|
||||
type=int,
|
||||
help='Front-end PUBLISHER port number')
|
||||
help='Back-end PUBLISHER port number')
|
||||
parser.add_argument('-lp', '--local-publisher', dest='local_publisher',
|
||||
action='store_true',
|
||||
help='Specify publisher/subscriber local proxy.')
|
||||
parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub',
|
||||
action='store_true',
|
||||
help='Acknowledge PUB/SUB messages')
|
||||
@ -112,6 +119,9 @@ def parse_command_line_args(conf):
|
||||
if args.publisher_port:
|
||||
conf.set_override('publisher_port', args.publisher_port,
|
||||
group='zmq_proxy_opts')
|
||||
if args.local_publisher:
|
||||
conf.set_override('local_publisher', args.local_publisher,
|
||||
group='zmq_proxy_opts')
|
||||
if args.ack_pub_sub:
|
||||
conf.set_override('ack_pub_sub', args.ack_pub_sub,
|
||||
group='zmq_proxy_opts')
|
||||
@ -172,7 +182,10 @@ class ZmqProxy(object):
|
||||
self.proxy = self._choose_proxy_implementation()
|
||||
|
||||
def _choose_proxy_implementation(self):
|
||||
if self.conf.zmq_proxy_opts.frontend_port != 0 and \
|
||||
if self.conf.zmq_proxy_opts.local_publisher:
|
||||
return zmq_local_proxy.LocalPublisherProxy(self.conf, self.context,
|
||||
self.matchmaker)
|
||||
elif self.conf.zmq_proxy_opts.frontend_port != 0 and \
|
||||
self.conf.zmq_proxy_opts.backend_port == 0:
|
||||
return zmq_central_proxy.SingleRouterProxy(self.conf, self.context,
|
||||
self.matchmaker)
|
||||
|
@ -86,8 +86,23 @@ class CentralPublisherSender(Sender):
|
||||
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
|
||||
|
||||
socket.send(topic_filter, zmq.SNDMORE)
|
||||
socket.send(six.b(str(message_type)), zmq.SNDMORE)
|
||||
socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
|
||||
|
||||
LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
|
||||
{"topic": topic_filter,
|
||||
"message_id": message_id})
|
||||
|
||||
|
||||
class LocalPublisherSender(Sender):
|
||||
|
||||
TOPIC_IDX = 0
|
||||
MSG_TYPE_IDX = 1
|
||||
MSG_ID_IDX = 2
|
||||
|
||||
def send_message(self, socket, multipart_message):
|
||||
socket.send_multipart(multipart_message)
|
||||
|
||||
LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
|
||||
{"topic": multipart_message[self.TOPIC_IDX],
|
||||
"message_id": multipart_message[self.MSG_ID_IDX]})
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
# Copyright 2015-2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -22,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers \
|
||||
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LE, _LI
|
||||
@ -35,7 +36,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
||||
|
||||
def __init__(self, conf, poller, server):
|
||||
super(SubConsumer, self).__init__(conf, poller, server)
|
||||
self.matchmaker = server.matchmaker
|
||||
self.matchmaker = SubscriptionMatchmakerWrapper(conf,
|
||||
server.matchmaker)
|
||||
self.target = server.target
|
||||
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
|
||||
immediate=False,
|
||||
@ -61,11 +63,15 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
||||
|
||||
def _receive_request(self, socket):
|
||||
topic_filter = socket.recv()
|
||||
message_type = int(socket.recv())
|
||||
message_id = socket.recv()
|
||||
context, message = socket.recv_loaded()
|
||||
LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s",
|
||||
{'host': self.host, 'filter': topic_filter,
|
||||
'msg_id': message_id})
|
||||
LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s "
|
||||
"%(msg_type)s",
|
||||
{'host': self.host,
|
||||
'filter': topic_filter,
|
||||
'msg_id': message_id,
|
||||
'msg_type': zmq_names.message_type_str(message_type)})
|
||||
return context, message
|
||||
|
||||
def receive_message(self, socket):
|
||||
@ -83,6 +89,20 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
||||
super(SubConsumer, self).cleanup()
|
||||
|
||||
|
||||
class SubscriptionMatchmakerWrapper(object):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
|
||||
def get_publishers(self):
|
||||
conf_publishers = self.conf.oslo_messaging_zmq.subscribe_on
|
||||
LOG.debug("Publishers taken from configuration %s", conf_publishers)
|
||||
if conf_publishers:
|
||||
return [(publisher, None) for publisher in conf_publishers]
|
||||
return self.matchmaker.get_publishers()
|
||||
|
||||
|
||||
class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater):
|
||||
|
||||
def _update_connection(self):
|
||||
|
@ -181,7 +181,14 @@ zmq_opts = [
|
||||
'of any problems occurred: positive value N means '
|
||||
'at most N retries, 0 means no retries, None or -1 '
|
||||
'(or any other negative values) mean to retry forever. '
|
||||
'This option is used only if acknowledgments are enabled.')
|
||||
'This option is used only if acknowledgments are '
|
||||
'enabled.'),
|
||||
|
||||
cfg.ListOpt('subscribe_on',
|
||||
default=[],
|
||||
help='List of publisher hosts SubConsumer can subscribe on. '
|
||||
'This option has higher priority then the default '
|
||||
'publishers list taken from the matchmaker.'),
|
||||
]
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user