Merge "[zmq] Make second ROUTER socket optional for proxy"

This commit is contained in:
Jenkins 2016-09-28 16:37:56 +00:00 committed by Gerrit Code Review
commit 080df375ca
11 changed files with 252 additions and 172 deletions

@ -1,4 +1,4 @@
# Copyright 2015 Mirantis, Inc.
# Copyright 2015-2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,86 +12,36 @@
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import logging
from oslo_config import cfg
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LI
CONF = cfg.CONF
zmq_options.register_opts(CONF)
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
title='ZeroMQ proxy options')
CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
Usage example:
python oslo_messaging/_cmd/zmq-proxy.py"""
LOG = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(
description='ZeroMQ proxy service',
usage=USAGE
)
parser.add_argument('-c', '--config-file', dest='config_file', type=str,
help='Path to configuration file')
parser.add_argument('-l', '--log-file', dest='log_file', type=str,
help='Path to log file')
conf = cfg.CONF
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
title='ZeroMQ proxy options')
conf.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
zmq_options.register_opts(conf)
zmq_proxy.parse_command_line_args(conf)
parser.add_argument('-H', '--host', dest='host', type=str,
help='Host FQDN for current proxy')
parser.add_argument('-f', '--frontend-port', dest='frontend_port',
type=int,
help='Front-end ROUTER port number')
parser.add_argument('-b', '--backend-port', dest='backend_port', type=int,
help='Back-end ROUTER port number')
parser.add_argument('-p', '--publisher-port', dest='publisher_port',
type=int,
help='Front-end PUBLISHER port number')
parser.add_argument('-d', '--debug', dest='debug', type=bool,
default=False,
help='Turn on DEBUG logging level instead of INFO')
args = parser.parse_args()
if args.config_file:
cfg.CONF(['--config-file', args.config_file])
log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO,
'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'}
if args.log_file:
log_kwargs.update({'filename': args.log_file})
logging.basicConfig(**log_kwargs)
if args.host:
CONF.zmq_proxy_opts.host = args.host
if args.frontend_port:
CONF.set_override('frontend_port', args.frontend_port,
group='zmq_proxy_opts')
if args.backend_port:
CONF.set_override('backend_port', args.backend_port,
group='zmq_proxy_opts')
if args.publisher_port:
CONF.set_override('publisher_port', args.publisher_port,
group='zmq_proxy_opts')
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
reactor = zmq_proxy.ZmqProxy(conf)
try:
while True:
reactor.run()
except (KeyboardInterrupt, SystemExit):
LOG.info(_LI("Exit proxy by interrupt signal."))
finally:
reactor.close()
if __name__ == "__main__":
main()

@ -55,7 +55,7 @@ class Request(object):
:type retry: int
"""
if self.msg_type not in zmq_names.MESSAGE_TYPES:
if self.msg_type not in zmq_names.REQUEST_TYPES:
raise RuntimeError("Unknown message type!")
self.target = target

