diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst index 0ab7903a1..9f9a74d23 100644 --- a/doc/source/zmq_driver.rst +++ b/doc/source/zmq_driver.rst @@ -138,28 +138,24 @@ stored in Redis is that the key is a base topic and the corresponding values are hostname arrays to be sent to. -Proxy to avoid blocking (optional) ----------------------------------- +Proxy for fanout publishing +--------------------------- -Each machine running OpenStack services, or sending RPC messages, may run the -'oslo-messaging-zmq-broker' daemon. This is needed to avoid blocking -if a listener (server) appears after the sender (client). +Each machine running OpenStack services, or sending RPC messages, should run +the 'oslo-messaging-zmq-broker' daemon. Fanout-based patterns like CAST+Fanout and notifications always use proxy as they act over PUB/SUB, 'use_pub_sub' - defaults to True. If not using PUB/SUB (use_pub_sub = False) then fanout will be emulated over direct DEALER/ROUTER unicast which is possible but less efficient and therefore -is not recommended. +is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not +needed. -Running direct RPC methods like CALL and CAST over a proxy is controlled by -the option 'direct_over_proxy' which is True by default. - -These options can be set in [DEFAULT] section. +This option can be set in [DEFAULT] section. For example:: use_pub_sub = True - direct_over_proxy = False In case of using the broker all publishers (clients) talk to servers over diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 5eb3fdc35..3d02f68c8 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -72,11 +72,7 @@ zmq_opts = [ help='Expiration timeout in seconds of a name service record ' 'about existing target ( < 0 means no timeout).'), - cfg.BoolOpt('direct_over_proxy', default=False, - help='Configures zmq-messaging to use proxy with ' - 'non PUB/SUB patterns.'), - - cfg.BoolOpt('use_pub_sub', default=True, + cfg.BoolOpt('use_pub_sub', default=False, help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 1d5729c80..b5e71c13d 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -15,14 +15,12 @@ import logging from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher_proxy from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_pub_publisher from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LI +from oslo_messaging._i18n import _LE, _LI zmq = zmq_async.import_zmq(zmq_concurrency='native') LOG = logging.getLogger(__name__) @@ -41,9 +39,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): LOG.info(_LI("Polling at universal proxy")) self.matchmaker = matchmaker - reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller) - self.direct_publisher = zmq_dealer_publisher_proxy \ - .DealerPublisherProxy(conf, matchmaker, reply_receiver) self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( conf, matchmaker) @@ -54,8 +49,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): if socket == self.router_socket: self._redirect_in_request(message) - else: - self._redirect_reply(message) def _redirect_in_request(self, multipart_message): LOG.debug("-> Redirecting request %s to TCP publisher", @@ -65,19 +58,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): envelope[zmq_names.FIELD_MSG_TYPE] \ in zmq_names.MULTISEND_TYPES: self.pub_publisher.send_request(multipart_message) - else: - self.direct_publisher.send_request(multipart_message) - - def _redirect_reply(self, reply): - LOG.debug("Reply proxy %s", reply) - if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE: - LOG.debug("Acknowledge dropped %s", reply) - return - - LOG.debug("<- Redirecting reply to ROUTER: reply: %s", - reply[zmq_names.IDX_REPLY_BODY:]) - - self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:]) def _receive_in_request(self, socket): reply_id = socket.recv() @@ -85,8 +65,9 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): empty = socket.recv() assert empty == b'', "Empty delimiter expected" envelope = socket.recv_pyobj() - if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE: - envelope[zmq_names.FIELD_REPLY_ID] = reply_id + if envelope[zmq_names.FIELD_MSG_TYPE] not in zmq_names.MULTISEND_TYPES: + LOG.error(_LE("Message type %s is not supported by proxy"), + envelope[zmq_names.FIELD_MSG_TYPE]) payload = socket.recv_multipart() payload.insert(0, envelope) return payload diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index c1115588e..dfccc70a0 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -43,9 +43,7 @@ class DealerCallPublisher(object): self.conf = conf self.matchmaker = matchmaker self.reply_waiter = ReplyWaiter(conf) - self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \ - if not conf.direct_over_proxy else \ - RequestSenderLight(conf, matchmaker, self.reply_waiter) + self.sender = RequestSender(conf, matchmaker, self.reply_waiter) def send_request(self, request): reply_future = self.sender.send_request(request) @@ -113,39 +111,6 @@ class RequestSender(zmq_publisher_base.PublisherBase): super(RequestSender, self).cleanup() -class RequestSenderLight(RequestSender): - """This class used with proxy. - - Simplified address matching because there is only - one proxy IPC address. - """ - - def __init__(self, conf, matchmaker, reply_waiter): - if not conf.direct_over_proxy: - raise rpc_common.RPCException("RequestSenderLight needs a proxy!") - - super(RequestSenderLight, self).__init__( - conf, matchmaker, reply_waiter) - - self.socket = None - - def _connect_socket(self, target): - return self.outbound_sockets.get_socket_to_broker(target) - - def _do_send_request(self, socket, request): - LOG.debug("Sending %(type)s message_id %(message)s" - " to a target %(target)s", - {"type": request.msg_type, - "message": request.message_id, - "target": request.target}) - - envelope = request.create_envelope() - - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) - socket.send_pyobj(request) - - class ReplyWaiter(object): def __init__(self, conf): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py deleted file mode 100644 index f233d099b..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LI, _LW - -zmq = zmq_async.import_zmq() - -LOG = logging.getLogger(__name__) - - -class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher): - - def __init__(self, conf, matchmaker, reply_receiver): - super(DealerPublisherProxy, self).__init__(conf, matchmaker) - self.reply_receiver = reply_receiver - - def send_request(self, multipart_message): - - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - - LOG.debug("Envelope: %s", envelope) - - target = envelope[zmq_names.FIELD_TARGET] - dealer_socket = self._check_hosts_connections( - target, zmq_names.socket_type_str(zmq.ROUTER)) - - if not dealer_socket.connections: - # NOTE(ozamiatin): Here we can provide - # a queue for keeping messages to send them later - # when some listener appears. However such approach - # being more reliable will consume additional memory. - LOG.warning(_LW("Request %s was dropped because no connection"), - envelope[zmq_names.FIELD_MSG_TYPE]) - return - - self.reply_receiver.track_socket(dealer_socket.handle) - - LOG.debug("Sending message %(message)s to a target %(target)s" - % {"message": envelope[zmq_names.FIELD_MSG_ID], - "target": envelope[zmq_names.FIELD_TARGET]}) - - if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES: - for _ in range(dealer_socket.connections_count()): - self._send_request(dealer_socket, multipart_message) - else: - self._send_request(dealer_socket, multipart_message) - - def _send_request(self, socket, multipart_message): - - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj( - multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE], - zmq.SNDMORE) - socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) - - -class ReplyReceiver(object): - - def __init__(self, poller): - self.poller = poller - LOG.info(_LI("Reply waiter created in broker")) - - def _receive_reply(self, socket): - return socket.recv_multipart() - - def track_socket(self, socket): - self.poller.register(socket, self._receive_reply) - - def cleanup(self): - self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index ffe484557..c6bc67993 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -30,9 +30,7 @@ class ZmqClient(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): default_publisher = zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) if not conf.direct_over_proxy else \ - zmq_dealer_publisher.DealerPublisherLight( - conf, zmq_address.get_broker_address(conf)) + conf, matchmaker) fanout_publisher = zmq_dealer_publisher.DealerPublisherLight( conf, zmq_address.get_broker_address(conf)) \ diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index 05edefffd..2646451bf 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -68,7 +68,6 @@ class Request(object): "retry must be an integer, not {0}".format(type(retry))) self.message_id = str(uuid.uuid1()) - self.proxy_reply_id = None def create_envelope(self): return {'msg_type': self.msg_type, diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index c317b9929..94617b3d7 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -100,23 +100,6 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): LOG.error(_LE("Receiving message failed: %s"), str(e)) -class RouterConsumerBroker(RouterConsumer): - - def __init__(self, conf, poller, server): - super(RouterConsumerBroker, self).__init__(conf, poller, server) - - def _receive_request(self, socket): - reply_id = socket.recv() - empty = socket.recv() - assert empty == b'', 'Bad format: empty delimiter expected' - envelope = socket.recv_pyobj() - request = socket.recv_pyobj() - - if zmq_names.FIELD_REPLY_ID in envelope: - request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID] - return request, reply_id - - class TargetsManager(object): def __init__(self, conf, matchmaker, host): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index e009d55c9..f1db740c0 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -45,7 +45,6 @@ class ZmqIncomingRequest(base.IncomingMessage): zmq_names.FIELD_REPLY: reply, zmq_names.FIELD_FAILURE: failure, zmq_names.FIELD_LOG_FAILURE: log_failure, - zmq_names.FIELD_ID: self.request.proxy_reply_id, zmq_names.FIELD_MSG_ID: self.request.message_id} LOG.debug("Replying %s", (str(self.request.message_id))) @@ -53,10 +52,6 @@ class ZmqIncomingRequest(base.IncomingMessage): self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) - if self.request.proxy_reply_id: - self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE) - self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE) - self.reply_socket.send(b'', zmq.SNDMORE) self.reply_socket.send_pyobj(message_reply) self.poller.resume_polling(self.reply_socket) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 62c12c7ff..8a95a1f6e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -34,10 +34,8 @@ class ZmqServer(base.Listener): super(ZmqServer, self).__init__(driver) self.matchmaker = matchmaker self.poller = zmq_async.get_poller() - self.router_consumer = zmq_router_consumer.RouterConsumerBroker( - conf, self.poller, self) if conf.direct_over_proxy else \ - zmq_router_consumer.RouterConsumer( - conf, self.poller, self) + self.router_consumer = zmq_router_consumer.RouterConsumer( + conf, self.poller, self) self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None self.notify_consumer = self.sub_consumer if conf.use_pub_sub \ diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 5f86679b3..5b957e625 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -145,7 +145,7 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase): message = {'method': 'hello-world', 'tx_id': 1} context = {} - target.topic = target.topic + '.info' + target.topic += '.info' self.driver.send_notification(target, context, message, '3.0') self.listener._received.wait(5) self.assertTrue(self.listener._received.isSet()) diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py index f5c6bafac..21b56e6a9 100644 --- a/oslo_messaging/tests/drivers/zmq/zmq_common.py +++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py @@ -78,7 +78,6 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): 'rpc_response_timeout': 5, 'rpc_zmq_ipc_dir': self.internal_ipc_dir, 'use_pub_sub': False, - 'direct_over_proxy': False, 'rpc_zmq_matchmaker': 'dummy'} self.config(**kwargs) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py index c3258131f..6f114ae4a 100644 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ b/oslo_messaging/tests/functional/zmq/test_startup.py @@ -31,8 +31,7 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): self.conf.project = "test_project" kwargs = {'rpc_response_timeout': 30, - 'use_pub_sub': False, - 'direct_over_proxy': False} + 'use_pub_sub': False} self.config(**kwargs) log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"