[zmq] Reduce proxy for direct messaging

Since the change I643a111fca8bac32f41ced232d54ff2a2ebcbf77
we don't need proxy for direct types because any message
woud be sent before server listening to the target
appears on name server registry, so DEALER wouldn't block.

Change-Id: I3c0f3e6930a4092cac5a6e18529d98e6d6e65f32
This commit is contained in:
Oleksii Zamiatin 2016-01-25 15:00:38 +02:00
parent a053593c13
commit 08dd23d1d4
13 changed files with 18 additions and 196 deletions

View File

@ -138,28 +138,24 @@ stored in Redis is that the key is a base topic and the corresponding values are
hostname arrays to be sent to. hostname arrays to be sent to.
Proxy to avoid blocking (optional) Proxy for fanout publishing
---------------------------------- ---------------------------
Each machine running OpenStack services, or sending RPC messages, may run the Each machine running OpenStack services, or sending RPC messages, should run
'oslo-messaging-zmq-broker' daemon. This is needed to avoid blocking the 'oslo-messaging-zmq-broker' daemon.
if a listener (server) appears after the sender (client).
Fanout-based patterns like CAST+Fanout and notifications always use proxy Fanout-based patterns like CAST+Fanout and notifications always use proxy
as they act over PUB/SUB, 'use_pub_sub' - defaults to True. If not using as they act over PUB/SUB, 'use_pub_sub' - defaults to True. If not using
PUB/SUB (use_pub_sub = False) then fanout will be emulated over direct PUB/SUB (use_pub_sub = False) then fanout will be emulated over direct
DEALER/ROUTER unicast which is possible but less efficient and therefore DEALER/ROUTER unicast which is possible but less efficient and therefore
is not recommended. is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not
needed.
Running direct RPC methods like CALL and CAST over a proxy is controlled by This option can be set in [DEFAULT] section.
the option 'direct_over_proxy' which is True by default.
These options can be set in [DEFAULT] section.
For example:: For example::
use_pub_sub = True use_pub_sub = True
direct_over_proxy = False
In case of using the broker all publishers (clients) talk to servers over In case of using the broker all publishers (clients) talk to servers over

View File

@ -72,11 +72,7 @@ zmq_opts = [
help='Expiration timeout in seconds of a name service record ' help='Expiration timeout in seconds of a name service record '
'about existing target ( < 0 means no timeout).'), 'about existing target ( < 0 means no timeout).'),
cfg.BoolOpt('direct_over_proxy', default=False, cfg.BoolOpt('use_pub_sub', default=False,
help='Configures zmq-messaging to use proxy with '
'non PUB/SUB patterns.'),
cfg.BoolOpt('use_pub_sub', default=True,
help='Use PUB/SUB pattern for fanout methods. ' help='Use PUB/SUB pattern for fanout methods. '
'PUB/SUB always uses proxy.'), 'PUB/SUB always uses proxy.'),

View File