@ -1,4 +1,4 @@
# Copyright 2015 Mirantis, Inc.
# Copyright 2015-2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,114 +14,123 @@
import logging
import six
from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver.proxy.central \
import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging._i18n import _LI, _LE
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
class UniversalQueueProxy(object):
def check_message_format(func):
def _check_message_format(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
LOG.error(_LE("Received message with wrong format"))
LOG.exception(e)
return _check_message_format
class SingleRouterProxy(object):
def __init__(self, conf, context, matchmaker):
self.conf = conf
self.context = context
super(UniversalQueueProxy, self).__init__()
super(SingleRouterProxy, self).__init__()
self.matchmaker = matchmaker
host = conf.zmq_proxy_opts.host
self.poller = zmq_async.get_poller()
port = conf.zmq_proxy_opts.frontend_port
host = conf.zmq_proxy_opts.host
self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
conf, context, zmq.ROUTER, host,
conf.zmq_proxy_opts.frontend_port) if port != 0 else \
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
host)
port = conf.zmq_proxy_opts.backend_port
self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
conf, context, zmq.ROUTER, host,
conf.zmq_proxy_opts.backend_port) if port != 0 else \
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
self.poller.register(self.fe_router_socket, self._receive_message)
self.poller.register(self.fe_router_socket, self._receive_in_request)
self.poller.register(self.be_router_socket, self._receive_in_request)
self.pub_publisher = zmq_publisher_proxy.PublisherProxy(
self.publisher = zmq_publisher_proxy.PublisherProxy(
conf, matchmaker)
self._router_updater = RouterUpdater(
conf, matchmaker, self.pub_publisher.host,
self.fe_router_socket.connect_address,
self.be_router_socket.connect_address)
self.router_sender = zmq_sender.CentralRouterSender()
self._router_updater = self._create_router_updater()
def run(self):
message, socket = self.poller.poll()
if message is None:
return
msg_type = message[0]
msg_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
if self.conf.oslo_messaging_zmq.use_pub_sub and \
msg_type in (zmq_names.CAST_FANOUT_TYPE,
zmq_names.NOTIFY_TYPE):
self.pub_publisher.send_request(message)
self.publisher.send_request(message)
else:
self._redirect_message(self.be_router_socket
if socket is self.fe_router_socket
else self.fe_router_socket, message)
self.router_sender.send_message(
self._get_socket_to_dispatch_on(socket), message)
def _create_router_updater(self):
return RouterUpdater(
self.conf, self.matchmaker, self.publisher.host,
self.fe_router_socket.connect_address,
self.fe_router_socket.connect_address)
def _get_socket_to_dispatch_on(self, socket):
return self.fe_router_socket
@staticmethod
def _receive_in_request(socket):
try:
reply_id = socket.recv()
assert reply_id is not None, "Valid id expected"
empty = socket.recv()
assert empty == b'', "Empty delimiter expected"
msg_type = int(socket.recv())
routing_key = socket.recv()
payload = socket.recv_multipart()
payload.insert(0, reply_id)
payload.insert(0, routing_key)
payload.insert(0, msg_type)
return payload
except (AssertionError, ValueError):
LOG.error(_LE("Received message with wrong format"))
if socket.getsockopt(zmq.RCVMORE):
# NOTE(ozamiatin): Drop the left parts of broken message
socket.recv_multipart()
except zmq.ZMQError as e:
LOG.exception(e)
return None
@staticmethod
def _redirect_message(socket, multipart_message):
message_type = multipart_message.pop(0)
routing_key = multipart_message.pop(0)
reply_id = multipart_message.pop(0)
message_id = multipart_message[0]
socket.send(routing_key, zmq.SNDMORE)
socket.send(b'', zmq.SNDMORE)
socket.send(reply_id, zmq.SNDMORE)
socket.send(six.b(str(message_type)), zmq.SNDMORE)
LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
"to -> %(rkey)s" %
{"msg_type": zmq_names.message_type_str(message_type),
"msg_id": message_id,
"rkey": routing_key,
"rid": reply_id})
socket.send_multipart(multipart_message)
@check_message_format
def _receive_message(socket):
message = socket.recv_multipart()
assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts"
assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected"
message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!"
assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected"
return message
def cleanup(self):
self._router_updater.cleanup()
self.poller.close()
self.fe_router_socket.close()
self.publisher.cleanup()
class DoubleRouterProxy(SingleRouterProxy):
def __init__(self, conf, context, matchmaker):
LOG.info(_LI('Running double router proxy'))
port = conf.zmq_proxy_opts.backend_port
host = conf.zmq_proxy_opts.host
self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
conf, context, zmq.ROUTER, host,
conf.zmq_proxy_opts.backend_port) if port != 0 else \
zmq_socket.ZmqRandomPortSocket(
conf, context, zmq.ROUTER, host)
super(DoubleRouterProxy, self).__init__(conf, context, matchmaker)
self.poller.register(self.be_router_socket, self._receive_message)
def _create_router_updater(self):
return RouterUpdater(
self.conf, self.matchmaker, self.publisher.host,
self.fe_router_socket.connect_address,
self.be_router_socket.connect_address)
def _get_socket_to_dispatch_on(self, socket):
return self.be_router_socket \
if socket is self.fe_router_socket \
else self.fe_router_socket
def cleanup(self):
super(DoubleRouterProxy, self).cleanup()
self.be_router_socket.close()
self.pub_publisher.cleanup()
self._router_updater.cleanup()
class RouterUpdater(zmq_updater.UpdaterBase):

