diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index a77b6d4c9..17bfe41e4 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -13,6 +13,7 @@ # under the License. import logging +import os import pprint import socket import threading @@ -23,6 +24,7 @@ from stevedore import driver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client import zmq_client +from oslo_messaging._drivers.zmq_driver.client import zmq_client_light from oslo_messaging._drivers.zmq_driver.server import zmq_server from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc) @@ -78,8 +80,8 @@ zmq_opts = [ 'Poll raises timeout exception when timeout expired.'), cfg.BoolOpt('zmq_use_broker', - default=True, - help='Shows whether zmq-messaging uses broker or not.'), + default=False, + help='Configures zmq-messaging to use broker or not.'), cfg.PortOpt('rpc_zmq_min_port', default=49152, @@ -106,6 +108,7 @@ class LazyDriverItem(object): self.item_class = item_cls self.args = args self.kwargs = kwargs + self.process_id = os.getpid() def get(self): # NOTE(ozamiatin): Lazy initialization. @@ -114,11 +117,12 @@ class LazyDriverItem(object): # __init__, but 'fork' extensively used by services # breaks all things. - if self.item is not None: + if self.item is not None and os.getpid() == self.process_id: return self.item self._lock.acquire() - if self.item is None: + if self.item is None or os.getpid() != self.process_id: + self.process_id = os.getpid() self.item = self.item_class(*self.args, **self.kwargs) self._lock.release() return self.item @@ -175,12 +179,15 @@ class ZmqDriver(base.BaseDriver): self.notify_server = LazyDriverItem( zmq_server.ZmqServer, self, self.conf, self.matchmaker) + client_cls = zmq_client_light.ZmqClientLight \ + if conf.zmq_use_broker else zmq_client.ZmqClient + self.client = LazyDriverItem( - zmq_client.ZmqClient, self.conf, self.matchmaker, + client_cls, self.conf, self.matchmaker, self.allowed_remote_exmods) self.notifier = LazyDriverItem( - zmq_client.ZmqClient, self.conf, self.matchmaker, + client_cls, self.conf, self.matchmaker, self.allowed_remote_exmods) super(ZmqDriver, self).__init__(conf, url, default_exchange, diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index dd6665a03..eb752bed7 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -15,8 +15,8 @@ import logging from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy -from oslo_messaging._drivers.zmq_driver.client.publishers\ - import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher_proxy from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -39,8 +39,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): LOG.info(_LI("Polling at universal proxy")) self.matchmaker = matchmaker - reply_receiver = zmq_dealer_publisher.ReplyReceiver(self.poller) - self.publisher = zmq_dealer_publisher.DealerPublisherProxy( + reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller) + self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy( conf, matchmaker, reply_receiver) def run(self): @@ -54,18 +54,18 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): self._redirect_reply(message) def _redirect_in_request(self, request): - LOG.info(_LI("-> Redirecting request %s to TCP publisher") - % request) + LOG.debug("-> Redirecting request %s to TCP publisher" + % request) self.publisher.send_request(request) def _redirect_reply(self, reply): - LOG.info(_LI("Reply proxy %s") % reply) + LOG.debug("Reply proxy %s" % reply) if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE: - LOG.info(_LI("Acknowledge dropped %s") % reply) + LOG.debug("Acknowledge dropped %s" % reply) return - LOG.info(_LI("<- Redirecting reply to ROUTER: reply: %s") - % reply[zmq_names.IDX_REPLY_BODY:]) + LOG.debug("<- Redirecting reply to ROUTER: reply: %s" + % reply[zmq_names.IDX_REPLY_BODY:]) self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:]) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/__init__.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py new file mode 100644 index 000000000..0c4e7536d --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -0,0 +1,194 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading + +from concurrent import futures +import futurist + +import oslo_messaging +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class DealerCallPublisher(zmq_publisher_base.PublisherBase): + """Thread-safe CALL publisher + + Used as faster and thread-safe publisher for CALL + instead of ReqPublisher. + """ + + def __init__(self, conf, matchmaker): + super(DealerCallPublisher, self).__init__(conf) + self.matchmaker = matchmaker + self.reply_waiter = ReplyWaiter(conf) + self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \ + if not conf.zmq_use_broker else \ + RequestSenderLight(conf, matchmaker, self.reply_waiter) + + def send_request(self, request): + reply_future = self.sender.send_request(request) + try: + reply = reply_future.result(timeout=request.timeout) + except futures.TimeoutError: + raise oslo_messaging.MessagingTimeout( + "Timeout %s seconds was reached" % request.timeout) + finally: + self.reply_waiter.untrack_id(request.message_id) + + LOG.debug("Received reply %s" % reply) + if reply[zmq_names.FIELD_FAILURE]: + raise rpc_common.deserialize_remote_exception( + reply[zmq_names.FIELD_FAILURE], + request.allowed_remote_exmods) + else: + return reply[zmq_names.FIELD_REPLY] + + +class RequestSender(zmq_publisher_base.PublisherMultisend): + + def __init__(self, conf, matchmaker, reply_waiter): + super(RequestSender, self).__init__(conf, matchmaker, zmq.DEALER) + self.reply_waiter = reply_waiter + self.queue, self.empty_except = zmq_async.get_queue() + self.executor = zmq_async.get_executor(self.run_loop) + self.executor.execute() + + def send_request(self, request): + reply_future = futurist.Future() + self.reply_waiter.track_reply(reply_future, request.message_id) + self.queue.put(request) + return reply_future + + def _do_send_request(self, socket, request): + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(request) + + LOG.debug("Sending message_id %(message)s to a target %(target)s" + % {"message": request.message_id, + "target": request.target}) + + def _check_hosts_connections(self, target, listener_type): + if str(target) in self.outbound_sockets: + socket = self.outbound_sockets[str(target)] + else: + hosts = self.matchmaker.get_hosts( + target, listener_type) + socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type) + self.outbound_sockets[str(target)] = socket + + for host in hosts: + self._connect_to_host(socket, host, target) + + return socket + + def run_loop(self): + try: + request = self.queue.get(timeout=self.conf.rpc_poll_timeout) + except self.empty_except: + return + + socket = self._check_hosts_connections( + request.target, zmq_names.socket_type_str(zmq.ROUTER)) + + self._do_send_request(socket, request) + self.reply_waiter.poll_socket(socket) + + +class RequestSenderLight(RequestSender): + """This class used with proxy. + + Simplified address matching because there is only + one proxy IPC address. + """ + + def __init__(self, conf, matchmaker, reply_waiter): + if not conf.zmq_use_broker: + raise rpc_common.RPCException("RequestSenderLight needs a proxy!") + + super(RequestSenderLight, self).__init__( + conf, matchmaker, reply_waiter) + + self.socket = None + + def _check_hosts_connections(self, target, listener_type): + if self.socket is None: + self.socket = zmq_socket.ZmqSocket(self.zmq_context, + self.socket_type) + self.outbound_sockets[str(target)] = self.socket + address = zmq_address.get_broker_address(self.conf) + self._connect_to_address(self.socket, address, target) + return self.socket + + def _do_send_request(self, socket, request): + LOG.debug("Sending %(type)s message_id %(message)s" + " to a target %(target)s" + % {"type": request.msg_type, + "message": request.message_id, + "target": request.target}) + + envelope = request.create_envelope() + + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(envelope, zmq.SNDMORE) + socket.send_pyobj(request) + + +class ReplyWaiter(object): + + def __init__(self, conf): + self.conf = conf + self.replies = {} + self.poller = zmq_async.get_poller() + self.executor = zmq_async.get_executor(self.run_loop) + self.executor.execute() + self._lock = threading.Lock() + + def track_reply(self, reply_future, message_id): + self._lock.acquire() + self.replies[message_id] = reply_future + self._lock.release() + + def untrack_id(self, message_id): + self._lock.acquire() + self.replies.pop(message_id) + self._lock.release() + + def poll_socket(self, socket): + + def _receive_method(socket): + empty = socket.recv() + assert empty == b'', "Empty expected!" + reply = socket.recv_pyobj() + LOG.debug("Received reply %s" % reply) + return reply + + self.poller.register(socket, recv_method=_receive_method) + + def run_loop(self): + reply, socket = self.poller.poll( + timeout=self.conf.rpc_poll_timeout) + if reply is not None: + call_future = self.replies[reply[zmq_names.FIELD_MSG_ID]] + call_future.set_result(reply) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py similarity index 58% rename from oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py rename to oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index 922607c7e..22e09c3fe 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LI, _LW +from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) @@ -34,7 +34,7 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): self._check_request_pattern(request) - dealer_socket, hosts = self._check_hosts_connections( + dealer_socket = self._check_hosts_connections( request.target, zmq_names.socket_type_str(zmq.ROUTER)) if not dealer_socket.connections: @@ -61,15 +61,16 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): socket.send(b'', zmq.SNDMORE) socket.send_pyobj(request) - LOG.info(_LI("Sending message_id %(message)s to a target %(target)s") - % {"message": request.message_id, - "target": request.target}) + LOG.debug("Sending message_id %(message)s to a target %(target)s" + % {"message": request.message_id, + "target": request.target}) def cleanup(self): super(DealerPublisher, self).cleanup() class DealerPublisherLight(zmq_publisher_base.PublisherBase): + """Used when publishing to proxy. """ def __init__(self, conf, address): super(DealerPublisherLight, self).__init__(conf) @@ -92,68 +93,6 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase): self.socket.close() -class DealerPublisherProxy(DealerPublisher): - - def __init__(self, conf, matchmaker, reply_receiver): - super(DealerPublisherProxy, self).__init__(conf, matchmaker) - self.reply_receiver = reply_receiver - - def send_request(self, multipart_message): - - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - - LOG.info(_LI("Envelope: %s") % envelope) - - target = envelope[zmq_names.FIELD_TARGET] - dealer_socket, hosts = self._check_hosts_connections( - target, zmq_names.socket_type_str(zmq.ROUTER)) - - if not dealer_socket.connections: - # NOTE(ozamiatin): Here we can provide - # a queue for keeping messages to send them later - # when some listener appears. However such approach - # being more reliable will consume additional memory. - LOG.warning(_LW("Request %s was dropped because no connection") - % envelope[zmq_names.FIELD_MSG_TYPE]) - return - - self.reply_receiver.track_socket(dealer_socket.handle) - - LOG.info(_LI("Sending message %(message)s to a target %(target)s") - % {"message": envelope[zmq_names.FIELD_MSG_ID], - "target": envelope[zmq_names.FIELD_TARGET]}) - - if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES: - for _ in range(dealer_socket.connections_count()): - self._send_request(dealer_socket, multipart_message) - else: - self._send_request(dealer_socket, multipart_message) - - def _send_request(self, socket, multipart_message): - - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj( - multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE], - zmq.SNDMORE) - socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) - - -class ReplyReceiver(object): - - def __init__(self, poller): - self.poller = poller - LOG.info(_LI("Reply waiter created in broker")) - - def _receive_reply(self, socket): - return socket.recv_multipart() - - def track_socket(self, socket): - self.poller.register(socket, self._receive_reply) - - def cleanup(self): - self.poller.close() - - class AcknowledgementReceiver(object): def __init__(self): @@ -172,8 +111,7 @@ class AcknowledgementReceiver(object): def poll_for_acknowledgements(self): ack_message, socket = self.poller.poll() - LOG.info(_LI("Message %s acknowledged") - % ack_message[zmq_names.FIELD_ID]) + LOG.debug("Message %s acknowledged" % ack_message[zmq_names.FIELD_ID]) def cleanup(self): self.thread.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py new file mode 100644 index 000000000..c8ad98345 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -0,0 +1,87 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LI, _LW + +zmq = zmq_async.import_zmq() + +LOG = logging.getLogger(__name__) + + +class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher): + + def __init__(self, conf, matchmaker, reply_receiver): + super(DealerPublisherProxy, self).__init__(conf, matchmaker) + self.reply_receiver = reply_receiver + + def send_request(self, multipart_message): + + envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + + LOG.debug("Envelope: %s" % envelope) + + target = envelope[zmq_names.FIELD_TARGET] + dealer_socket = self._check_hosts_connections( + target, zmq_names.socket_type_str(zmq.ROUTER)) + + if not dealer_socket.connections: + # NOTE(ozamiatin): Here we can provide + # a queue for keeping messages to send them later + # when some listener appears. However such approach + # being more reliable will consume additional memory. + LOG.warning(_LW("Request %s was dropped because no connection") + % envelope[zmq_names.FIELD_MSG_TYPE]) + return + + self.reply_receiver.track_socket(dealer_socket.handle) + + LOG.debug("Sending message %(message)s to a target %(target)s" + % {"message": envelope[zmq_names.FIELD_MSG_ID], + "target": envelope[zmq_names.FIELD_TARGET]}) + + if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES: + for _ in range(dealer_socket.connections_count()): + self._send_request(dealer_socket, multipart_message) + else: + self._send_request(dealer_socket, multipart_message) + + def _send_request(self, socket, multipart_message): + + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj( + multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE], + zmq.SNDMORE) + socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) + + +class ReplyReceiver(object): + + def __init__(self, poller): + self.poller = poller + LOG.info(_LI("Reply waiter created in broker")) + + def _receive_reply(self, socket): + return socket.recv_multipart() + + def track_socket(self, socket): + self.poller.register(socket, self._receive_reply) + + def cleanup(self): + self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 1cd3360eb..6ecfb2f77 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -18,7 +18,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LI LOG = logging.getLogger(__name__) @@ -35,7 +34,7 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend): if request.msg_type not in zmq_names.NOTIFY_TYPES: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - pub_socket, hosts = self._check_hosts_connections( + pub_socket = self._check_hosts_connections( request.target, zmq_names.socket_type_str(zmq.SUB)) self._send_request(pub_socket, request) @@ -43,6 +42,6 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend): super(PubPublisher, self)._send_request(socket, request) - LOG.info(_LI("Publishing message %(message)s to a target %(target)s") - % {"message": request.message, - "target": request.target}) + LOG.debug("Publishing message %(message)s to a target %(target)s" + % {"message": request.message, + "target": request.target}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 46e8ef535..6daf6db18 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -14,6 +14,7 @@ import abc import logging +import uuid import six @@ -89,12 +90,11 @@ class PublisherBase(object): :param request: Message data and destination container object :type request: zmq_request.Request """ - LOG.info(_LI("Sending %(type)s message_id %(message)s to a target" - "%(target)s host:%(host)s") - % {"type": request.msg_type, - "message": request.message_id, - "target": request.target, - "host": request.host}) + LOG.debug("Sending %(type)s message_id %(message)s to a target" + "%(target)s" + % {"type": request.msg_type, + "message": request.message_id, + "target": request.target}) socket.send_pyobj(request) def cleanup(self): @@ -124,28 +124,30 @@ class PublisherMultisend(PublisherBase): def _check_hosts_connections(self, target, listener_type): # TODO(ozamiatin): Place for significant optimization # Matchmaker cache should be implemented - hosts = self.matchmaker.get_hosts( - target, listener_type) if str(target) in self.outbound_sockets: socket = self.outbound_sockets[str(target)] else: + hosts = self.matchmaker.get_hosts(target, listener_type) socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type) self.outbound_sockets[str(target)] = socket + for host in hosts: + self._connect_to_host(socket, host, target) - for host in hosts: - self._connect_to_host(socket, host, target) + return socket - return socket, hosts - - def _connect_to_host(self, socket, host, target): - address = zmq_address.get_tcp_direct_address(host) - LOG.info(address) + def _connect_to_address(self, socket, address, target): stype = zmq_names.socket_type_str(self.socket_type) try: LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s") % {"stype": stype, "address": address, "target": target}) + + if six.PY3: + socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1())) + else: + socket.handle.identity = str(uuid.uuid1()) + socket.connect(address) except zmq.ZMQError as e: errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\ @@ -153,3 +155,7 @@ class PublisherMultisend(PublisherBase): LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s") % (stype, address, e)) raise rpc_common.RPCException(errmsg) + + def _connect_to_host(self, socket, host, target): + address = zmq_address.get_tcp_direct_address(host) + self._connect_to_address(socket, address, target) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py index 7fcb46961..3a38cfd43 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py @@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LI, _LW +from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) @@ -35,7 +35,7 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend): if request.msg_type == zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - push_socket, hosts = self._check_hosts_connections( + push_socket = self._check_hosts_connections( request.target, zmq_names.socket_type_str(zmq.PULL)) if not push_socket.connections: @@ -53,6 +53,6 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend): super(PushPublisher, self)._send_request(socket, request) - LOG.info(_LI("Publishing message %(message)s to a target %(target)s") - % {"message": request.message, - "target": request.target}) + LOG.debug("Publishing message %(message)s to a target %(target)s" + % {"message": request.message, + "target": request.target}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index ace229ba5..78330f3a3 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -82,8 +82,11 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): def _receive_reply(socket, request): def _receive_method(socket): - return socket.recv_pyobj() + reply = socket.recv_pyobj() + LOG.debug("Received reply %s" % reply) + return reply + LOG.debug("Start waiting reply") # NOTE(ozamiatin): Check for retry here (no retries now) with contextlib.closing(zmq_async.get_reply_poller()) as poller: poller.register(socket, recv_method=_receive_method) @@ -91,7 +94,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): if reply is None: raise oslo_messaging.MessagingTimeout( "Timeout %s seconds was reached" % request.timeout) - LOG.info(_LI("Received reply %s") % reply) + LOG.debug("Received reply %s" % reply) if reply[zmq_names.FIELD_FAILURE]: raise rpc_common.deserialize_remote_exception( reply[zmq_names.FIELD_FAILURE], @@ -114,12 +117,12 @@ class ReqPublisherLight(ReqPublisher): def _send_request(self, socket, request): - LOG.info(_LI("Sending %(type)s message_id %(message)s" - " to a target %(target)s, host:%(host)s") - % {"type": request.msg_type, - "message": request.message_id, - "target": request.target, - "host": request.host}) + LOG.debug("Sending %(type)s message_id %(message)s" + " to a target %(target)s, host:%(host)s" + % {"type": request.msg_type, + "message": request.message_id, + "target": request.target, + "host": request.host}) envelope = request.create_envelope() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 9fb38224d..c6e895863 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -12,70 +12,33 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib -from oslo_messaging._drivers.zmq_driver.client.publishers\ +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_call_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher -from oslo_messaging._drivers.zmq_driver.client.publishers\ - import zmq_req_publisher -from oslo_messaging._drivers.zmq_driver.client import zmq_request -from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver.client import zmq_client_base from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names zmq = zmq_async.import_zmq() -class ZmqClient(object): +class ZmqClient(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - self.conf = conf - self.context = zmq.Context() - self.matchmaker = matchmaker - self.allowed_remote_exmods = allowed_remote_exmods or [] + if conf.zmq_use_broker: + raise rpc_common.RPCException("This client doesn't need proxy!") - self.dealer_publisher = None - if self.conf.zmq_use_broker: - self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight( - conf, zmq_address.get_broker_address(self.conf)) - self.req_publisher_cls = zmq_req_publisher.ReqPublisherLight - else: - self.dealer_publisher = zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) - self.req_publisher_cls = zmq_req_publisher.ReqPublisher + super(ZmqClient, self).__init__( + conf, matchmaker, allowed_remote_exmods, + publishers={ + zmq_names.CALL_TYPE: + zmq_dealer_call_publisher.DealerCallPublisher( + conf, matchmaker), - def send_call(self, target, context, message, timeout=None, retry=None): - with contextlib.closing(zmq_request.CallRequest( - target, context=context, message=message, - timeout=timeout, retry=retry, - allowed_remote_exmods=self.allowed_remote_exmods)) as request: - with contextlib.closing(self.req_publisher_cls( - self.conf, self.matchmaker)) as req_publisher: - return req_publisher.send_request(request) - - def send_cast(self, target, context, message, timeout=None, retry=None): - with contextlib.closing(zmq_request.CastRequest( - target, context=context, message=message, - timeout=timeout, retry=retry)) as request: - self.dealer_publisher.send_request(request) - - def send_fanout(self, target, context, message, timeout=None, retry=None): - with contextlib.closing(zmq_request.FanoutRequest( - target, context=context, message=message, - timeout=timeout, retry=retry)) as request: - self.dealer_publisher.send_request(request) - - def send_notify(self, target, context, message, version, retry=None): - with contextlib.closing(zmq_request.NotificationRequest( - target, context, message, version=version, - retry=retry)) as request: - self.dealer_publisher.send_request(request) - - def send_notify_fanout(self, target, context, message, version, - retry=None): - with contextlib.closing(zmq_request.NotificationFanoutRequest( - target, context, message, version=version, - retry=retry)) as request: - self.dealer_publisher.send_request(request) - - def cleanup(self): - self.dealer_publisher.cleanup() + "default": zmq_dealer_publisher.DealerPublisher( + conf, matchmaker) + } + ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py new file mode 100644 index 000000000..aa7cd12d1 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py @@ -0,0 +1,77 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + +from oslo_messaging._drivers.zmq_driver.client import zmq_request +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +zmq = zmq_async.import_zmq() + + +class ZmqClientBase(object): + + def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None, + publishers=None): + self.conf = conf + self.context = zmq.Context() + self.matchmaker = matchmaker + self.allowed_remote_exmods = allowed_remote_exmods or [] + + self.publishers = publishers + self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \ + or publishers["default"] + self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \ + or publishers["default"] + self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \ + or publishers["default"] + self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \ + or publishers["default"] + + def send_call(self, target, context, message, timeout=None, retry=None): + with contextlib.closing(zmq_request.CallRequest( + target, context=context, message=message, + timeout=timeout, retry=retry, + allowed_remote_exmods=self.allowed_remote_exmods)) as request: + return self.call_publisher.send_request(request) + + def send_cast(self, target, context, message, timeout=None, retry=None): + with contextlib.closing(zmq_request.CastRequest( + target, context=context, message=message, + timeout=timeout, retry=retry)) as request: + self.cast_publisher.send_request(request) + + def send_fanout(self, target, context, message, timeout=None, retry=None): + with contextlib.closing(zmq_request.FanoutRequest( + target, context=context, message=message, + timeout=timeout, retry=retry)) as request: + self.fanout_publisher.send_request(request) + + def send_notify(self, target, context, message, version, retry=None): + with contextlib.closing(zmq_request.NotificationRequest( + target, context, message, version=version, + retry=retry)) as request: + self.notify_publisher.send_request(request) + + def send_notify_fanout(self, target, context, message, version, + retry=None): + with contextlib.closing(zmq_request.NotificationFanoutRequest( + target, context, message, version=version, + retry=retry)) as request: + self.notify_publisher.send_request(request) + + def cleanup(self): + for publisher in self.publishers.values(): + publisher.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_light.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_light.py new file mode 100644 index 000000000..873911f8d --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_light.py @@ -0,0 +1,46 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_call_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver.client import zmq_client_base +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +zmq = zmq_async.import_zmq() + + +class ZmqClientLight(zmq_client_base.ZmqClientBase): + + def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): + if not conf.zmq_use_broker: + raise rpc_common.RPCException( + "This client needs proxy to be configured!") + + super(ZmqClientLight, self).__init__( + conf, matchmaker, allowed_remote_exmods, + publishers={ + zmq_names.CALL_TYPE: + zmq_dealer_call_publisher.DealerCallPublisher( + conf, matchmaker), + + "default": zmq_dealer_publisher.DealerPublisherLight( + conf, zmq_address.get_broker_address(self.conf)) + } + ) diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index 518d32e49..8167715f1 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -38,6 +38,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller): self.recv_methods = {} def register(self, socket, recv_method=None): + LOG.debug("Registering socket") if socket in self.recv_methods: return if recv_method is not None: @@ -46,6 +47,8 @@ class ThreadingPoller(zmq_poller.ZmqPoller): def poll(self, timeout=None): + LOG.debug("Entering poll method") + if timeout: timeout *= 1000 # zmq poller waits milliseconds diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index f002c45af..b7532a74a 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -21,7 +21,7 @@ from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket -from oslo_messaging._i18n import _LE, _LI +from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -44,10 +44,10 @@ class ConsumerBase(object): self.conf, self.context, socket_type) self.sockets.append(socket) self.poller.register(socket, self.receive_message) - LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"), - {"stype": zmq_names.socket_type_str(socket_type), - "addr": socket.bind_address, - "port": socket.port}) + LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d", + {"stype": zmq_names.socket_type_str(socket_type), + "addr": socket.bind_address, + "port": socket.port}) return socket except zmq.ZMQError as e: errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index 98ef3a73c..81cf7fde0 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -56,9 +56,9 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): assert msg_type is not None, 'Bad format: msg type expected' context = socket.recv_pyobj() message = socket.recv_pyobj() - LOG.info(_LI("Received %(msg_type)s message %(msg)s") - % {"msg_type": msg_type, - "msg": str(message)}) + LOG.debug("Received %(msg_type)s message %(msg)s" + % {"msg_type": msg_type, + "msg": str(message)}) if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): return PullIncomingMessage(self.server, context, message) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index a501ef7a7..f5885c55a 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE, _LI +from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -43,7 +43,7 @@ class RouterIncomingMessage(base.IncomingMessage): """Reply is not needed for non-call messages""" def acknowledge(self): - LOG.info("Not sending acknowledge for %s", self.msg_id) + LOG.debug("Not sending acknowledge for %s", self.msg_id) def requeue(self): """Requeue is not supported""" @@ -83,11 +83,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): def receive_message(self, socket): try: request, reply_id = self._receive_request(socket) - LOG.info(_LI("[%(host)s] Received %(type)s, %(id)s, %(target)s") - % {"host": self.host, - "type": request.msg_type, - "id": request.message_id, - "target": request.target}) + LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s" + % {"host": self.host, + "type": request.msg_type, + "id": request.message_id, + "target": request.target}) if request.msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 5e932493f..e009d55c9 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -45,9 +45,10 @@ class ZmqIncomingRequest(base.IncomingMessage): zmq_names.FIELD_REPLY: reply, zmq_names.FIELD_FAILURE: failure, zmq_names.FIELD_LOG_FAILURE: log_failure, - zmq_names.FIELD_ID: self.request.proxy_reply_id} + zmq_names.FIELD_ID: self.request.proxy_reply_id, + zmq_names.FIELD_MSG_ID: self.request.message_id} - LOG.info("Replying %s REP", (str(self.request.message_id))) + LOG.debug("Replying %s", (str(self.request.message_id))) self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index 7a993a285..4de248f39 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -80,3 +80,13 @@ def _raise_error_if_invalid_config_value(zmq_concurrency): if zmq_concurrency not in ZMQ_MODULES: errmsg = _('Invalid zmq_concurrency value: %s') raise ValueError(errmsg % zmq_concurrency) + + +def get_queue(zmq_concurrency='eventlet'): + _raise_error_if_invalid_config_value(zmq_concurrency) + if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): + import eventlet + return eventlet.queue.Queue(), eventlet.queue.Empty + else: + import six + return six.moves.queue.Queue(), six.moves.queue.Empty diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 8f79bd083..4119e5735 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -47,6 +47,9 @@ class ZmqSocket(object): def setsockopt(self, *args, **kwargs): self.handle.setsockopt(*args, **kwargs) + def setsockopt_string(self, *args, **kwargs): + self.handle.setsockopt_string(*args, **kwargs) + def send(self, *args, **kwargs): self.handle.send(*args, **kwargs)