From fe6cf0f8a20625fc777f7c8ab131e44e2006d225 Mon Sep 17 00:00:00 2001 From: ozamiatin Date: Sat, 24 Dec 2016 00:53:48 +0200 Subject: [PATCH] [zmq] Restore static direct connections Restore static direct connections which may be useful for services running RPC on controllers only and talking to agents via other means (ssh for example). Change-Id: Icbe45978cb4a8ba5db74e5593ea7be23cba3f44e --- .../dealer/zmq_dealer_publisher_base.py | 4 ++ .../dealer/zmq_dealer_publisher_direct.py | 48 ++++++++++++++++++- .../dealer/zmq_dealer_publisher_proxy.py | 1 - .../client/publishers/zmq_publisher_base.py | 3 +- .../_drivers/zmq_driver/client/zmq_client.py | 6 ++- .../zmq_driver/client/zmq_client_base.py | 12 ++++- .../zmq_driver/client/zmq_routing_table.py | 40 +++++++++------- .../zmq_driver/client/zmq_sockets_manager.py | 31 +++++++++++- .../server/consumers/zmq_dealer_consumer.py | 1 - .../_drivers/zmq_driver/zmq_options.py | 8 +++- .../_drivers/zmq_driver/zmq_socket.py | 5 +- oslo_messaging/tests/functional/utils.py | 4 ++ setup-test-env-zmq-direct-static.sh | 32 +++++++++++++ setup-test-env-zmq.sh | 2 + tox.ini | 3 ++ 15 files changed, 168 insertions(+), 32 deletions(-) create mode 100755 setup-test-env-zmq-direct-static.sh diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py index 1493aeb1e..c5456e702 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py @@ -64,3 +64,7 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase): reply.failure, request.allowed_remote_exmods) else: return reply.reply_body + + def cleanup(self): + super(DealerPublisherBase, self).cleanup() + self.sockets_manager.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index df0f22db7..ccecda631 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -12,19 +12,26 @@ # License for the specific language governing permissions and limitations # under the License. +import logging + from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_base from oslo_messaging._drivers.zmq_driver.client import zmq_receivers from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table from oslo_messaging._drivers.zmq_driver.client import zmq_senders +from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names + +LOG = logging.getLogger(__name__) + zmq = zmq_async.import_zmq() class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): - """DEALER-publisher using direct connections. + """DEALER-publisher using direct dynamic connections. Publishing directly to remote services assumes the following: - All direct connections are dynamic - so they live per message, @@ -86,3 +93,42 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): def cleanup(self): self.routing_table.cleanup() super(DealerPublisherDirect, self).cleanup() + + +class DealerPublisherDirectStatic(DealerPublisherDirect): + """DEALER-publisher using direct static connections. + + For some reason direct static connections may be also useful. + Assume a case when some agents are not connected with control services + over RPC (Ironic or Cinder+Ceph), and RPC is used only between controllers. + In this case number of RPC connections doesn't matter (very small) so we + can use static connections without fear and have all performance benefits + from it. + """ + + def __init__(self, conf, matchmaker): + super(DealerPublisherDirectStatic, self).__init__(conf, matchmaker) + self.fanout_sockets = zmq_sockets_manager.SocketsManager( + conf, matchmaker, zmq.DEALER) + + def acquire_connection(self, request): + if request.msg_type in zmq_names.MULTISEND_TYPES: + hosts = self.routing_table.get_fanout_hosts(request.target) + target_key = zmq_address.prefix_str( + request.target.topic, + zmq_names.socket_type_str(zmq.ROUTER)) + return self.fanout_sockets.get_cached_socket(target_key, hosts, + immediate=False) + else: + hosts = self.routing_table.get_all_round_robin_hosts( + request.target) + target_key = zmq_address.target_to_key( + request.target, zmq_names.socket_type_str(zmq.ROUTER)) + return self.sockets_manager.get_cached_socket(target_key, hosts) + + def _finally_unregister(self, socket, request): + self.receiver.untrack_request(request) + + def cleanup(self): + self.fanout_sockets.cleanup() + super(DealerPublisherDirectStatic, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 63a3d4996..8b07fcbb9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -76,7 +76,6 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): def cleanup(self): self.connection_updater.stop() self.routing_table.cleanup() - self.socket.close() super(DealerPublisherProxy, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 09bec6f42..90d296705 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -84,8 +84,7 @@ class PublisherBase(object): def _raise_timeout(request): raise oslo_messaging.MessagingTimeout( "Timeout %(tout)s seconds was reached for message %(msg_id)s" % - {"tout": request.timeout, "msg_id": request.message_id} - ) + {"tout": request.timeout, "msg_id": request.message_id}) def cleanup(self): """Cleanup publisher: stop receiving responses, close allocated diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 84ad3e786..6665eef7f 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -66,10 +66,14 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase): conf.oslo_messaging_zmq.use_router_proxy: raise WrongClientException() + publisher = self._create_publisher_direct_dynamic(conf, matchmaker) \ + if conf.oslo_messaging_zmq.use_dynamic_connections else \ + self._create_publisher_direct(conf, matchmaker) + super(ZmqClientDirect, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ - "default": self._create_publisher_direct(conf, matchmaker) + "default": publisher } ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py index 445530b48..261ded919 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. + from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_direct from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ @@ -71,8 +72,15 @@ class ZmqClientBase(object): @staticmethod def _create_publisher_direct(conf, matchmaker): - publisher_direct = \ - zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker) + publisher_cls = zmq_dealer_publisher_direct.DealerPublisherDirectStatic + publisher_direct = publisher_cls(conf, matchmaker) + publisher_manager_cls = zmq_publisher_manager.PublisherManagerStatic + return publisher_manager_cls(publisher_direct) + + @staticmethod + def _create_publisher_direct_dynamic(conf, matchmaker): + publisher_cls = zmq_dealer_publisher_direct.DealerPublisherDirect + publisher_direct = publisher_cls(conf, matchmaker) publisher_manager_cls = zmq_publisher_manager.PublisherManagerDynamic \ if conf.oslo_messaging_zmq.use_pub_sub else \ zmq_publisher_manager.PublisherManagerDynamicAsyncMultisend diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py index 346edf813..03979a5a9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -42,30 +42,34 @@ class RoutingTableAdaptor(object): self._lock = threading.Lock() def get_round_robin_host(self, target): + target_key = self._fetch_round_robin_hosts_from_matchmaker(target) + rr_gen = self.round_robin_targets[target_key] + host = next(rr_gen) + LOG.debug("Host resolved for the current connection is %s" % host) + return host + + def get_all_round_robin_hosts(self, target): + target_key = self._fetch_round_robin_hosts_from_matchmaker(target) + return self.routing_table.get_hosts_fanout(target_key) + + def _fetch_round_robin_hosts_from_matchmaker(self, target): target_key = zmq_address.target_to_key( target, zmq_names.socket_type_str(self.listener_type)) LOG.debug("Processing target %s for round-robin." % target_key) if target_key not in self.round_robin_targets: - self._fetch_round_robin_hosts_from_matchmaker(target, target_key) - - rr_gen = self.round_robin_targets[target_key] - host = next(rr_gen) - LOG.debug("Host resolved for the current connection is %s" % host) - return host - - def _fetch_round_robin_hosts_from_matchmaker(self, target, target_key): - with self._lock: - if target_key not in self.round_robin_targets: - LOG.debug("Target %s is not in cache. Check matchmaker server." - % target_key) - hosts = self.matchmaker.get_hosts_retry( - target, zmq_names.socket_type_str(self.listener_type)) - LOG.debug("Received hosts %s" % hosts) - self.routing_table.update_hosts(target_key, hosts) - self.round_robin_targets[target_key] = \ - self.routing_table.get_hosts_round_robin(target_key) + with self._lock: + if target_key not in self.round_robin_targets: + LOG.debug("Target %s is not in cache. Check matchmaker " + "server." % target_key) + hosts = self.matchmaker.get_hosts_retry( + target, zmq_names.socket_type_str(self.listener_type)) + LOG.debug("Received hosts %s" % hosts) + self.routing_table.update_hosts(target_key, hosts) + self.round_robin_targets[target_key] = \ + self.routing_table.get_hosts_round_robin(target_key) + return target_key def get_fanout_hosts(self, target): target_key = zmq_address.prefix_str( diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index 0cf6ec0b0..b94fc6fb2 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -12,11 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. +import logging + from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_socket zmq = zmq_async.import_zmq() +LOG = logging.getLogger(__name__) + class SocketsManager(object): @@ -27,10 +31,25 @@ class SocketsManager(object): self.zmq_context = zmq.Context() self.socket_to_publishers = None self.socket_to_routers = None + self.sockets = {} def get_socket(self): - socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type, immediate=False) + return zmq_socket.ZmqSocket(self.conf, self.zmq_context, + self.socket_type, immediate=False) + + def get_cached_socket(self, target_key, hosts=None, immediate=True): + hosts = [] if hosts is None else hosts + socket = self.sockets.get(target_key, None) + if socket is None: + LOG.debug("CREATING NEW socket for target_key %s " % target_key) + socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, + self.socket_type, + immediate=immediate) + self.sockets[target_key] = socket + for host in hosts: + socket.connect_to_host(host) + LOG.debug("Target key: %s socket:%s" % (target_key, + socket.handle.identity)) return socket def get_socket_to_publishers(self, identity=None): @@ -56,3 +75,11 @@ class SocketsManager(object): for be_router_address in routers: self.socket_to_routers.connect_to_host(be_router_address) return self.socket_to_routers + + def cleanup(self): + if self.socket_to_publishers: + self.socket_to_publishers.close() + if self.socket_to_routers: + self.socket_to_routers.close() + for socket in self.sockets.values(): + socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 52fea538e..7b77a62c5 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -60,7 +60,6 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): try: socket = self.sockets_manager.get_socket_to_routers( self._generate_identity()) - self.sockets.append(socket) self.host = socket.handle.identity self.poller.register(socket, self.receive_request) return socket diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index 7fe7afabe..98dd65a40 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -87,10 +87,16 @@ zmq_opts = [ help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), - cfg.BoolOpt('use_router_proxy', default=True, + cfg.BoolOpt('use_router_proxy', default=False, deprecated_group='DEFAULT', help='Use ROUTER remote proxy.'), + cfg.BoolOpt('use_dynamic_connections', default=False, + help='This option makes direct connections dynamic or static. ' + 'It makes sense only with use_router_proxy=False which ' + 'means to use direct connections for direct message ' + 'types (ignored otherwise).'), + cfg.PortOpt('rpc_zmq_min_port', default=49153, deprecated_group='DEFAULT', diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 6a6a4c38b..e352d8fa0 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -56,7 +56,7 @@ class ZmqSocket(object): # Put messages to only connected queues self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0) - # Configure TCP KA + # Configure TCP keep alive keepalive = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive if keepalive < 0: keepalive = -1 @@ -193,8 +193,7 @@ class ZmqSocket(object): {"stype": stype, "sid": sid, "address": address, "e": e}) raise rpc_common.RPCException( "Failed connecting %(stype)s-%(sid)s to %(address)s: %(e)s" % - {"stype": stype, "sid": sid, "address": address, "e": e} - ) + {"stype": stype, "sid": sid, "address": address, "e": e}) def connect_to_host(self, host): address = zmq_address.get_tcp_direct_address( diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 7956cb10e..ebdc72533 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -317,6 +317,10 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): zmq_use_acks = os.environ.get('ZMQ_USE_ACKS') self.config(rpc_use_acks=zmq_use_acks, group='oslo_messaging_zmq') + zmq_use_dynamic_connections = \ + os.environ.get('ZMQ_USE_DYNAMIC_CONNECTIONS') + self.config(use_dynamic_connections=zmq_use_dynamic_connections, + group='oslo_messaging_zmq') class NotificationFixture(fixtures.Fixture): diff --git a/setup-test-env-zmq-direct-static.sh b/setup-test-env-zmq-direct-static.sh new file mode 100755 index 000000000..32dd229b0 --- /dev/null +++ b/setup-test-env-zmq-direct-static.sh @@ -0,0 +1,32 @@ +#!/bin/bash +set -e + +. tools/functions.sh + +DATADIR=$(mktemp -d /tmp/OSLOMSG-ZEROMQ.XXXXX) +trap "clean_exit $DATADIR" EXIT + +export ZMQ_MATCHMAKER=redis +export ZMQ_REDIS_PORT=65123 +export ZMQ_IPC_DIR=${DATADIR} +export ZMQ_USE_PUB_SUB=false +export ZMQ_USE_ROUTER_PROXY=false +export ZMQ_USE_DYNAMIC_CONNECTIONS=false +export ZMQ_USE_ACKS=false +export TRANSPORT_URL="zmq+${ZMQ_MATCHMAKER}://127.0.0.1:${ZMQ_REDIS_PORT}" + +cat > ${DATADIR}/zmq.conf < ${DATADIR}/zmq-proxy.log 2>&1 & + +$* diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index e6b219d5f..8fd7e11cf 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -12,6 +12,7 @@ export ZMQ_IPC_DIR=${DATADIR} export ZMQ_USE_PUB_SUB=false export ZMQ_USE_ROUTER_PROXY=false export ZMQ_USE_ACKS=false +export ZMQ_USE_DYNAMIC_CONNECTIONS=true export TRANSPORT_URL="zmq+${ZMQ_MATCHMAKER}://127.0.0.1:${ZMQ_REDIS_PORT}" cat > ${DATADIR}/zmq.conf <