@ -1,4 +1,4 @@
# Copyright 2015 Mirantis, Inc.
# Copyright 2015-2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,8 +14,8 @@
import logging
from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
LOG = logging.getLogger(__name__)
@ -44,7 +44,6 @@ class PublisherProxy(object):
self.matchmaker = matchmaker
port = conf.zmq_proxy_opts.publisher_port
self.socket = zmq_socket.ZmqFixedPortSocket(
self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host,
port) if port != 0 else \
@ -52,23 +51,10 @@ class PublisherProxy(object):
self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host)
self.host = self.socket.connect_address
self.sender = zmq_sender.CentralPublisherSender()
def send_request(self, multipart_message):
message_type = multipart_message.pop(0)
assert message_type in (zmq_names.CAST_FANOUT_TYPE,
zmq_names.NOTIFY_TYPE), "Fanout expected!"
topic_filter = multipart_message.pop(0)
reply_id = multipart_message.pop(0)
message_id = multipart_message.pop(0)
assert reply_id is not None, "Reply id expected!"
self.socket.send(topic_filter, zmq.SNDMORE)
self.socket.send(message_id, zmq.SNDMORE)
self.socket.send_multipart(multipart_message)
LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
{"topic": topic_filter,
"message_id": message_id})
self.sender.send_message(self.socket, multipart_message)
def cleanup(self):
self.socket.close()