@ -15,14 +15,12 @@
import logging import logging
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers \ from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_pub_publisher import zmq_pub_publisher
from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI from oslo_messaging._i18n import _LE, _LI
zmq = zmq_async.import_zmq(zmq_concurrency='native') zmq = zmq_async.import_zmq(zmq_concurrency='native')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -41,9 +39,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
LOG.info(_LI("Polling at universal proxy")) LOG.info(_LI("Polling at universal proxy"))
self.matchmaker = matchmaker self.matchmaker = matchmaker
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
self.direct_publisher = zmq_dealer_publisher_proxy \
.DealerPublisherProxy(conf, matchmaker, reply_receiver)
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker) conf, matchmaker)
@ -54,8 +49,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
if socket == self.router_socket: if socket == self.router_socket:
self._redirect_in_request(message) self._redirect_in_request(message)
else:
self._redirect_reply(message)
def _redirect_in_request(self, multipart_message): def _redirect_in_request(self, multipart_message):
LOG.debug("-> Redirecting request %s to TCP publisher", LOG.debug("-> Redirecting request %s to TCP publisher",
@ -65,19 +58,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
envelope[zmq_names.FIELD_MSG_TYPE] \ envelope[zmq_names.FIELD_MSG_TYPE] \
in zmq_names.MULTISEND_TYPES: in zmq_names.MULTISEND_TYPES:
self.pub_publisher.send_request(multipart_message) self.pub_publisher.send_request(multipart_message)
else:
self.direct_publisher.send_request(multipart_message)
def _redirect_reply(self, reply):
LOG.debug("Reply proxy %s", reply)
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
LOG.debug("Acknowledge dropped %s", reply)
return
LOG.debug("<- Redirecting reply to ROUTER: reply: %s",
reply[zmq_names.IDX_REPLY_BODY:])
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
def _receive_in_request(self, socket): def _receive_in_request(self, socket):
reply_id = socket.recv() reply_id = socket.recv()
@ -85,8 +65,9 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
empty = socket.recv() empty = socket.recv()
assert empty == b'', "Empty delimiter expected" assert empty == b'', "Empty delimiter expected"
envelope = socket.recv_pyobj() envelope = socket.recv_pyobj()
if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE: if envelope[zmq_names.FIELD_MSG_TYPE] not in zmq_names.MULTISEND_TYPES:
envelope[zmq_names.FIELD_REPLY_ID] = reply_id LOG.error(_LE("Message type %s is not supported by proxy"),
envelope[zmq_names.FIELD_MSG_TYPE])
payload = socket.recv_multipart() payload = socket.recv_multipart()
payload.insert(0, envelope) payload.insert(0, envelope)
return payload return payload

View File

@ -43,9 +43,7 @@ class DealerCallPublisher(object):
self.conf = conf self.conf = conf
self.matchmaker = matchmaker self.matchmaker = matchmaker
self.reply_waiter = ReplyWaiter(conf) self.reply_waiter = ReplyWaiter(conf)
self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \ self.sender = RequestSender(conf, matchmaker, self.reply_waiter)
if not conf.direct_over_proxy else \
RequestSenderLight(conf, matchmaker, self.reply_waiter)
def send_request(self, request): def send_request(self, request):
reply_future = self.sender.send_request(request) reply_future = self.sender.send_request(request)
@ -113,39 +111,6 @@ class RequestSender(zmq_publisher_base.PublisherBase):
super(RequestSender, self).cleanup() super(RequestSender, self).cleanup()
class RequestSenderLight(RequestSender):
"""This class used with proxy.
Simplified address matching because there is only
one proxy IPC address.
"""
def __init__(self, conf, matchmaker, reply_waiter):
if not conf.direct_over_proxy:
raise rpc_common.RPCException("RequestSenderLight needs a proxy!")
super(RequestSenderLight, self).__init__(
conf, matchmaker, reply_waiter)
self.socket = None
def _connect_socket(self, target):
return self.outbound_sockets.get_socket_to_broker(target)
def _do_send_request(self, socket, request):
LOG.debug("Sending %(type)s message_id %(message)s"
" to a target %(target)s",
{"type": request.msg_type,
"message": request.message_id,
"target": request.target})
envelope = request.create_envelope()
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
class ReplyWaiter(object): class ReplyWaiter(object):
def __init__(self, conf): def __init__(self, conf):

View File

@ -1,87 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher):
def __init__(self, conf, matchmaker, reply_receiver):
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
self.reply_receiver = reply_receiver
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("Envelope: %s", envelope)
target = envelope[zmq_names.FIELD_TARGET]
dealer_socket = self._check_hosts_connections(
target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection"),
envelope[zmq_names.FIELD_MSG_TYPE])
return
self.reply_receiver.track_socket(dealer_socket.handle)
LOG.debug("Sending message %(message)s to a target %(target)s"
% {"message": envelope[zmq_names.FIELD_MSG_ID],
"target": envelope[zmq_names.FIELD_TARGET]})
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, multipart_message)
else:
self._send_request(dealer_socket, multipart_message)
def _send_request(self, socket, multipart_message):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
zmq.SNDMORE)
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
class ReplyReceiver(object):
def __init__(self, poller):
self.poller = poller
LOG.info(_LI("Reply waiter created in broker"))
def _receive_reply(self, socket):
return socket.recv_multipart()
def track_socket(self, socket):
self.poller.register(socket, self._receive_reply)
def cleanup(self):
self.poller.close()

View File

@ -30,9 +30,7 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
default_publisher = zmq_dealer_publisher.DealerPublisher( default_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker) if not conf.direct_over_proxy else \ conf, matchmaker)
zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(conf))
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight( fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(conf)) \ conf, zmq_address.get_broker_address(conf)) \

