From 73cd49129f0ce2a799938b4fec9dcd847b0a77ad Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Fri, 19 Jun 2015 14:29:18 +0300 Subject: [PATCH] Initial commit for new zmq driver implementation - Minimal RPC (CALL + direct CAST) implementation - Has up and running oslo_messaging/tests/drivers/test_impl_zmq - Pep8 fixed. - Works over REQ/REP pipeline according to [1] - Has a beginning of eventlet/threading behavior differentiation Fanout and Notifier are not yet supported Devstack not yet fixed Functional tests not yet fixed ..[1] - https://review.openstack.org/#/c/171131/ Change-Id: I44cd48070bf7c7f46152fdf0e54664a7dee97de9 --- oslo_messaging/_drivers/impl_zmq.py | 1069 +---------------- .../_drivers/zmq_driver/__init__.py | 0 .../_drivers/zmq_driver/broker/__init__.py | 1 + .../zmq_driver/broker/zmq_base_proxy.py | 92 ++ .../_drivers/zmq_driver/broker/zmq_broker.py | 71 ++ .../zmq_driver/broker/zmq_call_proxy.py | 110 ++ .../zmq_driver/broker/zmq_cast_proxy.py | 79 ++ .../_drivers/zmq_driver/notifier/__init__.py | 1 + .../_drivers/zmq_driver/poller/__init__.py | 0 .../zmq_driver/poller/green_poller.py | 111 ++ .../zmq_driver/poller/threading_poller.py | 50 + .../_drivers/zmq_driver/rpc/__init__.py | 0 .../zmq_driver/rpc/client/__init__.py | 0 .../zmq_driver/rpc/client/zmq_call_request.py | 49 + .../zmq_driver/rpc/client/zmq_cast_dealer.py | 72 ++ .../rpc/client/zmq_cast_publisher.py | 40 + .../zmq_driver/rpc/client/zmq_client.py | 33 + .../zmq_driver/rpc/client/zmq_request.py | 76 ++ .../zmq_driver/rpc/server/__init__.py | 0 .../rpc/server/zmq_base_consumer.py | 35 + .../rpc/server/zmq_call_responder.py | 96 ++ .../zmq_driver/rpc/server/zmq_server.py | 49 + .../_drivers/zmq_driver/zmq_async.py | 59 + .../_drivers/zmq_driver/zmq_context.py | 33 + .../_drivers/zmq_driver/zmq_poller.py | 48 + .../_drivers/zmq_driver/zmq_serializer.py | 54 + .../_drivers/zmq_driver/zmq_topic.py | 61 + oslo_messaging/tests/drivers/test_impl_zmq.py | 467 ++----- tests/drivers/test_impl_zmq.py | 446 ++----- 29 files changed, 1490 insertions(+), 1712 deletions(-) create mode 100644 oslo_messaging/_drivers/zmq_driver/__init__.py create mode 100644 oslo_messaging/_drivers/zmq_driver/broker/__init__.py create mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py create mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py create mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py create mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py create mode 100644 oslo_messaging/_drivers/zmq_driver/notifier/__init__.py create mode 100644 oslo_messaging/_drivers/zmq_driver/poller/__init__.py create mode 100644 oslo_messaging/_drivers/zmq_driver/poller/green_poller.py create mode 100644 oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/__init__.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py create mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py create mode 100644 oslo_messaging/_drivers/zmq_driver/zmq_async.py create mode 100644 oslo_messaging/_drivers/zmq_driver/zmq_context.py create mode 100644 oslo_messaging/_drivers/zmq_driver/zmq_poller.py create mode 100644 oslo_messaging/_drivers/zmq_driver/zmq_serializer.py create mode 100644 oslo_messaging/_drivers/zmq_driver/zmq_topic.py diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f673b9c06..7357aa3e3 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -12,39 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import logging -import os import pprint -import re import socket -import sys -import threading -import types -import uuid -import eventlet -import greenlet from oslo_config import cfg -from oslo_serialization import jsonutils -from oslo_utils import excutils -from oslo_utils import importutils -import six -from six import moves -from stevedore import driver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._executors import base as executor_base # FIXME(markmc) -from oslo_messaging._i18n import _, _LE, _LW -from oslo_messaging._drivers import pool +from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_client +from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_server +from oslo_messaging._executors import base as executor_base -zmq = importutils.try_import('eventlet.green.zmq') - -# for convenience, are not modified. pformat = pprint.pformat -Timeout = eventlet.timeout.Timeout LOG = logging.getLogger(__name__) RPCException = rpc_common.RPCException @@ -62,6 +43,11 @@ zmq_opts = [ help='MatchMaker driver.', ), + cfg.BoolOpt('rpc_zmq_all_req_rep', + default=True, + deprecated_group='DEFAULT', + help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'), + # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port.'), @@ -87,905 +73,6 @@ zmq_opts = [ 'Only supported by impl_zmq.'), ] -CONF = cfg.CONF - -matchmaker = None # memoized matchmaker object - - -def _serialize(data): - """Serialization wrapper. - - We prefer using JSON, but it cannot encode all types. - Error if a developer passes us bad data. - """ - try: - return jsonutils.dumps(data, ensure_ascii=True) - except TypeError: - with excutils.save_and_reraise_exception(): - LOG.error(_("JSON serialization failed.")) - - -def _deserialize(data): - """Deserialization wrapper.""" - LOG.debug("Deserializing: %r", data) - return jsonutils.loads(data) - - -class ZmqSocket(object): - """A tiny wrapper around ZeroMQ. - - Simplifies the send/recv protocol and connection management. - Can be used as a Context (supports the 'with' statement). - """ - - def __init__(self, addr, zmq_type, bind=True, subscribe=None, ctxt=None): - self.ctxt = ctxt or zmq.Context(CONF.rpc_zmq_contexts) - self.sock = self.ctxt.socket(zmq_type) - - # Enable IPv6-support in libzmq. - # When IPv6 is enabled, a socket will connect to, or accept - # connections from, both IPv4 and IPv6 hosts. - try: - self.sock.ipv6 = True - except AttributeError: - # NOTE(dhellmann): Sometimes the underlying library does - # not recognize the IPV6 option. There's nothing we can - # really do in that case, so ignore the error and keep - # trying to work. - pass - - self.addr = addr - self.type = zmq_type - self.subscriptions = [] - - # Support failures on sending/receiving on wrong socket type. - self.can_recv = zmq_type in (zmq.PULL, zmq.SUB) - self.can_send = zmq_type in (zmq.PUSH, zmq.PUB) - self.can_sub = zmq_type in (zmq.SUB, ) - - # Support list, str, & None for subscribe arg (cast to list) - do_sub = { - list: subscribe, - str: [subscribe], - type(None): [] - }[type(subscribe)] - - for f in do_sub: - self.subscribe(f) - - str_data = {'addr': addr, 'type': self.socket_s(), - 'subscribe': subscribe, 'bind': bind} - - LOG.debug("Connecting to %(addr)s with %(type)s", str_data) - LOG.debug("-> Subscribed to %(subscribe)s", str_data) - LOG.debug("-> bind: %(bind)s", str_data) - - try: - if bind: - self.sock.bind(addr) - else: - self.sock.connect(addr) - except Exception: - raise RPCException(_("Could not open socket.")) - - def socket_s(self): - """Get socket type as string.""" - t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER', - 'DEALER') - return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type] - - def subscribe(self, msg_filter): - """Subscribe.""" - if not self.can_sub: - raise RPCException("Cannot subscribe on this socket.") - LOG.debug("Subscribing to %s", msg_filter) - - try: - arg = msg_filter - if six.PY3: - arg = arg.encode('utf-8') - self.sock.setsockopt(zmq.SUBSCRIBE, arg) - except Exception: - return - - self.subscriptions.append(msg_filter) - - def unsubscribe(self, msg_filter): - """Unsubscribe.""" - if msg_filter not in self.subscriptions: - return - arg = msg_filter - if six.PY3: - arg = arg.encode('utf-8') - self.sock.setsockopt(zmq.UNSUBSCRIBE, arg) - self.subscriptions.remove(msg_filter) - - @property - def closed(self): - return self.sock is None or self.sock.closed - - def close(self): - if self.sock is None or self.sock.closed: - return - - # We must unsubscribe, or we'll leak descriptors. - if self.subscriptions: - for f in self.subscriptions: - try: - self.sock.setsockopt(zmq.UNSUBSCRIBE, f) - except Exception: - pass - self.subscriptions = [] - - try: - # Default is to linger - self.sock.close() - self.ctxt.term() - except Exception: - # While this is a bad thing to happen, - # it would be much worse if some of the code calling this - # were to fail. For now, lets log, and later evaluate - # if we can safely raise here. - LOG.error("ZeroMQ socket could not be closed.") - self.sock = None - - def recv(self, **kwargs): - if not self.can_recv: - raise RPCException(_("You cannot recv on this socket.")) - return self.sock.recv_multipart(**kwargs) - - def send(self, data, **kwargs): - if not self.can_send: - raise RPCException(_("You cannot send on this socket.")) - self.sock.send_multipart(data, **kwargs) - - -class ZmqClient(object): - """Client for ZMQ sockets.""" - - def __init__(self, addr, ctxt=None): - self.address = addr - self.outq = ZmqSocket(addr, zmq.PUSH, bind=False, ctxt=ctxt) - - def cast(self, msg_id, topic, data, envelope): - msg_id = msg_id or '0' - - if six.PY3: - msg_id = msg_id.encode('utf-8') - - if not envelope: - data = _serialize(data) - if six.PY3: - data = data.encode('utf-8') - data = (msg_id, topic, b'cast', data) - self.outq.send([bytes(item) for item in data]) - return - - rpc_envelope = rpc_common.serialize_msg(data[1]) - zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items()) - data = (msg_id, topic, b'impl_zmq_v2', data[0]) + zmq_msg - self.outq.send([bytes(item) for item in data]) - - def close(self): - self.outq.close() - - -class ZmqClientContext(object): - """This is essentially a wrapper around ZmqClient that supports 'with'. - It can also return a new ZmqClient, or one from a pool. - - The function will also catch when an instance of this class is to be - deleted. With that we can return ZmqClients to the pool on exceptions - and so forth without making the caller be responsible for catching them. - If possible the function makes sure to return a client to the pool. - - Based on amqp.ConnectionContext. - """ - - def __init__(self, address, connection_pool=None, pooled=False): - self.connection = None - self.connection_pool = connection_pool - self.pooled = pooled - if self.pooled and self.connection_pool is not None: - self.connection = self.connection_pool.get(address) - else: - self.connection = ZmqClient(address) - - def __enter__(self): - """When with ZmqClientContext() is used, return self.""" - return self - - def _done(self): - """If the client came from a pool, clean it up and put it back. - If it did not come from a pool, close it. - """ - if self.connection: - if self.pooled and self.connection_pool is not None: - # Reset the connection so it's ready for the next caller - # to grab from the pool - self.connection_pool.put(self.connection) - else: - try: - self.connection.close() - except Exception: - pass - self.connection = None - - def __exit__(self, exc_type, exc_value, tb): - """End of 'with' statement. We're done here.""" - self._done() - - def __del__(self): - """Caller is done with this client. Make sure we cleaned up.""" - self._done() - - def close(self): - """Caller is done with this client.""" - self._done() - - def __getattr__(self, key): - """Proxy all other calls to the ZmqClient instance.""" - if self.connection: - return getattr(self.connection, key) - else: - raise rpc_common.InvalidRPCConnectionReuse() - - -class RpcContext(rpc_common.CommonRpcContext): - """Context that supports replying to a rpc.call.""" - def __init__(self, **kwargs): - self.replies = [] - super(RpcContext, self).__init__(**kwargs) - - def deepcopy(self): - values = self.to_dict() - values['replies'] = self.replies - return self.__class__(**values) - - def reply(self, reply=None, failure=None, ending=False): - if ending: - return - self.replies.append(reply) - - @classmethod - def marshal(self, ctx): - if not isinstance(ctx, dict): - ctx_data = ctx.to_dict() - else: - ctx_data = ctx - return _serialize(ctx_data) - - @classmethod - def unmarshal(self, data): - return RpcContext.from_dict(_deserialize(data)) - - -class InternalContext(object): - """Used by ConsumerBase as a private context for - methods.""" - - def __init__(self, proxy): - self.proxy = proxy - self.msg_waiter = None - - def _get_response(self, ctx, proxy, topic, data): - """Process a curried message and cast the result to topic.""" - LOG.debug("Running func with context: %s", ctx.to_dict()) - data.setdefault('version', None) - data.setdefault('args', {}) - - try: - if not data.get("method"): - raise KeyError - result = proxy.dispatch(ctx, data) - return ConsumerBase.normalize_reply(result, ctx.replies) - except greenlet.GreenletExit: - # ignore these since they are just from shutdowns - pass - except rpc_common.ClientException as e: - LOG.debug("Expected exception during message handling (%s)", - e._exc_info[1]) - return {'exc': - rpc_common.serialize_remote_exception(e._exc_info, - log_failure=False)} - except Exception: - LOG.error(_("Exception during message handling")) - return {'exc': - rpc_common.serialize_remote_exception(sys.exc_info())} - - def reply(self, driver, ctx, proxy, - msg_id=None, context=None, topic=None, msg=None): - """Reply to a casted call.""" - # NOTE(ewindisch): context kwarg exists for Grizzly compat. - # this may be able to be removed earlier than - # 'I' if ConsumerBase.process were refactored. - if type(msg) is list: - payload = msg[-1] - else: - payload = msg - - response = ConsumerBase.normalize_reply( - self._get_response(ctx, proxy, topic, payload), - ctx.replies) - - LOG.debug("Sending reply") - _multi_send(driver, _cast, ctx, topic, { - 'method': '-process_reply', - 'args': { - 'msg_id': msg_id, # Include for Folsom compat. - 'response': response - } - }, _msg_id=msg_id, pooled=True) - - -class ConsumerBase(object): - """Base Consumer.""" - - def __init__(self, driver): - self.driver = driver - self.private_ctx = InternalContext(None) - - @classmethod - def normalize_reply(self, result, replies): - # TODO(ewindisch): re-evaluate and document this method. - if isinstance(result, types.GeneratorType): - return list(result) - elif replies: - return replies - else: - return [result] - - def process(self, proxy, ctx, data): - data.setdefault('version', None) - data.setdefault('args', {}) - - # Method starting with - are - # processed internally. (non-valid method name) - method = data.get('method') - # Internal method - # uses internal context for safety. - if method == '-reply': - self.private_ctx.reply(self.driver, ctx, proxy, **data['args']) - return - - proxy.dispatch(ctx, data) - - -class ZmqBaseReactor(ConsumerBase): - """A consumer class implementing a centralized casting broker (PULL-PUSH). - - Used for RoundRobin requests. - """ - - def __init__(self, conf, driver=None): - super(ZmqBaseReactor, self).__init__(driver) - - self.driver = driver - self.proxies = {} - self.threads = [] - self.sockets = [] - self.subscribe = {} - - self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) - - def register(self, proxy, in_addr, zmq_type_in, - in_bind=True, subscribe=None): - - LOG.info(_("Registering reactor")) - - if zmq_type_in not in (zmq.PULL, zmq.SUB): - raise RPCException("Bad input socktype") - - # Items push in. - inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind, - subscribe=subscribe) - - self.proxies[inq] = proxy - self.sockets.append(inq) - - LOG.info(_("In reactor registered")) - - def consume_in_thread(self): - def _consume(sock): - LOG.info(_("Consuming socket")) - while not sock.closed: - self.consume(sock) - - for k in self.proxies.keys(): - self.threads.append( - self.pool.spawn(_consume, k) - ) - - def wait(self): - for t in self.threads: - t.wait() - - def close(self): - for t in self.threads: - t.kill() - - for s in self.sockets: - s.close() - - -class ZmqProxy(ZmqBaseReactor): - """A consumer class implementing a topic-based proxy. - - Forwards to IPC sockets. - """ - - def __init__(self, conf): - super(ZmqProxy, self).__init__(conf) - pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\')) - self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep))) - - self.topic_proxy = {} - - def consume(self, sock): - ipc_dir = CONF.rpc_zmq_ipc_dir - - data = sock.recv(copy=False) - topic = data[1].bytes - if six.PY3: - topic = topic.decode('utf-8') - - if topic.startswith('fanout~'): - sock_type = zmq.PUB - topic = topic.split('.', 1)[0] - elif topic.startswith('zmq_replies'): - sock_type = zmq.PUB - else: - sock_type = zmq.PUSH - - if topic not in self.topic_proxy: - def publisher(waiter): - LOG.info(_("Creating proxy for topic: %s"), topic) - - try: - # The topic is received over the network, - # don't trust this input. - if self.badchars.search(topic) is not None: - emsg = _("Topic contained dangerous characters.") - LOG.warn(emsg) - raise RPCException(emsg) - - out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % - (ipc_dir, topic), - sock_type, bind=True) - except RPCException: - waiter.send_exception(*sys.exc_info()) - return - - self.topic_proxy[topic] = eventlet.queue.LightQueue( - CONF.rpc_zmq_topic_backlog) - self.sockets.append(out_sock) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) - - waiter.send(True) - - while(True): - data = self.topic_proxy[topic].get() - out_sock.send(data, copy=False) - - wait_sock_creation = eventlet.event.Event() - eventlet.spawn(publisher, wait_sock_creation) - - try: - wait_sock_creation.wait() - except RPCException: - LOG.error(_("Topic socket file creation failed.")) - return - - try: - self.topic_proxy[topic].put_nowait(data) - except eventlet.queue.Full: - LOG.error(_("Local per-topic backlog buffer full for topic " - "%s. Dropping message."), topic) - - def consume_in_thread(self): - """Runs the ZmqProxy service.""" - ipc_dir = CONF.rpc_zmq_ipc_dir - consume_in = "tcp://%s:%s" % \ - (CONF.rpc_zmq_bind_address, - CONF.rpc_zmq_port) - consumption_proxy = InternalContext(None) - - try: - os.makedirs(ipc_dir) - except os.error: - if not os.path.isdir(ipc_dir): - with excutils.save_and_reraise_exception(): - LOG.error(_("Required IPC directory does not exist at" - " %s"), ipc_dir) - try: - self.register(consumption_proxy, - consume_in, - zmq.PULL) - except zmq.ZMQError: - if os.access(ipc_dir, os.X_OK): - with excutils.save_and_reraise_exception(): - LOG.error(_("Permission denied to IPC directory at" - " %s"), ipc_dir) - with excutils.save_and_reraise_exception(): - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) - - super(ZmqProxy, self).consume_in_thread() - - -def unflatten_envelope(packenv): - """Unflattens the RPC envelope. - - Takes a list and returns a dictionary. - i.e. [1,2,3,4] => {1: 2, 3: 4} - """ - i = iter(packenv) - h = {} - try: - while True: - k = six.next(i) - h[k] = six.next(i) - except StopIteration: - return h - - -class ZmqReactor(ZmqBaseReactor): - """A consumer class implementing a consumer for messages. - - Can also be used as a 1:1 proxy - """ - - def __init__(self, conf, driver): - super(ZmqReactor, self).__init__(conf, driver) - - def consume(self, sock): - # TODO(ewindisch): use zero-copy (i.e. references, not copying) - data = sock.recv() - LOG.debug("CONSUMER RECEIVED DATA: %s", data) - - proxy = self.proxies[sock] - - if data[2] == b'cast': # Legacy protocol - packenv = data[3] - - ctx, msg = _deserialize(packenv) - request = rpc_common.deserialize_msg(msg) - ctx = RpcContext.unmarshal(ctx) - elif data[2] == b'impl_zmq_v2': - packenv = data[4:] - - msg = unflatten_envelope(packenv) - request = rpc_common.deserialize_msg(msg) - - # Unmarshal only after verifying the message. - ctx = RpcContext.unmarshal(data[3]) - else: - LOG.error(_("ZMQ Envelope version unsupported or unknown.")) - return - - self.pool.spawn_n(self.process, proxy, ctx, request) - - -class Connection(rpc_common.Connection): - """Manages connections and threads.""" - - def __init__(self, conf, driver): - self.topics = [] - self.reactor = ZmqReactor(conf, driver) - - def create_consumer(self, topic, proxy, fanout=False): - # Register with matchmaker. - _get_matchmaker().register(topic, CONF.rpc_zmq_host) - - # Subscription scenarios - if fanout: - sock_type = zmq.SUB - subscribe = ('', fanout)[type(fanout) == str] - topic = 'fanout~' + topic.split('.', 1)[0] - else: - sock_type = zmq.PULL - subscribe = None - topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) - - if topic in self.topics: - LOG.info(_("Skipping topic registration. Already registered.")) - return - - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) - - LOG.debug("Consumer is a zmq.%s", - ['PULL', 'SUB'][sock_type == zmq.SUB]) - - self.reactor.register(proxy, inaddr, sock_type, - subscribe=subscribe, in_bind=False) - self.topics.append(topic) - - def close(self): - mm = _get_matchmaker() - mm.stop_heartbeat() - for topic in self.topics: - try: - mm.unregister(topic, CONF.rpc_zmq_host) - except Exception as err: - LOG.error(_LE('Unable to unregister topic %(topic)s' - ' from matchmaker: %(err)s') % - {'topic': topic, 'err': err}) - - self.reactor.close() - self.topics = [] - - def wait(self): - self.reactor.wait() - - def consume_in_thread(self): - _get_matchmaker().start_heartbeat() - self.reactor.consume_in_thread() - - -def _cast(driver, addr, context, topic, msg, timeout=None, envelope=False, - _msg_id=None, allowed_remote_exmods=None, pooled=False): - allowed_remote_exmods = allowed_remote_exmods or [] - timeout_cast = timeout or CONF.rpc_cast_timeout - payload = [RpcContext.marshal(context), msg] - if six.PY3: - topic = topic.encode('utf-8') - - with Timeout(timeout_cast, exception=rpc_common.Timeout): - with driver.get_connection(addr, pooled) as conn: - try: - # assumes cast can't return an exception - conn.cast(_msg_id, topic, payload, envelope) - except zmq.ZMQError: - raise RPCException("Cast failed. ZMQ Socket Exception") - - -def _call(driver, addr, context, topic, msg, timeout=None, - envelope=False, allowed_remote_exmods=None, pooled=False): - allowed_remote_exmods = allowed_remote_exmods or [] - # timeout_response is how long we wait for a response - timeout = timeout or CONF.rpc_response_timeout - - # The msg_id is used to track replies. - msg_id = uuid.uuid4().hex - - # Replies always come into the reply service. - reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host - - LOG.debug("Creating payload") - # Curry the original request into a reply method. - mcontext = RpcContext.marshal(context) - payload = { - 'method': '-reply', - 'args': { - 'msg_id': msg_id, - 'topic': reply_topic, - # TODO(ewindisch): safe to remove mcontext in I. - 'msg': [mcontext, msg] - } - } - - LOG.debug("Creating queue socket for reply waiter") - - # Messages arriving async. - # TODO(ewindisch): have reply consumer with dynamic subscription mgmt - with Timeout(timeout, exception=rpc_common.Timeout): - try: - msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies.%s" % - (CONF.rpc_zmq_ipc_dir, - CONF.rpc_zmq_host), - zmq.SUB, subscribe=msg_id, bind=False - ) - - LOG.debug("Sending cast: %s", topic) - _cast(driver, addr, context, topic, payload, envelope=envelope, - pooled=pooled) - - LOG.debug("Cast sent; Waiting reply") - # Blocks until receives reply - msg = msg_waiter.recv() - if msg is None: - raise rpc_common.Timeout() - LOG.debug("Received message: %s", msg) - LOG.debug("Unpacking response") - - if msg[2] == b'cast': # Legacy version - raw_msg = _deserialize(msg[-1])[-1] - elif msg[2] == b'impl_zmq_v2': - rpc_envelope = unflatten_envelope(msg[4:]) - raw_msg = rpc_common.deserialize_msg(rpc_envelope) - else: - raise rpc_common.UnsupportedRpcEnvelopeVersion( - _("Unsupported or unknown ZMQ envelope returned.")) - - responses = raw_msg['args']['response'] - # ZMQError trumps the Timeout error. - except zmq.ZMQError: - raise RPCException("ZMQ Socket Error") - except (IndexError, KeyError): - raise RPCException(_("RPC Message Invalid.")) - finally: - if 'msg_waiter' in vars(): - msg_waiter.close() - - # It seems we don't need to do all of the following, - # but perhaps it would be useful for multicall? - # One effect of this is that we're checking all - # responses for Exceptions. - for resp in responses: - if isinstance(resp, dict) and 'exc' in resp: - raise rpc_common.deserialize_remote_exception( - resp['exc'], allowed_remote_exmods) - - return responses[-1] - - -def _multi_send(driver, method, context, topic, msg, timeout=None, - envelope=False, _msg_id=None, allowed_remote_exmods=None, - pooled=False): - """Wraps the sending of messages. - - Dispatches to the matchmaker and sends message to all relevant hosts. - """ - allowed_remote_exmods = allowed_remote_exmods or [] - conf = CONF - LOG.debug(' '.join(map(pformat, (topic, msg)))) - - queues = _get_matchmaker().queues(topic) - LOG.debug("Sending message(s) to: %s", queues) - - # Don't stack if we have no matchmaker results - if not queues: - warn_log = _LW("No matchmaker results. Not sending.") - - if method.__name__ == '_cast': - LOG.warn(warn_log) - return - - # While not strictly a timeout, callers know how to handle - # this exception and a timeout isn't too big a lie. - raise rpc_common.Timeout(warn_log) - - # This supports brokerless fanout (addresses > 1) - return_val = None - for queue in queues: - _topic, ip_addr = queue - _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port) - - if method.__name__ == '_cast': - eventlet.spawn_n(method, driver, _addr, context, - _topic, msg, timeout, envelope, _msg_id, - None, pooled) - else: - return_val = method(driver, _addr, context, _topic, msg, timeout, - envelope, allowed_remote_exmods, pooled) - - return return_val - - -def _get_matchmaker(*args, **kwargs): - global matchmaker - mm_name = CONF.rpc_zmq_matchmaker - - # Back compatibility for old class names - mm_mapping = { - 'oslo_messaging._drivers.matchmaker_redis.MatchMakerRedis': 'redis', - 'oslo_messaging._drivers.matchmaker_ring.MatchMakerRing': 'ring', - 'oslo_messaging._drivers.matchmaker.MatchMakerLocalhost': 'local', - 'oslo.messaging._drivers.matchmaker_redis.MatchMakerRedis': 'redis', - 'oslo.messaging._drivers.matchmaker_ring.MatchMakerRing': 'ring', - 'oslo.messaging._drivers.matchmaker.MatchMakerLocalhost': 'local'} - if mm_name in mm_mapping: - LOG.warn(_LW('rpc_zmq_matchmaker = %(old_val)s is deprecated. ' - 'It is suggested to change the value to %(new_val)s.'), - {'old_val': mm_name, 'new_val': mm_mapping[mm_name]}) - mm_name = mm_mapping[mm_name] - - if not matchmaker: - mgr = driver.DriverManager('oslo.messaging.zmq.matchmaker', - mm_name) - matchmaker = mgr.driver(*args, **kwargs) - return matchmaker - - -class ZmqIncomingMessage(base.IncomingMessage): - - ReceivedReply = collections.namedtuple( - 'ReceivedReply', ['reply', 'failure', 'log_failure']) - - def __init__(self, listener, ctxt, message): - super(ZmqIncomingMessage, self).__init__(listener, ctxt, message) - self.condition = threading.Condition() - self.received = None - - def reply(self, reply=None, failure=None, log_failure=True): - self.received = self.ReceivedReply(reply, failure, log_failure) - with self.condition: - self.condition.notify() - - def requeue(self): - LOG.debug("WARNING: requeue not supported") - - -class ZmqListener(base.Listener): - - def __init__(self, driver): - super(ZmqListener, self).__init__(driver) - self.incoming_queue = moves.queue.Queue() - - def dispatch(self, ctxt, message): - incoming = ZmqIncomingMessage(self, - ctxt.to_dict(), - message) - - self.incoming_queue.put(incoming) - - with incoming.condition: - incoming.condition.wait() - - assert incoming.received - - if incoming.received.failure: - raise incoming.received.failure - else: - return incoming.received.reply - - def poll(self, timeout=None): - try: - return self.incoming_queue.get(timeout=timeout) - except six.moves.queue.Empty: - # timeout - return None - - -class ZmqClientPool(pool.Pool): - """Class that implements a pool of Zmq Clients for a single endpoint""" - def __init__(self, conf, address, connection_cls, ctxt): - self.connection_cls = connection_cls - self.ctxt = ctxt - self.address = address - super(ZmqClientPool, self).__init__(conf.rpc_conn_pool_size) - - def create(self): - LOG.debug('Pool creating new ZMQ connection for %s' % self.address) - return self.connection_cls(self.address, self.ctxt) - - def empty(self): - for item in self.iter_free(): - item.close() - - -class ZmqClientPoolManager(object): - """Class that manages pools of clients for Zmq endpoints""" - - def __init__(self, conf, ctxt=None): - self._pools = {} - self._lock = threading.Lock() - self.conf = conf - self.ctxt = ctxt - - def get(self, address): - if address not in self._pools: - with self._lock: - if address not in self._pools: - self._pools[address] = ZmqClientPool(self.conf, - address, - ZmqClient, - self.ctxt) - return self._pools[address].get() - - def put(self, item): - self._pools[item.address].put(item) - - def empty(self): - for p in self._pools: - self._pools[p].empty() - class ZmqDriver(base.BaseDriver): """ZeroMQ Driver @@ -994,142 +81,38 @@ class ZmqDriver(base.BaseDriver): """ - # FIXME(markmc): allow this driver to be used without eventlet - def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): - if not zmq: - raise ImportError("Failed to import eventlet.green.zmq") conf.register_opts(zmq_opts) conf.register_opts(executor_base._pool_opts) - conf.register_opts(base.base_opts) - + self.conf = conf + self.server = None + self.client = None + self.matchmaker = None super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) - # FIXME(markmc): handle default_exchange - - # FIXME(markmc): handle transport URL - if self._url.hosts: - raise NotImplementedError('The ZeroMQ driver does not yet support ' - 'transport URLs') - - # FIXME(markmc): use self.conf everywhere - if self.conf is not CONF: - raise NotImplementedError('The ZeroMQ driver currently only works ' - 'with oslo.config.cfg.CONF') - - self.listeners = [] - - # NOTE(jamespage): Create pool manager on first use to deal with - # os.fork calls in openstack daemons. - self._pool = None - self._pid = None - self._lock = threading.Lock() - - def _configure_pool_manager(func): - """Causes a new pool manager to be created when the messaging service - is first used by the current process. This is important as all - connections in the pools manager by the pool manager will share the - same ZMQ context, which must not be shared across OS processes. - """ - def wrap(self, *args, **kws): - with self._lock: - old_pid = self._pid - self._pid = os.getpid() - - if old_pid != self._pid: - # Create fresh pool manager for the current process - # along with a new ZMQ context. - self._pool = ZmqClientPoolManager( - self.conf, - zmq.Context(self.conf.rpc_zmq_contexts) - ) - return func(self, *args, **kws) - return wrap - - def _send(self, target, ctxt, message, - wait_for_reply=None, timeout=None, envelope=False): - - if wait_for_reply: - method = _call - else: - method = _cast - - topic = target.topic - if target.fanout: - # NOTE(ewindisch): fanout~ is used because it avoid splitting on - # and acts as a non-subtle hint to the matchmaker and ZmqProxy. - topic = 'fanout~' + topic - elif target.server: - topic = '%s.%s' % (topic, target.server) - - reply = _multi_send(self, method, ctxt, topic, message, - envelope=envelope, - allowed_remote_exmods=self._allowed_remote_exmods, - pooled=True) - - if wait_for_reply: - return reply[-1] - - @_configure_pool_manager def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): - # NOTE(sileht): retry is not implemented because this driver never - # retry anything - return self._send(target, ctxt, message, wait_for_reply, timeout) + if self.client is None: + self.client = zmq_client.ZmqClient(self.conf, self.matchmaker) + if wait_for_reply: + return self.client.call(target, ctxt, message, timeout, retry) + else: + self.client.cast(target, ctxt, message, timeout, retry) + return None - @_configure_pool_manager def send_notification(self, target, ctxt, message, version, retry=None): - # NOTE(ewindisch): dot-priority in rpc notifier does not - # work with our assumptions. - # NOTE(sileht): retry is not implemented because this driver never - # retry anything - target = target(topic=target.topic.replace('.', '-')) - return self._send(target, ctxt, message, envelope=(version == 2.0)) + return None - @_configure_pool_manager def listen(self, target): - conn = Connection(self.conf, self) + if self.server is None: + self.server = zmq_server.ZmqServer(self.conf, self.matchmaker) + self.server.listen(target) + return self.server - listener = ZmqListener(self) - - conn.create_consumer(target.topic, listener) - conn.create_consumer('%s.%s' % (target.topic, target.server), - listener) - conn.create_consumer(target.topic, listener, fanout=True) - - conn.consume_in_thread() - self.listeners.append(conn) - - return listener - - @_configure_pool_manager def listen_for_notifications(self, targets_and_priorities, pool): - # NOTE(sileht): this listener implementation is limited - # because zeromq doesn't support: - # * requeing message - # * pool - conn = Connection(self.conf, self) - - listener = ZmqListener(self) - for target, priority in targets_and_priorities: - # NOTE(ewindisch): dot-priority in rpc notifier does not - # work with our assumptions. - # NOTE(sileht): create_consumer doesn't support target.exchange - conn.create_consumer('%s-%s' % (target.topic, priority), - listener) - conn.consume_in_thread() - self.listeners.append(conn) - - return listener + return None def cleanup(self): - for c in self.listeners: - c.close() - self.listeners = [] - if self._pool: - self._pool.empty() - - def get_connection(self, address, pooled=False): - return ZmqClientContext(address, self._pool, pooled) + pass diff --git a/oslo_messaging/_drivers/zmq_driver/__init__.py b/oslo_messaging/_drivers/zmq_driver/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py new file mode 100644 index 000000000..8af3e63a7 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py @@ -0,0 +1 @@ +__author__ = 'ozamiatin' diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py new file mode 100644 index 000000000..9e11a08a5 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py @@ -0,0 +1,92 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async + + +@six.add_metaclass(abc.ABCMeta) +class BaseProxy(object): + + def __init__(self, conf, context): + super(BaseProxy, self).__init__() + self.conf = conf + self.context = context + self.executor = zmq_async.get_executor(self.run) + + @abc.abstractmethod + def run(self): + "Main execution point of the proxy" + + def start(self): + self.executor.execute() + + def stop(self): + self.executor.stop() + + def wait(self): + self.executor.wait() + + +@six.add_metaclass(abc.ABCMeta) +class BaseTcpFrontend(object): + + def __init__(self, conf, poller, context): + self.conf = conf + self.poller = poller + self.context = context + + def receive_incoming(self): + message, socket = self.poller.poll(1) + return message + + +@six.add_metaclass(abc.ABCMeta) +class BaseBackendMatcher(object): + + def __init__(self, conf, poller, context): + self.conf = conf + self.context = context + self.backends = {} + self.poller = poller + + def redirect_to_backend(self, message): + backend, topic = self._match_backend(message) + self._send_message(backend, message, topic) + + def _match_backend(self, message): + topic = self._get_topic(message) + ipc_address = self._get_ipc_address(topic) + if ipc_address not in self.backends: + self._create_backend(ipc_address) + return self.backend, topic + + @abc.abstractmethod + def _get_topic(self, message): + "Extract topic from message" + + @abc.abstractmethod + def _get_ipc_address(self, topic): + "Get ipc backend address from topic" + + @abc.abstractmethod + def _send_message(self, backend, message, topic): + "Backend specific sending logic" + + @abc.abstractmethod + def _create_backend(self, ipc_address): + "Backend specific socket opening logic" diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py new file mode 100644 index 000000000..a0d3f4fe2 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py @@ -0,0 +1,71 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os + +from oslo_utils import excutils + +from oslo_messaging._drivers.zmq_driver.broker.zmq_call_proxy import CallProxy +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _LE, _LI + + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ZmqBroker(object): + """Local messaging IPC broker (nodes are still peers). + + The main purpose is to have one TCP connection + (one TCP port assigned for ZMQ messaging) per node. + There could be a number of services running on a node. + Without such broker a number of opened TCP ports used for + messaging become unpredictable for the engine. + + All messages are coming to TCP ROUTER socket and then + distributed between their targets by topic via IPC. + """ + + def __init__(self, conf): + super(ZmqBroker, self).__init__() + self.conf = conf + self.context = zmq.Context() + self.proxies = [CallProxy(conf, self.context)] + self._create_ipc_dirs() + + def _create_ipc_dirs(self): + ipc_dir = self.conf.rpc_zmq_ipc_dir + try: + os.makedirs("%s/fanout" % ipc_dir) + except os.error: + if not os.path.isdir(ipc_dir): + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Required IPC directory does not exist at" + " %s"), ipc_dir) + + def start(self): + for proxy in self.proxies: + proxy.start() + + def wait(self): + for proxy in self.proxies: + proxy.wait() + + def close(self): + LOG.info(_LI("Broker shutting down ...")) + for proxy in self.proxies: + proxy.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py new file mode 100644 index 000000000..f4471b532 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py @@ -0,0 +1,110 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.common import RPCException +import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer +from oslo_messaging._drivers.zmq_driver import zmq_topic +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class CallProxy(base_proxy.BaseProxy): + + def __init__(self, conf, context): + super(CallProxy, self).__init__(conf, context) + self.tcp_frontend = FrontendTcpRouter(self.conf, context) + self.backend_matcher = CallBackendMatcher(self.conf, context) + LOG.info(_LI("Starting call proxy thread")) + + def run(self): + message = self.tcp_frontend.receive_incoming() + if message is not None: + self.backend_matcher.redirect_to_backend(message) + + reply, socket = self.backend_matcher.receive_outgoing_reply() + if reply is not None: + self.tcp_frontend.redirect_outgoing_reply(reply) + + +class CallBackendMatcher(base_proxy.BaseBackendMatcher): + + def __init__(self, conf, context): + super(CallBackendMatcher, self).__init__(conf, + zmq_async.get_poller(), + context) + self.backend = self.context.socket(zmq.DEALER) + self.poller.register(self.backend) + + def receive_outgoing_reply(self): + reply_message = self.poller.poll(1) + return reply_message + + def _get_topic(self, message): + topic, server = zmq_serializer.get_topic_from_call_message(message) + return zmq_topic.Topic(self.conf, topic, server) + + def _get_ipc_address(self, topic): + return zmq_topic.get_ipc_address_call(self.conf, topic) + + def _send_message(self, backend, message, topic): + # Empty needed for awaiting REP socket to work properly + # (DEALER-REP usage specific) + backend.send(b'', zmq.SNDMORE) + backend.send_multipart(message) + + def _create_backend(self, ipc_address): + self.backend.connect(ipc_address) + self.backends[str(ipc_address)] = True + + +class FrontendTcpRouter(base_proxy.BaseTcpFrontend): + + def __init__(self, conf, context): + super(FrontendTcpRouter, self).__init__(conf, + zmq_async.get_poller(), + context) + + try: + self.frontend = self.context.socket(zmq.ROUTER) + bind_address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_port) + LOG.info(_LI("Binding to TCP ROUTER %s") % bind_address) + self.frontend.bind(bind_address) + self.poller.register(self.frontend) + except zmq.ZMQError: + errmsg = _LE("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.") + LOG.error(errmsg) + raise RPCException(errmsg) + + @staticmethod + def _reduce_empty(reply): + reply.pop(0) + return reply + + def redirect_outgoing_reply(self, reply): + self._reduce_empty(reply) + try: + self.frontend.send_multipart(reply) + LOG.info(_LI("Redirecting reply to client %s") % reply) + except zmq.ZMQError: + errmsg = _LE("Failed redirecting reply to client %s") % reply + LOG.error(errmsg) + raise RPCException(errmsg) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py new file mode 100644 index 000000000..8eef8befc --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py @@ -0,0 +1,79 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer +from oslo_messaging._drivers.zmq_driver import zmq_topic +from oslo_messaging._i18n import _LI + +zmq = zmq_async.import_zmq() + +LOG = logging.getLogger(__name__) + + +class CastProxy(base_proxy.BaseProxy): + + def __init__(self, conf, context): + super(CastProxy, self).__init__(conf, context) + self.tcp_frontend = FrontendTcpPull(self.conf, context) + self.backend_matcher = CastPushBackendMatcher(self.conf, context) + LOG.info(_LI("Starting cast proxy thread")) + + def run(self): + message = self.tcp_frontend.receive_incoming() + if message is not None: + self.backend_matcher.redirect_to_backend(message) + + +class FrontendTcpPull(base_proxy.BaseTcpFrontend): + + def __init__(self, conf, context): + super(FrontendTcpPull, self).__init__(conf, zmq_async.get_poller(), + context) + self.frontend = self.context.socket(zmq.PULL) + address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_fanout_port) + LOG.info(_LI("Binding to TCP PULL %s") % address) + self.frontend.bind(address) + self.poller.register(self.frontend) + + def _receive_message(self): + message = self.poller.poll() + return message + + +class CastPushBackendMatcher(base_proxy.BaseBackendMatcher): + + def __init__(self, conf, context): + super(CastPushBackendMatcher, self).__init__(conf, + zmq_async.get_poller(), + context) + self.backend = self.context.socket(zmq.PUSH) + + def _get_topic(self, message): + topic, server = zmq_serializer.get_topic_from_cast_message(message) + return zmq_topic.Topic(self.conf, topic, server) + + def _get_ipc_address(self, topic): + return zmq_topic.get_ipc_address_cast(self.conf, topic) + + def _send_message(self, backend, message, topic): + backend.send_multipart(message) + + def _create_backend(self, ipc_address): + LOG.debug("[Cast Proxy] Creating PUSH backend %s", ipc_address) + self.backend.connect(ipc_address) + self.backends[str(ipc_address)] = True diff --git a/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py b/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py new file mode 100644 index 000000000..8af3e63a7 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py @@ -0,0 +1 @@ +__author__ = 'ozamiatin' diff --git a/oslo_messaging/_drivers/zmq_driver/poller/__init__.py b/oslo_messaging/_drivers/zmq_driver/poller/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py new file mode 100644 index 000000000..b2c26c8a8 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -0,0 +1,111 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading + +import eventlet +import six + +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_poller + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class GreenPoller(zmq_poller.ZmqPoller): + + def __init__(self): + self.incoming_queue = six.moves.queue.Queue() + self.green_pool = eventlet.GreenPool() + self.sockets = [] + + def register(self, socket, recv_method=None): + self.sockets.append(socket) + return self.green_pool.spawn(self._socket_receive, socket, + recv_method) + + def _socket_receive(self, socket, recv_method=None): + while True: + if recv_method: + incoming = recv_method(socket) + else: + incoming = socket.recv_multipart() + self.incoming_queue.put((incoming, socket)) + eventlet.sleep() + + def poll(self, timeout=None): + incoming = None + try: + with eventlet.Timeout(timeout, exception=rpc_common.Timeout): + while incoming is None: + try: + incoming = self.incoming_queue.get_nowait() + except six.moves.queue.Empty: + eventlet.sleep() + except rpc_common.Timeout: + return None, None + return incoming[0], incoming[1] + + +class HoldReplyPoller(GreenPoller): + + def __init__(self): + super(HoldReplyPoller, self).__init__() + self.event_by_socket = {} + + def register(self, socket, recv_method=None): + super(HoldReplyPoller, self).register(socket, recv_method) + self.event_by_socket[socket] = threading.Event() + + def resume_polling(self, socket): + pause = self.event_by_socket[socket] + pause.set() + + def _socket_receive(self, socket, recv_method=None): + pause = self.event_by_socket[socket] + while True: + pause.clear() + if recv_method: + incoming = recv_method(socket) + else: + incoming = socket.recv_multipart() + self.incoming_queue.put((incoming, socket)) + pause.wait() + + +class GreenExecutor(zmq_poller.Executor): + + def __init__(self, method): + self._method = method + super(GreenExecutor, self).__init__(None) + + def _loop(self): + while True: + self._method() + eventlet.sleep() + + def execute(self): + self.thread = eventlet.spawn(self._loop) + + def wait(self): + if self.thread is not None: + self.thread.wait() + + def stop(self): + if self.thread is not None: + self.thread.kill() diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py new file mode 100644 index 000000000..e4317c487 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -0,0 +1,50 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading + +import zmq + +from oslo_messaging._drivers.zmq_driver import zmq_poller + +LOG = logging.getLogger(__name__) + + +class ThreadingPoller(zmq_poller.ZmqPoller): + + def __init__(self): + self.poller = zmq.Poller() + + def register(self, socket): + self.poller.register(socket, zmq.POLLOUT) + + def poll(self, timeout=None): + socks = dict(self.poller.poll(timeout)) + for socket in socks: + incoming = socket.recv() + return incoming + + +class ThreadingExecutor(zmq_poller.Executor): + + def __init__(self, method): + thread = threading.Thread(target=method) + super(ThreadingExecutor, self).__init__(thread) + + def execute(self): + self.thread.start() + + def wait(self): + self.thread.join() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/__init__.py b/oslo_messaging/_drivers/zmq_driver/rpc/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py new file mode 100644 index 000000000..fb20efd4a --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -0,0 +1,49 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_topic +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class CallRequest(Request): + + def __init__(self, conf, target, context, message, timeout=None, + retry=None): + try: + self.zmq_context = zmq.Context() + socket = self.zmq_context.socket(zmq.REQ) + + super(CallRequest, self).__init__(conf, target, context, + message, socket, timeout, retry) + + self.connect_address = zmq_topic.get_tcp_address_call(conf, + self.topic) + LOG.info(_LI("Connecting REQ to %s") % self.connect_address) + self.socket.connect(self.connect_address) + except zmq.ZMQError as e: + LOG.error(_LE("Error connecting to socket: %s") % str(e)) + + def receive_reply(self): + # NOTE(ozamiatin): Check for retry here (no retries now) + self.socket.setsockopt(zmq.RCVTIMEO, self.timeout) + reply = self.socket.recv_json() + return reply[u'reply'] diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py new file mode 100644 index 000000000..40fddd97b --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py @@ -0,0 +1,72 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher +from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_topic +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class CastRequest(Request): + + def __init__(self, conf, target, context, + message, socket, address, timeout=None, retry=None): + self.connect_address = address + super(CastRequest, self).__init__(conf, target, context, message, + socket, timeout, retry) + + def __call__(self, *args, **kwargs): + self.send_request() + + def send_request(self): + self.socket.send(b'', zmq.SNDMORE) + super(CastRequest, self).send_request() + + def receive_reply(self): + # Ignore reply for CAST + pass + + +class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): + + def __init__(self, conf, matchmaker): + super(DealerCastPublisher, self).__init__(conf) + self.matchmaker = matchmaker + + def cast(self, target, context, + message, timeout=None, retry=None): + topic = zmq_topic.Topic.from_target(self.conf, target) + connect_address = zmq_topic.get_tcp_address_call(self.conf, topic) + dealer_socket = self._create_socket(connect_address) + request = CastRequest(self.conf, target, context, message, + dealer_socket, connect_address, timeout, retry) + request.send_request() + + def _create_socket(self, address): + if address in self.outbound_sockets: + return self.outbound_sockets[address] + try: + dealer_socket = self.zmq_context.socket(zmq.DEALER) + LOG.info(_LI("Connecting DEALER to %s") % address) + dealer_socket.connect(address) + except zmq.ZMQError: + LOG.error(_LE("Failed connecting DEALER to %s") % address) + return dealer_socket diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py new file mode 100644 index 000000000..098454524 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py @@ -0,0 +1,40 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async + + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +@six.add_metaclass(abc.ABCMeta) +class CastPublisherBase(object): + + def __init__(self, conf): + self.conf = conf + self.zmq_context = zmq.Context() + self.outbound_sockets = {} + super(CastPublisherBase, self).__init__() + + @abc.abstractmethod + def cast(self, target, context, + message, timeout=None, retry=None): + "Send CAST to target" diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py new file mode 100644 index 000000000..a4eed4953 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py @@ -0,0 +1,33 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request +from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer + + +class ZmqClient(object): + + def __init__(self, conf, matchmaker=None): + self.conf = conf + self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf, + matchmaker) + + def call(self, target, context, message, timeout=None, retry=None): + request = zmq_call_request.CallRequest(self.conf, target, context, + message, timeout, retry) + return request() + + def cast(self, target, context, message, timeout=None, retry=None): + self.cast_publisher.cast(target, context, message, timeout, retry) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py new file mode 100644 index 000000000..2bfe755bf --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py @@ -0,0 +1,76 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +from abc import abstractmethod +import logging +import uuid + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_topic +from oslo_messaging._i18n import _LE + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +@six.add_metaclass(abc.ABCMeta) +class Request(object): + + def __init__(self, conf, target, context, message, + socket, timeout=None, retry=None): + + if message['method'] is None: + errmsg = _LE("No method specified for RPC call") + LOG.error(errmsg) + raise KeyError(errmsg) + + self.msg_id = uuid.uuid4().hex + self.target = target + self.context = context + self.message = message + self.timeout = self._to_milliseconds(conf, timeout) + self.retry = retry + self.reply = None + self.socket = socket + self.topic = zmq_topic.Topic.from_target(conf, target) + + @staticmethod + def _to_milliseconds(conf, timeout): + return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000 + + @property + def is_replied(self): + return self.reply is not None + + @property + def is_timed_out(self): + return False + + def send_request(self): + self.socket.send_string(str(self.topic), zmq.SNDMORE) + self.socket.send_string(self.msg_id, zmq.SNDMORE) + self.socket.send_json(self.context, zmq.SNDMORE) + self.socket.send_json(self.message) + + def __call__(self): + self.send_request() + return self.receive_reply() + + @abstractmethod + def receive_reply(self): + "Receive reply from server side" diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py new file mode 100644 index 000000000..2eeb55f22 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py @@ -0,0 +1,35 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class ConsumerBase(object): + + def __init__(self, listener, conf, zmq_poller, context): + self.listener = listener + self.conf = conf + self.poller = zmq_poller + self.context = context + self.sockets_per_topic = {} + + def poll(self, timeout=None): + pass + + def stop(self): + pass + + def cleanup(self): + pass + + def listen(self, target): + pass diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py new file mode 100644 index 000000000..959ffd70d --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py @@ -0,0 +1,96 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import logging + +from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils +from oslo_messaging._i18n import _LE + + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ZmqIncomingRequest(base.IncomingMessage): + + def __init__(self, listener, context, message, socket, rep_id, poller): + super(ZmqIncomingRequest, self).__init__(listener, context, message) + self.reply_socket = socket + self.reply_id = rep_id + self.received = None + self.poller = poller + + def reply(self, reply=None, failure=None, log_failure=True): + message_reply = {u'reply': reply, + u'failure': failure, + u'log_failure': log_failure} + LOG.debug("Replying %s REP", (str(message_reply))) + self.received = True + self.reply_socket.send(self.reply_id, zmq.SNDMORE) + self.reply_socket.send(b'', zmq.SNDMORE) + self.reply_socket.send_json(message_reply) + self.poller.resume_polling(self.reply_socket) + + def acknowledge(self): + pass + + def requeue(self): + pass + + +class CallResponder(zmq_base_consumer.ConsumerBase): + + def __init__(self, listener, conf, poller, context): + super(CallResponder, self).__init__(listener, conf, poller, context) + + def poll(self, timeout=None): + try: + incoming, socket = self.poller.poll(timeout) + reply_id, context, message = incoming + LOG.debug("[Server] REP Received message %s" % str(message)) + incoming = ZmqIncomingRequest(self.listener, + context, + message, socket, + reply_id, + self.poller) + return incoming + + except zmq.ZMQError as e: + LOG.error(_LE("Receiving message failed ... {}"), e) + + def listen(self, target): + + def _receive_message(socket): + reply_id = socket.recv() + empty = socket.recv() + assert empty == b'', 'Bad format: empty separator expected' + topic = socket.recv_string() + assert topic is not None, 'Bad format: topic string expected' + msg_id = socket.recv_string() + assert msg_id is not None, 'Bad format: message ID expected' + context = socket.recv_json() + message = socket.recv_json() + return (reply_id, context, message) + + topic = topic_utils.Topic.from_target(self.conf, target) + ipc_rep_address = topic_utils.get_ipc_address_call(self.conf, topic) + rep_socket = self.context.socket(zmq.REP) + rep_socket.bind(ipc_rep_address) + self.sockets_per_topic[str(topic)] = rep_socket + self.poller.register(rep_socket, _receive_message) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py new file mode 100644 index 000000000..e6f67ab95 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -0,0 +1,49 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder +from oslo_messaging._drivers.zmq_driver import zmq_async + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ZmqServer(base.Listener): + + def __init__(self, conf, matchmaker=None): + LOG.info("[Server] __init__") + self.conf = conf + self.context = zmq.Context() + poller = zmq_async.get_reply_poller() + self.call_responder = zmq_call_responder.CallResponder(self, conf, + poller, + self.context) + + def poll(self, timeout=None): + incoming = self.call_responder.poll(timeout) + return incoming + + def stop(self): + LOG.info("[Server] Stop") + + def cleanup(self): + pass + + def listen(self, target): + LOG.info("[Server] Listen to Target %s" % target) + self.call_responder.listen(target) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py new file mode 100644 index 000000000..3694d0f5a --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -0,0 +1,59 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_utils import importutils + +from oslo_messaging._i18n import _LE + +LOG = logging.getLogger(__name__) + +green_zmq = importutils.try_import('eventlet.green.zmq') + + +def import_zmq(): + imported_zmq = green_zmq or importutils.try_import('zmq') + if imported_zmq is None: + errmsg = _LE("ZeroMQ not found!") + LOG.error(errmsg) + raise ImportError(errmsg) + return imported_zmq + + +def get_poller(): + if green_zmq: + from oslo_messaging._drivers.zmq_driver.poller import green_poller + return green_poller.GreenPoller() + else: + from oslo_messaging._drivers.zmq_driver.poller import threading_poller + return threading_poller.ThreadingPoller() + + +def get_reply_poller(): + if green_zmq: + from oslo_messaging._drivers.zmq_driver.poller import green_poller + return green_poller.HoldReplyPoller() + else: + from oslo_messaging._drivers.zmq_driver.poller import threading_poller + return threading_poller.ThreadingPoller() + + +def get_executor(method): + if green_zmq is not None: + from oslo_messaging._drivers.zmq_driver.poller import green_poller + return green_poller.GreenExecutor(method) + else: + from oslo_messaging._drivers.zmq_driver.poller import threading_poller + return threading_poller.ThreadingExecutor() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_context.py b/oslo_messaging/_drivers/zmq_driver/zmq_context.py new file mode 100644 index 000000000..f986e41db --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_context.py @@ -0,0 +1,33 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_messaging._drivers import common as rpc_common + + +class RpcContext(rpc_common.CommonRpcContext): + """Context that supports replying to a rpc.call.""" + def __init__(self, **kwargs): + self.replies = [] + super(RpcContext, self).__init__(**kwargs) + + def deepcopy(self): + values = self.to_dict() + values['replies'] = self.replies + return self.__class__(**values) + + def reply(self, reply=None, failure=None, ending=False): + if ending: + return + self.replies.append(reply) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py new file mode 100644 index 000000000..dcd51ad7b --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py @@ -0,0 +1,48 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class ZmqPoller(object): + + @abc.abstractmethod + def register(self, socket, recv_method=None): + 'Register socket to poll' + + @abc.abstractmethod + def poll(self, timeout=None): + 'Poll for messages' + + +@six.add_metaclass(abc.ABCMeta) +class Executor(object): + + def __init__(self, thread): + self.thread = thread + + @abc.abstractmethod + def execute(self): + 'Run execution' + + @abc.abstractmethod + def stop(self): + 'Stop execution' + + @abc.abstractmethod + def wait(self): + 'Wait until pass' diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py new file mode 100644 index 000000000..0f0733ae9 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py @@ -0,0 +1,54 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import re + +import six + +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._i18n import _LE, _LW + +LOG = logging.getLogger(__name__) + +MESSAGE_CALL_TOPIC_POSITION = 2 + + +def _get_topic_from_msg(message, position): + pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\')) + badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep))) + topic = message[position] + topic_items = None + + if six.PY3: + topic = topic.decode('utf-8') + + try: + # The topic is received over the network, + # don't trust this input. + if badchars.search(topic) is not None: + emsg = _LW("Topic contained dangerous characters") + LOG.warn(emsg) + raise rpc_common.RPCException(emsg) + topic_items = topic.split('.', 1) + except Exception as e: + errmsg = _LE("Failed topic string parsing, %s") % str(e) + LOG.error(errmsg) + rpc_common.RPCException(errmsg) + return topic_items[0], topic_items[1] + + +def get_topic_from_call_message(message): + return _get_topic_from_msg(message, MESSAGE_CALL_TOPIC_POSITION) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py new file mode 100644 index 000000000..c338b69c5 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py @@ -0,0 +1,61 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +def get_ipc_address_call(conf, topic): + return "ipc://%s/%s" % (conf.rpc_zmq_ipc_dir, str(topic)) + + +def get_tcp_bind_address(port): + return "tcp://*:%s" % port + + +def get_tcp_address_call(conf, topic): + return "tcp://%s:%s" % (topic.server, conf.rpc_zmq_port) + + +def get_ipc_address_cast(conf, topic): + return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, str(topic)) + + +class Topic(object): + + def __init__(self, conf, topic, server=None, fanout=False): + + if server is None: + self.server = conf.rpc_zmq_host + else: + self.server = server + + self._topic = topic + self.fanout = fanout + + @staticmethod + def _extract_cinder_server(server): + return server.split('@', 1)[0] + + @staticmethod + def from_target(conf, target): + if target.server is not None: + return Topic(conf, target.topic, target.server, + fanout=target.fanout) + else: + return Topic(conf, target.topic, fanout=target.fanout) + + @property + def topic(self): + return self._topic if self._topic else "" + + def __str__(self, *args, **kwargs): + return "%s.%s" % (self.topic, self.server) diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py index 85e5dd377..a6eef2f7c 100644 --- a/oslo_messaging/tests/drivers/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/test_impl_zmq.py @@ -1,5 +1,4 @@ -# Copyright 2014 Canonical, Ltd. -# All Rights Reserved. +# Copyright 2015 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,28 +14,52 @@ import logging import socket +import threading import fixtures -from oslo_utils import importutils import testtools -try: - import zmq -except ImportError: - zmq = None - import oslo_messaging -from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _ from oslo_messaging.tests import utils as test_utils -from six.moves import mock - -# eventlet is not yet py3 compatible, so skip if not installed -eventlet = importutils.try_import('eventlet') - -impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq') LOG = logging.getLogger(__name__) +zmq = zmq_async.import_zmq() + + +class TestRPCServerListener(object): + + def __init__(self, driver): + self.driver = driver + self.target = None + self.listener = None + self.executor = zmq_async.get_executor(self._run) + self._stop = threading.Event() + self._received = threading.Event() + self.message = None + + def listen(self, target): + self.target = target + self.listener = self.driver.listen(self.target) + self.executor.execute() + + def _run(self): + try: + message = self.listener.poll() + if message is not None: + self._received.set() + self.message = message + message.reply(reply=True) + except Exception: + LOG.exception(_("Unexpected exception occurred.")) + + def stop(self): + self.executor.stop() + def get_unused_port(): """Returns an unused port on localhost.""" @@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): # Start RPC LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() + self.broker = ZmqBroker(self.conf) + self.broker.start() + + self.listener = TestRPCServerListener(self.driver) - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') self.addCleanup(stopRpc(self.__dict__)) @@ -94,380 +118,127 @@ class stopRpc(object): self.attrs = attrs def __call__(self): - if self.attrs['reactor']: - self.attrs['reactor'].close() + if self.attrs['broker']: + self.attrs['broker'].close() if self.attrs['driver']: self.attrs['driver'].cleanup() + if self.attrs['listener']: + self.attrs['listener'].stop() class TestZmqBasics(ZmqBaseTestCase): - def test_start_stop_listener(self): - target = oslo_messaging.Target(topic='testtopic') - listener = self.driver.listen(target) - result = listener.poll(0.01) - self.assertEqual(result, None) - def test_send_receive_raises(self): """Call() without method.""" target = oslo_messaging.Target(topic='testtopic') - self.driver.listen(target) + self.listener.listen(target) self.assertRaises( KeyError, self.driver.send, target, {}, {'tx_id': 1}, wait_for_reply=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage') - def test_send_receive_topic(self, mock_msg): - """Call() with method.""" - mock_msg.return_value = msg = mock.MagicMock() - msg.received = received = mock.MagicMock() - received.failure = False - received.reply = True - msg.condition = condition = mock.MagicMock() - condition.wait.return_value = True + def test_send_receive_topic(self): + """Call() with topic.""" target = oslo_messaging.Target(topic='testtopic') - self.driver.listen(target) + self.listener.listen(target) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=True) - self.assertEqual(result, True) + self.assertIsNotNone(result) - @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) - def test_send_receive_fanout(self, mock_call): + def test_send_noreply(self): + """Cast() with topic.""" + + target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1") + self.listener.listen(target) + result = self.driver.send( + target, {}, + {'method': 'hello-world', 'tx_id': 1}, + wait_for_reply=False) + + self.listener._received.wait() + + self.assertIsNone(result) + self.assertEqual(True, self.listener._received.isSet()) + method = self.listener.message.message[u'method'] + self.assertEqual(u'hello-world', method) + + @testtools.skip("Not implemented feature") + def test_send_fanout(self): target = oslo_messaging.Target(topic='testtopic', fanout=True) self.driver.listen(target) - mock_call.__name__ = '_call' - mock_call.return_value = [True] - result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=True) + wait_for_reply=False) - self.assertEqual(result, True) - mock_call.assert_called_once_with( - self.driver, - 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'], - {}, 'fanout~testtopic.127.0.0.1', - {'tx_id': 1, 'method': 'hello-world'}, - None, False, [], True) + self.assertIsNone(result) + self.assertEqual(True, self.listener._received.isSet()) + msg_pattern = "{'method': 'hello-world', 'tx_id': 1}" + self.assertEqual(msg_pattern, self.listener.message) - @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) - def test_send_receive_direct(self, mock_call): - # Also verifies fix for bug http://pad.lv/1301723 - target = oslo_messaging.Target(topic='testtopic', server='localhost') - self.driver.listen(target) + def test_send_receive_direct(self): + """Call() without topic.""" - mock_call.__name__ = '_call' - mock_call.return_value = [True] - - result = self.driver.send( - target, {}, - {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=True) - - self.assertEqual(result, True) - mock_call.assert_called_once_with( - self.driver, - 'tcp://localhost:%s' % self.conf['rpc_zmq_port'], - {}, 'testtopic.localhost', - {'tx_id': 1, 'method': 'hello-world'}, - None, False, [], True) + target = oslo_messaging.Target(server='127.0.0.1') + self.listener.listen(target) + message = {'method': 'hello-world', 'tx_id': 1} + context = {} + result = self.driver.send(target, context, message, + wait_for_reply=True) + self.assertTrue(result) -class TestZmqSocket(test_utils.BaseTestCase): +class TestPoller(test_utils.BaseTestCase): - @testtools.skipIf(zmq is None, "zmq not available") def setUp(self): - super(TestZmqSocket, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver + super(TestPoller, self).setUp() + self.poller = zmq_async.get_poller() + self.ctx = zmq.Context() + self.ADDR_REQ = "ipc://request1" - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + def test_poll_blocking(self): - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False, - subscribe=None) - self.assertTrue(sock.can_recv) - self.assertFalse(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + rep = self.ctx.socket(zmq.REP) + rep.bind(self.ADDR_REQ) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + reply_poller = zmq_async.get_reply_poller() + reply_poller.register(rep) - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False, - subscribe=None) - self.assertTrue(sock.can_recv) - self.assertFalse(sock.can_send) - self.assertTrue(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + def listener(): + incoming, socket = reply_poller.poll() + self.assertEqual(b'Hello', incoming[0]) + socket.send_string('Reply') + reply_poller.resume_polling(socket) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + executor = zmq_async.get_executor(listener) + executor.execute() - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False, - subscribe=None) - self.assertFalse(sock.can_recv) - self.assertTrue(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + req1 = self.ctx.socket(zmq.REQ) + req1.connect(self.ADDR_REQ) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + req2 = self.ctx.socket(zmq.REQ) + req2.connect(self.ADDR_REQ) - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False, - subscribe=None) - self.assertFalse(sock.can_recv) - self.assertTrue(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + req1.send_string('Hello') + req2.send_string('Hello') + reply = req1.recv_string() + self.assertEqual('Reply', reply) -class TestZmqIncomingMessage(test_utils.BaseTestCase): + reply = req2.recv_string() + self.assertEqual('Reply', reply) - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqIncomingMessage, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver + def test_poll_timeout(self): + rep = self.ctx.socket(zmq.REP) + rep.bind(self.ADDR_REQ) - def test_zmqincomingmessage(self): - msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo') - msg.reply("abc") - self.assertIsInstance( - msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) - self.assertIsInstance( - msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) - self.assertEqual(msg.received.reply, "abc") - msg.requeue() + reply_poller = zmq_async.get_reply_poller() + reply_poller.register(rep) - -class TestZmqConnection(ZmqBaseTestCase): - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_create_consumer(self, mock_reactor): - - mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - topic = 'topic.foo' - context = mock.Mock() - inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % - (self.internal_ipc_dir)) - # No Fanout - conn.create_consumer(topic, context) - conn.reactor.register.assert_called_with(context, inaddr, - impl_zmq.zmq.PULL, - subscribe=None, in_bind=False) - - # Reset for next bunch of checks - conn.reactor.register.reset_mock() - - # Fanout - inaddr = ('ipc://%s/zmq_topic_fanout~topic' % - (self.internal_ipc_dir)) - conn.create_consumer(topic, context, fanout='subscriber.foo') - conn.reactor.register.assert_called_with(context, inaddr, - impl_zmq.zmq.SUB, - subscribe='subscriber.foo', - in_bind=False) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor): - mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - topic = 'topic.foo' - context = mock.Mock() - inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % - (self.internal_ipc_dir)) - - conn.create_consumer(topic, context) - conn.reactor.register.assert_called_with( - context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False) - conn.reactor.register.reset_mock() - # Call again with same topic - conn.create_consumer(topic, context) - self.assertFalse(conn.reactor.register.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker', - autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker): - conn = impl_zmq.Connection(self.driver.conf, self.driver) - conn.reactor.close = mock.Mock() - mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock() - conn.close() - self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called) - self.assertTrue(conn.reactor.close.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_wait(self, mock_reactor): - conn = impl_zmq.Connection(self.driver, self.driver) - conn.reactor.wait = mock.Mock() - conn.wait() - self.assertTrue(conn.reactor.wait.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker', - autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_consume_in_thread(self, mock_reactor, - mock_getmatchmaker): - mock_getmatchmaker.return_value.start_heartbeat = mock.Mock() - conn = impl_zmq.Connection(self.driver, self.driver) - conn.reactor.consume_in_thread = mock.Mock() - conn.consume_in_thread() - self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called) - self.assertTrue(conn.reactor.consume_in_thread.called) - - -class TestZmqListener(ZmqBaseTestCase): - - def test_zmqlistener_no_msg(self): - listener = impl_zmq.ZmqListener(self.driver) - # Timeout = 0 should return straight away since the queue is empty - listener.poll(timeout=0) - - def test_zmqlistener_w_msg(self): - listener = impl_zmq.ZmqListener(self.driver) - kwargs = {'a': 1, 'b': 2} - m = mock.Mock() - ctxt = mock.Mock(autospec=impl_zmq.RpcContext) - message = {'namespace': 'name.space', 'method': m.fake_method, - 'args': kwargs} - eventlet.spawn_n(listener.dispatch, ctxt, message) - resp = listener.poll(timeout=10) - msg = {'method': m.fake_method, 'namespace': 'name.space', - 'args': kwargs} - self.assertEqual(resp.message, msg) - - -class TestZmqDriver(ZmqBaseTestCase): - - @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) - @mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues', - autospec=True) - def test_zmqdriver_multi_send_cast_with_no_queues(self, - mock_queues, - mock_cast): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic' - msg = 'jeronimo' - - with mock.patch.object(impl_zmq.LOG, 'warn') as flog: - mock_queues.return_value = None - impl_zmq._multi_send(self.driver, mock_cast, - context, topic, msg) - self.assertEqual(1, flog.call_count) - args, kwargs = flog.call_args - self.assertIn('No matchmaker results', args[0]) - - @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) - @mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues', - autospec=True) - def test_zmqdriver_multi_send_call_with_no_queues(self, - mock_queues, - mock_call): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic' - msg = 'jeronimo' - - mock_queues.return_value = None - self.assertRaises(rpc_common.Timeout, - impl_zmq._multi_send, self.driver, - mock_call, context, topic, msg) - - @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) - def test_zmqdriver_send(self, mock_multi_send, mock_cast): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic' - msg = 'jeronimo' - self.driver.send(oslo_messaging.Target(topic=topic), context, msg, - False, 0, False) - mock_multi_send.assert_called_with(self.driver, mock_cast, context, - topic, msg, - allowed_remote_exmods=[], - envelope=False, pooled=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) - def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic.foo' - topic_reformat = 'testtopic-foo' - msg = 'jeronimo' - self.driver.send_notification(oslo_messaging.Target(topic=topic), - context, msg, False, False) - mock_multi_send.assert_called_with(self.driver, mock_cast, context, - topic_reformat, msg, - allowed_remote_exmods=[], - envelope=False, pooled=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True) - def test_zmqdriver_listen(self, mock_connection, mock_listener): - mock_listener.return_value = listener = mock.Mock() - mock_connection.return_value = conn = mock.Mock() - conn.create_consumer = mock.Mock() - conn.consume_in_thread = mock.Mock() - topic = 'testtopic.foo' - self.driver.listen(oslo_messaging.Target(topic=topic)) - conn.create_consumer.assert_called_with(topic, listener, fanout=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True) - def test_zmqdriver_listen_for_notification(self, mock_connection, - mock_listener): - mock_listener.return_value = listener = mock.Mock() - mock_connection.return_value = conn = mock.Mock() - conn.create_consumer = mock.Mock() - conn.consume_in_thread = mock.Mock() - topic = 'testtopic.foo' - data = [(oslo_messaging.Target(topic=topic), 0)] - # NOTE(jamespage): Pooling not supported, just pass None for now. - self.driver.listen_for_notifications(data, None) - conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener) + incoming, socket = reply_poller.poll(1) + self.assertIsNone(incoming) + self.assertIsNone(socket) diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py index ddc6753ea..a6eef2f7c 100644 --- a/tests/drivers/test_impl_zmq.py +++ b/tests/drivers/test_impl_zmq.py @@ -1,5 +1,4 @@ -# Copyright 2014 Canonical, Ltd. -# All Rights Reserved. +# Copyright 2015 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,28 +14,52 @@ import logging import socket +import threading import fixtures import testtools -from six.moves import mock - -try: - import zmq -except ImportError: - zmq = None - -from oslo import messaging -from oslo.utils import importutils +import oslo_messaging +from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _ from oslo_messaging.tests import utils as test_utils -# eventlet is not yet py3 compatible, so skip if not installed -eventlet = importutils.try_import('eventlet') - -impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq') - LOG = logging.getLogger(__name__) +zmq = zmq_async.import_zmq() + + +class TestRPCServerListener(object): + + def __init__(self, driver): + self.driver = driver + self.target = None + self.listener = None + self.executor = zmq_async.get_executor(self._run) + self._stop = threading.Event() + self._received = threading.Event() + self.message = None + + def listen(self, target): + self.target = target + self.listener = self.driver.listen(self.target) + self.executor.execute() + + def _run(self): + try: + message = self.listener.poll() + if message is not None: + self._received.set() + self.message = message + message.reply(reply=True) + except Exception: + LOG.exception(_("Unexpected exception occurred.")) + + def stop(self): + self.executor.stop() + def get_unused_port(): """Returns an unused port on localhost.""" @@ -56,7 +79,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' # Get driver - transport = messaging.get_transport(self.conf) + transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver # Set config values @@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): # Start RPC LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() + self.broker = ZmqBroker(self.conf) + self.broker.start() + + self.listener = TestRPCServerListener(self.driver) - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') self.addCleanup(stopRpc(self.__dict__)) @@ -85,7 +109,7 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase): self.messaging_conf.transport_driver = 'zmq' def test_driver_load(self): - transport = messaging.get_transport(self.conf) + transport = oslo_messaging.get_transport(self.conf) self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) @@ -94,347 +118,127 @@ class stopRpc(object): self.attrs = attrs def __call__(self): - if self.attrs['reactor']: - self.attrs['reactor'].close() + if self.attrs['broker']: + self.attrs['broker'].close() if self.attrs['driver']: self.attrs['driver'].cleanup() + if self.attrs['listener']: + self.attrs['listener'].stop() class TestZmqBasics(ZmqBaseTestCase): - def test_start_stop_listener(self): - target = messaging.Target(topic='testtopic') - listener = self.driver.listen(target) - result = listener.poll(0.01) - self.assertEqual(result, None) - def test_send_receive_raises(self): """Call() without method.""" - target = messaging.Target(topic='testtopic') - self.driver.listen(target) + target = oslo_messaging.Target(topic='testtopic') + self.listener.listen(target) self.assertRaises( KeyError, self.driver.send, target, {}, {'tx_id': 1}, wait_for_reply=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage') - def test_send_receive_topic(self, mock_msg): - """Call() with method.""" - mock_msg.return_value = msg = mock.MagicMock() - msg.received = received = mock.MagicMock() - received.failure = False - received.reply = True - msg.condition = condition = mock.MagicMock() - condition.wait.return_value = True + def test_send_receive_topic(self): + """Call() with topic.""" - target = messaging.Target(topic='testtopic') - self.driver.listen(target) + target = oslo_messaging.Target(topic='testtopic') + self.listener.listen(target) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=True) - self.assertEqual(result, True) + self.assertIsNotNone(result) - @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) - def test_send_receive_fanout(self, mock_call): - target = messaging.Target(topic='testtopic', fanout=True) + def test_send_noreply(self): + """Cast() with topic.""" + + target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1") + self.listener.listen(target) + result = self.driver.send( + target, {}, + {'method': 'hello-world', 'tx_id': 1}, + wait_for_reply=False) + + self.listener._received.wait() + + self.assertIsNone(result) + self.assertEqual(True, self.listener._received.isSet()) + method = self.listener.message.message[u'method'] + self.assertEqual(u'hello-world', method) + + @testtools.skip("Not implemented feature") + def test_send_fanout(self): + target = oslo_messaging.Target(topic='testtopic', fanout=True) self.driver.listen(target) - mock_call.__name__ = '_call' - mock_call.return_value = [True] - result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=True) + wait_for_reply=False) - self.assertEqual(result, True) - mock_call.assert_called_once_with( - self.driver, - 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'], - {}, 'fanout~testtopic.127.0.0.1', - {'tx_id': 1, 'method': 'hello-world'}, - None, False, [], True) + self.assertIsNone(result) + self.assertEqual(True, self.listener._received.isSet()) + msg_pattern = "{'method': 'hello-world', 'tx_id': 1}" + self.assertEqual(msg_pattern, self.listener.message) - @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) - def test_send_receive_direct(self, mock_call): - # Also verifies fix for bug http://pad.lv/1301723 - target = messaging.Target(topic='testtopic', server='localhost') - self.driver.listen(target) + def test_send_receive_direct(self): + """Call() without topic.""" - mock_call.__name__ = '_call' - mock_call.return_value = [True] - - result = self.driver.send( - target, {}, - {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=True) - - self.assertEqual(result, True) - mock_call.assert_called_once_with( - self.driver, - 'tcp://localhost:%s' % self.conf['rpc_zmq_port'], - {}, 'testtopic.localhost', - {'tx_id': 1, 'method': 'hello-world'}, - None, False, [], True) + target = oslo_messaging.Target(server='127.0.0.1') + self.listener.listen(target) + message = {'method': 'hello-world', 'tx_id': 1} + context = {} + result = self.driver.send(target, context, message, + wait_for_reply=True) + self.assertTrue(result) -class TestZmqSocket(test_utils.BaseTestCase): +class TestPoller(test_utils.BaseTestCase): - @testtools.skipIf(zmq is None, "zmq not available") def setUp(self): - super(TestZmqSocket, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver + super(TestPoller, self).setUp() + self.poller = zmq_async.get_poller() + self.ctx = zmq.Context() + self.ADDR_REQ = "ipc://request1" - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + def test_poll_blocking(self): - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False, - subscribe=None) - self.assertTrue(sock.can_recv) - self.assertFalse(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + rep = self.ctx.socket(zmq.REP) + rep.bind(self.ADDR_REQ) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + reply_poller = zmq_async.get_reply_poller() + reply_poller.register(rep) - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False, - subscribe=None) - self.assertTrue(sock.can_recv) - self.assertFalse(sock.can_send) - self.assertTrue(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + def listener(): + incoming, socket = reply_poller.poll() + self.assertEqual(b'Hello', incoming[0]) + socket.send_string('Reply') + reply_poller.resume_polling(socket) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + executor = zmq_async.get_executor(listener) + executor.execute() - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False, - subscribe=None) - self.assertFalse(sock.can_recv) - self.assertTrue(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + req1 = self.ctx.socket(zmq.REQ) + req1.connect(self.ADDR_REQ) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' + req2 = self.ctx.socket(zmq.REQ) + req2.connect(self.ADDR_REQ) - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False, - subscribe=None) - self.assertFalse(sock.can_recv) - self.assertTrue(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) + req1.send_string('Hello') + req2.send_string('Hello') + reply = req1.recv_string() + self.assertEqual('Reply', reply) -class TestZmqIncomingMessage(test_utils.BaseTestCase): + reply = req2.recv_string() + self.assertEqual('Reply', reply) - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqIncomingMessage, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver + def test_poll_timeout(self): + rep = self.ctx.socket(zmq.REP) + rep.bind(self.ADDR_REQ) - def test_zmqincomingmessage(self): - msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo') - msg.reply("abc") - self.assertIsInstance( - msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) - self.assertIsInstance( - msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) - self.assertEqual(msg.received.reply, "abc") - msg.requeue() + reply_poller = zmq_async.get_reply_poller() + reply_poller.register(rep) - -class TestZmqConnection(ZmqBaseTestCase): - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_create_consumer(self, mock_reactor): - - mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - topic = 'topic.foo' - context = mock.Mock() - inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % - (self.internal_ipc_dir)) - # No Fanout - conn.create_consumer(topic, context) - conn.reactor.register.assert_called_with(context, inaddr, - impl_zmq.zmq.PULL, - subscribe=None, in_bind=False) - - # Reset for next bunch of checks - conn.reactor.register.reset_mock() - - # Fanout - inaddr = ('ipc://%s/zmq_topic_fanout~topic' % - (self.internal_ipc_dir)) - conn.create_consumer(topic, context, fanout='subscriber.foo') - conn.reactor.register.assert_called_with(context, inaddr, - impl_zmq.zmq.SUB, - subscribe='subscriber.foo', - in_bind=False) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor): - mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - topic = 'topic.foo' - context = mock.Mock() - inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % - (self.internal_ipc_dir)) - - conn.create_consumer(topic, context) - conn.reactor.register.assert_called_with( - context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False) - conn.reactor.register.reset_mock() - # Call again with same topic - conn.create_consumer(topic, context) - self.assertFalse(conn.reactor.register.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker', - autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker): - conn = impl_zmq.Connection(self.driver.conf, self.driver) - conn.reactor.close = mock.Mock() - mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock() - conn.close() - self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called) - self.assertTrue(conn.reactor.close.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_wait(self, mock_reactor): - conn = impl_zmq.Connection(self.driver.conf, self.driver) - conn.reactor.wait = mock.Mock() - conn.wait() - self.assertTrue(conn.reactor.wait.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker', - autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_consume_in_thread(self, mock_reactor, - mock_getmatchmaker): - mock_getmatchmaker.return_value.start_heartbeat = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - conn.reactor.consume_in_thread = mock.Mock() - conn.consume_in_thread() - self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called) - self.assertTrue(conn.reactor.consume_in_thread.called) - - -class TestZmqListener(ZmqBaseTestCase): - - def test_zmqlistener_no_msg(self): - listener = impl_zmq.ZmqListener(self.driver) - # Timeout = 0 should return straight away since the queue is empty - listener.poll(timeout=0) - - def test_zmqlistener_w_msg(self): - listener = impl_zmq.ZmqListener(self.driver) - kwargs = {'a': 1, 'b': 2} - m = mock.Mock() - ctxt = mock.Mock(autospec=impl_zmq.RpcContext) - message = {'namespace': 'name.space', 'method': m.fake_method, - 'args': kwargs} - eventlet.spawn_n(listener.dispatch, ctxt, message) - resp = listener.poll(timeout=10) - msg = {'method': m.fake_method, 'namespace': 'name.space', - 'args': kwargs} - self.assertEqual(resp.message, msg) - - -class TestZmqDriver(ZmqBaseTestCase): - - @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) - def test_zmqdriver_send(self, mock_multi_send, mock_cast): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic' - msg = 'jeronimo' - self.driver.send(messaging.Target(topic=topic), context, msg, - False, 0, False) - mock_multi_send.assert_called_with(self.driver, mock_cast, context, - topic, msg, - allowed_remote_exmods=[], - envelope=False, pooled=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) - def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic.foo' - topic_reformat = 'testtopic-foo' - msg = 'jeronimo' - self.driver.send_notification(messaging.Target(topic=topic), context, - msg, False, False) - mock_multi_send.assert_called_with(self.driver, mock_cast, context, - topic_reformat, msg, - allowed_remote_exmods=[], - envelope=False, pooled=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True) - def test_zmqdriver_listen(self, mock_connection, mock_listener): - mock_listener.return_value = listener = mock.Mock() - mock_connection.return_value = conn = mock.Mock() - conn.create_consumer = mock.Mock() - conn.consume_in_thread = mock.Mock() - topic = 'testtopic.foo' - self.driver.listen(messaging.Target(topic=topic)) - conn.create_consumer.assert_called_with(topic, listener, fanout=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True) - def test_zmqdriver_listen_for_notification(self, mock_connection, - mock_listener): - mock_listener.return_value = listener = mock.Mock() - mock_connection.return_value = conn = mock.Mock() - conn.create_consumer = mock.Mock() - conn.consume_in_thread = mock.Mock() - topic = 'testtopic.foo' - data = [(messaging.Target(topic=topic), 0)] - # NOTE(jamespage): Pooling not supported, just pass None for now. - self.driver.listen_for_notifications(data, None) - conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener) + incoming, socket = reply_poller.poll(1) + self.assertIsNone(incoming) + self.assertIsNone(socket)