diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index 03ccea19d..a76b0b534 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -18,14 +18,15 @@ import logging from oslo_config import cfg from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers.zmq_driver.broker import zmq_proxy -from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy -from oslo_messaging import server +from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy +from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy CONF = cfg.CONF CONF.register_opts(impl_zmq.zmq_opts) -CONF.register_opts(server._pool_opts) -CONF.rpc_zmq_native = True + +opt_group = cfg.OptGroup(name='zmq_proxy_opts', + title='ZeroMQ proxy options') +CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group) USAGE = """ Usage: ./zmq-proxy.py [-h] [] ... @@ -42,9 +43,20 @@ def main(): parser.add_argument('--config-file', dest='config_file', type=str, help='Path to configuration file') + + parser.add_argument('--host', dest='host', type=str, + help='Host FQDN for current proxy') + parser.add_argument('--frontend-port', dest='frontend_port', type=int, + help='Front-end ROUTER port number') + parser.add_argument('--backend-port', dest='backend_port', type=int, + help='Back-end ROUTER port number') + parser.add_argument('--publisher-port', dest='publisher_port', type=int, + help='Back-end PUBLISHER port number') + parser.add_argument('-d', '--debug', dest='debug', type=bool, default=False, help="Turn on DEBUG logging level instead of INFO") + args = parser.parse_args() if args.config_file: @@ -57,6 +69,18 @@ def main(): format='%(asctime)s %(name)s ' '%(levelname)-8s %(message)s') + if args.host: + CONF.zmq_proxy_opts.host = args.host + if args.frontend_port: + CONF.set_override('frontend_port', args.frontend_port, + group='zmq_proxy_opts') + if args.backend_port: + CONF.set_override('backend_port', args.backend_port, + group='zmq_proxy_opts') + if args.publisher_port: + CONF.set_override('publisher_port', args.publisher_port, + group='zmq_proxy_opts') + reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy) try: diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/proxy/__init__.py similarity index 100% rename from oslo_messaging/_drivers/zmq_driver/broker/__init__.py rename to oslo_messaging/_drivers/zmq_driver/proxy/__init__.py diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py similarity index 85% rename from oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py rename to oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py index 77a47d22f..b35a7f9be 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py @@ -13,9 +13,11 @@ # under the License. import logging +import socket from stevedore import driver +from oslo_config import cfg from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LI @@ -23,6 +25,22 @@ zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) +zmq_proxy_opts = [ + cfg.StrOpt('host', default=socket.gethostname(), + help='Hostname (FQDN) of current proxy' + ' an ethernet interface, or IP address.'), + + cfg.IntOpt('frontend_port', default=0, + help='Front-end ROUTER port number. Zero means random.'), + + cfg.IntOpt('backend_port', default=0, + help='Back-end ROUTER port number. Zero means random.'), + + cfg.IntOpt('publisher_port', default=0, + help='Publisher port number. Zero means random.'), +] + + class ZmqProxy(object): """Wrapper class for Publishers and Routers proxies. The main reason to have a proxy is high complexity of TCP sockets number diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py similarity index 83% rename from oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py rename to oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py index 68b9de234..727b41903 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py @@ -14,7 +14,6 @@ import logging -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket @@ -24,7 +23,7 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class PubPublisherProxy(object): +class PublisherProxy(object): """PUB/SUB based request publisher The publisher intended to be used for Fanout and Notify @@ -39,16 +38,20 @@ class PubPublisherProxy(object): """ def __init__(self, conf, matchmaker): - super(PubPublisherProxy, self).__init__() + super(PublisherProxy, self).__init__() self.conf = conf self.zmq_context = zmq.Context() self.matchmaker = matchmaker - self.socket = zmq_socket.ZmqRandomPortSocket( - self.conf, self.zmq_context, zmq.PUB) + port = conf.zmq_proxy_opts.publisher_port - self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, - self.socket.port) + self.socket = zmq_socket.ZmqFixedPortSocket( + self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host, + port) if port != 0 else \ + zmq_socket.ZmqRandomPortSocket( + self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host) + + self.host = self.socket.connect_address def send_request(self, multipart_message): message_type = multipart_message.pop(0) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py similarity index 77% rename from oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py rename to oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py index bd3e61312..2e053f5c9 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py @@ -16,9 +16,7 @@ import logging import six -from oslo_messaging._drivers.zmq_driver.client.publishers \ - import zmq_pub_publisher -from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket @@ -38,27 +36,31 @@ class UniversalQueueProxy(object): self.matchmaker = matchmaker self.poller = zmq_async.get_poller() - self.fe_router_socket = zmq_socket.ZmqRandomPortSocket( - conf, context, zmq.ROUTER) - self.be_router_socket = zmq_socket.ZmqRandomPortSocket( - conf, context, zmq.ROUTER) + port = conf.zmq_proxy_opts.frontend_port + host = conf.zmq_proxy_opts.host + self.fe_router_socket = zmq_socket.ZmqFixedPortSocket( + conf, context, zmq.ROUTER, host, + conf.zmq_proxy_opts.frontend_port) if port != 0 else \ + zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host) + + port = conf.zmq_proxy_opts.backend_port + self.be_router_socket = zmq_socket.ZmqFixedPortSocket( + conf, context, zmq.ROUTER, host, + conf.zmq_proxy_opts.backend_port) if port != 0 else \ + zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host) self.poller.register(self.fe_router_socket.handle, self._receive_in_request) self.poller.register(self.be_router_socket.handle, self._receive_in_request) - self.fe_router_address = zmq_address.combine_address( - self.conf.rpc_zmq_host, self.fe_router_socket.port) - self.be_router_address = zmq_address.combine_address( - self.conf.rpc_zmq_host, self.be_router_socket.port) - - self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( + self.pub_publisher = zmq_publisher_proxy.PublisherProxy( conf, matchmaker) self._router_updater = RouterUpdater( - conf, matchmaker, self.pub_publisher.host, self.fe_router_address, - self.be_router_address) + conf, matchmaker, self.pub_publisher.host, + self.fe_router_socket.connect_address, + self.be_router_socket.connect_address) def run(self): message, socket = self.poller.poll() @@ -102,16 +104,17 @@ class UniversalQueueProxy(object): socket.send(b'', zmq.SNDMORE) socket.send(reply_id, zmq.SNDMORE) socket.send(six.b(str(message_type)), zmq.SNDMORE) - LOG.debug("Dispatching message %s" % message_id) + LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - to %(rkey)s" % + {"msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id, + "rkey": routing_key}) socket.send_multipart(multipart_message) def cleanup(self): self.fe_router_socket.close() self.be_router_socket.close() self.pub_publisher.cleanup() - self.matchmaker.unregister_publisher( - (self.pub_publisher.host, self.fe_router_address)) - self.matchmaker.unregister_router(self.be_router_address) + self._router_updater.cleanup() class RouterUpdater(zmq_updater.UpdaterBase): @@ -138,3 +141,10 @@ class RouterUpdater(zmq_updater.UpdaterBase): expire=self.conf.zmq_target_expire) LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"), {"router": self.be_router_address}) + + def cleanup(self): + super(RouterUpdater, self).cleanup() + self.matchmaker.unregister_publisher( + (self.publisher_address, self.fe_router_address)) + self.matchmaker.unregister_router( + self.be_router_address) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 1179e241b..c50ffe4ed 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -150,23 +150,50 @@ class ZmqSocket(object): self.connect_to_address(address) -class ZmqPortRangeExceededException(exceptions.MessagingException): - """Raised by ZmqRandomPortSocket - wrapping zmq.ZMQBindError""" +class ZmqPortBusy(exceptions.MessagingException): + """Raised when binding to a port failure""" + + def __init__(self, port_number): + super(ZmqPortBusy, self).__init__() + self.port_number = port_number class ZmqRandomPortSocket(ZmqSocket): - def __init__(self, conf, context, socket_type, high_watermark=0): + def __init__(self, conf, context, socket_type, host=None, + high_watermark=0): super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type, high_watermark) self.bind_address = zmq_address.get_tcp_random_address(self.conf) - + if host is None: + host = conf.rpc_zmq_host try: self.port = self.handle.bind_to_random_port( self.bind_address, min_port=conf.rpc_zmq_min_port, max_port=conf.rpc_zmq_max_port, max_tries=conf.rpc_zmq_bind_port_retries) + self.connect_address = zmq_address.combine_address(host, self.port) except zmq.ZMQBindError: LOG.error(_LE("Random ports range exceeded!")) - raise ZmqPortRangeExceededException() + raise ZmqPortBusy(port_number=0) + + +class ZmqFixedPortSocket(ZmqSocket): + + def __init__(self, conf, context, socket_type, host, port, + high_watermark=0): + super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type, + high_watermark) + self.connect_address = zmq_address.combine_address(host, port) + self.bind_address = zmq_address.get_tcp_direct_address( + zmq_address.combine_address(conf.rpc_zmq_bind_address, port)) + self.host = host + self.port = port + + try: + self.handle.bind(self.bind_address) + except zmq.ZMQError as e: + LOG.exception(e) + LOG.error(_LE("Chosen port %d is being busy.") % self.port) + raise ZmqPortBusy(port_number=port) diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index f9f6f5265..76b61cf1c 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -45,7 +45,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): target = oslo_messaging.Target(topic='testtopic_' + str(i)) new_listener = self.driver.listen(target, None, None) listeners.append(new_listener) - except zmq_socket.ZmqPortRangeExceededException: + except zmq_socket.ZmqPortBusy: pass self.assertLessEqual(len(listeners), 5) diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 50e9d1b6e..02519def9 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -13,15 +13,17 @@ # under the License. import json -import msgpack import time +import msgpack import six import testscenarios +from oslo_config import cfg + import oslo_messaging -from oslo_messaging._drivers.zmq_driver.client.publishers \ - import zmq_pub_publisher +from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy +from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -31,6 +33,10 @@ load_tests = testscenarios.load_tests_apply_scenarios zmq = zmq_async.import_zmq() +opt_group = cfg.OptGroup(name='zmq_proxy_opts', + title='ZeroMQ proxy options') +cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group) + class TestPubSub(zmq_common.ZmqBaseTestCase): @@ -50,7 +56,10 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): 'rpc_zmq_serialization': self.serialization} self.config(**kwargs) - self.publisher = zmq_pub_publisher.PubPublisherProxy( + self.config(host="127.0.0.1", group="zmq_proxy_opts") + self.config(publisher_port="0", group="zmq_proxy_opts") + + self.publisher = zmq_publisher_proxy.PublisherProxy( self.conf, self.driver.matchmaker) self.driver.matchmaker.register_publisher( (self.publisher.host, "")) @@ -59,6 +68,12 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): for i in range(self.LISTENERS_COUNT): self.listeners.append(zmq_common.TestServerListener(self.driver)) + def tearDown(self): + super(TestPubSub, self).tearDown() + self.publisher.cleanup() + for listener in self.listeners: + listener.stop() + def _send_request(self, target): # Needed only in test env to give listener a chance to connect # before request fires diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh index e40dbb3cc..ebce12ca7 100755 --- a/setup-test-env-zmq-proxy.sh +++ b/setup-test-env-zmq-proxy.sh @@ -13,6 +13,8 @@ export ZMQ_IPC_DIR=${DATADIR} export ZMQ_USE_PUB_SUB=false export ZMQ_USE_ROUTER_PROXY=true +export ZMQ_PROXY_HOST=127.0.0.1 + cat > ${DATADIR}/zmq.conf <<EOF [DEFAULT] transport_url=${TRANSPORT_URL} @@ -22,6 +24,9 @@ use_pub_sub=${ZMQ_USE_PUB_SUB} use_router_proxy=${ZMQ_USE_ROUTER_PROXY} [matchmaker_redis] port=${ZMQ_REDIS_PORT} + +[zmq_proxy_opts] +host=${ZMQ_PROXY_HOST} EOF redis-server --port $ZMQ_REDIS_PORT &