Merge "Remove qpidd's driver from the tree"
This commit is contained in:
commit
5840ab3340
@ -25,6 +25,4 @@ different 3rd party libraries that don't ensure that. In certain
|
|||||||
cases, with some drivers, it does work:
|
cases, with some drivers, it does work:
|
||||||
|
|
||||||
* rabbit: works only if no connection have already been established.
|
* rabbit: works only if no connection have already been established.
|
||||||
* qpid: doesn't work (The qpid library has a global state that uses
|
|
||||||
file descriptors that can't be reset)
|
|
||||||
* amqp1: works
|
* amqp1: works
|
||||||
|
@ -45,7 +45,7 @@ Juno release, as almost all the core projects in OpenStack have switched to
|
|||||||
oslo_messaging, ZeroMQ can be the only RPC driver across the OpenStack cluster.
|
oslo_messaging, ZeroMQ can be the only RPC driver across the OpenStack cluster.
|
||||||
This document provides deployment information for this driver in oslo_messaging.
|
This document provides deployment information for this driver in oslo_messaging.
|
||||||
|
|
||||||
Other than AMQP-based drivers, like RabbitMQ or Qpid, ZeroMQ doesn't have
|
Other than AMQP-based drivers, like RabbitMQ, ZeroMQ doesn't have
|
||||||
any central brokers in oslo.messaging, instead, each host (running OpenStack
|
any central brokers in oslo.messaging, instead, each host (running OpenStack
|
||||||
services) is both ZeroMQ client and server. As a result, each host needs to
|
services) is both ZeroMQ client and server. As a result, each host needs to
|
||||||
listen to a certain TCP port for incoming connections and directly connect
|
listen to a certain TCP port for incoming connections and directly connect
|
||||||
@ -172,7 +172,6 @@ The parameters for the script oslo-messaging-zmq-receiver should be::
|
|||||||
|
|
||||||
You can specify ZeroMQ options in /etc/oslo/zeromq.conf if necessary.
|
You can specify ZeroMQ options in /etc/oslo/zeromq.conf if necessary.
|
||||||
|
|
||||||
|
|
||||||
Listening Address (optional)
|
Listening Address (optional)
|
||||||
----------------------------
|
----------------------------
|
||||||
|
|
||||||
@ -204,7 +203,7 @@ DevStack Support
|
|||||||
|
|
||||||
ZeroMQ driver has been supported by DevStack. The configuration is as follows::
|
ZeroMQ driver has been supported by DevStack. The configuration is as follows::
|
||||||
|
|
||||||
ENABLED_SERVICES+=,-rabbit,-qpid,zeromq
|
ENABLED_SERVICES+=,-rabbit,zeromq
|
||||||
ZEROMQ_MATCHMAKER=redis
|
ZEROMQ_MATCHMAKER=redis
|
||||||
|
|
||||||
In local.conf [localrc] section need to enable zmq plugin which lives in
|
In local.conf [localrc] section need to enable zmq plugin which lives in
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
Shared code between AMQP based openstack.common.rpc implementations.
|
Shared code between AMQP based openstack.common.rpc implementations.
|
||||||
|
|
||||||
The code in this module is shared between the rpc implementations based on
|
The code in this module is shared between the rpc implementations based on
|
||||||
AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
|
AMQP. Specifically, this includes impl_kombu. impl_carrot also
|
||||||
uses AMQP, but is deprecated and predates this code.
|
uses AMQP, but is deprecated and predates this code.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -66,7 +66,7 @@ amqp_opts = [
|
|||||||
UNIQUE_ID = '_unique_id'
|
UNIQUE_ID = '_unique_id'
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
# NOTE(sileht): Even if rabbit/qpid have only one Connection class,
|
# NOTE(sileht): Even if rabbit has only one Connection class,
|
||||||
# this connection can be used for two purposes:
|
# this connection can be used for two purposes:
|
||||||
# * wait and receive amqp messages (only do read stuffs on the socket)
|
# * wait and receive amqp messages (only do read stuffs on the socket)
|
||||||
# * send messages to the broker (only do write stuffs on the socket)
|
# * send messages to the broker (only do write stuffs on the socket)
|
||||||
|
@ -1,800 +0,0 @@
|
|||||||
# Copyright 2011 OpenStack Foundation
|
|
||||||
# Copyright 2011 - 2012, Red Hat, Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import functools
|
|
||||||
import itertools
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import time
|
|
||||||
import warnings
|
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_serialization import jsonutils
|
|
||||||
from oslo_utils import importutils
|
|
||||||
from oslo_utils import netutils
|
|
||||||
import six
|
|
||||||
|
|
||||||
from oslo_messaging._drivers import amqp as rpc_amqp
|
|
||||||
from oslo_messaging._drivers import amqpdriver
|
|
||||||
from oslo_messaging._drivers import base
|
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
|
||||||
from oslo_messaging._i18n import _
|
|
||||||
from oslo_messaging._i18n import _LE
|
|
||||||
from oslo_messaging._i18n import _LI
|
|
||||||
from oslo_messaging import exceptions
|
|
||||||
|
|
||||||
qpid_codec = importutils.try_import("qpid.codec010")
|
|
||||||
qpid_messaging = importutils.try_import("qpid.messaging")
|
|
||||||
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
qpid_opts = [
|
|
||||||
cfg.StrOpt('qpid_hostname',
|
|
||||||
default='localhost',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Qpid broker hostname.'),
|
|
||||||
cfg.IntOpt('qpid_port',
|
|
||||||
default=5672,
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Qpid broker port.'),
|
|
||||||
cfg.ListOpt('qpid_hosts',
|
|
||||||
default=['$qpid_hostname:$qpid_port'],
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Qpid HA cluster host:port pairs.'),
|
|
||||||
cfg.StrOpt('qpid_username',
|
|
||||||
default='',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Username for Qpid connection.'),
|
|
||||||
cfg.StrOpt('qpid_password',
|
|
||||||
default='',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Password for Qpid connection.',
|
|
||||||
secret=True),
|
|
||||||
cfg.StrOpt('qpid_sasl_mechanisms',
|
|
||||||
default='',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Space separated list of SASL mechanisms to use for '
|
|
||||||
'auth.'),
|
|
||||||
cfg.IntOpt('qpid_heartbeat',
|
|
||||||
default=60,
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Seconds between connection keepalive heartbeats.'),
|
|
||||||
cfg.StrOpt('qpid_protocol',
|
|
||||||
default='tcp',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help="Transport to use, either 'tcp' or 'ssl'."),
|
|
||||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
|
||||||
default=True,
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Whether to disable the Nagle algorithm.'),
|
|
||||||
cfg.IntOpt('qpid_receiver_capacity',
|
|
||||||
default=1,
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='The number of prefetched messages held by receiver.'),
|
|
||||||
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
|
|
||||||
# this file could probably use some additional refactoring so that the
|
|
||||||
# differences between each version are split into different classes.
|
|
||||||
cfg.IntOpt('qpid_topology_version',
|
|
||||||
default=1,
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help="The qpid topology version to use. Version 1 is what "
|
|
||||||
"was originally used by impl_qpid. Version 2 includes "
|
|
||||||
"some backwards-incompatible changes that allow broker "
|
|
||||||
"federation to work. Users should update to version 2 "
|
|
||||||
"when they are able to take everything down, as it "
|
|
||||||
"requires a clean break."),
|
|
||||||
]
|
|
||||||
|
|
||||||
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
|
|
||||||
|
|
||||||
|
|
||||||
def raise_invalid_topology_version(conf):
|
|
||||||
msg = (_("Invalid value for qpid_topology_version: %d") %
|
|
||||||
conf.qpid_topology_version)
|
|
||||||
LOG.error(msg)
|
|
||||||
raise Exception(msg)
|
|
||||||
|
|
||||||
|
|
||||||
class QpidMessage(dict):
|
|
||||||
def __init__(self, session, raw_message):
|
|
||||||
super(QpidMessage, self).__init__(
|
|
||||||
rpc_common.deserialize_msg(raw_message.content))
|
|
||||||
self._raw_message = raw_message
|
|
||||||
self._session = session
|
|
||||||
|
|
||||||
def acknowledge(self):
|
|
||||||
self._session.acknowledge(self._raw_message)
|
|
||||||
|
|
||||||
def requeue(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ConsumerBase(object):
|
|
||||||
"""Consumer base class."""
|
|
||||||
|
|
||||||
def __init__(self, conf, session, callback, node_name, node_opts,
|
|
||||||
link_name, link_opts):
|
|
||||||
"""Declare a queue on an amqp session.
|
|
||||||
|
|
||||||
'session' is the amqp session to use
|
|
||||||
'callback' is the callback to call when messages are received
|
|
||||||
'node_name' is the first part of the Qpid address string, before ';'
|
|
||||||
'node_opts' will be applied to the "x-declare" section of "node"
|
|
||||||
in the address string.
|
|
||||||
'link_name' goes into the "name" field of the "link" in the address
|
|
||||||
string
|
|
||||||
'link_opts' will be applied to the "x-declare" section of "link"
|
|
||||||
in the address string.
|
|
||||||
"""
|
|
||||||
self.callback = callback
|
|
||||||
self.receiver = None
|
|
||||||
self.rcv_capacity = conf.qpid_receiver_capacity
|
|
||||||
self.session = None
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
addr_opts = {
|
|
||||||
"create": "always",
|
|
||||||
"node": {
|
|
||||||
"type": "topic",
|
|
||||||
"x-declare": {
|
|
||||||
"durable": True,
|
|
||||||
"auto-delete": True,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"link": {
|
|
||||||
"durable": True,
|
|
||||||
"x-declare": {
|
|
||||||
"durable": False,
|
|
||||||
"auto-delete": True,
|
|
||||||
"exclusive": False,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
addr_opts["node"]["x-declare"].update(node_opts)
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
addr_opts = {
|
|
||||||
"link": {
|
|
||||||
"x-declare": {
|
|
||||||
"auto-delete": True,
|
|
||||||
"exclusive": False,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
addr_opts["link"]["x-declare"].update(link_opts)
|
|
||||||
if link_name:
|
|
||||||
addr_opts["link"]["name"] = link_name
|
|
||||||
|
|
||||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
|
||||||
|
|
||||||
self.connect(session)
|
|
||||||
|
|
||||||
def connect(self, session):
|
|
||||||
"""Declare the receiver on connect."""
|
|
||||||
self._declare_receiver(session)
|
|
||||||
|
|
||||||
def reconnect(self, session):
|
|
||||||
"""Re-declare the receiver after a Qpid reconnect."""
|
|
||||||
self._declare_receiver(session)
|
|
||||||
|
|
||||||
def _declare_receiver(self, session):
|
|
||||||
self.session = session
|
|
||||||
self.receiver = session.receiver(self.address)
|
|
||||||
self.receiver.capacity = self.rcv_capacity
|
|
||||||
|
|
||||||
def _unpack_json_msg(self, msg):
|
|
||||||
"""Load the JSON data in msg if msg.content_type indicates that it
|
|
||||||
is necessary. Put the loaded data back into msg.content and
|
|
||||||
update msg.content_type appropriately.
|
|
||||||
|
|
||||||
A Qpid Message containing a dict will have a content_type of
|
|
||||||
'amqp/map', whereas one containing a string that needs to be converted
|
|
||||||
back from JSON will have a content_type of JSON_CONTENT_TYPE.
|
|
||||||
|
|
||||||
:param msg: a Qpid Message object
|
|
||||||
:returns: None
|
|
||||||
"""
|
|
||||||
if msg.content_type == JSON_CONTENT_TYPE:
|
|
||||||
msg.content = jsonutils.loads(msg.content)
|
|
||||||
msg.content_type = 'amqp/map'
|
|
||||||
|
|
||||||
def consume(self):
|
|
||||||
"""Fetch the message and pass it to the callback object."""
|
|
||||||
message = self.receiver.fetch()
|
|
||||||
try:
|
|
||||||
self._unpack_json_msg(message)
|
|
||||||
self.callback(QpidMessage(self.session, message))
|
|
||||||
except Exception:
|
|
||||||
LOG.exception(_LE("Failed to process message... skipping it."))
|
|
||||||
self.session.acknowledge(message)
|
|
||||||
|
|
||||||
def get_receiver(self):
|
|
||||||
return self.receiver
|
|
||||||
|
|
||||||
def get_node_name(self):
|
|
||||||
return self.address.split(';')[0]
|
|
||||||
|
|
||||||
|
|
||||||
class DirectConsumer(ConsumerBase):
|
|
||||||
"""Queue/consumer class for 'direct'."""
|
|
||||||
|
|
||||||
def __init__(self, conf, session, msg_id, callback):
|
|
||||||
"""Init a 'direct' queue.
|
|
||||||
|
|
||||||
'session' is the amqp session to use
|
|
||||||
'msg_id' is the msg_id to listen on
|
|
||||||
'callback' is the callback to call when messages are received
|
|
||||||
"""
|
|
||||||
|
|
||||||
link_opts = {
|
|
||||||
"exclusive": True,
|
|
||||||
"durable": conf.amqp_durable_queues,
|
|
||||||
}
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
node_name = "%s/%s" % (msg_id, msg_id)
|
|
||||||
node_opts = {"type": "direct"}
|
|
||||||
link_name = msg_id
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
node_name = "amq.direct/%s" % msg_id
|
|
||||||
node_opts = {}
|
|
||||||
link_name = msg_id
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
super(DirectConsumer, self).__init__(conf, session, callback,
|
|
||||||
node_name, node_opts, link_name,
|
|
||||||
link_opts)
|
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(ConsumerBase):
|
|
||||||
"""Consumer class for 'topic'."""
|
|
||||||
|
|
||||||
def __init__(self, conf, session, topic, callback, exchange_name,
|
|
||||||
name=None):
|
|
||||||
"""Init a 'topic' queue.
|
|
||||||
|
|
||||||
:param session: the amqp session to use
|
|
||||||
:param topic: is the topic to listen on
|
|
||||||
:paramtype topic: str
|
|
||||||
:param callback: the callback to call when messages are received
|
|
||||||
:param name: optional queue name, defaults to topic
|
|
||||||
"""
|
|
||||||
|
|
||||||
link_opts = {
|
|
||||||
"auto-delete": conf.amqp_auto_delete,
|
|
||||||
"durable": conf.amqp_durable_queues,
|
|
||||||
}
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
node_name = "%s/%s" % (exchange_name, topic)
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
|
|
||||||
{}, name or topic, link_opts)
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutConsumer(ConsumerBase):
|
|
||||||
"""Consumer class for 'fanout'."""
|
|
||||||
|
|
||||||
def __init__(self, conf, session, topic, callback):
|
|
||||||
"""Init a 'fanout' queue.
|
|
||||||
|
|
||||||
'session' is the amqp session to use
|
|
||||||
'topic' is the topic to listen on
|
|
||||||
'callback' is the callback to call when messages are received
|
|
||||||
"""
|
|
||||||
self.conf = conf
|
|
||||||
|
|
||||||
link_opts = {"exclusive": True}
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
node_name = "%s_fanout" % topic
|
|
||||||
node_opts = {"durable": False, "type": "fanout"}
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
node_name = "amq.topic/fanout/%s" % topic
|
|
||||||
node_opts = {}
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
super(FanoutConsumer, self).__init__(conf, session, callback,
|
|
||||||
node_name, node_opts, None,
|
|
||||||
link_opts)
|
|
||||||
|
|
||||||
|
|
||||||
class Publisher(object):
|
|
||||||
"""Base Publisher class."""
|
|
||||||
|
|
||||||
def __init__(self, conf, session, node_name, node_opts=None):
|
|
||||||
"""Init the Publisher class with the exchange_name, routing_key,
|
|
||||||
and other options
|
|
||||||
"""
|
|
||||||
self.sender = None
|
|
||||||
self.session = session
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
addr_opts = {
|
|
||||||
"create": "always",
|
|
||||||
"node": {
|
|
||||||
"type": "topic",
|
|
||||||
"x-declare": {
|
|
||||||
"durable": False,
|
|
||||||
# auto-delete isn't implemented for exchanges in qpid,
|
|
||||||
# but put in here anyway
|
|
||||||
"auto-delete": True,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if node_opts:
|
|
||||||
addr_opts["node"]["x-declare"].update(node_opts)
|
|
||||||
|
|
||||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
self.address = node_name
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
self.reconnect(session)
|
|
||||||
|
|
||||||
def reconnect(self, session):
|
|
||||||
"""Re-establish the Sender after a reconnection."""
|
|
||||||
self.sender = session.sender(self.address)
|
|
||||||
|
|
||||||
def _pack_json_msg(self, msg):
|
|
||||||
"""Qpid cannot serialize dicts containing strings longer than 65535
|
|
||||||
characters. This function dumps the message content to a JSON
|
|
||||||
string, which Qpid is able to handle.
|
|
||||||
|
|
||||||
:param msg: May be either a Qpid Message object or a bare dict.
|
|
||||||
:returns: A Qpid Message with its content field JSON encoded.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
msg.content = jsonutils.dumps(msg.content)
|
|
||||||
except AttributeError:
|
|
||||||
# Need to have a Qpid message so we can set the content_type.
|
|
||||||
msg = qpid_messaging.Message(jsonutils.dumps(msg))
|
|
||||||
msg.content_type = JSON_CONTENT_TYPE
|
|
||||||
return msg
|
|
||||||
|
|
||||||
def send(self, msg):
|
|
||||||
"""Send a message."""
|
|
||||||
try:
|
|
||||||
# Check if Qpid can encode the message
|
|
||||||
check_msg = msg
|
|
||||||
if not hasattr(check_msg, 'content_type'):
|
|
||||||
check_msg = qpid_messaging.Message(msg)
|
|
||||||
content_type = check_msg.content_type
|
|
||||||
enc, dec = qpid_messaging.message.get_codec(content_type)
|
|
||||||
enc(check_msg.content)
|
|
||||||
except qpid_codec.CodecException:
|
|
||||||
# This means the message couldn't be serialized as a dict.
|
|
||||||
msg = self._pack_json_msg(msg)
|
|
||||||
self.sender.send(msg)
|
|
||||||
|
|
||||||
|
|
||||||
class DirectPublisher(Publisher):
|
|
||||||
"""Publisher class for 'direct'."""
|
|
||||||
def __init__(self, conf, session, topic):
|
|
||||||
"""Init a 'direct' publisher."""
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
node_name = "%s/%s" % (topic, topic)
|
|
||||||
node_opts = {"type": "direct"}
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
node_name = "amq.direct/%s" % topic
|
|
||||||
node_opts = {}
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
super(DirectPublisher, self).__init__(conf, session, node_name,
|
|
||||||
node_opts)
|
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
|
||||||
"""Publisher class for 'topic'."""
|
|
||||||
def __init__(self, conf, session, exchange_name, topic):
|
|
||||||
"""Init a 'topic' publisher.
|
|
||||||
"""
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
node_name = "%s/%s" % (exchange_name, topic)
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
super(TopicPublisher, self).__init__(conf, session, node_name)
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutPublisher(Publisher):
|
|
||||||
"""Publisher class for 'fanout'."""
|
|
||||||
def __init__(self, conf, session, topic):
|
|
||||||
"""Init a 'fanout' publisher.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
node_name = "%s_fanout" % topic
|
|
||||||
node_opts = {"type": "fanout"}
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
node_name = "amq.topic/fanout/%s" % topic
|
|
||||||
node_opts = {}
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
super(FanoutPublisher, self).__init__(conf, session, node_name,
|
|
||||||
node_opts)
|
|
||||||
|
|
||||||
|
|
||||||
class NotifyPublisher(Publisher):
|
|
||||||
"""Publisher class for notifications."""
|
|
||||||
def __init__(self, conf, session, exchange_name, topic):
|
|
||||||
"""Init a 'topic' publisher.
|
|
||||||
"""
|
|
||||||
node_opts = {"durable": True}
|
|
||||||
|
|
||||||
if conf.qpid_topology_version == 1:
|
|
||||||
node_name = "%s/%s" % (exchange_name, topic)
|
|
||||||
elif conf.qpid_topology_version == 2:
|
|
||||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
|
||||||
else:
|
|
||||||
raise_invalid_topology_version(conf)
|
|
||||||
|
|
||||||
super(NotifyPublisher, self).__init__(conf, session, node_name,
|
|
||||||
node_opts)
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
|
||||||
"""Connection object."""
|
|
||||||
|
|
||||||
pools = {}
|
|
||||||
|
|
||||||
def __init__(self, conf, url, purpose):
|
|
||||||
if not qpid_messaging:
|
|
||||||
raise ImportError("Failed to import qpid.messaging")
|
|
||||||
|
|
||||||
self.connection = None
|
|
||||||
self.session = None
|
|
||||||
self.consumers = {}
|
|
||||||
self.conf = conf
|
|
||||||
self.driver_conf = conf.oslo_messaging_qpid
|
|
||||||
|
|
||||||
self._consume_loop_stopped = False
|
|
||||||
|
|
||||||
self.brokers_params = []
|
|
||||||
if url.hosts:
|
|
||||||
for host in url.hosts:
|
|
||||||
params = {
|
|
||||||
'username': host.username or '',
|
|
||||||
'password': host.password or '',
|
|
||||||
}
|
|
||||||
if host.port is not None:
|
|
||||||
params['host'] = '%s:%d' % (host.hostname, host.port)
|
|
||||||
else:
|
|
||||||
params['host'] = host.hostname
|
|
||||||
self.brokers_params.append(params)
|
|
||||||
else:
|
|
||||||
# Old configuration format
|
|
||||||
for adr in self.driver_conf.qpid_hosts:
|
|
||||||
hostname, port = netutils.parse_host_port(
|
|
||||||
adr, default_port=5672)
|
|
||||||
|
|
||||||
if ':' in hostname:
|
|
||||||
hostname = '[' + hostname + ']'
|
|
||||||
|
|
||||||
params = {
|
|
||||||
'host': '%s:%d' % (hostname, port),
|
|
||||||
'username': self.driver_conf.qpid_username,
|
|
||||||
'password': self.driver_conf.qpid_password,
|
|
||||||
}
|
|
||||||
self.brokers_params.append(params)
|
|
||||||
|
|
||||||
random.shuffle(self.brokers_params)
|
|
||||||
self.brokers = itertools.cycle(self.brokers_params)
|
|
||||||
|
|
||||||
self._initial_pid = os.getpid()
|
|
||||||
self.reconnect()
|
|
||||||
|
|
||||||
def _connect(self, broker):
|
|
||||||
# Create the connection - this does not open the connection
|
|
||||||
self.connection = qpid_messaging.Connection(broker['host'])
|
|
||||||
|
|
||||||
# Check if flags are set and if so set them for the connection
|
|
||||||
# before we call open
|
|
||||||
self.connection.username = broker['username']
|
|
||||||
self.connection.password = broker['password']
|
|
||||||
|
|
||||||
self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms
|
|
||||||
# Reconnection is done by self.reconnect()
|
|
||||||
self.connection.reconnect = False
|
|
||||||
self.connection.heartbeat = self.driver_conf.qpid_heartbeat
|
|
||||||
self.connection.transport = self.driver_conf.qpid_protocol
|
|
||||||
self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay
|
|
||||||
self.connection.open()
|
|
||||||
|
|
||||||
def _register_consumer(self, consumer):
|
|
||||||
self.consumers[six.text_type(consumer.get_receiver())] = consumer
|
|
||||||
|
|
||||||
def _lookup_consumer(self, receiver):
|
|
||||||
return self.consumers[six.text_type(receiver)]
|
|
||||||
|
|
||||||
def _disconnect(self):
|
|
||||||
# Close the session if necessary
|
|
||||||
if self.connection is not None and self.connection.opened():
|
|
||||||
try:
|
|
||||||
self.connection.close()
|
|
||||||
except qpid_exceptions.MessagingError:
|
|
||||||
pass
|
|
||||||
self.connection = None
|
|
||||||
|
|
||||||
def reconnect(self, retry=None):
|
|
||||||
"""Handles reconnecting and re-establishing sessions and queues.
|
|
||||||
Will retry up to retry number of times.
|
|
||||||
retry = None or -1 means to retry forever
|
|
||||||
retry = 0 means no retry
|
|
||||||
retry = N means N retries
|
|
||||||
"""
|
|
||||||
delay = 1
|
|
||||||
attempt = 0
|
|
||||||
loop_forever = False
|
|
||||||
if retry is None or retry < 0:
|
|
||||||
loop_forever = True
|
|
||||||
|
|
||||||
while True:
|
|
||||||
self._disconnect()
|
|
||||||
|
|
||||||
attempt += 1
|
|
||||||
broker = six.next(self.brokers)
|
|
||||||
try:
|
|
||||||
self._connect(broker)
|
|
||||||
except qpid_exceptions.MessagingError as e:
|
|
||||||
msg_dict = dict(e=e,
|
|
||||||
delay=delay,
|
|
||||||
retry=retry,
|
|
||||||
broker=broker)
|
|
||||||
if not loop_forever and attempt > retry:
|
|
||||||
msg = _('Unable to connect to AMQP server on '
|
|
||||||
'%(broker)s after %(retry)d '
|
|
||||||
'tries: %(e)s') % msg_dict
|
|
||||||
LOG.error(msg)
|
|
||||||
raise exceptions.MessageDeliveryFailure(msg)
|
|
||||||
else:
|
|
||||||
msg = _LE("Unable to connect to AMQP server on "
|
|
||||||
"%(broker)s: %(e)s. Sleeping %(delay)s seconds")
|
|
||||||
LOG.error(msg, msg_dict)
|
|
||||||
time.sleep(delay)
|
|
||||||
delay = min(delay + 1, 5)
|
|
||||||
else:
|
|
||||||
LOG.info(_LI('Connected to AMQP server on %s'), broker['host'])
|
|
||||||
break
|
|
||||||
|
|
||||||
self.session = self.connection.session()
|
|
||||||
|
|
||||||
if self.consumers:
|
|
||||||
consumers = self.consumers
|
|
||||||
self.consumers = {}
|
|
||||||
|
|
||||||
for consumer in six.itervalues(consumers):
|
|
||||||
consumer.reconnect(self.session)
|
|
||||||
self._register_consumer(consumer)
|
|
||||||
|
|
||||||
LOG.debug("Re-established AMQP queues")
|
|
||||||
|
|
||||||
def ensure(self, error_callback, method, retry=None):
|
|
||||||
|
|
||||||
current_pid = os.getpid()
|
|
||||||
if self._initial_pid != current_pid:
|
|
||||||
# NOTE(sileht):
|
|
||||||
# to get the same level of fork support that rabbit driver have
|
|
||||||
# (ie: allow fork before the first connection established)
|
|
||||||
# we could use the kombu workaround:
|
|
||||||
# https://github.com/celery/kombu/blob/master/kombu/transport/
|
|
||||||
# qpid_patches.py#L67
|
|
||||||
LOG.warn("Process forked! "
|
|
||||||
"This can result in unpredictable behavior. "
|
|
||||||
"See: http://docs.openstack.org/developer/"
|
|
||||||
"oslo_messaging/transport.html")
|
|
||||||
self._initial_pid = current_pid
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
return method()
|
|
||||||
except (qpid_exceptions.Empty,
|
|
||||||
qpid_exceptions.MessagingError) as e:
|
|
||||||
if error_callback:
|
|
||||||
error_callback(e)
|
|
||||||
self.reconnect(retry=retry)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""Close/release this connection."""
|
|
||||||
try:
|
|
||||||
self.connection.close()
|
|
||||||
except Exception:
|
|
||||||
# NOTE(dripton) Logging exceptions that happen during cleanup just
|
|
||||||
# causes confusion; there's really nothing useful we can do with
|
|
||||||
# them.
|
|
||||||
pass
|
|
||||||
self.connection = None
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
"""Reset a connection so it can be used again."""
|
|
||||||
self.session.close()
|
|
||||||
self.session = self.connection.session()
|
|
||||||
self.consumers = {}
|
|
||||||
|
|
||||||
def declare_consumer(self, consumer_cls, topic, callback):
|
|
||||||
"""Create a Consumer using the class that was passed in and
|
|
||||||
add it to our list of consumers
|
|
||||||
"""
|
|
||||||
def _connect_error(exc):
|
|
||||||
log_info = {'topic': topic, 'err_str': exc}
|
|
||||||
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
|
|
||||||
"%(err_str)s"), log_info)
|
|
||||||
|
|
||||||
def _declare_consumer():
|
|
||||||
consumer = consumer_cls(self.driver_conf, self.session, topic,
|
|
||||||
callback)
|
|
||||||
self._register_consumer(consumer)
|
|
||||||
return consumer
|
|
||||||
|
|
||||||
return self.ensure(_connect_error, _declare_consumer)
|
|
||||||
|
|
||||||
def consume(self, timeout=None):
|
|
||||||
"""Consume from all queues/consumers."""
|
|
||||||
|
|
||||||
timer = rpc_common.DecayingTimer(duration=timeout)
|
|
||||||
timer.start()
|
|
||||||
|
|
||||||
def _raise_timeout(exc):
|
|
||||||
LOG.debug('Timed out waiting for RPC response: %s', exc)
|
|
||||||
raise rpc_common.Timeout()
|
|
||||||
|
|
||||||
def _error_callback(exc):
|
|
||||||
timer.check_return(_raise_timeout, exc)
|
|
||||||
LOG.exception(_LE('Failed to consume message from queue: %s'), exc)
|
|
||||||
|
|
||||||
def _consume():
|
|
||||||
# NOTE(sileht):
|
|
||||||
# maximum value chosen according the best practice from kombu:
|
|
||||||
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
|
|
||||||
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
if self._consume_loop_stopped:
|
|
||||||
self._consume_loop_stopped = False
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
nxt_receiver = self.session.next_receiver(
|
|
||||||
timeout=poll_timeout)
|
|
||||||
except qpid_exceptions.Empty as exc:
|
|
||||||
poll_timeout = timer.check_return(_raise_timeout, exc,
|
|
||||||
maximum=1)
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._lookup_consumer(nxt_receiver).consume()
|
|
||||||
except Exception:
|
|
||||||
LOG.exception(_LE("Error processing message. "
|
|
||||||
"Skipping it."))
|
|
||||||
|
|
||||||
self.ensure(_error_callback, _consume)
|
|
||||||
|
|
||||||
def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
|
|
||||||
"""Send to a publisher based on the publisher class."""
|
|
||||||
|
|
||||||
def _connect_error(exc):
|
|
||||||
log_info = {'topic': topic, 'err_str': exc}
|
|
||||||
LOG.exception(_LE("Failed to publish message to topic "
|
|
||||||
"'%(topic)s': %(err_str)s"), log_info)
|
|
||||||
|
|
||||||
def _publisher_send():
|
|
||||||
publisher = cls(self.driver_conf, self.session, topic=topic,
|
|
||||||
**kwargs)
|
|
||||||
publisher.send(msg)
|
|
||||||
|
|
||||||
return self.ensure(_connect_error, _publisher_send, retry=retry)
|
|
||||||
|
|
||||||
def declare_direct_consumer(self, topic, callback):
|
|
||||||
"""Create a 'direct' queue.
|
|
||||||
In nova's use, this is generally a msg_id queue used for
|
|
||||||
responses for call/multicall
|
|
||||||
"""
|
|
||||||
self.declare_consumer(DirectConsumer, topic, callback)
|
|
||||||
|
|
||||||
def declare_topic_consumer(self, exchange_name, topic, callback=None,
|
|
||||||
queue_name=None):
|
|
||||||
"""Create a 'topic' consumer."""
|
|
||||||
self.declare_consumer(functools.partial(TopicConsumer,
|
|
||||||
name=queue_name,
|
|
||||||
exchange_name=exchange_name,
|
|
||||||
),
|
|
||||||
topic, callback)
|
|
||||||
|
|
||||||
def declare_fanout_consumer(self, topic, callback):
|
|
||||||
"""Create a 'fanout' consumer."""
|
|
||||||
self.declare_consumer(FanoutConsumer, topic, callback)
|
|
||||||
|
|
||||||
def direct_send(self, msg_id, msg):
|
|
||||||
"""Send a 'direct' message."""
|
|
||||||
self.publisher_send(DirectPublisher, topic=msg_id, msg=msg)
|
|
||||||
|
|
||||||
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
|
|
||||||
"""Send a 'topic' message."""
|
|
||||||
#
|
|
||||||
# We want to create a message with attributes, for example a TTL. We
|
|
||||||
# don't really need to keep 'msg' in its JSON format any longer
|
|
||||||
# so let's create an actual Qpid message here and get some
|
|
||||||
# value-add on the go.
|
|
||||||
#
|
|
||||||
# WARNING: Request timeout happens to be in the same units as
|
|
||||||
# Qpid's TTL (seconds). If this changes in the future, then this
|
|
||||||
# will need to be altered accordingly.
|
|
||||||
#
|
|
||||||
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
|
|
||||||
self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message,
|
|
||||||
exchange_name=exchange_name, retry=retry)
|
|
||||||
|
|
||||||
def fanout_send(self, topic, msg, retry=None):
|
|
||||||
"""Send a 'fanout' message."""
|
|
||||||
self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
|
|
||||||
|
|
||||||
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
|
|
||||||
"""Send a notify message on a topic."""
|
|
||||||
self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
|
|
||||||
exchange_name=exchange_name, retry=retry)
|
|
||||||
|
|
||||||
def stop_consuming(self):
|
|
||||||
self._consume_loop_stopped = True
|
|
||||||
|
|
||||||
|
|
||||||
class QpidDriver(amqpdriver.AMQPDriverBase):
|
|
||||||
"""qpidd Driver
|
|
||||||
|
|
||||||
.. deprecated:: 1.16 (Liberty)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, conf, url,
|
|
||||||
default_exchange=None, allowed_remote_exmods=None):
|
|
||||||
|
|
||||||
warnings.warn(_('The Qpid driver has been deprecated. '
|
|
||||||
'The driver is planned to be removed during the '
|
|
||||||
'`Mitaka` development cycle.'),
|
|
||||||
DeprecationWarning, stacklevel=2)
|
|
||||||
|
|
||||||
opt_group = cfg.OptGroup(name='oslo_messaging_qpid',
|
|
||||||
title='QPID driver options')
|
|
||||||
conf.register_group(opt_group)
|
|
||||||
conf.register_opts(qpid_opts, group=opt_group)
|
|
||||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
|
||||||
conf.register_opts(base.base_opts, group=opt_group)
|
|
||||||
|
|
||||||
connection_pool = rpc_amqp.ConnectionPool(
|
|
||||||
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
|
|
||||||
url, Connection)
|
|
||||||
|
|
||||||
super(QpidDriver, self).__init__(
|
|
||||||
conf, url,
|
|
||||||
connection_pool,
|
|
||||||
default_exchange,
|
|
||||||
allowed_remote_exmods,
|
|
||||||
conf.oslo_messaging_qpid.send_single_reply,
|
|
||||||
)
|
|
@ -50,9 +50,6 @@ class ConfFixture(fixtures.Fixture):
|
|||||||
_import_opts(self.conf,
|
_import_opts(self.conf,
|
||||||
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
||||||
'oslo_messaging_rabbit')
|
'oslo_messaging_rabbit')
|
||||||
_import_opts(self.conf,
|
|
||||||
'oslo_messaging._drivers.impl_qpid', 'qpid_opts',
|
|
||||||
'oslo_messaging_qpid')
|
|
||||||
_import_opts(self.conf,
|
_import_opts(self.conf,
|
||||||
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
'oslo_messaging._drivers.amqp', 'amqp_opts',
|
||||||
'oslo_messaging_qpid')
|
'oslo_messaging_qpid')
|
||||||
@ -77,7 +74,7 @@ class ConfFixture(fixtures.Fixture):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def transport_driver(self):
|
def transport_driver(self):
|
||||||
"""The transport driver - for example 'rabbit', 'qpid' or 'fake'."""
|
"""The transport driver - for example 'rabbit', 'amqp' or 'fake'."""
|
||||||
return self.conf.rpc_backend
|
return self.conf.rpc_backend
|
||||||
|
|
||||||
@transport_driver.setter
|
@transport_driver.setter
|
||||||
|
@ -33,7 +33,7 @@ class LoggingNotificationHandler(logging.Handler):
|
|||||||
[handler_notifier]
|
[handler_notifier]
|
||||||
class=oslo_messaging.LoggingNotificationHandler
|
class=oslo_messaging.LoggingNotificationHandler
|
||||||
level=ERROR
|
level=ERROR
|
||||||
args=('qpid:///')
|
args=('rabbit:///')
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -22,7 +22,6 @@ import itertools
|
|||||||
|
|
||||||
from oslo_messaging._drivers import amqp
|
from oslo_messaging._drivers import amqp
|
||||||
from oslo_messaging._drivers import base as drivers_base
|
from oslo_messaging._drivers import base as drivers_base
|
||||||
from oslo_messaging._drivers import impl_qpid
|
|
||||||
from oslo_messaging._drivers import impl_rabbit
|
from oslo_messaging._drivers import impl_rabbit
|
||||||
from oslo_messaging._drivers import impl_zmq
|
from oslo_messaging._drivers import impl_zmq
|
||||||
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
|
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
|
||||||
@ -48,8 +47,6 @@ _opts = [
|
|||||||
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
|
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
|
||||||
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
|
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
|
||||||
impl_rabbit.rabbit_opts))),
|
impl_rabbit.rabbit_opts))),
|
||||||
('oslo_messaging_qpid', list(itertools.chain(amqp.amqp_opts,
|
|
||||||
impl_qpid.qpid_opts)))
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,850 +0,0 @@
|
|||||||
# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import operator
|
|
||||||
import random
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
|
|
||||||
try:
|
|
||||||
import qpid
|
|
||||||
except ImportError:
|
|
||||||
qpid = None
|
|
||||||
from six.moves import _thread
|
|
||||||
import testscenarios
|
|
||||||
import testtools
|
|
||||||
|
|
||||||
import oslo_messaging
|
|
||||||
from oslo_messaging._drivers import amqp
|
|
||||||
from oslo_messaging._drivers import impl_qpid as qpid_driver
|
|
||||||
from oslo_messaging.tests import utils as test_utils
|
|
||||||
from six.moves import mock
|
|
||||||
|
|
||||||
|
|
||||||
load_tests = testscenarios.load_tests_apply_scenarios
|
|
||||||
|
|
||||||
QPID_BROKER = 'localhost:5672'
|
|
||||||
|
|
||||||
|
|
||||||
class TestQpidDriverLoad(test_utils.BaseTestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestQpidDriverLoad, self).setUp()
|
|
||||||
self.messaging_conf.transport_driver = 'qpid'
|
|
||||||
|
|
||||||
def test_driver_load(self):
|
|
||||||
transport = oslo_messaging.get_transport(self.conf)
|
|
||||||
self.assertIsInstance(transport._driver, qpid_driver.QpidDriver)
|
|
||||||
|
|
||||||
|
|
||||||
def _is_qpidd_service_running():
|
|
||||||
|
|
||||||
"""this function checks if the qpid service is running or not."""
|
|
||||||
|
|
||||||
qpid_running = True
|
|
||||||
try:
|
|
||||||
broker = QPID_BROKER
|
|
||||||
connection = qpid.messaging.Connection(broker)
|
|
||||||
connection.open()
|
|
||||||
except Exception:
|
|
||||||
# qpid service is not running.
|
|
||||||
qpid_running = False
|
|
||||||
else:
|
|
||||||
connection.close()
|
|
||||||
|
|
||||||
return qpid_running
|
|
||||||
|
|
||||||
|
|
||||||
class _QpidBaseTestCase(test_utils.BaseTestCase):
|
|
||||||
|
|
||||||
@testtools.skipIf(qpid is None, "qpid not available")
|
|
||||||
def setUp(self):
|
|
||||||
super(_QpidBaseTestCase, self).setUp()
|
|
||||||
self.messaging_conf.transport_driver = 'qpid'
|
|
||||||
self.fake_qpid = not _is_qpidd_service_running()
|
|
||||||
|
|
||||||
if self.fake_qpid:
|
|
||||||
self.session_receive = get_fake_qpid_session()
|
|
||||||
self.session_send = get_fake_qpid_session()
|
|
||||||
else:
|
|
||||||
self.broker = QPID_BROKER
|
|
||||||
# create connection from the qpid.messaging
|
|
||||||
# connection for the Consumer.
|
|
||||||
self.con_receive = qpid.messaging.Connection(self.broker)
|
|
||||||
self.con_receive.open()
|
|
||||||
# session to receive the messages
|
|
||||||
self.session_receive = self.con_receive.session()
|
|
||||||
|
|
||||||
# connection for sending the message
|
|
||||||
self.con_send = qpid.messaging.Connection(self.broker)
|
|
||||||
self.con_send.open()
|
|
||||||
# session to send the messages
|
|
||||||
self.session_send = self.con_send.session()
|
|
||||||
|
|
||||||
# list to store the expected messages and
|
|
||||||
# the actual received messages
|
|
||||||
self._expected = []
|
|
||||||
self._messages = []
|
|
||||||
self.initialized = True
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
super(_QpidBaseTestCase, self).tearDown()
|
|
||||||
|
|
||||||
if self.initialized:
|
|
||||||
if self.fake_qpid:
|
|
||||||
_fake_session.flush_exchanges()
|
|
||||||
else:
|
|
||||||
self.con_receive.close()
|
|
||||||
self.con_send.close()
|
|
||||||
|
|
||||||
|
|
||||||
class TestQpidTransportURL(_QpidBaseTestCase):
|
|
||||||
|
|
||||||
scenarios = [
|
|
||||||
('none', dict(url=None,
|
|
||||||
expected=[dict(host='localhost:5672',
|
|
||||||
username='',
|
|
||||||
password='')])),
|
|
||||||
('empty',
|
|
||||||
dict(url='qpid:///',
|
|
||||||
expected=[dict(host='localhost:5672',
|
|
||||||
username='',
|
|
||||||
password='')])),
|
|
||||||
('localhost',
|
|
||||||
dict(url='qpid://localhost/',
|
|
||||||
expected=[dict(host='localhost',
|
|
||||||
username='',
|
|
||||||
password='')])),
|
|
||||||
('no_creds',
|
|
||||||
dict(url='qpid://host/',
|
|
||||||
expected=[dict(host='host',
|
|
||||||
username='',
|
|
||||||
password='')])),
|
|
||||||
('no_port',
|
|
||||||
dict(url='qpid://user:password@host/',
|
|
||||||
expected=[dict(host='host',
|
|
||||||
username='user',
|
|
||||||
password='password')])),
|
|
||||||
('full_url',
|
|
||||||
dict(url='qpid://user:password@host:10/',
|
|
||||||
expected=[dict(host='host:10',
|
|
||||||
username='user',
|
|
||||||
password='password')])),
|
|
||||||
('full_two_url',
|
|
||||||
dict(url='qpid://user:password@host:10,'
|
|
||||||
'user2:password2@host2:12/',
|
|
||||||
expected=[dict(host='host:10',
|
|
||||||
username='user',
|
|
||||||
password='password'),
|
|
||||||
dict(host='host2:12',
|
|
||||||
username='user2',
|
|
||||||
password='password2')
|
|
||||||
]
|
|
||||||
)),
|
|
||||||
|
|
||||||
]
|
|
||||||
|
|
||||||
@mock.patch.object(qpid_driver.Connection, 'reconnect')
|
|
||||||
def test_transport_url(self, *args):
|
|
||||||
transport = oslo_messaging.get_transport(self.conf, self.url)
|
|
||||||
self.addCleanup(transport.cleanup)
|
|
||||||
driver = transport._driver
|
|
||||||
|
|
||||||
brokers_params = driver._get_connection().brokers_params
|
|
||||||
self.assertEqual(sorted(self.expected,
|
|
||||||
key=operator.itemgetter('host')),
|
|
||||||
sorted(brokers_params,
|
|
||||||
key=operator.itemgetter('host')))
|
|
||||||
|
|
||||||
|
|
||||||
class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
|
|
||||||
"""Unit test cases to test invalid qpid topology version."""
|
|
||||||
|
|
||||||
scenarios = [
|
|
||||||
('direct', dict(consumer_cls=qpid_driver.DirectConsumer,
|
|
||||||
consumer_kwargs={},
|
|
||||||
publisher_cls=qpid_driver.DirectPublisher,
|
|
||||||
publisher_kwargs={})),
|
|
||||||
('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
|
|
||||||
consumer_kwargs={'exchange_name': 'openstack'},
|
|
||||||
publisher_cls=qpid_driver.TopicPublisher,
|
|
||||||
publisher_kwargs={'exchange_name': 'openstack'})),
|
|
||||||
('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
|
|
||||||
consumer_kwargs={},
|
|
||||||
publisher_cls=qpid_driver.FanoutPublisher,
|
|
||||||
publisher_kwargs={})),
|
|
||||||
]
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestQpidInvalidTopologyVersion, self).setUp()
|
|
||||||
self.config(qpid_topology_version=-1,
|
|
||||||
group='oslo_messaging_qpid')
|
|
||||||
|
|
||||||
def test_invalid_topology_version(self):
|
|
||||||
def consumer_callback(msg):
|
|
||||||
pass
|
|
||||||
|
|
||||||
msgid_or_topic = 'test'
|
|
||||||
|
|
||||||
# not using self.assertRaises because
|
|
||||||
# 1. qpid driver raises Exception(msg) for invalid topology version
|
|
||||||
# 2. flake8 - H202 assertRaises Exception too broad
|
|
||||||
exception_msg = ("Invalid value for qpid_topology_version: %d" %
|
|
||||||
self.conf.oslo_messaging_qpid.qpid_topology_version)
|
|
||||||
recvd_exc_msg = ''
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.consumer_cls(self.conf.oslo_messaging_qpid,
|
|
||||||
self.session_receive,
|
|
||||||
msgid_or_topic,
|
|
||||||
consumer_callback,
|
|
||||||
**self.consumer_kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
recvd_exc_msg = e.message
|
|
||||||
|
|
||||||
self.assertEqual(exception_msg, recvd_exc_msg)
|
|
||||||
|
|
||||||
recvd_exc_msg = ''
|
|
||||||
try:
|
|
||||||
self.publisher_cls(self.conf.oslo_messaging_qpid,
|
|
||||||
self.session_send,
|
|
||||||
topic=msgid_or_topic,
|
|
||||||
**self.publisher_kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
recvd_exc_msg = e.message
|
|
||||||
|
|
||||||
self.assertEqual(exception_msg, recvd_exc_msg)
|
|
||||||
|
|
||||||
|
|
||||||
class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
|
|
||||||
"""Unit test cases to test DirectConsumer and Direct Publisher."""
|
|
||||||
|
|
||||||
_n_qpid_topology = [
|
|
||||||
('v1', dict(qpid_topology=1)),
|
|
||||||
('v2', dict(qpid_topology=2)),
|
|
||||||
]
|
|
||||||
|
|
||||||
_n_msgs = [
|
|
||||||
('single', dict(no_msgs=1)),
|
|
||||||
('multiple', dict(no_msgs=10)),
|
|
||||||
]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def generate_scenarios(cls):
|
|
||||||
cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
|
|
||||||
cls._n_msgs)
|
|
||||||
|
|
||||||
def consumer_callback(self, msg):
|
|
||||||
# This function will be called by the DirectConsumer
|
|
||||||
# when any message is received.
|
|
||||||
# Append the received message into the messages list
|
|
||||||
# so that the received messages can be validated
|
|
||||||
# with the expected messages
|
|
||||||
if isinstance(msg, dict):
|
|
||||||
self._messages.append(msg['content'])
|
|
||||||
else:
|
|
||||||
self._messages.append(msg)
|
|
||||||
|
|
||||||
def test_qpid_direct_consumer_producer(self):
|
|
||||||
self.msgid = str(random.randint(1, 100))
|
|
||||||
|
|
||||||
# create a DirectConsumer and DirectPublisher class objects
|
|
||||||
self.dir_cons = qpid_driver.DirectConsumer(
|
|
||||||
self.conf.oslo_messaging_qpid,
|
|
||||||
self.session_receive,
|
|
||||||
self.msgid,
|
|
||||||
self.consumer_callback)
|
|
||||||
self.dir_pub = qpid_driver.DirectPublisher(
|
|
||||||
self.conf.oslo_messaging_qpid,
|
|
||||||
self.session_send,
|
|
||||||
self.msgid)
|
|
||||||
|
|
||||||
def try_send_msg(no_msgs):
|
|
||||||
for i in range(no_msgs):
|
|
||||||
self._expected.append(str(i))
|
|
||||||
snd_msg = {'content_type': 'text/plain', 'content': str(i)}
|
|
||||||
self.dir_pub.send(snd_msg)
|
|
||||||
|
|
||||||
def try_receive_msg(no_msgs):
|
|
||||||
for i in range(no_msgs):
|
|
||||||
self.dir_cons.consume()
|
|
||||||
|
|
||||||
thread1 = threading.Thread(target=try_receive_msg,
|
|
||||||
args=(self.no_msgs,))
|
|
||||||
thread2 = threading.Thread(target=try_send_msg,
|
|
||||||
args=(self.no_msgs,))
|
|
||||||
|
|
||||||
thread1.start()
|
|
||||||
thread2.start()
|
|
||||||
thread1.join()
|
|
||||||
thread2.join()
|
|
||||||
|
|
||||||
self.assertEqual(self.no_msgs, len(self._messages))
|
|
||||||
self.assertEqual(self._expected, self._messages)
|
|
||||||
|
|
||||||
|
|
||||||
TestQpidDirectConsumerPublisher.generate_scenarios()
|
|
||||||
|
|
||||||
|
|
||||||
class TestQpidTopicAndFanout(_QpidBaseTestCase):
|
|
||||||
"""Unit Test cases to test TopicConsumer and
|
|
||||||
TopicPublisher classes of the qpid driver
|
|
||||||
and FanoutConsumer and FanoutPublisher classes
|
|
||||||
of the qpid driver
|
|
||||||
"""
|
|
||||||
|
|
||||||
_n_qpid_topology = [
|
|
||||||
('v1', dict(qpid_topology=1)),
|
|
||||||
('v2', dict(qpid_topology=2)),
|
|
||||||
]
|
|
||||||
|
|
||||||
_n_msgs = [
|
|
||||||
('single', dict(no_msgs=1)),
|
|
||||||
('multiple', dict(no_msgs=10)),
|
|
||||||
]
|
|
||||||
|
|
||||||
_n_senders = [
|
|
||||||
('single', dict(no_senders=1)),
|
|
||||||
('multiple', dict(no_senders=10)),
|
|
||||||
]
|
|
||||||
|
|
||||||
_n_receivers = [
|
|
||||||
('single', dict(no_receivers=1)),
|
|
||||||
]
|
|
||||||
_exchange_class = [
|
|
||||||
('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
|
|
||||||
consumer_kwargs={'exchange_name': 'openstack'},
|
|
||||||
publisher_cls=qpid_driver.TopicPublisher,
|
|
||||||
publisher_kwargs={'exchange_name': 'openstack'},
|
|
||||||
topic='topictest.test',
|
|
||||||
receive_topic='topictest.test')),
|
|
||||||
('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
|
|
||||||
consumer_kwargs={},
|
|
||||||
publisher_cls=qpid_driver.FanoutPublisher,
|
|
||||||
publisher_kwargs={},
|
|
||||||
topic='fanouttest',
|
|
||||||
receive_topic='fanouttest')),
|
|
||||||
]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def generate_scenarios(cls):
|
|
||||||
cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
|
|
||||||
cls._n_msgs,
|
|
||||||
cls._n_senders,
|
|
||||||
cls._n_receivers,
|
|
||||||
cls._exchange_class)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestQpidTopicAndFanout, self).setUp()
|
|
||||||
|
|
||||||
# to store the expected messages and the
|
|
||||||
# actual received messages
|
|
||||||
#
|
|
||||||
# NOTE(dhellmann): These are dicts, where the base class uses
|
|
||||||
# lists.
|
|
||||||
self._expected = {}
|
|
||||||
self._messages = {}
|
|
||||||
|
|
||||||
self._senders = []
|
|
||||||
self._receivers = []
|
|
||||||
|
|
||||||
self._sender_threads = []
|
|
||||||
self._receiver_threads = []
|
|
||||||
|
|
||||||
def consumer_callback(self, msg):
|
|
||||||
"""callback function called by the ConsumerBase class of
|
|
||||||
qpid driver.
|
|
||||||
Message will be received in the format x-y
|
|
||||||
where x is the sender id and y is the msg number of the sender
|
|
||||||
extract the sender id 'x' and store the msg 'x-y' with 'x' as
|
|
||||||
the key
|
|
||||||
"""
|
|
||||||
|
|
||||||
if isinstance(msg, dict):
|
|
||||||
msgcontent = msg['content']
|
|
||||||
else:
|
|
||||||
msgcontent = msg
|
|
||||||
|
|
||||||
splitmsg = msgcontent.split('-')
|
|
||||||
key = _thread.get_ident()
|
|
||||||
|
|
||||||
if key not in self._messages:
|
|
||||||
self._messages[key] = dict()
|
|
||||||
|
|
||||||
tdict = self._messages[key]
|
|
||||||
|
|
||||||
if splitmsg[0] not in tdict:
|
|
||||||
tdict[splitmsg[0]] = []
|
|
||||||
|
|
||||||
tdict[splitmsg[0]].append(msgcontent)
|
|
||||||
|
|
||||||
def _try_send_msg(self, sender_id, no_msgs):
|
|
||||||
for i in range(no_msgs):
|
|
||||||
sendmsg = '%s-%s' % (str(sender_id), str(i))
|
|
||||||
key = str(sender_id)
|
|
||||||
# Store the message in the self._expected for each sender.
|
|
||||||
# This will be used later to
|
|
||||||
# validate the test by comparing it with the
|
|
||||||
# received messages by all the receivers
|
|
||||||
if key not in self._expected:
|
|
||||||
self._expected[key] = []
|
|
||||||
self._expected[key].append(sendmsg)
|
|
||||||
send_dict = {'content_type': 'text/plain', 'content': sendmsg}
|
|
||||||
self._senders[sender_id].send(send_dict)
|
|
||||||
|
|
||||||
def _try_receive_msg(self, receiver_id, no_msgs):
|
|
||||||
for i in range(self.no_senders * no_msgs):
|
|
||||||
no_of_attempts = 0
|
|
||||||
|
|
||||||
# ConsumerBase.consume blocks indefinitely until a message
|
|
||||||
# is received.
|
|
||||||
# So qpid_receiver.available() is called before calling
|
|
||||||
# ConsumerBase.consume() so that we are not
|
|
||||||
# blocked indefinitely
|
|
||||||
qpid_receiver = self._receivers[receiver_id].get_receiver()
|
|
||||||
while no_of_attempts < 50:
|
|
||||||
if qpid_receiver.available() > 0:
|
|
||||||
self._receivers[receiver_id].consume()
|
|
||||||
break
|
|
||||||
no_of_attempts += 1
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
def test_qpid_topic_and_fanout(self):
|
|
||||||
for receiver_id in range(self.no_receivers):
|
|
||||||
consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
|
|
||||||
self.session_receive,
|
|
||||||
self.receive_topic,
|
|
||||||
self.consumer_callback,
|
|
||||||
**self.consumer_kwargs)
|
|
||||||
self._receivers.append(consumer)
|
|
||||||
|
|
||||||
# create receivers threads
|
|
||||||
thread = threading.Thread(target=self._try_receive_msg,
|
|
||||||
args=(receiver_id, self.no_msgs,))
|
|
||||||
self._receiver_threads.append(thread)
|
|
||||||
|
|
||||||
for sender_id in range(self.no_senders):
|
|
||||||
publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
|
|
||||||
self.session_send,
|
|
||||||
topic=self.topic,
|
|
||||||
**self.publisher_kwargs)
|
|
||||||
self._senders.append(publisher)
|
|
||||||
|
|
||||||
# create sender threads
|
|
||||||
thread = threading.Thread(target=self._try_send_msg,
|
|
||||||
args=(sender_id, self.no_msgs,))
|
|
||||||
self._sender_threads.append(thread)
|
|
||||||
|
|
||||||
for thread in self._receiver_threads:
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
for thread in self._sender_threads:
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
for thread in self._receiver_threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
for thread in self._sender_threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
# Each receiver should receive all the messages sent by
|
|
||||||
# the sender(s).
|
|
||||||
# So, Iterate through each of the receiver items in
|
|
||||||
# self._messages and compare with the expected messages
|
|
||||||
# messages.
|
|
||||||
|
|
||||||
self.assertEqual(self.no_senders, len(self._expected))
|
|
||||||
self.assertEqual(self.no_receivers, len(self._messages))
|
|
||||||
|
|
||||||
for key, messages in self._messages.iteritems():
|
|
||||||
self.assertEqual(self._expected, messages)
|
|
||||||
|
|
||||||
TestQpidTopicAndFanout.generate_scenarios()
|
|
||||||
|
|
||||||
|
|
||||||
class AddressNodeMatcher(object):
|
|
||||||
def __init__(self, node):
|
|
||||||
self.node = node
|
|
||||||
|
|
||||||
def __eq__(self, address):
|
|
||||||
return address.split(';')[0].strip() == self.node
|
|
||||||
|
|
||||||
|
|
||||||
class TestDriverInterface(_QpidBaseTestCase):
|
|
||||||
"""Unit Test cases to test the amqpdriver with qpid
|
|
||||||
"""
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestDriverInterface, self).setUp()
|
|
||||||
self.config(qpid_topology_version=2,
|
|
||||||
group='oslo_messaging_qpid')
|
|
||||||
transport = oslo_messaging.get_transport(self.conf)
|
|
||||||
self.driver = transport._driver
|
|
||||||
|
|
||||||
original_get_connection = self.driver._get_connection
|
|
||||||
p = mock.patch.object(self.driver, '_get_connection',
|
|
||||||
side_effect=lambda pooled=True:
|
|
||||||
original_get_connection(False))
|
|
||||||
p.start()
|
|
||||||
self.addCleanup(p.stop)
|
|
||||||
|
|
||||||
def test_listen_and_direct_send(self):
|
|
||||||
target = oslo_messaging.Target(exchange="exchange_test",
|
|
||||||
topic="topic_test",
|
|
||||||
server="server_test")
|
|
||||||
|
|
||||||
with mock.patch('qpid.messaging.Connection') as conn_cls:
|
|
||||||
conn = conn_cls.return_value
|
|
||||||
session = conn.session.return_value
|
|
||||||
session.receiver.side_effect = [mock.Mock(), mock.Mock(),
|
|
||||||
mock.Mock()]
|
|
||||||
|
|
||||||
listener = self.driver.listen(target)
|
|
||||||
listener.conn.direct_send("msg_id", {})
|
|
||||||
|
|
||||||
self.assertEqual(3, len(listener.conn.consumers))
|
|
||||||
|
|
||||||
expected_calls = [
|
|
||||||
mock.call(AddressNodeMatcher(
|
|
||||||
'amq.topic/topic/exchange_test/topic_test')),
|
|
||||||
mock.call(AddressNodeMatcher(
|
|
||||||
'amq.topic/topic/exchange_test/topic_test.server_test')),
|
|
||||||
mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')),
|
|
||||||
]
|
|
||||||
session.receiver.assert_has_calls(expected_calls)
|
|
||||||
session.sender.assert_called_with(
|
|
||||||
AddressNodeMatcher("amq.direct/msg_id"))
|
|
||||||
|
|
||||||
def test_send(self):
|
|
||||||
target = oslo_messaging.Target(exchange="exchange_test",
|
|
||||||
topic="topic_test",
|
|
||||||
server="server_test")
|
|
||||||
with mock.patch('qpid.messaging.Connection') as conn_cls:
|
|
||||||
conn = conn_cls.return_value
|
|
||||||
session = conn.session.return_value
|
|
||||||
|
|
||||||
self.driver.send(target, {}, {})
|
|
||||||
session.sender.assert_called_with(AddressNodeMatcher(
|
|
||||||
"amq.topic/topic/exchange_test/topic_test.server_test"))
|
|
||||||
|
|
||||||
def test_send_notification(self):
|
|
||||||
target = oslo_messaging.Target(exchange="exchange_test",
|
|
||||||
topic="topic_test.info")
|
|
||||||
with mock.patch('qpid.messaging.Connection') as conn_cls:
|
|
||||||
conn = conn_cls.return_value
|
|
||||||
session = conn.session.return_value
|
|
||||||
|
|
||||||
self.driver.send_notification(target, {}, {}, "2.0")
|
|
||||||
session.sender.assert_called_with(AddressNodeMatcher(
|
|
||||||
"amq.topic/topic/exchange_test/topic_test.info"))
|
|
||||||
|
|
||||||
|
|
||||||
class TestQpidReconnectOrder(test_utils.BaseTestCase):
|
|
||||||
"""Unit Test cases to test reconnection
|
|
||||||
"""
|
|
||||||
|
|
||||||
@testtools.skipIf(qpid is None, "qpid not available")
|
|
||||||
def test_reconnect_order(self):
|
|
||||||
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
|
||||||
brokers_count = len(brokers)
|
|
||||||
|
|
||||||
self.config(qpid_hosts=brokers,
|
|
||||||
group='oslo_messaging_qpid')
|
|
||||||
|
|
||||||
with mock.patch('qpid.messaging.Connection') as conn_mock:
|
|
||||||
# starting from the first broker in the list
|
|
||||||
url = oslo_messaging.TransportURL.parse(self.conf, None)
|
|
||||||
connection = qpid_driver.Connection(self.conf, url,
|
|
||||||
amqp.PURPOSE_SEND)
|
|
||||||
|
|
||||||
# reconnect will advance to the next broker, one broker per
|
|
||||||
# attempt, and then wrap to the start of the list once the end is
|
|
||||||
# reached
|
|
||||||
for _ in range(brokers_count):
|
|
||||||
connection.reconnect()
|
|
||||||
|
|
||||||
expected = []
|
|
||||||
for broker in brokers:
|
|
||||||
expected.extend([mock.call("%s:5672" % broker),
|
|
||||||
mock.call().open(),
|
|
||||||
mock.call().session(),
|
|
||||||
mock.call().opened(),
|
|
||||||
mock.call().opened().__nonzero__(),
|
|
||||||
mock.call().close()])
|
|
||||||
|
|
||||||
conn_mock.assert_has_calls(expected, any_order=True)
|
|
||||||
|
|
||||||
|
|
||||||
def synchronized(func):
|
|
||||||
func.__lock__ = threading.Lock()
|
|
||||||
|
|
||||||
def synced_func(*args, **kws):
|
|
||||||
with func.__lock__:
|
|
||||||
return func(*args, **kws)
|
|
||||||
|
|
||||||
return synced_func
|
|
||||||
|
|
||||||
|
|
||||||
class FakeQpidMsgManager(object):
|
|
||||||
def __init__(self):
|
|
||||||
self._exchanges = {}
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def add_exchange(self, exchange):
|
|
||||||
if exchange not in self._exchanges:
|
|
||||||
self._exchanges[exchange] = {'msgs': [], 'consumers': {}}
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def add_exchange_consumer(self, exchange, consumer_id):
|
|
||||||
exchange_info = self._exchanges[exchange]
|
|
||||||
cons_dict = exchange_info['consumers']
|
|
||||||
cons_dict[consumer_id] = 0
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def add_exchange_msg(self, exchange, msg):
|
|
||||||
exchange_info = self._exchanges[exchange]
|
|
||||||
exchange_info['msgs'].append(msg)
|
|
||||||
|
|
||||||
def get_exchange_msg(self, exchange, index):
|
|
||||||
exchange_info = self._exchanges[exchange]
|
|
||||||
return exchange_info['msgs'][index]
|
|
||||||
|
|
||||||
def get_no_exch_msgs(self, exchange):
|
|
||||||
exchange_info = self._exchanges[exchange]
|
|
||||||
return len(exchange_info['msgs'])
|
|
||||||
|
|
||||||
def get_exch_cons_index(self, exchange, consumer_id):
|
|
||||||
exchange_info = self._exchanges[exchange]
|
|
||||||
cons_dict = exchange_info['consumers']
|
|
||||||
return cons_dict[consumer_id]
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def inc_consumer_index(self, exchange, consumer_id):
|
|
||||||
exchange_info = self._exchanges[exchange]
|
|
||||||
cons_dict = exchange_info['consumers']
|
|
||||||
cons_dict[consumer_id] += 1
|
|
||||||
|
|
||||||
_fake_qpid_msg_manager = FakeQpidMsgManager()
|
|
||||||
|
|
||||||
|
|
||||||
class FakeQpidSessionSender(object):
|
|
||||||
def __init__(self, session, id, target, options):
|
|
||||||
self.session = session
|
|
||||||
self.id = id
|
|
||||||
self.target = target
|
|
||||||
self.options = options
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def send(self, object, sync=True, timeout=None):
|
|
||||||
_fake_qpid_msg_manager.add_exchange_msg(self.target, object)
|
|
||||||
|
|
||||||
def close(self, timeout=None):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class FakeQpidSessionReceiver(object):
|
|
||||||
|
|
||||||
def __init__(self, session, id, source, options):
|
|
||||||
self.session = session
|
|
||||||
self.id = id
|
|
||||||
self.source = source
|
|
||||||
self.options = options
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def fetch(self, timeout=None):
|
|
||||||
if timeout is None:
|
|
||||||
# if timeout is not given, take a default time out
|
|
||||||
# of 30 seconds to avoid indefinite loop
|
|
||||||
_timeout = 30
|
|
||||||
else:
|
|
||||||
_timeout = timeout
|
|
||||||
|
|
||||||
deadline = time.time() + _timeout
|
|
||||||
while time.time() <= deadline:
|
|
||||||
index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
|
|
||||||
self.id)
|
|
||||||
try:
|
|
||||||
msg = _fake_qpid_msg_manager.get_exchange_msg(self.source,
|
|
||||||
index)
|
|
||||||
except IndexError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
_fake_qpid_msg_manager.inc_consumer_index(self.source,
|
|
||||||
self.id)
|
|
||||||
return qpid.messaging.Message(msg)
|
|
||||||
time.sleep(0.050)
|
|
||||||
|
|
||||||
if timeout is None:
|
|
||||||
raise Exception('timed out waiting for reply')
|
|
||||||
|
|
||||||
def close(self, timeout=None):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def available(self):
|
|
||||||
no_msgs = _fake_qpid_msg_manager.get_no_exch_msgs(self.source)
|
|
||||||
index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
|
|
||||||
self.id)
|
|
||||||
if no_msgs == 0 or index >= no_msgs:
|
|
||||||
return 0
|
|
||||||
else:
|
|
||||||
return no_msgs - index
|
|
||||||
|
|
||||||
|
|
||||||
class FakeQpidSession(object):
|
|
||||||
|
|
||||||
def __init__(self, connection=None, name=None, transactional=None):
|
|
||||||
self.connection = connection
|
|
||||||
self.name = name
|
|
||||||
self.transactional = transactional
|
|
||||||
self._receivers = {}
|
|
||||||
self.conf = None
|
|
||||||
self.url = None
|
|
||||||
self._senders = {}
|
|
||||||
self._sender_id = 0
|
|
||||||
self._receiver_id = 0
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def sender(self, target, **options):
|
|
||||||
exchange_key = self._extract_exchange_key(target)
|
|
||||||
_fake_qpid_msg_manager.add_exchange(exchange_key)
|
|
||||||
|
|
||||||
sendobj = FakeQpidSessionSender(self, self._sender_id,
|
|
||||||
exchange_key, options)
|
|
||||||
self._senders[self._sender_id] = sendobj
|
|
||||||
self._sender_id = self._sender_id + 1
|
|
||||||
return sendobj
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def receiver(self, source, **options):
|
|
||||||
exchange_key = self._extract_exchange_key(source)
|
|
||||||
_fake_qpid_msg_manager.add_exchange(exchange_key)
|
|
||||||
recvobj = FakeQpidSessionReceiver(self, self._receiver_id,
|
|
||||||
exchange_key, options)
|
|
||||||
self._receivers[self._receiver_id] = recvobj
|
|
||||||
_fake_qpid_msg_manager.add_exchange_consumer(exchange_key,
|
|
||||||
self._receiver_id)
|
|
||||||
self._receiver_id += 1
|
|
||||||
return recvobj
|
|
||||||
|
|
||||||
def acknowledge(self, message=None, disposition=None, sync=True):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@synchronized
|
|
||||||
def flush_exchanges(self):
|
|
||||||
_fake_qpid_msg_manager._exchanges = {}
|
|
||||||
|
|
||||||
def _extract_exchange_key(self, exchange_msg):
|
|
||||||
"""This function extracts a unique key for the exchange.
|
|
||||||
This key is used in the dictionary as a 'key' for
|
|
||||||
this exchange.
|
|
||||||
Eg. if the exchange_msg (for qpid topology version 1)
|
|
||||||
is 33/33 ; {"node": {"x-declare": {"auto-delete": true, ....
|
|
||||||
then 33 is returned as the key.
|
|
||||||
Eg 2. For topology v2, if the
|
|
||||||
exchange_msg is - amq.direct/44 ; {"link": {"x-dec.......
|
|
||||||
then 44 is returned
|
|
||||||
"""
|
|
||||||
# first check for ';'
|
|
||||||
semicolon_split = exchange_msg.split(';')
|
|
||||||
|
|
||||||
# split the first item of semicolon_split with '/'
|
|
||||||
slash_split = semicolon_split[0].split('/')
|
|
||||||
# return the last element of the list as the key
|
|
||||||
key = slash_split[-1]
|
|
||||||
return key.strip()
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
_fake_session = FakeQpidSession()
|
|
||||||
|
|
||||||
|
|
||||||
def get_fake_qpid_session():
|
|
||||||
return _fake_session
|
|
||||||
|
|
||||||
|
|
||||||
class QPidHATestCase(test_utils.BaseTestCase):
|
|
||||||
|
|
||||||
@testtools.skipIf(qpid is None, "qpid not available")
|
|
||||||
def setUp(self):
|
|
||||||
super(QPidHATestCase, self).setUp()
|
|
||||||
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
|
||||||
|
|
||||||
self.config(qpid_hosts=self.brokers,
|
|
||||||
qpid_username=None,
|
|
||||||
qpid_password=None,
|
|
||||||
group='oslo_messaging_qpid')
|
|
||||||
|
|
||||||
hostname_sets = set()
|
|
||||||
self.info = {'attempt': 0,
|
|
||||||
'fail': False}
|
|
||||||
|
|
||||||
def _connect(myself, broker):
|
|
||||||
# do as little work that is enough to pass connection attempt
|
|
||||||
myself.connection = mock.Mock()
|
|
||||||
hostname = broker['host']
|
|
||||||
self.assertNotIn(hostname, hostname_sets)
|
|
||||||
hostname_sets.add(hostname)
|
|
||||||
|
|
||||||
self.info['attempt'] += 1
|
|
||||||
if self.info['fail']:
|
|
||||||
raise qpid.messaging.exceptions.ConnectionError
|
|
||||||
|
|
||||||
# just make sure connection instantiation does not fail with an
|
|
||||||
# exception
|
|
||||||
self.stubs.Set(qpid_driver.Connection, '_connect', _connect)
|
|
||||||
|
|
||||||
# starting from the first broker in the list
|
|
||||||
url = oslo_messaging.TransportURL.parse(self.conf, None)
|
|
||||||
self.connection = qpid_driver.Connection(self.conf, url,
|
|
||||||
amqp.PURPOSE_SEND)
|
|
||||||
self.addCleanup(self.connection.close)
|
|
||||||
|
|
||||||
self.info.update({'attempt': 0,
|
|
||||||
'fail': True})
|
|
||||||
hostname_sets.clear()
|
|
||||||
|
|
||||||
def test_reconnect_order(self):
|
|
||||||
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
|
|
||||||
self.connection.reconnect,
|
|
||||||
retry=len(self.brokers) - 1)
|
|
||||||
self.assertEqual(len(self.brokers), self.info['attempt'])
|
|
||||||
|
|
||||||
def test_ensure_four_retries(self):
|
|
||||||
mock_callback = mock.Mock(
|
|
||||||
side_effect=qpid.messaging.exceptions.ConnectionError)
|
|
||||||
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
|
|
||||||
self.connection.ensure, None, mock_callback,
|
|
||||||
retry=4)
|
|
||||||
self.assertEqual(5, self.info['attempt'])
|
|
||||||
self.assertEqual(1, mock_callback.call_count)
|
|
||||||
|
|
||||||
def test_ensure_one_retry(self):
|
|
||||||
mock_callback = mock.Mock(
|
|
||||||
side_effect=qpid.messaging.exceptions.ConnectionError)
|
|
||||||
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
|
|
||||||
self.connection.ensure, None, mock_callback,
|
|
||||||
retry=1)
|
|
||||||
self.assertEqual(2, self.info['attempt'])
|
|
||||||
self.assertEqual(1, mock_callback.call_count)
|
|
||||||
|
|
||||||
def test_ensure_no_retry(self):
|
|
||||||
mock_callback = mock.Mock(
|
|
||||||
side_effect=qpid.messaging.exceptions.ConnectionError)
|
|
||||||
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
|
|
||||||
self.connection.ensure, None, mock_callback,
|
|
||||||
retry=0)
|
|
||||||
self.assertEqual(1, self.info['attempt'])
|
|
||||||
self.assertEqual(1, mock_callback.call_count)
|
|
@ -46,10 +46,6 @@ case $RPC_BACKEND in
|
|||||||
sudo apt-get update -y
|
sudo apt-get update -y
|
||||||
sudo apt-get install -y redis-server python-redis
|
sudo apt-get install -y redis-server python-redis
|
||||||
;;
|
;;
|
||||||
qpid)
|
|
||||||
sudo apt-get update -y
|
|
||||||
sudo apt-get install -y qpidd sasl2-bin
|
|
||||||
;;
|
|
||||||
amqp1)
|
amqp1)
|
||||||
sudo yum install -y qpid-cpp-server qpid-proton-c-devel python-qpid-proton cyrus-sasl-lib cyrus-sasl-plain
|
sudo yum install -y qpid-cpp-server qpid-proton-c-devel python-qpid-proton cyrus-sasl-lib cyrus-sasl-plain
|
||||||
;;
|
;;
|
||||||
|
@ -32,14 +32,13 @@ class OptsTestCase(test_utils.BaseTestCase):
|
|||||||
super(OptsTestCase, self).setUp()
|
super(OptsTestCase, self).setUp()
|
||||||
|
|
||||||
def _test_list_opts(self, result):
|
def _test_list_opts(self, result):
|
||||||
self.assertEqual(5, len(result))
|
self.assertEqual(4, len(result))
|
||||||
|
|
||||||
groups = [g for (g, l) in result]
|
groups = [g for (g, l) in result]
|
||||||
self.assertIn(None, groups)
|
self.assertIn(None, groups)
|
||||||
self.assertIn('matchmaker_redis', groups)
|
self.assertIn('matchmaker_redis', groups)
|
||||||
self.assertIn('oslo_messaging_amqp', groups)
|
self.assertIn('oslo_messaging_amqp', groups)
|
||||||
self.assertIn('oslo_messaging_rabbit', groups)
|
self.assertIn('oslo_messaging_rabbit', groups)
|
||||||
self.assertIn('oslo_messaging_qpid', groups)
|
|
||||||
|
|
||||||
opt_names = [o.name for (g, l) in result for o in l]
|
opt_names = [o.name for (g, l) in result for o in l]
|
||||||
self.assertIn('rpc_backend', opt_names)
|
self.assertIn('rpc_backend', opt_names)
|
||||||
|
@ -43,7 +43,7 @@ _transport_opts = [
|
|||||||
cfg.StrOpt('rpc_backend',
|
cfg.StrOpt('rpc_backend',
|
||||||
default='rabbit',
|
default='rabbit',
|
||||||
help='The messaging driver to use, defaults to rabbit. Other '
|
help='The messaging driver to use, defaults to rabbit. Other '
|
||||||
'drivers include qpid and zmq.'),
|
'drivers include amqp and zmq.'),
|
||||||
cfg.StrOpt('control_exchange',
|
cfg.StrOpt('control_exchange',
|
||||||
default='openstack',
|
default='openstack',
|
||||||
help='The default exchange under which topics are scoped. May '
|
help='The default exchange under which topics are scoped. May '
|
||||||
@ -232,7 +232,7 @@ class TransportURL(object):
|
|||||||
|
|
||||||
:param conf: a ConfigOpts instance
|
:param conf: a ConfigOpts instance
|
||||||
:type conf: oslo.config.cfg.ConfigOpts
|
:type conf: oslo.config.cfg.ConfigOpts
|
||||||
:param transport: a transport name for example 'rabbit' or 'qpid'
|
:param transport: a transport name for example 'rabbit'
|
||||||
:type transport: str
|
:type transport: str
|
||||||
:param virtual_host: a virtual host path for example '/'
|
:param virtual_host: a virtual host path for example '/'
|
||||||
:type virtual_host: str
|
:type virtual_host: str
|
||||||
|
@ -26,7 +26,6 @@ console_scripts =
|
|||||||
|
|
||||||
oslo.messaging.drivers =
|
oslo.messaging.drivers =
|
||||||
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
|
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
|
||||||
qpid = oslo_messaging._drivers.impl_qpid:QpidDriver
|
|
||||||
zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver
|
zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver
|
||||||
amqp = oslo_messaging._drivers.protocols.amqp.driver:ProtonDriver
|
amqp = oslo_messaging._drivers.protocols.amqp.driver:ProtonDriver
|
||||||
|
|
||||||
|
@ -15,9 +15,6 @@ testscenarios>=0.4
|
|||||||
testtools>=1.4.0
|
testtools>=1.4.0
|
||||||
oslotest>=1.10.0 # Apache-2.0
|
oslotest>=1.10.0 # Apache-2.0
|
||||||
|
|
||||||
# for test_qpid
|
|
||||||
qpid-python;python_version=='2.7'
|
|
||||||
|
|
||||||
# for test_matchmaker_redis
|
# for test_matchmaker_redis
|
||||||
redis>=2.10.0
|
redis>=2.10.0
|
||||||
|
|
||||||
|
4
tox.ini
4
tox.ini
@ -23,10 +23,6 @@ commands = {posargs}
|
|||||||
[testenv:docs]
|
[testenv:docs]
|
||||||
commands = python setup.py build_sphinx
|
commands = python setup.py build_sphinx
|
||||||
|
|
||||||
[testenv:py27-func-qpid]
|
|
||||||
setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1:65123//
|
|
||||||
commands = {toxinidir}/setup-test-env-qpid.sh 0-10 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
|
||||||
|
|
||||||
[testenv:py27-func-rabbit]
|
[testenv:py27-func-rabbit]
|
||||||
commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user