@ -1,4 +1,4 @@
# Copyright 2015 Mirantis, Inc.
# Copyright 2015-2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,12 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import logging
import socket
from oslo_config import cfg
from stevedore import driver
from oslo_config import cfg
from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LI
@ -25,6 +27,12 @@ zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
Usage example:
python oslo_messaging/_cmd/zmq-proxy.py"""
zmq_proxy_opts = [
cfg.StrOpt('host', default=socket.gethostname(),
help='Hostname (FQDN) of current proxy'
@ -41,6 +49,56 @@ zmq_proxy_opts = [
]
def parse_command_line_args(conf):
parser = argparse.ArgumentParser(
description='ZeroMQ proxy service',
usage=USAGE
)
parser.add_argument('-c', '--config-file', dest='config_file', type=str,
help='Path to configuration file')
parser.add_argument('-l', '--log-file', dest='log_file', type=str,
help='Path to log file')
parser.add_argument('-H', '--host', dest='host', type=str,
help='Host FQDN for current proxy')
parser.add_argument('-f', '--frontend-port', dest='frontend_port',
type=int,
help='Front-end ROUTER port number')
parser.add_argument('-b', '--backend-port', dest='backend_port', type=int,
help='Back-end ROUTER port number')
parser.add_argument('-p', '--publisher-port', dest='publisher_port',
type=int,
help='Front-end PUBLISHER port number')
parser.add_argument('-d', '--debug', dest='debug', type=bool,
default=False,
help='Turn on DEBUG logging level instead of INFO')
args = parser.parse_args()
if args.config_file:
conf(['--config-file', args.config_file])
log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO,
'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'}
if args.log_file:
log_kwargs.update({'filename': args.log_file})
logging.basicConfig(**log_kwargs)
if args.host:
conf.zmq_proxy_opts.host = args.host
if args.frontend_port:
conf.set_override('frontend_port', args.frontend_port,
group='zmq_proxy_opts')
if args.backend_port:
conf.set_override('backend_port', args.backend_port,
group='zmq_proxy_opts')
if args.publisher_port:
conf.set_override('publisher_port', args.publisher_port,
group='zmq_proxy_opts')
class ZmqProxy(object):
"""Wrapper class for Publishers and Routers proxies.
The main reason to have a proxy is high complexity of TCP sockets number
@ -80,7 +138,7 @@ class ZmqProxy(object):
"""
def __init__(self, conf, proxy_cls):
def __init__(self, conf):
super(ZmqProxy, self).__init__()
self.conf = conf
self.matchmaker = driver.DriverManager(
@ -88,7 +146,16 @@ class ZmqProxy(object):
self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
).driver(self.conf)
self.context = zmq.Context()
self.proxy = proxy_cls(conf, self.context, self.matchmaker)
self.proxy = self._choose_proxy_implementation()
def _choose_proxy_implementation(self):
if self.conf.zmq_proxy_opts.frontend_port != 0 and \
self.conf.zmq_proxy_opts.backend_port == 0:
return zmq_central_proxy.SingleRouterProxy(self.conf, self.context,
self.matchmaker)
else:
return zmq_central_proxy.DoubleRouterProxy(self.conf, self.context,
self.matchmaker)
def run(self):
self.proxy.run()

@ -0,0 +1,69 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Sender(object):
@abc.abstractmethod
def send_message(self, socket, multipart_message):
"""Send message to a socket from multipart list"""
class CentralRouterSender(Sender):
def send_message(self, socket, multipart_message):
message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX]
reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
socket.send(routing_key, zmq.SNDMORE)
socket.send(b'', zmq.SNDMORE)
socket.send(reply_id, zmq.SNDMORE)
socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE)
LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
"to -> %(rkey)s" %
{"msg_type": zmq_names.message_type_str(message_type),
"msg_id": message_id,
"rkey": routing_key,
"rid": reply_id})
socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
class CentralPublisherSender(Sender):
def send_message(self, socket, multipart_message):
message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
assert message_type in (zmq_names.CAST_FANOUT_TYPE,
zmq_names.NOTIFY_TYPE), "Fanout expected!"
topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX]
message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
socket.send(topic_filter, zmq.SNDMORE)
socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
{"topic": topic_filter,
"message_id": message_id})

@ -27,10 +27,6 @@ def get_tcp_random_address(conf):
return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
def get_broker_address(conf):
return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir
def prefix_str(key, listener_type):
return listener_type + "/" + key

@ -23,13 +23,14 @@ FIELD_REPLY_BODY = 'reply_body'
FIELD_FAILURE = 'failure'
IDX_REPLY_TYPE = 1
IDX_REPLY_BODY = 2
MULTIPART_IDX_ENVELOPE = 0
MULTIPART_IDX_BODY = 1
REPLY_ID_IDX = 0
EMPTY_IDX = 1
MESSAGE_TYPE_IDX = 2
ROUTING_KEY_IDX = 3
MESSAGE_ID_IDX = 4
DEFAULT_TYPE = 0
CALL_TYPE = 1
CAST_TYPE = 2
CAST_FANOUT_TYPE = 3
@ -37,13 +38,17 @@ NOTIFY_TYPE = 4
REPLY_TYPE = 5
ACK_TYPE = 6
MESSAGE_TYPES = (CALL_TYPE,
REQUEST_TYPES = (CALL_TYPE,
CAST_TYPE,
CAST_FANOUT_TYPE,
NOTIFY_TYPE)
RESPONSE_TYPES = (REPLY_TYPE, ACK_TYPE)
MESSAGE_TYPES = REQUEST_TYPES + RESPONSE_TYPES
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE, ACK_TYPE)
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
NOTIFY_TYPES = (NOTIFY_TYPE,)
NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES

@ -22,8 +22,9 @@ import testscenarios
from oslo_config import cfg
import oslo_messaging
from oslo_messaging._drivers.zmq_driver.proxy.central \
import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -82,9 +83,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
message = {'method': 'hello-world'}
self.publisher.send_request(
[zmq_names.CAST_FANOUT_TYPE,
[b'', b'', zmq_names.CAST_FANOUT_TYPE,
zmq_address.target_to_subscribe_filter(target),
b"message",
b"0000-0000",
self.dumps([context, message])])

@ -20,7 +20,6 @@ import oslo_messaging
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_options
@ -70,8 +69,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.driver = transport._driver
# prepare and launch proxy
self.proxy = zmq_proxy.ZmqProxy(self.conf,
zmq_queue_proxy.UniversalQueueProxy)
self.proxy = zmq_proxy.ZmqProxy(self.conf)
vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
self.executor = zmq_async.get_executor(self.proxy.run)
self.executor.execute()