View File

@ -68,7 +68,6 @@ class Request(object):
"retry must be an integer, not {0}".format(type(retry))) "retry must be an integer, not {0}".format(type(retry)))
self.message_id = str(uuid.uuid1()) self.message_id = str(uuid.uuid1())
self.proxy_reply_id = None
def create_envelope(self): def create_envelope(self):
return {'msg_type': self.msg_type, return {'msg_type': self.msg_type,

View File

@ -100,23 +100,6 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
LOG.error(_LE("Receiving message failed: %s"), str(e)) LOG.error(_LE("Receiving message failed: %s"), str(e))
class RouterConsumerBroker(RouterConsumer):
def __init__(self, conf, poller, server):
super(RouterConsumerBroker, self).__init__(conf, poller, server)
def _receive_request(self, socket):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
envelope = socket.recv_pyobj()
request = socket.recv_pyobj()
if zmq_names.FIELD_REPLY_ID in envelope:
request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID]
return request, reply_id
class TargetsManager(object): class TargetsManager(object):
def __init__(self, conf, matchmaker, host): def __init__(self, conf, matchmaker, host):

View File

@ -45,7 +45,6 @@ class ZmqIncomingRequest(base.IncomingMessage):
zmq_names.FIELD_REPLY: reply, zmq_names.FIELD_REPLY: reply,
zmq_names.FIELD_FAILURE: failure, zmq_names.FIELD_FAILURE: failure,
zmq_names.FIELD_LOG_FAILURE: log_failure, zmq_names.FIELD_LOG_FAILURE: log_failure,
zmq_names.FIELD_ID: self.request.proxy_reply_id,
zmq_names.FIELD_MSG_ID: self.request.message_id} zmq_names.FIELD_MSG_ID: self.request.message_id}
LOG.debug("Replying %s", (str(self.request.message_id))) LOG.debug("Replying %s", (str(self.request.message_id)))
@ -53,10 +52,6 @@ class ZmqIncomingRequest(base.IncomingMessage):
self.received = True self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE)
if self.request.proxy_reply_id:
self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE)
self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_pyobj(message_reply) self.reply_socket.send_pyobj(message_reply)
self.poller.resume_polling(self.reply_socket) self.poller.resume_polling(self.reply_socket)

View File

@ -34,9 +34,7 @@ class ZmqServer(base.Listener):
super(ZmqServer, self).__init__(driver) super(ZmqServer, self).__init__(driver)
self.matchmaker = matchmaker self.matchmaker = matchmaker
self.poller = zmq_async.get_poller() self.poller = zmq_async.get_poller()
self.router_consumer = zmq_router_consumer.RouterConsumerBroker( self.router_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self) if conf.direct_over_proxy else \
zmq_router_consumer.RouterConsumer(
conf, self.poller, self) conf, self.poller, self)
self.sub_consumer = zmq_sub_consumer.SubConsumer( self.sub_consumer = zmq_sub_consumer.SubConsumer(
conf, self.poller, self) if conf.use_pub_sub else None conf, self.poller, self) if conf.use_pub_sub else None

View File

@ -145,7 +145,7 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
message = {'method': 'hello-world', 'tx_id': 1} message = {'method': 'hello-world', 'tx_id': 1}
context = {} context = {}
target.topic = target.topic + '.info' target.topic += '.info'
self.driver.send_notification(target, context, message, '3.0') self.driver.send_notification(target, context, message, '3.0')
self.listener._received.wait(5) self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet()) self.assertTrue(self.listener._received.isSet())

View File

@ -78,7 +78,6 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
'rpc_response_timeout': 5, 'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir, 'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False, 'use_pub_sub': False,
'direct_over_proxy': False,
'rpc_zmq_matchmaker': 'dummy'} 'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs) self.config(**kwargs)

View File

@ -31,8 +31,7 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
self.conf.project = "test_project" self.conf.project = "test_project"
kwargs = {'rpc_response_timeout': 30, kwargs = {'rpc_response_timeout': 30,
'use_pub_sub': False, 'use_pub_sub': False}
'direct_over_proxy': False}
self.config(**kwargs) self.config(**kwargs)
log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log" log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"