[zmq] PUB-SUB pipeline

In this change PUB-SUB pipeline was added
and used for CAST-Fanout pattern.

Added 2 new options:

* use_pub_sub - configures the driver to use
PUB/SUB if True, fallbacks to DEALER/ROUTER implementation
otherwise.

PUB/SUB implementation is assumed to be always used with proxy.

* direct_over_proxy - specifies to use proxy with direct patterns
(CALL,CAST). This option is in replace of zmq_use_broker.
Latter is meaningless in context of PUB/SUB architecture,
so renamed.

Change-Id: I7c02d4d62632293941bc0d9d947e5362a5317db6
Closes-Bug: #1515185
This commit is contained in:
Oleksii Zamiatin 2015-11-24 18:59:22 +02:00
parent a4f83d97b9
commit 1a03d7b447
25 changed files with 704 additions and 413 deletions

View File

@ -14,7 +14,6 @@
import logging
import os
import pprint
import socket
import threading
@ -24,14 +23,12 @@ from stevedore import driver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.client import zmq_client_light
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc)
from oslo_messaging._executors import impl_pooledexecutor
from oslo_messaging._i18n import _LE
pformat = pprint.pformat
LOG = logging.getLogger(__name__)
RPCException = rpc_common.RPCException
@ -42,16 +39,8 @@ zmq_opts = [
'The "host" option should point or resolve to this '
'address.'),
# The module.Class to use for matchmaking.
cfg.StrOpt(
'rpc_zmq_matchmaker',
default='redis',
help='MatchMaker driver.',
),
cfg.BoolOpt('rpc_zmq_all_req_rep',
default=True,
help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'),
cfg.StrOpt('rpc_zmq_matchmaker', default='redis',
help='MatchMaker driver.'),
cfg.StrOpt('rpc_zmq_concurrency', default='eventlet',
help='Type of concurrency used. Either "native" or "eventlet"'),
@ -71,19 +60,21 @@ zmq_opts = [
help='Name of this node. Must be a valid hostname, FQDN, or '
'IP address. Must match "host" option, if running Nova.'),
cfg.IntOpt('rpc_cast_timeout',
default=30,
cfg.IntOpt('rpc_cast_timeout', default=30,
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.IntOpt('rpc_poll_timeout',
default=1,
cfg.IntOpt('rpc_poll_timeout', default=1,
help='The default number of seconds that poll should wait. '
'Poll raises timeout exception when timeout expired.'),
cfg.BoolOpt('zmq_use_broker',
default=False,
help='Configures zmq-messaging to use broker or not.'),
cfg.BoolOpt('direct_over_proxy', default=True,
help='Configures zmq-messaging to use proxy with '
'non PUB/SUB patterns.'),
cfg.BoolOpt('use_pub_sub', default=True,
help='Use PUB/SUB pattern for fanout methods. '
'PUB/SUB always uses proxy.'),
cfg.PortOpt('rpc_zmq_min_port',
default=49152,
@ -185,15 +176,12 @@ class ZmqDriver(base.BaseDriver):
self.notify_server = LazyDriverItem(
zmq_server.ZmqServer, self, self.conf, self.matchmaker)
client_cls = zmq_client_light.ZmqClientLight \
if conf.zmq_use_broker else zmq_client.ZmqClient
self.client = LazyDriverItem(
client_cls, self.conf, self.matchmaker,
zmq_client.ZmqClient, self.conf, self.matchmaker,
self.allowed_remote_exmods)
self.notifier = LazyDriverItem(
client_cls, self.conf, self.matchmaker,
zmq_client.ZmqClient, self.conf, self.matchmaker,
self.allowed_remote_exmods)
super(ZmqDriver, self).__init__(conf, url, default_exchange,

View File

@ -17,6 +17,8 @@ import logging
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_pub_publisher
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -42,6 +44,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker, reply_receiver)
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker)
def run(self):
message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
@ -53,10 +57,16 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
else:
self._redirect_reply(message)
def _redirect_in_request(self, request):
def _redirect_in_request(self, multipart_message):
LOG.debug("-> Redirecting request %s to TCP publisher"
% request)
self.publisher.send_request(request)
% multipart_message)
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
if self.conf.use_pub_sub and \
envelope[zmq_names.FIELD_MSG_TYPE] \
== zmq_names.CAST_FANOUT_TYPE:
self.pub_publisher.send_request(multipart_message)
else:
self.publisher.send_request(multipart_message)
def _redirect_reply(self, reply):
LOG.debug("Reply proxy %s" % reply)

View File

@ -26,6 +26,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
@ -44,7 +45,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
self.matchmaker = matchmaker
self.reply_waiter = ReplyWaiter(conf)
self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \
if not conf.zmq_use_broker else \
if not conf.direct_over_proxy else \
RequestSenderLight(conf, matchmaker, self.reply_waiter)
def send_request(self, request):
@ -124,7 +125,7 @@ class RequestSenderLight(RequestSender):
"""
def __init__(self, conf, matchmaker, reply_waiter):
if not conf.zmq_use_broker:
if not conf.direct_over_proxy:
raise rpc_common.RPCException("RequestSenderLight needs a proxy!")
super(RequestSenderLight, self).__init__(
@ -190,5 +191,9 @@ class ReplyWaiter(object):
reply, socket = self.poller.poll(
timeout=self.conf.rpc_poll_timeout)
if reply is not None:
call_future = self.replies[reply[zmq_names.FIELD_MSG_ID]]
call_future.set_result(reply)
reply_id = reply[zmq_names.FIELD_MSG_ID]
call_future = self.replies.get(reply_id)
if call_future:
call_future.set_result(reply)
else:
LOG.warning(_LW("Received timed out reply: %s") % reply_id)

View File

@ -75,6 +75,7 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
def __init__(self, conf, address):
super(DealerPublisherLight, self).__init__(conf)
self.socket = self.zmq_context.socket(zmq.DEALER)
self.address = address
self.socket.connect(address)
def send_request(self, request):
@ -88,6 +89,12 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
self.socket.send_pyobj(envelope, zmq.SNDMORE)
self.socket.send_pyobj(request)
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
"a target %(target)s"
% {"message": request.message_id,
"target": request.target,
"addr": self.address})
def cleanup(self):
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()

View File

@ -16,32 +16,108 @@ import logging
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PubPublisher(zmq_publisher_base.PublisherMultisend):
class PubPublisherProxy(zmq_publisher_base.PublisherBase):
"""PUB/SUB based request publisher
The publisher intended to be used for Fanout and Notify
multi-sending patterns.
It differs from direct publishers like DEALER or PUSH based
in a way it treats matchmaker. Here all publishers register
in the matchmaker. Subscribers (server-side) take the list
of publishers and connect to all of them but subscribe
only to a specific topic-filtering tag generated from the
Target object.
"""
def __init__(self, conf, matchmaker):
super(PubPublisher, self).__init__(conf, matchmaker, zmq.PUB)
super(PubPublisherProxy, self).__init__(conf)
self.matchmaker = matchmaker
def send_request(self, request):
self.socket = zmq_socket.ZmqRandomPortSocket(
self.conf, self.zmq_context, zmq.PUB)
if request.msg_type not in zmq_names.NOTIFY_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
self.socket.port)
pub_socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.SUB))
self._send_request(pub_socket, request)
self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context)
def _send_request(self, socket, request):
LOG.info(_LI("[PUB:%(pub)s, PULL:%(pull)s] Run PUB publisher") %
{"pub": self.host,
"pull": self.sync_channel.sync_host})
super(PubPublisher, self)._send_request(socket, request)
self.matchmaker.register_publisher(
(self.host, self.sync_channel.sync_host))
LOG.debug("Publishing message %(message)s to a target %(target)s"
% {"message": request.message,
"target": request.target})
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
msg_type = envelope[zmq_names.FIELD_MSG_TYPE]
target = envelope[zmq_names.FIELD_TARGET]
message_id = envelope[zmq_names.FIELD_MSG_ID]
if msg_type not in zmq_names.MULTISEND_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(msg_type)
topic_filter = zmq_address.target_to_subscribe_filter(target)
self.socket.send(topic_filter, zmq.SNDMORE)
self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
LOG.debug("Publishing message [%(topic)s] %(message_id)s to "
"a target %(target)s "
% {"message_id": message_id,
"target": target,
"topic": topic_filter})
def cleanup(self):
self.matchmaker.unregister_publisher(
(self.host, self.sync_channel.sync_host))
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
class SyncChannel(object):
"""Subscribers synchronization channel
As far as PUB/SUB is one directed way pattern we need some
backwards channel to have a possibility of subscribers
to talk back to publisher.
May be used for heartbeats or some kind of acknowledgments etc.
"""
def __init__(self, conf, matchmaker, context):
self.conf = conf
self.matchmaker = matchmaker
self.context = context
self._ready = None
# NOTE(ozamiatin): May be used for heartbeats when we
# implement them
self.sync_socket = zmq_socket.ZmqRandomPortSocket(
self.conf, self.context, zmq.PULL)
self.poller = zmq_async.get_poller()
self.poller.register(self.sync_socket)
self.sync_host = zmq_address.combine_address(self.conf.rpc_zmq_host,
self.sync_socket.port)
def is_ready(self):
LOG.debug("[%s] Waiting for ready from first subscriber" %
self.sync_host)
if self._ready is None:
self._ready = self.poller.poll()
LOG.debug("[%s] Received ready from first subscriber" %
self.sync_host)
return self._ready is not None

View File

@ -132,7 +132,6 @@ class PublisherMultisend(PublisherBase):
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
return socket
def _connect_to_address(self, socket, address, target):

View File

@ -1,130 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import uuid
import six
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ReqPublisher(zmq_publisher_base.PublisherBase):
def __init__(self, conf, matchmaker):
super(ReqPublisher, self).__init__(conf)
self.matchmaker = matchmaker
def send_request(self, request):
if request.msg_type != zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket, connect_address = self._connect_to_host(request.target,
request.timeout)
request.host = connect_address
self._send_request(socket, request)
return self._receive_reply(socket, request)
def _resolve_host_address(self, target, timeout=0):
host = self.matchmaker.get_single_host(
target, zmq_names.socket_type_str(zmq.ROUTER), timeout)
return zmq_address.get_tcp_direct_address(host)
def _connect_to_host(self, target, timeout=0):
try:
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ)
if six.PY3:
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
else:
socket.identity = str(uuid.uuid1())
connect_address = self._resolve_host_address(target, timeout)
LOG.info(_LI("Connecting REQ to %s") % connect_address)
socket.connect(connect_address)
self.outbound_sockets[str(target)] = socket
return socket, connect_address
except zmq.ZMQError as e:
errmsg = _LE("Error connecting to socket: %s") % str(e)
LOG.error(_LE("Error connecting to socket: %s") % str(e))
raise rpc_common.RPCException(errmsg)
@staticmethod
def _receive_reply(socket, request):
def _receive_method(socket):
reply = socket.recv_pyobj()
LOG.debug("Received reply %s" % reply)
return reply
LOG.debug("Start waiting reply")
# NOTE(ozamiatin): Check for retry here (no retries now)
with contextlib.closing(zmq_async.get_reply_poller()) as poller:
poller.register(socket, recv_method=_receive_method)
reply, socket = poller.poll(timeout=request.timeout)
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
LOG.debug("Received reply %s" % reply)
if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
request.allowed_remote_exmods)
else:
return reply[zmq_names.FIELD_REPLY]
def close(self):
# For contextlib compatibility
self.cleanup()
class ReqPublisherLight(ReqPublisher):
def __init__(self, conf, matchmaker):
super(ReqPublisherLight, self).__init__(conf, matchmaker)
def _resolve_host_address(self, target, timeout=0):
return zmq_address.get_broker_address(self.conf)
def _send_request(self, socket, request):
LOG.debug("Sending %(type)s message_id %(message)s"
" to a target %(target)s, host:%(host)s"
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"host": request.host})
envelope = request.create_envelope()
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)

View File

@ -13,12 +13,12 @@
# under the License.
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -28,8 +28,11 @@ zmq = zmq_async.import_zmq()
class ZmqClient(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
if conf.zmq_use_broker:
raise rpc_common.RPCException("This client doesn't need proxy!")
default_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker) if not conf.direct_over_proxy else \
zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(conf))
super(ZmqClient, self).__init__(
conf, matchmaker, allowed_remote_exmods,
@ -38,7 +41,14 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
"default": zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
# Here use DealerPublisherLight for sending request to proxy
# which finally uses PubPublisher to send fanout in case of
# 'use_pub_sub' option configured.
zmq_names.CAST_FANOUT_TYPE:
zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(conf))
if conf.use_pub_sub else default_publisher,
"default": default_publisher
}
)

View File

@ -73,5 +73,8 @@ class ZmqClientBase(object):
self.notify_publisher.send_request(request)
def cleanup(self):
cleaned = set()
for publisher in self.publishers.values():
publisher.cleanup()
if publisher not in cleaned:
publisher.cleanup()
cleaned.add(publisher)

View File

@ -1,46 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class ZmqClientLight(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
if not conf.zmq_use_broker:
raise rpc_common.RPCException(
"This client needs proxy to be configured!")
super(ZmqClientLight, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
"default": zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(self.conf))
}
)

View File

@ -61,7 +61,12 @@ class Request(object):
self.target = target
self.context = context
self.message = message
self.retry = retry
if not isinstance(retry, int) and retry is not None:
raise ValueError(
"retry must be an integer, not {0}".format(type(retry)))
self.message_id = str(uuid.uuid1())
self.proxy_reply_id = None
@ -90,6 +95,11 @@ class RpcRequest(Request):
self.timeout = kwargs.pop("timeout")
assert self.timeout is not None, "Timeout should be specified!"
if not isinstance(self.timeout, int) and self.timeout is not None:
raise ValueError(
"timeout must be an integer, not {0}"
.format(type(self.timeout)))
super(RpcRequest, self).__init__(*args, **kwargs)
def create_envelope(self):

View File

@ -14,14 +14,11 @@
import abc
import collections
import logging
import random
import retrying
import six
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
@ -32,9 +29,51 @@ class MatchMakerBase(object):
def __init__(self, conf, *args, **kwargs):
super(MatchMakerBase, self).__init__(*args, **kwargs)
self.conf = conf
@abc.abstractmethod
def register_publisher(self, hostname):
"""Register publisher on nameserver.
This works for PUB-SUB only
:param hostname: host for the topic in "host:port" format
host for back-chatter in "host:port" format
:type hostname: tuple
"""
@abc.abstractmethod
def unregister_publisher(self, hostname):
"""Unregister publisher on nameserver.
This works for PUB-SUB only
:param hostname: host for the topic in "host:port" format
host for back-chatter in "host:port" format
:type hostname: tuple
"""
def get_publishers_retrying(self):
"""Retry until at least one publisher appears"""
def retry_if_empty(publishers):
return not publishers
_retry = retrying.retry(retry_on_result=retry_if_empty)
@_retry
def _get_publishers():
return self.get_publishers()
return _get_publishers()
@abc.abstractmethod
def get_publishers(self):
"""Get all publisher-hosts from nameserver.
:returns: a list of tuples of strings "hostname:port" hosts
"""
@abc.abstractmethod
def register(self, target, hostname, listener_type):
"""Register target on nameserver.
@ -68,71 +107,6 @@ class MatchMakerBase(object):
:returns: a list of "hostname:port" hosts
"""
def get_single_host(self, target, listener_type, timeout=None, retry=0):
"""Get a single host by target.
:param target: the target for messages
:type target: Target
:param timeout: matchmaker query timeout
:type timeout: integer
:param retry: the number of retries to do
None or -1 means retry forever
0 means do not retry
N means retry N times
:type retry: integer
:returns: a "hostname:port" host
"""
if not isinstance(timeout, int) and timeout is not None:
raise ValueError(
"timeout must be integer, not {0}".format(type(timeout)))
if not isinstance(retry, int) and retry is not None:
raise ValueError(
"retry must be integer, not {0}".format(type(retry)))
if timeout is None or timeout < 0:
full_timeout = 0
retry_timeout = 0
else:
retry_timeout = timeout * 1000
if retry is None or retry < 0:
full_timeout = None
else:
full_timeout = retry * retry_timeout
_retry = retrying.retry(stop_max_delay=full_timeout,
wait_fixed=retry_timeout)
@_retry
def _get_single_host():
hosts = self.get_hosts(target, listener_type)
try:
if not hosts:
err_msg = "No hosts were found for target %s." % target
LOG.error(err_msg)
raise oslo_messaging.InvalidTarget(err_msg, target)
if len(hosts) == 1:
host = hosts[0]
LOG.info(_LI(
"A single host %(host)s found for target %(target)s.")
% {"host": host, "target": target})
else:
host = random.choice(hosts)
LOG.warning(_LW(
"Multiple hosts %(hosts)s were found for target "
" %(target)s. Using the random one - %(host)s.")
% {"hosts": hosts, "target": target, "host": host})
return host
except oslo_messaging.InvalidTarget as ex:
if timeout:
raise oslo_messaging.MessagingTimeout()
else:
raise ex
return _get_single_host()
class DummyMatchMaker(MatchMakerBase):
@ -140,6 +114,18 @@ class DummyMatchMaker(MatchMakerBase):
super(DummyMatchMaker, self).__init__(conf, *args, **kwargs)
self._cache = collections.defaultdict(list)
self._publishers = set()
def register_publisher(self, hostname):
if hostname not in self._publishers:
self._publishers.add(hostname)
def unregister_publisher(self, hostname):
if hostname in self._publishers:
self._publishers.remove(hostname)
def get_publishers(self):
return list(self._publishers)
def register(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)

View File

@ -36,6 +36,8 @@ matchmaker_redis_opts = [
help='Password for Redis server (optional).'),
]
_PUBLISHERS_KEY = "PUBLISHERS"
class RedisMatchMaker(base.MatchMakerBase):
@ -49,6 +51,22 @@ class RedisMatchMaker(base.MatchMakerBase):
password=self.conf.matchmaker_redis.password,
)
def register_publisher(self, hostname):
host_str = ",".join(hostname)
if host_str not in self._get_hosts_by_key(_PUBLISHERS_KEY):
self._redis.lpush(_PUBLISHERS_KEY, host_str)
def unregister_publisher(self, hostname):
host_str = ",".join(hostname)
self._redis.lrem(_PUBLISHERS_KEY, 0, host_str)
def get_publishers(self):
hosts = []
hosts.extend([tuple(host_str.split(","))
for host_str in
self._get_hosts_by_key(_PUBLISHERS_KEY)])
return hosts
def _get_hosts_by_key(self, key):
return self._redis.lrange(key, 0, -1)

View File

@ -96,9 +96,10 @@ class GreenExecutor(zmq_poller.Executor):
def __init__(self, method):
self._method = method
super(GreenExecutor, self).__init__(None)
self._done = threading.Event()
def _loop(self):
while True:
while not self._done.is_set():
self._method()
eventlet.sleep()
@ -112,3 +113,6 @@ class GreenExecutor(zmq_poller.Executor):
def stop(self):
if self.thread is not None:
self.thread.kill()
def done(self):
self._done.set()

View File

@ -47,8 +47,6 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
def poll(self, timeout=None):
LOG.debug("Entering poll method")
if timeout:
timeout *= 1000 # zmq poller waits milliseconds
@ -94,3 +92,6 @@ class ThreadingExecutor(zmq_poller.Executor):
def wait(self):
self.thread.join()
def done(self):
self._stop.set()

View File

@ -38,24 +38,6 @@ class ConsumerBase(object):
self.sockets = []
self.context = zmq.Context()
def subscribe_socket(self, socket_type):
try:
socket = zmq_socket.ZmqRandomPortSocket(
self.conf, self.context, socket_type)
self.sockets.append(socket)
self.poller.register(socket, self.receive_message)
LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d",
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
return socket
except zmq.ZMQError as e:
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
% (self.port, e)
LOG.error(_LE("Failed binding to port %(port)d: %(e)s")
% (self.port, e))
raise rpc_common.RPCException(errmsg)
@abc.abstractmethod
def listen(self, target):
"""Associate new sockets with targets here"""
@ -78,6 +60,24 @@ class SingleSocketConsumer(ConsumerBase):
super(SingleSocketConsumer, self).__init__(conf, poller, server)
self.socket = self.subscribe_socket(socket_type)
def subscribe_socket(self, socket_type):
try:
socket = zmq_socket.ZmqRandomPortSocket(
self.conf, self.context, socket_type)
self.sockets.append(socket)
self.poller.register(socket, self.receive_message)
LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d",
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
return socket
except zmq.ZMQError as e:
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
% (self.port, e)
LOG.error(_LE("Failed binding to port %(port)d: %(e)s")
% (self.port, e))
raise rpc_common.RPCException(errmsg)
@property
def address(self):
return self.socket.bind_address

View File

@ -0,0 +1,158 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import uuid
import six
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class SubIncomingMessage(base.IncomingMessage):
def __init__(self, listener, request, socket, poller):
super(SubIncomingMessage, self).__init__(
listener, request.context, request.message)
self.socket = socket
self.msg_id = request.message_id
poller.resume_polling(socket)
def reply(self, reply=None, failure=None, log_failure=True):
"""Reply is not needed for non-call messages"""
def acknowledge(self):
LOG.debug("Not sending acknowledge for %s", self.msg_id)
def requeue(self):
"""Requeue is not supported"""
class SubConsumer(zmq_consumer_base.ConsumerBase):
def __init__(self, conf, poller, server):
super(SubConsumer, self).__init__(conf, poller, server)
self.matchmaker = server.matchmaker
self.subscriptions = set()
self.targets = []
self._socket_lock = threading.Lock()
self.socket = zmq_socket.ZmqSocket(self.context, zmq.SUB)
self.sockets.append(self.socket)
self.id = uuid.uuid4()
self.publishers_poller = MatchmakerPoller(
self.matchmaker, on_result=self.on_publishers)
def _subscribe_on_target(self, target):
topic_filter = zmq_address.target_to_subscribe_filter(target)
if target.topic:
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic))
self.subscriptions.add(six.b(target.topic))
if target.server:
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server))
self.subscriptions.add(six.b(target.server))
if target.topic and target.server:
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
self.subscriptions.add(topic_filter)
LOG.debug("[%(host)s] Subscribing to topic %(filter)s"
% {"host": self.id,
"filter": topic_filter})
def on_publishers(self, publishers):
with self._socket_lock:
for host, sync in publishers:
self.socket.connect(zmq_address.get_tcp_direct_address(host))
self.poller.register(self.socket, self.receive_message)
LOG.debug("[%s] SUB consumer connected to publishers %s"
% (self.id, publishers))
def listen(self, target):
LOG.debug("Listen to target %s" % target)
with self._socket_lock:
self._subscribe_on_target(target)
def _receive_request(self, socket):
topic_filter = socket.recv()
LOG.debug("[%s] Received %s topic" % (self.id, topic_filter))
assert topic_filter in self.subscriptions
request = socket.recv_pyobj()
return request
def receive_message(self, socket):
try:
request = self._receive_request(socket)
if not request:
return None
LOG.debug("Received %(type)s, %(id)s, %(target)s"
% {"type": request.msg_type,
"id": request.message_id,
"target": request.target})
if request.msg_type not in zmq_names.MULTISEND_TYPES:
LOG.error(_LE("Unknown message type: %s") % request.msg_type)
else:
return SubIncomingMessage(self.server, request, socket,
self.poller)
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))
class MatchmakerPoller(object):
"""This entity performs periodical async polling
to the matchmaker if no hosts were registered for
specified target before.
"""
def __init__(self, matchmaker, on_result):
self.matchmaker = matchmaker
self.executor = zmq_async.get_executor(
method=self._poll_for_publishers)
self.on_result = on_result
self.executor.execute()
def _poll_for_publishers(self):
publishers = self.matchmaker.get_publishers_retrying()
if publishers:
self.on_result(publishers)
self.executor.done()
class BackChatter(object):
def __init__(self, context):
self.socket = zmq_socket.ZmqSocket(context, zmq.PUSH)
def connect(self, address):
self.socket.connect(address)
def send_ready(self):
for i in range(self.socket.connections_count()):
self.socket.send(zmq_names.ACK_TYPE)
def close(self):
self.socket.setsockopt(zmq.LINGER, 5)
self.socket.close()

View File

@ -18,6 +18,8 @@ import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_router_consumer
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_sub_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
@ -31,14 +33,17 @@ class ZmqServer(base.Listener):
super(ZmqServer, self).__init__(driver)
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller()
if conf.zmq_use_broker:
self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker(
conf, self.poller, self)
else:
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker(
conf, self.poller, self) if conf.direct_over_proxy else \
zmq_router_consumer.RouterConsumer(
conf, self.poller, self)
self.notify_consumer = self.rpc_consumer
self.sub_consumer = zmq_sub_consumer.SubConsumer(
conf, self.poller, self) if conf.use_pub_sub else None
self.consumers = [self.rpc_consumer]
if self.sub_consumer:
self.consumers.append(self.sub_consumer)
@base.batch_poll_helper
def poll(self, timeout=None):
@ -59,6 +64,9 @@ class ZmqServer(base.Listener):
consumer = self.rpc_consumer
consumer.listen(target)
if self.sub_consumer:
self.sub_consumer.listen(target)
def listen_notification(self, targets_and_priorities):
consumer = self.notify_consumer

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
def combine_address(host, port):
return "%s:%s" % (host, port)
@ -46,3 +48,14 @@ def target_to_key(target, listener_type):
return prefix(target.topic)
if target.server:
return prefix(target.server)
def target_to_subscribe_filter(target):
if target.topic and target.server:
attributes = ['topic', 'server']
key = "/".join(getattr(target, attr) for attr in attributes)
return six.b(key)
if target.topic:
return six.b(target.topic)
if target.server:
return six.b(target.server)

View File

@ -70,6 +70,11 @@ def get_executor(method, zmq_concurrency='eventlet'):
return threading_poller.ThreadingExecutor(method)
def get_proc_executor(method):
from oslo_messaging._drivers.zmq_driver import zmq_poller
return zmq_poller.MutliprocessingExecutor(method)
def _is_eventlet_zmq_available():
return importutils.try_import('eventlet.green.zmq')

View File

@ -13,6 +13,7 @@
# under the License.
import abc
import multiprocessing
import six
@ -104,3 +105,31 @@ class Executor(object):
@abc.abstractmethod
def wait(self):
"""Wait until pass"""
@abc.abstractmethod
def done(self):
"""More soft way to stop rather than killing thread"""
class MutliprocessingExecutor(Executor):
def __init__(self, method):
process = multiprocessing.Process(target=self._loop)
self._method = method
super(MutliprocessingExecutor, self).__init__(process)
def _loop(self):
while not self._stop.is_set():
self._method()
def execute(self):
self.thread.start()
def stop(self):
self._stop.set()
def wait(self):
self.thread.join()
def done(self):
self._stop.set()

View File

@ -66,8 +66,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
self.host1)
def test_register_two_hosts(self):
self.test_matcher.register(self.target, self.host1, "test")
@ -75,8 +73,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1, self.host2])
self.assertIn(self.test_matcher.get_single_host(self.target, "test"),
[self.host1, self.host2])
def test_register_unsibscribe(self):
self.test_matcher.register(self.target, self.host1, "test")
@ -86,8 +82,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertNotIn(self.test_matcher.get_single_host(self.target, "test"),
[self.host2])
def test_register_two_same_hosts(self):
self.test_matcher.register(self.target, self.host1, "test")
@ -95,14 +89,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
self.host1)
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertEqual(self.test_matcher.get_hosts(target, "test"), [])
def test_get_single_host_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertRaises(oslo_messaging.InvalidTarget,
self.test_matcher.get_single_host, target, "test")

View File

@ -13,7 +13,6 @@
# under the License.
import logging
import threading
import fixtures
import testtools
@ -22,77 +21,16 @@ import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils
from oslo_messaging.tests.drivers.zmq import zmq_common
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestServerListener(object):
def __init__(self, driver):
self.driver = driver
self.listener = None
self.executor = zmq_async.get_executor(self._run)
self._stop = threading.Event()
self._received = threading.Event()
self.message = None
def listen(self, target):
self.listener = self.driver.listen(target)
self.executor.execute()
def listen_notifications(self, targets_and_priorities):
self.listener = self.driver.listen_for_notifications(
targets_and_priorities, {})
self.executor.execute()
def _run(self):
try:
message = self.listener.poll()
if message:
message = message[0]
message.acknowledge()
self._received.set()
self.message = message
message.reply(reply=True)
except Exception:
LOG.exception(_("Unexpected exception occurred."))
def stop(self):
self.executor.stop()
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests """
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'zmq_use_broker': False,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
self.listener = TestServerListener(self.driver)
self.addCleanup(stopRpc(self.__dict__))
class ZmqTestPortsRange(ZmqBaseTestCase):
class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
@ -131,18 +69,7 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase):
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['driver']:
self.attrs['driver'].cleanup()
if self.attrs['listener']:
self.attrs['listener'].stop()
class TestZmqBasics(ZmqBaseTestCase):
class TestZmqBasics(zmq_common.ZmqBaseTestCase):
def test_send_receive_raises(self):
"""Call() without method."""
@ -183,6 +110,7 @@ class TestZmqBasics(ZmqBaseTestCase):
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listener.listen(target)
result = self.driver.send(

View File

@ -0,0 +1,120 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import pickle
import time
import contextlib
import fixtures
import testtools
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_pub_publisher
from oslo_messaging.tests import utils as test_utils
from oslo_messaging.tests.drivers.zmq import zmq_common
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestPubSub(zmq_common.ZmqBaseTestCase):
LISTENERS_COUNT = 3
def setUp(self):
super(TestPubSub, self).setUp()
kwargs = {'use_pub_sub': True}
self.config(**kwargs)
self.publisher = zmq_pub_publisher.PubPublisherProxy(
self.conf, self.driver.matchmaker)
self.listeners = []
for i in range(self.LISTENERS_COUNT):
self.listeners.append(zmq_common.TestServerListener(self.driver))
def _send_request(self, target):
# Needed only in test env to get listener a chance to connect
# before request fires
time.sleep(1)
with contextlib.closing(zmq_request.FanoutRequest(
target, context={}, message={'method': 'hello-world'},
timeout=0, retry=None)) as request:
self.publisher.send_request([request.create_envelope(),
pickle.dumps(request)])
def _check_listener(self, listener):
listener._received.wait(timeout=5)
self.assertEqual(True, listener._received.isSet())
method = listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def _check_listener_negative(self, listener):
listener._received.wait(timeout=1)
self.assertEqual(False, listener._received.isSet())
def test_single_listener(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listener.listen(target)
self._send_request(target)
self._check_listener(self.listener)
def test_all_listeners(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
for listener in self.listeners:
listener.listen(target)
self._send_request(target)
for listener in self.listeners:
self._check_listener(listener)
def test_filtered(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
target_wrong = oslo_messaging.Target(topic='wrong', fanout=True)
self.listeners[0].listen(target)
self.listeners[1].listen(target)
self.listeners[2].listen(target_wrong)
self._send_request(target)
self._check_listener(self.listeners[0])
self._check_listener(self.listeners[1])
self._check_listener_negative(self.listeners[2])
def test_topic_part_matching(self):
target = oslo_messaging.Target(topic='testtopic', server='server')
target_part = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listeners[0].listen(target)
self.listeners[1].listen(target)
self._send_request(target_part)
self._check_listener(self.listeners[0])
self._check_listener(self.listeners[1])

View File

@ -0,0 +1,102 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import fixtures
import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestServerListener(object):
def __init__(self, driver):
self.driver = driver
self.listener = None
self.executor = zmq_async.get_executor(self._run)
self._stop = threading.Event()
self._received = threading.Event()
self.message = None
def listen(self, target):
self.listener = self.driver.listen(target)
self.executor.execute()
def listen_notifications(self, targets_and_priorities):
self.listener = self.driver.listen_for_notifications(
targets_and_priorities, {})
self.executor.execute()
def _run(self):
try:
messages = self.listener.poll()
if messages:
message = messages[0]
message.acknowledge()
self._received.set()
self.message = message
message.reply(reply=True)
except Exception:
LOG.exception(_("Unexpected exception occurred."))
def stop(self):
self.executor.stop()
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests """
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False,
'direct_over_proxy': False,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
self.listener = TestServerListener(self.driver)
self.addCleanup(StopRpc(self.__dict__))
class StopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['driver']:
self.attrs['driver'].cleanup()
if self.attrs['listener']:
self.attrs['listener'].stop()