diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f8bf1378e..c4530bb8b 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -14,7 +14,6 @@ import logging import os -import pprint import socket import threading @@ -24,14 +23,12 @@ from stevedore import driver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client import zmq_client -from oslo_messaging._drivers.zmq_driver.client import zmq_client_light from oslo_messaging._drivers.zmq_driver.server import zmq_server from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc) +from oslo_messaging._executors import impl_pooledexecutor from oslo_messaging._i18n import _LE -pformat = pprint.pformat LOG = logging.getLogger(__name__) RPCException = rpc_common.RPCException @@ -42,16 +39,8 @@ zmq_opts = [ 'The "host" option should point or resolve to this ' 'address.'), - # The module.Class to use for matchmaking. - cfg.StrOpt( - 'rpc_zmq_matchmaker', - default='redis', - help='MatchMaker driver.', - ), - - cfg.BoolOpt('rpc_zmq_all_req_rep', - default=True, - help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'), + cfg.StrOpt('rpc_zmq_matchmaker', default='redis', + help='MatchMaker driver.'), cfg.StrOpt('rpc_zmq_concurrency', default='eventlet', help='Type of concurrency used. Either "native" or "eventlet"'), @@ -71,19 +60,21 @@ zmq_opts = [ help='Name of this node. Must be a valid hostname, FQDN, or ' 'IP address. Must match "host" option, if running Nova.'), - cfg.IntOpt('rpc_cast_timeout', - default=30, + cfg.IntOpt('rpc_cast_timeout', default=30, help='Seconds to wait before a cast expires (TTL). ' 'Only supported by impl_zmq.'), - cfg.IntOpt('rpc_poll_timeout', - default=1, + cfg.IntOpt('rpc_poll_timeout', default=1, help='The default number of seconds that poll should wait. ' 'Poll raises timeout exception when timeout expired.'), - cfg.BoolOpt('zmq_use_broker', - default=False, - help='Configures zmq-messaging to use broker or not.'), + cfg.BoolOpt('direct_over_proxy', default=True, + help='Configures zmq-messaging to use proxy with ' + 'non PUB/SUB patterns.'), + + cfg.BoolOpt('use_pub_sub', default=True, + help='Use PUB/SUB pattern for fanout methods. ' + 'PUB/SUB always uses proxy.'), cfg.PortOpt('rpc_zmq_min_port', default=49152, @@ -185,15 +176,12 @@ class ZmqDriver(base.BaseDriver): self.notify_server = LazyDriverItem( zmq_server.ZmqServer, self, self.conf, self.matchmaker) - client_cls = zmq_client_light.ZmqClientLight \ - if conf.zmq_use_broker else zmq_client.ZmqClient - self.client = LazyDriverItem( - client_cls, self.conf, self.matchmaker, + zmq_client.ZmqClient, self.conf, self.matchmaker, self.allowed_remote_exmods) self.notifier = LazyDriverItem( - client_cls, self.conf, self.matchmaker, + zmq_client.ZmqClient, self.conf, self.matchmaker, self.allowed_remote_exmods) super(ZmqDriver, self).__init__(conf, url, default_exchange, diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index eb752bed7..ffe913884 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -17,6 +17,8 @@ import logging from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_proxy +from oslo_messaging._drivers.zmq_driver.client.publishers \ + import zmq_pub_publisher from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -42,6 +44,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller) self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy( conf, matchmaker, reply_receiver) + self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( + conf, matchmaker) def run(self): message, socket = self.poller.poll(self.conf.rpc_poll_timeout) @@ -53,10 +57,16 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): else: self._redirect_reply(message) - def _redirect_in_request(self, request): + def _redirect_in_request(self, multipart_message): LOG.debug("-> Redirecting request %s to TCP publisher" - % request) - self.publisher.send_request(request) + % multipart_message) + envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + if self.conf.use_pub_sub and \ + envelope[zmq_names.FIELD_MSG_TYPE] \ + == zmq_names.CAST_FANOUT_TYPE: + self.pub_publisher.send_request(multipart_message) + else: + self.publisher.send_request(multipart_message) def _redirect_reply(self, reply): LOG.debug("Reply proxy %s" % reply) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index 0c4e7536d..eaba22bdb 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -26,6 +26,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) @@ -44,7 +45,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase): self.matchmaker = matchmaker self.reply_waiter = ReplyWaiter(conf) self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \ - if not conf.zmq_use_broker else \ + if not conf.direct_over_proxy else \ RequestSenderLight(conf, matchmaker, self.reply_waiter) def send_request(self, request): @@ -124,7 +125,7 @@ class RequestSenderLight(RequestSender): """ def __init__(self, conf, matchmaker, reply_waiter): - if not conf.zmq_use_broker: + if not conf.direct_over_proxy: raise rpc_common.RPCException("RequestSenderLight needs a proxy!") super(RequestSenderLight, self).__init__( @@ -190,5 +191,9 @@ class ReplyWaiter(object): reply, socket = self.poller.poll( timeout=self.conf.rpc_poll_timeout) if reply is not None: - call_future = self.replies[reply[zmq_names.FIELD_MSG_ID]] - call_future.set_result(reply) + reply_id = reply[zmq_names.FIELD_MSG_ID] + call_future = self.replies.get(reply_id) + if call_future: + call_future.set_result(reply) + else: + LOG.warning(_LW("Received timed out reply: %s") % reply_id) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index 22e09c3fe..cf8358eb9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -75,6 +75,7 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase): def __init__(self, conf, address): super(DealerPublisherLight, self).__init__(conf) self.socket = self.zmq_context.socket(zmq.DEALER) + self.address = address self.socket.connect(address) def send_request(self, request): @@ -88,6 +89,12 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase): self.socket.send_pyobj(envelope, zmq.SNDMORE) self.socket.send_pyobj(request) + LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " + "a target %(target)s" + % {"message": request.message_id, + "target": request.target, + "addr": self.address}) + def cleanup(self): self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 6ecfb2f77..0a5a58ebe 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -16,32 +16,108 @@ import logging from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LI LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class PubPublisher(zmq_publisher_base.PublisherMultisend): +class PubPublisherProxy(zmq_publisher_base.PublisherBase): + """PUB/SUB based request publisher + + The publisher intended to be used for Fanout and Notify + multi-sending patterns. + + It differs from direct publishers like DEALER or PUSH based + in a way it treats matchmaker. Here all publishers register + in the matchmaker. Subscribers (server-side) take the list + of publishers and connect to all of them but subscribe + only to a specific topic-filtering tag generated from the + Target object. + """ def __init__(self, conf, matchmaker): - super(PubPublisher, self).__init__(conf, matchmaker, zmq.PUB) + super(PubPublisherProxy, self).__init__(conf) + self.matchmaker = matchmaker - def send_request(self, request): + self.socket = zmq_socket.ZmqRandomPortSocket( + self.conf, self.zmq_context, zmq.PUB) - if request.msg_type not in zmq_names.NOTIFY_TYPES: - raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, + self.socket.port) - pub_socket = self._check_hosts_connections( - request.target, zmq_names.socket_type_str(zmq.SUB)) - self._send_request(pub_socket, request) + self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context) - def _send_request(self, socket, request): + LOG.info(_LI("[PUB:%(pub)s, PULL:%(pull)s] Run PUB publisher") % + {"pub": self.host, + "pull": self.sync_channel.sync_host}) - super(PubPublisher, self)._send_request(socket, request) + self.matchmaker.register_publisher( + (self.host, self.sync_channel.sync_host)) - LOG.debug("Publishing message %(message)s to a target %(target)s" - % {"message": request.message, - "target": request.target}) + def send_request(self, multipart_message): + + envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + msg_type = envelope[zmq_names.FIELD_MSG_TYPE] + target = envelope[zmq_names.FIELD_TARGET] + message_id = envelope[zmq_names.FIELD_MSG_ID] + if msg_type not in zmq_names.MULTISEND_TYPES: + raise zmq_publisher_base.UnsupportedSendPattern(msg_type) + + topic_filter = zmq_address.target_to_subscribe_filter(target) + + self.socket.send(topic_filter, zmq.SNDMORE) + self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) + + LOG.debug("Publishing message [%(topic)s] %(message_id)s to " + "a target %(target)s " + % {"message_id": message_id, + "target": target, + "topic": topic_filter}) + + def cleanup(self): + self.matchmaker.unregister_publisher( + (self.host, self.sync_channel.sync_host)) + self.socket.setsockopt(zmq.LINGER, 0) + self.socket.close() + + +class SyncChannel(object): + """Subscribers synchronization channel + + As far as PUB/SUB is one directed way pattern we need some + backwards channel to have a possibility of subscribers + to talk back to publisher. + + May be used for heartbeats or some kind of acknowledgments etc. + """ + + def __init__(self, conf, matchmaker, context): + self.conf = conf + self.matchmaker = matchmaker + self.context = context + self._ready = None + + # NOTE(ozamiatin): May be used for heartbeats when we + # implement them + self.sync_socket = zmq_socket.ZmqRandomPortSocket( + self.conf, self.context, zmq.PULL) + self.poller = zmq_async.get_poller() + self.poller.register(self.sync_socket) + + self.sync_host = zmq_address.combine_address(self.conf.rpc_zmq_host, + self.sync_socket.port) + + def is_ready(self): + LOG.debug("[%s] Waiting for ready from first subscriber" % + self.sync_host) + if self._ready is None: + self._ready = self.poller.poll() + LOG.debug("[%s] Received ready from first subscriber" % + self.sync_host) + return self._ready is not None diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index e2f898550..cc2011e07 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -132,7 +132,6 @@ class PublisherMultisend(PublisherBase): self.outbound_sockets[str(target)] = socket for host in hosts: self._connect_to_host(socket, host, target) - return socket def _connect_to_address(self, socket, address, target): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py deleted file mode 100644 index 78330f3a3..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib -import logging -import uuid - -import six - -import oslo_messaging -from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.client.publishers\ - import zmq_publisher_base -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE, _LI - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class ReqPublisher(zmq_publisher_base.PublisherBase): - - def __init__(self, conf, matchmaker): - super(ReqPublisher, self).__init__(conf) - self.matchmaker = matchmaker - - def send_request(self, request): - - if request.msg_type != zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - - socket, connect_address = self._connect_to_host(request.target, - request.timeout) - request.host = connect_address - self._send_request(socket, request) - return self._receive_reply(socket, request) - - def _resolve_host_address(self, target, timeout=0): - host = self.matchmaker.get_single_host( - target, zmq_names.socket_type_str(zmq.ROUTER), timeout) - return zmq_address.get_tcp_direct_address(host) - - def _connect_to_host(self, target, timeout=0): - - try: - self.zmq_context = zmq.Context() - socket = self.zmq_context.socket(zmq.REQ) - - if six.PY3: - socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1())) - else: - socket.identity = str(uuid.uuid1()) - - connect_address = self._resolve_host_address(target, timeout) - - LOG.info(_LI("Connecting REQ to %s") % connect_address) - - socket.connect(connect_address) - self.outbound_sockets[str(target)] = socket - return socket, connect_address - - except zmq.ZMQError as e: - errmsg = _LE("Error connecting to socket: %s") % str(e) - LOG.error(_LE("Error connecting to socket: %s") % str(e)) - raise rpc_common.RPCException(errmsg) - - @staticmethod - def _receive_reply(socket, request): - - def _receive_method(socket): - reply = socket.recv_pyobj() - LOG.debug("Received reply %s" % reply) - return reply - - LOG.debug("Start waiting reply") - # NOTE(ozamiatin): Check for retry here (no retries now) - with contextlib.closing(zmq_async.get_reply_poller()) as poller: - poller.register(socket, recv_method=_receive_method) - reply, socket = poller.poll(timeout=request.timeout) - if reply is None: - raise oslo_messaging.MessagingTimeout( - "Timeout %s seconds was reached" % request.timeout) - LOG.debug("Received reply %s" % reply) - if reply[zmq_names.FIELD_FAILURE]: - raise rpc_common.deserialize_remote_exception( - reply[zmq_names.FIELD_FAILURE], - request.allowed_remote_exmods) - else: - return reply[zmq_names.FIELD_REPLY] - - def close(self): - # For contextlib compatibility - self.cleanup() - - -class ReqPublisherLight(ReqPublisher): - - def __init__(self, conf, matchmaker): - super(ReqPublisherLight, self).__init__(conf, matchmaker) - - def _resolve_host_address(self, target, timeout=0): - return zmq_address.get_broker_address(self.conf) - - def _send_request(self, socket, request): - - LOG.debug("Sending %(type)s message_id %(message)s" - " to a target %(target)s, host:%(host)s" - % {"type": request.msg_type, - "message": request.message_id, - "target": request.target, - "host": request.host}) - - envelope = request.create_envelope() - - socket.send_pyobj(envelope, zmq.SNDMORE) - socket.send_pyobj(request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index c6e895863..fa8ccdc3c 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -13,12 +13,12 @@ # under the License. -from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_call_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher from oslo_messaging._drivers.zmq_driver.client import zmq_client_base +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -28,8 +28,11 @@ zmq = zmq_async.import_zmq() class ZmqClient(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if conf.zmq_use_broker: - raise rpc_common.RPCException("This client doesn't need proxy!") + + default_publisher = zmq_dealer_publisher.DealerPublisher( + conf, matchmaker) if not conf.direct_over_proxy else \ + zmq_dealer_publisher.DealerPublisherLight( + conf, zmq_address.get_broker_address(conf)) super(ZmqClient, self).__init__( conf, matchmaker, allowed_remote_exmods, @@ -38,7 +41,14 @@ class ZmqClient(zmq_client_base.ZmqClientBase): zmq_dealer_call_publisher.DealerCallPublisher( conf, matchmaker), - "default": zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) + # Here use DealerPublisherLight for sending request to proxy + # which finally uses PubPublisher to send fanout in case of + # 'use_pub_sub' option configured. + zmq_names.CAST_FANOUT_TYPE: + zmq_dealer_publisher.DealerPublisherLight( + conf, zmq_address.get_broker_address(conf)) + if conf.use_pub_sub else default_publisher, + + "default": default_publisher } ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py index aa7cd12d1..e4d7adb67 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py @@ -73,5 +73,8 @@ class ZmqClientBase(object): self.notify_publisher.send_request(request) def cleanup(self): + cleaned = set() for publisher in self.publishers.values(): - publisher.cleanup() + if publisher not in cleaned: + publisher.cleanup() + cleaned.add(publisher) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_light.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_light.py deleted file mode 100644 index 873911f8d..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_light.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_call_publisher -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher -from oslo_messaging._drivers.zmq_driver.client import zmq_client_base -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names - -zmq = zmq_async.import_zmq() - - -class ZmqClientLight(zmq_client_base.ZmqClientBase): - - def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if not conf.zmq_use_broker: - raise rpc_common.RPCException( - "This client needs proxy to be configured!") - - super(ZmqClientLight, self).__init__( - conf, matchmaker, allowed_remote_exmods, - publishers={ - zmq_names.CALL_TYPE: - zmq_dealer_call_publisher.DealerCallPublisher( - conf, matchmaker), - - "default": zmq_dealer_publisher.DealerPublisherLight( - conf, zmq_address.get_broker_address(self.conf)) - } - ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index 455b7ba5a..ae7ebc919 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -61,7 +61,12 @@ class Request(object): self.target = target self.context = context self.message = message + self.retry = retry + if not isinstance(retry, int) and retry is not None: + raise ValueError( + "retry must be an integer, not {0}".format(type(retry))) + self.message_id = str(uuid.uuid1()) self.proxy_reply_id = None @@ -90,6 +95,11 @@ class RpcRequest(Request): self.timeout = kwargs.pop("timeout") assert self.timeout is not None, "Timeout should be specified!" + if not isinstance(self.timeout, int) and self.timeout is not None: + raise ValueError( + "timeout must be an integer, not {0}" + .format(type(self.timeout))) + super(RpcRequest, self).__init__(*args, **kwargs) def create_envelope(self): diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py index 7b9b69d79..6bbb6454c 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py @@ -14,14 +14,11 @@ import abc import collections import logging -import random import retrying import six -import oslo_messaging from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._i18n import _LI, _LW LOG = logging.getLogger(__name__) @@ -32,9 +29,51 @@ class MatchMakerBase(object): def __init__(self, conf, *args, **kwargs): super(MatchMakerBase, self).__init__(*args, **kwargs) - self.conf = conf + @abc.abstractmethod + def register_publisher(self, hostname): + """Register publisher on nameserver. + + This works for PUB-SUB only + + :param hostname: host for the topic in "host:port" format + host for back-chatter in "host:port" format + :type hostname: tuple + """ + + @abc.abstractmethod + def unregister_publisher(self, hostname): + """Unregister publisher on nameserver. + + This works for PUB-SUB only + + :param hostname: host for the topic in "host:port" format + host for back-chatter in "host:port" format + :type hostname: tuple + """ + + def get_publishers_retrying(self): + """Retry until at least one publisher appears""" + + def retry_if_empty(publishers): + return not publishers + + _retry = retrying.retry(retry_on_result=retry_if_empty) + + @_retry + def _get_publishers(): + return self.get_publishers() + + return _get_publishers() + + @abc.abstractmethod + def get_publishers(self): + """Get all publisher-hosts from nameserver. + + :returns: a list of tuples of strings "hostname:port" hosts + """ + @abc.abstractmethod def register(self, target, hostname, listener_type): """Register target on nameserver. @@ -68,71 +107,6 @@ class MatchMakerBase(object): :returns: a list of "hostname:port" hosts """ - def get_single_host(self, target, listener_type, timeout=None, retry=0): - """Get a single host by target. - - :param target: the target for messages - :type target: Target - :param timeout: matchmaker query timeout - :type timeout: integer - :param retry: the number of retries to do - None or -1 means retry forever - 0 means do not retry - N means retry N times - :type retry: integer - :returns: a "hostname:port" host - """ - - if not isinstance(timeout, int) and timeout is not None: - raise ValueError( - "timeout must be integer, not {0}".format(type(timeout))) - if not isinstance(retry, int) and retry is not None: - raise ValueError( - "retry must be integer, not {0}".format(type(retry))) - - if timeout is None or timeout < 0: - full_timeout = 0 - retry_timeout = 0 - else: - retry_timeout = timeout * 1000 - - if retry is None or retry < 0: - full_timeout = None - else: - full_timeout = retry * retry_timeout - - _retry = retrying.retry(stop_max_delay=full_timeout, - wait_fixed=retry_timeout) - - @_retry - def _get_single_host(): - hosts = self.get_hosts(target, listener_type) - try: - if not hosts: - err_msg = "No hosts were found for target %s." % target - LOG.error(err_msg) - raise oslo_messaging.InvalidTarget(err_msg, target) - - if len(hosts) == 1: - host = hosts[0] - LOG.info(_LI( - "A single host %(host)s found for target %(target)s.") - % {"host": host, "target": target}) - else: - host = random.choice(hosts) - LOG.warning(_LW( - "Multiple hosts %(hosts)s were found for target " - " %(target)s. Using the random one - %(host)s.") - % {"hosts": hosts, "target": target, "host": host}) - return host - except oslo_messaging.InvalidTarget as ex: - if timeout: - raise oslo_messaging.MessagingTimeout() - else: - raise ex - - return _get_single_host() - class DummyMatchMaker(MatchMakerBase): @@ -140,6 +114,18 @@ class DummyMatchMaker(MatchMakerBase): super(DummyMatchMaker, self).__init__(conf, *args, **kwargs) self._cache = collections.defaultdict(list) + self._publishers = set() + + def register_publisher(self, hostname): + if hostname not in self._publishers: + self._publishers.add(hostname) + + def unregister_publisher(self, hostname): + if hostname in self._publishers: + self._publishers.remove(hostname) + + def get_publishers(self): + return list(self._publishers) def register(self, target, hostname, listener_type): key = zmq_address.target_to_key(target, listener_type) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index 3bbcf321a..1c12934d8 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -36,6 +36,8 @@ matchmaker_redis_opts = [ help='Password for Redis server (optional).'), ] +_PUBLISHERS_KEY = "PUBLISHERS" + class RedisMatchMaker(base.MatchMakerBase): @@ -49,6 +51,22 @@ class RedisMatchMaker(base.MatchMakerBase): password=self.conf.matchmaker_redis.password, ) + def register_publisher(self, hostname): + host_str = ",".join(hostname) + if host_str not in self._get_hosts_by_key(_PUBLISHERS_KEY): + self._redis.lpush(_PUBLISHERS_KEY, host_str) + + def unregister_publisher(self, hostname): + host_str = ",".join(hostname) + self._redis.lrem(_PUBLISHERS_KEY, 0, host_str) + + def get_publishers(self): + hosts = [] + hosts.extend([tuple(host_str.split(",")) + for host_str in + self._get_hosts_by_key(_PUBLISHERS_KEY)]) + return hosts + def _get_hosts_by_key(self, key): return self._redis.lrange(key, 0, -1) diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index 58f8d8af1..7ba3b80f3 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -96,9 +96,10 @@ class GreenExecutor(zmq_poller.Executor): def __init__(self, method): self._method = method super(GreenExecutor, self).__init__(None) + self._done = threading.Event() def _loop(self): - while True: + while not self._done.is_set(): self._method() eventlet.sleep() @@ -112,3 +113,6 @@ class GreenExecutor(zmq_poller.Executor): def stop(self): if self.thread is not None: self.thread.kill() + + def done(self): + self._done.set() diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index 8167715f1..aa0c73464 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -47,8 +47,6 @@ class ThreadingPoller(zmq_poller.ZmqPoller): def poll(self, timeout=None): - LOG.debug("Entering poll method") - if timeout: timeout *= 1000 # zmq poller waits milliseconds @@ -94,3 +92,6 @@ class ThreadingExecutor(zmq_poller.Executor): def wait(self): self.thread.join() + + def done(self): + self._stop.set() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index b7532a74a..2145c96fc 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -38,24 +38,6 @@ class ConsumerBase(object): self.sockets = [] self.context = zmq.Context() - def subscribe_socket(self, socket_type): - try: - socket = zmq_socket.ZmqRandomPortSocket( - self.conf, self.context, socket_type) - self.sockets.append(socket) - self.poller.register(socket, self.receive_message) - LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d", - {"stype": zmq_names.socket_type_str(socket_type), - "addr": socket.bind_address, - "port": socket.port}) - return socket - except zmq.ZMQError as e: - errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ - % (self.port, e) - LOG.error(_LE("Failed binding to port %(port)d: %(e)s") - % (self.port, e)) - raise rpc_common.RPCException(errmsg) - @abc.abstractmethod def listen(self, target): """Associate new sockets with targets here""" @@ -78,6 +60,24 @@ class SingleSocketConsumer(ConsumerBase): super(SingleSocketConsumer, self).__init__(conf, poller, server) self.socket = self.subscribe_socket(socket_type) + def subscribe_socket(self, socket_type): + try: + socket = zmq_socket.ZmqRandomPortSocket( + self.conf, self.context, socket_type) + self.sockets.append(socket) + self.poller.register(socket, self.receive_message) + LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d", + {"stype": zmq_names.socket_type_str(socket_type), + "addr": socket.bind_address, + "port": socket.port}) + return socket + except zmq.ZMQError as e: + errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ + % (self.port, e) + LOG.error(_LE("Failed binding to port %(port)d: %(e)s") + % (self.port, e)) + raise rpc_common.RPCException(errmsg) + @property def address(self): return self.socket.bind_address diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py new file mode 100644 index 000000000..d51032182 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -0,0 +1,158 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading +import uuid + +import six + +from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_consumer_base +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LE + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class SubIncomingMessage(base.IncomingMessage): + + def __init__(self, listener, request, socket, poller): + super(SubIncomingMessage, self).__init__( + listener, request.context, request.message) + self.socket = socket + self.msg_id = request.message_id + poller.resume_polling(socket) + + def reply(self, reply=None, failure=None, log_failure=True): + """Reply is not needed for non-call messages""" + + def acknowledge(self): + LOG.debug("Not sending acknowledge for %s", self.msg_id) + + def requeue(self): + """Requeue is not supported""" + + +class SubConsumer(zmq_consumer_base.ConsumerBase): + + def __init__(self, conf, poller, server): + super(SubConsumer, self).__init__(conf, poller, server) + self.matchmaker = server.matchmaker + self.subscriptions = set() + self.targets = [] + self._socket_lock = threading.Lock() + self.socket = zmq_socket.ZmqSocket(self.context, zmq.SUB) + self.sockets.append(self.socket) + self.id = uuid.uuid4() + self.publishers_poller = MatchmakerPoller( + self.matchmaker, on_result=self.on_publishers) + + def _subscribe_on_target(self, target): + topic_filter = zmq_address.target_to_subscribe_filter(target) + if target.topic: + self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic)) + self.subscriptions.add(six.b(target.topic)) + if target.server: + self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server)) + self.subscriptions.add(six.b(target.server)) + if target.topic and target.server: + self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter) + self.subscriptions.add(topic_filter) + + LOG.debug("[%(host)s] Subscribing to topic %(filter)s" + % {"host": self.id, + "filter": topic_filter}) + + def on_publishers(self, publishers): + with self._socket_lock: + for host, sync in publishers: + self.socket.connect(zmq_address.get_tcp_direct_address(host)) + + self.poller.register(self.socket, self.receive_message) + LOG.debug("[%s] SUB consumer connected to publishers %s" + % (self.id, publishers)) + + def listen(self, target): + LOG.debug("Listen to target %s" % target) + with self._socket_lock: + self._subscribe_on_target(target) + + def _receive_request(self, socket): + topic_filter = socket.recv() + LOG.debug("[%s] Received %s topic" % (self.id, topic_filter)) + assert topic_filter in self.subscriptions + request = socket.recv_pyobj() + return request + + def receive_message(self, socket): + try: + request = self._receive_request(socket) + if not request: + return None + LOG.debug("Received %(type)s, %(id)s, %(target)s" + % {"type": request.msg_type, + "id": request.message_id, + "target": request.target}) + + if request.msg_type not in zmq_names.MULTISEND_TYPES: + LOG.error(_LE("Unknown message type: %s") % request.msg_type) + else: + return SubIncomingMessage(self.server, request, socket, + self.poller) + except zmq.ZMQError as e: + LOG.error(_LE("Receiving message failed: %s") % str(e)) + + +class MatchmakerPoller(object): + """This entity performs periodical async polling + to the matchmaker if no hosts were registered for + specified target before. + """ + + def __init__(self, matchmaker, on_result): + self.matchmaker = matchmaker + self.executor = zmq_async.get_executor( + method=self._poll_for_publishers) + self.on_result = on_result + self.executor.execute() + + def _poll_for_publishers(self): + publishers = self.matchmaker.get_publishers_retrying() + if publishers: + self.on_result(publishers) + self.executor.done() + + +class BackChatter(object): + + def __init__(self, context): + self.socket = zmq_socket.ZmqSocket(context, zmq.PUSH) + + def connect(self, address): + self.socket.connect(address) + + def send_ready(self): + for i in range(self.socket.connections_count()): + self.socket.send(zmq_names.ACK_TYPE) + + def close(self): + self.socket.setsockopt(zmq.LINGER, 5) + self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index c37aef047..0dfec71f5 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -18,6 +18,8 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_router_consumer +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_sub_consumer from oslo_messaging._drivers.zmq_driver import zmq_async LOG = logging.getLogger(__name__) @@ -31,14 +33,17 @@ class ZmqServer(base.Listener): super(ZmqServer, self).__init__(driver) self.matchmaker = matchmaker self.poller = zmq_async.get_poller() - if conf.zmq_use_broker: - self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker( - conf, self.poller, self) - else: - self.rpc_consumer = zmq_router_consumer.RouterConsumer( + self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker( + conf, self.poller, self) if conf.direct_over_proxy else \ + zmq_router_consumer.RouterConsumer( conf, self.poller, self) self.notify_consumer = self.rpc_consumer + self.sub_consumer = zmq_sub_consumer.SubConsumer( + conf, self.poller, self) if conf.use_pub_sub else None + self.consumers = [self.rpc_consumer] + if self.sub_consumer: + self.consumers.append(self.sub_consumer) @base.batch_poll_helper def poll(self, timeout=None): @@ -59,6 +64,9 @@ class ZmqServer(base.Listener): consumer = self.rpc_consumer consumer.listen(target) + if self.sub_consumer: + self.sub_consumer.listen(target) + def listen_notification(self, targets_and_priorities): consumer = self.notify_consumer diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index 397bd1074..87e1f9d4e 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import six + def combine_address(host, port): return "%s:%s" % (host, port) @@ -46,3 +48,14 @@ def target_to_key(target, listener_type): return prefix(target.topic) if target.server: return prefix(target.server) + + +def target_to_subscribe_filter(target): + if target.topic and target.server: + attributes = ['topic', 'server'] + key = "/".join(getattr(target, attr) for attr in attributes) + return six.b(key) + if target.topic: + return six.b(target.topic) + if target.server: + return six.b(target.server) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index 093544118..d0e184345 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -70,6 +70,11 @@ def get_executor(method, zmq_concurrency='eventlet'): return threading_poller.ThreadingExecutor(method) +def get_proc_executor(method): + from oslo_messaging._drivers.zmq_driver import zmq_poller + return zmq_poller.MutliprocessingExecutor(method) + + def _is_eventlet_zmq_available(): return importutils.try_import('eventlet.green.zmq') diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py index a62ea8a6f..cfe249574 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py @@ -13,6 +13,7 @@ # under the License. import abc +import multiprocessing import six @@ -104,3 +105,31 @@ class Executor(object): @abc.abstractmethod def wait(self): """Wait until pass""" + + @abc.abstractmethod + def done(self): + """More soft way to stop rather than killing thread""" + + +class MutliprocessingExecutor(Executor): + + def __init__(self, method): + process = multiprocessing.Process(target=self._loop) + self._method = method + super(MutliprocessingExecutor, self).__init__(process) + + def _loop(self): + while not self._stop.is_set(): + self._method() + + def execute(self): + self.thread.start() + + def stop(self): + self._stop.set() + + def wait(self): + self.thread.join() + + def done(self): + self._stop.set() diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py index 5751e5ba3..ac58b205e 100644 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py @@ -66,8 +66,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase): self.assertEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1]) - self.assertEqual(self.test_matcher.get_single_host(self.target, "test"), - self.host1) def test_register_two_hosts(self): self.test_matcher.register(self.target, self.host1, "test") @@ -75,8 +73,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase): self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1, self.host2]) - self.assertIn(self.test_matcher.get_single_host(self.target, "test"), - [self.host1, self.host2]) def test_register_unsibscribe(self): self.test_matcher.register(self.target, self.host1, "test") @@ -86,8 +82,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase): self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1]) - self.assertNotIn(self.test_matcher.get_single_host(self.target, "test"), - [self.host2]) def test_register_two_same_hosts(self): self.test_matcher.register(self.target, self.host1, "test") @@ -95,14 +89,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase): self.assertEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1]) - self.assertEqual(self.test_matcher.get_single_host(self.target, "test"), - self.host1) def test_get_hosts_wrong_topic(self): target = oslo_messaging.Target(topic="no_such_topic") self.assertEqual(self.test_matcher.get_hosts(target, "test"), []) - - def test_get_single_host_wrong_topic(self): - target = oslo_messaging.Target(topic="no_such_topic") - self.assertRaises(oslo_messaging.InvalidTarget, - self.test_matcher.get_single_host, target, "test") diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 1d710d3a9..0957bfe88 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -13,7 +13,6 @@ # under the License. import logging -import threading import fixtures import testtools @@ -22,77 +21,16 @@ import oslo_messaging from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_socket -from oslo_messaging._i18n import _ from oslo_messaging.tests import utils as test_utils +from oslo_messaging.tests.drivers.zmq import zmq_common LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class TestServerListener(object): - def __init__(self, driver): - self.driver = driver - self.listener = None - self.executor = zmq_async.get_executor(self._run) - self._stop = threading.Event() - self._received = threading.Event() - self.message = None - - def listen(self, target): - self.listener = self.driver.listen(target) - self.executor.execute() - - def listen_notifications(self, targets_and_priorities): - self.listener = self.driver.listen_for_notifications( - targets_and_priorities, {}) - self.executor.execute() - - def _run(self): - try: - message = self.listener.poll() - if message: - message = message[0] - message.acknowledge() - self._received.set() - self.message = message - message.reply(reply=True) - except Exception: - LOG.exception(_("Unexpected exception occurred.")) - - def stop(self): - self.executor.stop() - - -class ZmqBaseTestCase(test_utils.BaseTestCase): - """Base test case for all ZMQ tests """ - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(ZmqBaseTestCase, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_ipc_dir': self.internal_ipc_dir, - 'zmq_use_broker': False, - 'rpc_zmq_matchmaker': 'dummy'} - self.config(**kwargs) - - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - self.listener = TestServerListener(self.driver) - - self.addCleanup(stopRpc(self.__dict__)) - - -class ZmqTestPortsRange(ZmqBaseTestCase): +class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): @testtools.skipIf(zmq is None, "zmq not available") def setUp(self): @@ -131,18 +69,7 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase): self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) -class stopRpc(object): - def __init__(self, attrs): - self.attrs = attrs - - def __call__(self): - if self.attrs['driver']: - self.attrs['driver'].cleanup() - if self.attrs['listener']: - self.attrs['listener'].stop() - - -class TestZmqBasics(ZmqBaseTestCase): +class TestZmqBasics(zmq_common.ZmqBaseTestCase): def test_send_receive_raises(self): """Call() without method.""" @@ -183,6 +110,7 @@ class TestZmqBasics(ZmqBaseTestCase): def test_send_fanout(self): target = oslo_messaging.Target(topic='testtopic', fanout=True) + self.listener.listen(target) result = self.driver.send( diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py new file mode 100644 index 000000000..01619d209 --- /dev/null +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -0,0 +1,120 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import pickle +import time + +import contextlib +import fixtures +import testtools + + +import oslo_messaging +from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._drivers.zmq_driver.client import zmq_request +from oslo_messaging._drivers.zmq_driver.client.publishers \ + import zmq_pub_publisher +from oslo_messaging.tests import utils as test_utils +from oslo_messaging.tests.drivers.zmq import zmq_common + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class TestPubSub(zmq_common.ZmqBaseTestCase): + + LISTENERS_COUNT = 3 + + def setUp(self): + super(TestPubSub, self).setUp() + + kwargs = {'use_pub_sub': True} + self.config(**kwargs) + + self.publisher = zmq_pub_publisher.PubPublisherProxy( + self.conf, self.driver.matchmaker) + + self.listeners = [] + for i in range(self.LISTENERS_COUNT): + self.listeners.append(zmq_common.TestServerListener(self.driver)) + + def _send_request(self, target): + # Needed only in test env to get listener a chance to connect + # before request fires + time.sleep(1) + with contextlib.closing(zmq_request.FanoutRequest( + target, context={}, message={'method': 'hello-world'}, + timeout=0, retry=None)) as request: + self.publisher.send_request([request.create_envelope(), + pickle.dumps(request)]) + + def _check_listener(self, listener): + listener._received.wait(timeout=5) + self.assertEqual(True, listener._received.isSet()) + method = listener.message.message[u'method'] + self.assertEqual(u'hello-world', method) + + def _check_listener_negative(self, listener): + listener._received.wait(timeout=1) + self.assertEqual(False, listener._received.isSet()) + + def test_single_listener(self): + target = oslo_messaging.Target(topic='testtopic', fanout=True) + self.listener.listen(target) + + self._send_request(target) + + self._check_listener(self.listener) + + def test_all_listeners(self): + target = oslo_messaging.Target(topic='testtopic', fanout=True) + + for listener in self.listeners: + listener.listen(target) + + self._send_request(target) + + for listener in self.listeners: + self._check_listener(listener) + + def test_filtered(self): + target = oslo_messaging.Target(topic='testtopic', fanout=True) + target_wrong = oslo_messaging.Target(topic='wrong', fanout=True) + + self.listeners[0].listen(target) + self.listeners[1].listen(target) + self.listeners[2].listen(target_wrong) + + self._send_request(target) + + self._check_listener(self.listeners[0]) + self._check_listener(self.listeners[1]) + self._check_listener_negative(self.listeners[2]) + + def test_topic_part_matching(self): + target = oslo_messaging.Target(topic='testtopic', server='server') + target_part = oslo_messaging.Target(topic='testtopic', fanout=True) + + self.listeners[0].listen(target) + self.listeners[1].listen(target) + + self._send_request(target_part) + + self._check_listener(self.listeners[0]) + self._check_listener(self.listeners[1]) + diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py new file mode 100644 index 000000000..f5c6bafac --- /dev/null +++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py @@ -0,0 +1,102 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading + +import fixtures +import testtools + +import oslo_messaging +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _ +from oslo_messaging.tests import utils as test_utils + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class TestServerListener(object): + + def __init__(self, driver): + self.driver = driver + self.listener = None + self.executor = zmq_async.get_executor(self._run) + self._stop = threading.Event() + self._received = threading.Event() + self.message = None + + def listen(self, target): + self.listener = self.driver.listen(target) + self.executor.execute() + + def listen_notifications(self, targets_and_priorities): + self.listener = self.driver.listen_for_notifications( + targets_and_priorities, {}) + self.executor.execute() + + def _run(self): + try: + messages = self.listener.poll() + if messages: + message = messages[0] + message.acknowledge() + self._received.set() + self.message = message + message.reply(reply=True) + except Exception: + LOG.exception(_("Unexpected exception occurred.")) + + def stop(self): + self.executor.stop() + + +class ZmqBaseTestCase(test_utils.BaseTestCase): + """Base test case for all ZMQ tests """ + + @testtools.skipIf(zmq is None, "zmq not available") + def setUp(self): + super(ZmqBaseTestCase, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + + # Set config values + self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path + kwargs = {'rpc_zmq_bind_address': '127.0.0.1', + 'rpc_zmq_host': '127.0.0.1', + 'rpc_response_timeout': 5, + 'rpc_zmq_ipc_dir': self.internal_ipc_dir, + 'use_pub_sub': False, + 'direct_over_proxy': False, + 'rpc_zmq_matchmaker': 'dummy'} + self.config(**kwargs) + + # Get driver + transport = oslo_messaging.get_transport(self.conf) + self.driver = transport._driver + + self.listener = TestServerListener(self.driver) + + self.addCleanup(StopRpc(self.__dict__)) + + +class StopRpc(object): + def __init__(self, attrs): + self.attrs = attrs + + def __call__(self): + if self.attrs['driver']: + self.attrs['driver'].cleanup() + if self.attrs['listener']: + self.attrs['listener'].stop()