Merge "[zmq] Restore static direct connections"
This commit is contained in:
commit
d265b49ac3
@ -64,3 +64,7 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
|
||||
reply.failure, request.allowed_remote_exmods)
|
||||
else:
|
||||
return reply.reply_body
|
||||
|
||||
def cleanup(self):
|
||||
super(DealerPublisherBase, self).cleanup()
|
||||
self.sockets_manager.cleanup()
|
||||
|
@ -12,19 +12,26 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
"""DEALER-publisher using direct connections.
|
||||
"""DEALER-publisher using direct dynamic connections.
|
||||
|
||||
Publishing directly to remote services assumes the following:
|
||||
- All direct connections are dynamic - so they live per message,
|
||||
@ -86,3 +93,42 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
def cleanup(self):
|
||||
self.routing_table.cleanup()
|
||||
super(DealerPublisherDirect, self).cleanup()
|
||||
|
||||
|
||||
class DealerPublisherDirectStatic(DealerPublisherDirect):
|
||||
"""DEALER-publisher using direct static connections.
|
||||
|
||||
For some reason direct static connections may be also useful.
|
||||
Assume a case when some agents are not connected with control services
|
||||
over RPC (Ironic or Cinder+Ceph), and RPC is used only between controllers.
|
||||
In this case number of RPC connections doesn't matter (very small) so we
|
||||
can use static connections without fear and have all performance benefits
|
||||
from it.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
super(DealerPublisherDirectStatic, self).__init__(conf, matchmaker)
|
||||
self.fanout_sockets = zmq_sockets_manager.SocketsManager(
|
||||
conf, matchmaker, zmq.DEALER)
|
||||
|
||||
def acquire_connection(self, request):
|
||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
hosts = self.routing_table.get_fanout_hosts(request.target)
|
||||
target_key = zmq_address.prefix_str(
|
||||
request.target.topic,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
return self.fanout_sockets.get_cached_socket(target_key, hosts,
|
||||
immediate=False)
|
||||
else:
|
||||
hosts = self.routing_table.get_all_round_robin_hosts(
|
||||
request.target)
|
||||
target_key = zmq_address.target_to_key(
|
||||
request.target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
return self.sockets_manager.get_cached_socket(target_key, hosts)
|
||||
|
||||
def _finally_unregister(self, socket, request):
|
||||
self.receiver.untrack_request(request)
|
||||
|
||||
def cleanup(self):
|
||||
self.fanout_sockets.cleanup()
|
||||
super(DealerPublisherDirectStatic, self).cleanup()
|
||||
|
@ -76,7 +76,6 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
def cleanup(self):
|
||||
self.connection_updater.stop()
|
||||
self.routing_table.cleanup()
|
||||
self.socket.close()
|
||||
super(DealerPublisherProxy, self).cleanup()
|
||||
|
||||
|
||||
|
@ -84,8 +84,7 @@ class PublisherBase(object):
|
||||
def _raise_timeout(request):
|
||||
raise oslo_messaging.MessagingTimeout(
|
||||
"Timeout %(tout)s seconds was reached for message %(msg_id)s" %
|
||||
{"tout": request.timeout, "msg_id": request.message_id}
|
||||
)
|
||||
{"tout": request.timeout, "msg_id": request.message_id})
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup publisher: stop receiving responses, close allocated
|
||||
|
@ -66,10 +66,14 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
|
||||
conf.oslo_messaging_zmq.use_router_proxy:
|
||||
raise WrongClientException()
|
||||
|
||||
publisher = self._create_publisher_direct_dynamic(conf, matchmaker) \
|
||||
if conf.oslo_messaging_zmq.use_dynamic_connections else \
|
||||
self._create_publisher_direct(conf, matchmaker)
|
||||
|
||||
super(ZmqClientDirect, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
publishers={
|
||||
"default": self._create_publisher_direct(conf, matchmaker)
|
||||
"default": publisher
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_direct
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
@ -71,8 +72,15 @@ class ZmqClientBase(object):
|
||||
|
||||
@staticmethod
|
||||
def _create_publisher_direct(conf, matchmaker):
|
||||
publisher_direct = \
|
||||
zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
|
||||
publisher_cls = zmq_dealer_publisher_direct.DealerPublisherDirectStatic
|
||||
publisher_direct = publisher_cls(conf, matchmaker)
|
||||
publisher_manager_cls = zmq_publisher_manager.PublisherManagerStatic
|
||||
return publisher_manager_cls(publisher_direct)
|
||||
|
||||
@staticmethod
|
||||
def _create_publisher_direct_dynamic(conf, matchmaker):
|
||||
publisher_cls = zmq_dealer_publisher_direct.DealerPublisherDirect
|
||||
publisher_direct = publisher_cls(conf, matchmaker)
|
||||
publisher_manager_cls = zmq_publisher_manager.PublisherManagerDynamic \
|
||||
if conf.oslo_messaging_zmq.use_pub_sub else \
|
||||
zmq_publisher_manager.PublisherManagerDynamicAsyncMultisend
|
||||
|
@ -42,30 +42,34 @@ class RoutingTableAdaptor(object):
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def get_round_robin_host(self, target):
|
||||
target_key = self._fetch_round_robin_hosts_from_matchmaker(target)
|
||||
rr_gen = self.round_robin_targets[target_key]
|
||||
host = next(rr_gen)
|
||||
LOG.debug("Host resolved for the current connection is %s" % host)
|
||||
return host
|
||||
|
||||
def get_all_round_robin_hosts(self, target):
|
||||
target_key = self._fetch_round_robin_hosts_from_matchmaker(target)
|
||||
return self.routing_table.get_hosts_fanout(target_key)
|
||||
|
||||
def _fetch_round_robin_hosts_from_matchmaker(self, target):
|
||||
target_key = zmq_address.target_to_key(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
|
||||
LOG.debug("Processing target %s for round-robin." % target_key)
|
||||
|
||||
if target_key not in self.round_robin_targets:
|
||||
self._fetch_round_robin_hosts_from_matchmaker(target, target_key)
|
||||
|
||||
rr_gen = self.round_robin_targets[target_key]
|
||||
host = next(rr_gen)
|
||||
LOG.debug("Host resolved for the current connection is %s" % host)
|
||||
return host
|
||||
|
||||
def _fetch_round_robin_hosts_from_matchmaker(self, target, target_key):
|
||||
with self._lock:
|
||||
if target_key not in self.round_robin_targets:
|
||||
LOG.debug("Target %s is not in cache. Check matchmaker server."
|
||||
% target_key)
|
||||
hosts = self.matchmaker.get_hosts_retry(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
LOG.debug("Received hosts %s" % hosts)
|
||||
self.routing_table.update_hosts(target_key, hosts)
|
||||
self.round_robin_targets[target_key] = \
|
||||
self.routing_table.get_hosts_round_robin(target_key)
|
||||
with self._lock:
|
||||
if target_key not in self.round_robin_targets:
|
||||
LOG.debug("Target %s is not in cache. Check matchmaker "
|
||||
"server." % target_key)
|
||||
hosts = self.matchmaker.get_hosts_retry(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
LOG.debug("Received hosts %s" % hosts)
|
||||
self.routing_table.update_hosts(target_key, hosts)
|
||||
self.round_robin_targets[target_key] = \
|
||||
self.routing_table.get_hosts_round_robin(target_key)
|
||||
return target_key
|
||||
|
||||
def get_fanout_hosts(self, target):
|
||||
target_key = zmq_address.prefix_str(
|
||||
|
@ -12,11 +12,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SocketsManager(object):
|
||||
|
||||
@ -27,10 +31,25 @@ class SocketsManager(object):
|
||||
self.zmq_context = zmq.Context()
|
||||
self.socket_to_publishers = None
|
||||
self.socket_to_routers = None
|
||||
self.sockets = {}
|
||||
|
||||
def get_socket(self):
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type, immediate=False)
|
||||
return zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type, immediate=False)
|
||||
|
||||
def get_cached_socket(self, target_key, hosts=None, immediate=True):
|
||||
hosts = [] if hosts is None else hosts
|
||||
socket = self.sockets.get(target_key, None)
|
||||
if socket is None:
|
||||
LOG.debug("CREATING NEW socket for target_key %s " % target_key)
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type,
|
||||
immediate=immediate)
|
||||
self.sockets[target_key] = socket
|
||||
for host in hosts:
|
||||
socket.connect_to_host(host)
|
||||
LOG.debug("Target key: %s socket:%s" % (target_key,
|
||||
socket.handle.identity))
|
||||
return socket
|
||||
|
||||
def get_socket_to_publishers(self, identity=None):
|
||||
@ -56,3 +75,11 @@ class SocketsManager(object):
|
||||
for be_router_address in routers:
|
||||
self.socket_to_routers.connect_to_host(be_router_address)
|
||||
return self.socket_to_routers
|
||||
|
||||
def cleanup(self):
|
||||
if self.socket_to_publishers:
|
||||
self.socket_to_publishers.close()
|
||||
if self.socket_to_routers:
|
||||
self.socket_to_routers.close()
|
||||
for socket in self.sockets.values():
|
||||
socket.close()
|
||||
|
@ -60,7 +60,6 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
try:
|
||||
socket = self.sockets_manager.get_socket_to_routers(
|
||||
self._generate_identity())
|
||||
self.sockets.append(socket)
|
||||
self.host = socket.handle.identity
|
||||
self.poller.register(socket, self.receive_request)
|
||||
return socket
|
||||
|
@ -87,10 +87,16 @@ zmq_opts = [
|
||||
help='Use PUB/SUB pattern for fanout methods. '
|
||||
'PUB/SUB always uses proxy.'),
|
||||
|
||||
cfg.BoolOpt('use_router_proxy', default=True,
|
||||
cfg.BoolOpt('use_router_proxy', default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Use ROUTER remote proxy.'),
|
||||
|
||||
cfg.BoolOpt('use_dynamic_connections', default=False,
|
||||
help='This option makes direct connections dynamic or static. '
|
||||
'It makes sense only with use_router_proxy=False which '
|
||||
'means to use direct connections for direct message '
|
||||
'types (ignored otherwise).'),
|
||||
|
||||
cfg.PortOpt('rpc_zmq_min_port',
|
||||
default=49153,
|
||||
deprecated_group='DEFAULT',
|
||||
|
@ -56,7 +56,7 @@ class ZmqSocket(object):
|
||||
# Put messages to only connected queues
|
||||
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
|
||||
|
||||
# Configure TCP KA
|
||||
# Configure TCP keep alive
|
||||
keepalive = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive
|
||||
if keepalive < 0:
|
||||
keepalive = -1
|
||||
@ -193,8 +193,7 @@ class ZmqSocket(object):
|
||||
{"stype": stype, "sid": sid, "address": address, "e": e})
|
||||
raise rpc_common.RPCException(
|
||||
"Failed connecting %(stype)s-%(sid)s to %(address)s: %(e)s" %
|
||||
{"stype": stype, "sid": sid, "address": address, "e": e}
|
||||
)
|
||||
{"stype": stype, "sid": sid, "address": address, "e": e})
|
||||
|
||||
def connect_to_host(self, host):
|
||||
address = zmq_address.get_tcp_direct_address(
|
||||
|
@ -317,6 +317,10 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
||||
zmq_use_acks = os.environ.get('ZMQ_USE_ACKS')
|
||||
self.config(rpc_use_acks=zmq_use_acks,
|
||||
group='oslo_messaging_zmq')
|
||||
zmq_use_dynamic_connections = \
|
||||
os.environ.get('ZMQ_USE_DYNAMIC_CONNECTIONS')
|
||||
self.config(use_dynamic_connections=zmq_use_dynamic_connections,
|
||||
group='oslo_messaging_zmq')
|
||||
|
||||
|
||||
class NotificationFixture(fixtures.Fixture):
|
||||
|
32
setup-test-env-zmq-direct-static.sh
Executable file
32
setup-test-env-zmq-direct-static.sh
Executable file
@ -0,0 +1,32 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
. tools/functions.sh
|
||||
|
||||
DATADIR=$(mktemp -d /tmp/OSLOMSG-ZEROMQ.XXXXX)
|
||||
trap "clean_exit $DATADIR" EXIT
|
||||
|
||||
export ZMQ_MATCHMAKER=redis
|
||||
export ZMQ_REDIS_PORT=65123
|
||||
export ZMQ_IPC_DIR=${DATADIR}
|
||||
export ZMQ_USE_PUB_SUB=false
|
||||
export ZMQ_USE_ROUTER_PROXY=false
|
||||
export ZMQ_USE_DYNAMIC_CONNECTIONS=false
|
||||
export ZMQ_USE_ACKS=false
|
||||
export TRANSPORT_URL="zmq+${ZMQ_MATCHMAKER}://127.0.0.1:${ZMQ_REDIS_PORT}"
|
||||
|
||||
cat > ${DATADIR}/zmq.conf <<EOF
|
||||
[DEFAULT]
|
||||
transport_url=${TRANSPORT_URL}
|
||||
[oslo_messaging_zmq]
|
||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||
use_pub_sub=${ZMQ_USE_PUB_SUB}
|
||||
use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
|
||||
use_dynamic_connections=${ZMQ_USE_DYNAMIC_CONNECTIONS}
|
||||
EOF
|
||||
|
||||
redis-server --port $ZMQ_REDIS_PORT &
|
||||
|
||||
oslo-messaging-zmq-proxy --debug --url ${TRANSPORT_URL} --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
|
||||
|
||||
$*
|
@ -12,6 +12,7 @@ export ZMQ_IPC_DIR=${DATADIR}
|
||||
export ZMQ_USE_PUB_SUB=false
|
||||
export ZMQ_USE_ROUTER_PROXY=false
|
||||
export ZMQ_USE_ACKS=false
|
||||
export ZMQ_USE_DYNAMIC_CONNECTIONS=true
|
||||
export TRANSPORT_URL="zmq+${ZMQ_MATCHMAKER}://127.0.0.1:${ZMQ_REDIS_PORT}"
|
||||
|
||||
cat > ${DATADIR}/zmq.conf <<EOF
|
||||
@ -21,6 +22,7 @@ transport_url=${TRANSPORT_URL}
|
||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||
use_pub_sub=${ZMQ_USE_PUB_SUB}
|
||||
use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
|
||||
use_dynamic_connections=${ZMQ_USE_DYNAMIC_CONNECTIONS}
|
||||
EOF
|
||||
|
||||
redis-server --port $ZMQ_REDIS_PORT &
|
||||
|
3
tox.ini
3
tox.ini
@ -96,6 +96,9 @@ commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --t
|
||||
basepython = python3.4
|
||||
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
|
||||
|
||||
[testenv:py27-func-zeromq-direct-static]
|
||||
commands = {toxinidir}/setup-test-env-zmq-direct-static.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
|
||||
|
||||
[testenv:py27-func-zeromq-proxy]
|
||||
commands = {toxinidir}/setup-test-env-zmq-proxy.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user