Implement blueprint add-qpid-support.

This patch implements a new notification strategy that sends
notifications to a message queue via Qpid.

Change-Id: Ie2640139dcf735e428af63e9e87b3667a2b22eb5
This commit is contained in:
Russell Bryant 2012-02-08 12:24:07 -05:00
parent c125a3744f
commit 2fa1ed0da1
6 changed files with 378 additions and 3 deletions

View File

@ -642,7 +642,7 @@ a RabbitMQ queue. The configuration options are specified in the
Optional. Default: ``noop`` Optional. Default: ``noop``
Sets the strategy used for notifications. Options are ``logging``, Sets the strategy used for notifications. Options are ``logging``,
``rabbit`` and ``noop``. ``rabbit``, ``qpid`` and ``noop``.
For more information :doc:`Glance notifications <notifications>` For more information :doc:`Glance notifications <notifications>`
* ``rabbit_host`` * ``rabbit_host``
@ -714,6 +714,122 @@ Optional. Default: ``30``
Maximum seconds to wait before reconnecting on failures when using Maximum seconds to wait before reconnecting on failures when using
``rabbit`` strategy. ``rabbit`` strategy.
* ``qpid_notification_exchange``
Optional. Default: ``glance``
Message exchange to use when using the ``qpid`` notification strategy.
* ``qpid_notification_topic``
Optional. Default: ``glanice_notifications``
This is the topic prefix for notifications when using the ``qpid``
notification strategy. When a notification is sent at the ``info`` priority,
the topic will be ``glance_notifications.info``. The same idea applies for
the ``error`` and ``warn`` notification priorities. To receive all
notifications, you would set up a receiver with a topic of
``glance_notifications.*``.
* ``qpid_host``
Optional. Default: ``localhost``
This is the hostname or IP address of the Qpid broker that will be used
when Glance has been configured to use the ``qpid`` notification strategy.
* ``qpid_port``
Optional. Default: ``5672``
This is the port number to connect to on the Qpid broker, ``qpid_host``,
when using the ``qpid`` notification strategy.
* ``qpid_username``
Optional. Default: None
This is the username that Glance will use to authenticate with the Qpid
broker if using the ``qpid`` notification strategy.
* ``qpid_password``
Optional. Default: None
This is the username that Glance will use to authenticate with the Qpid
broker if using the ``qpid`` notification strategy.
* ``qpid_sasl_mechanisms``
Optional. Default: None
This is a space separated list of SASL mechanisms to use for authentication
with the Qpid broker if using the ``qpid`` notification strategy.
* ``qpid_reconnect_timeout``
Optional. Default: None
This option specifies a timeout in seconds for automatic reconnect attempts
to the Qpid broker if the ``qpid`` notification strategy is used. In general,
it is safe to leave all of the reconnect timing options not set. In that case,
the Qpid client's default behavior will be used, which is to attempt to
reconnect to the broker at exponential back-off intervals (in 1 second, then 2
seconds, then 4, 8, 16, etc).
* ``qpid_reconnect_limit``
Optional. Default: None
This option specifies a maximum number of reconnect attempts to the Qpid
broker if the ``qpid`` notification strategy is being used. Normally the
Qpid client will continue attempting to reconnect until successful.
* ``qpid_reconnect_interval_min``
Optional. Default: None
This option specifies the minimum number of seconds between reconnection
attempts if the ``qpid`` notification strategy is being used.
* ``qpid_reconnect_interval_max``
Optional. Default: None
This option specifies the maximum number of seconds between reconnection
attempts if the ``qpid`` notification strategy is being used.
* ``qpid_reconnect_interval``
This option specifies the exact number of seconds between reconnection
attempts if the ``qpid`` notification strategy is being used. Setting
this option is equivalent to setting ``qpid_reconnect_interval_max`` and
``qpid_reconnect_interval_min`` to the same value.
* ``qpid_heartbeat``
Optional. Default: ``5``
This option is used to specify the number of seconds between heartbeat messages
exchanged between the Qpid client and Qpid broker if the ``qpid`` notification
strategy is being used. Heartbeats are used to more quickly detect that a
connection has been lost.
* ``qpid_protocol``
Optional. Default: ``tcp``
This option is used to specify the transport protocol to use if using the
``qpid`` notification strategy. To enable SSL, set this option to ``ssl``.
* ``qpid_tcp_nodelay``
Optional. Default: ``True``
This option can be used to disable the TCP NODELAY option. It effectively
disables the Nagle algorithm for the connection to the Qpid broker. This
option only applies if the ``qpid`` notification strategy is used.
Configuring Access Policies Configuring Access Policies
--------------------------- ---------------------------

View File

@ -34,6 +34,11 @@ Strategies
This strategy sends notifications to a rabbitmq queue. This can then This strategy sends notifications to a rabbitmq queue. This can then
be processed by other services or applications. be processed by other services or applications.
* qpid
This strategy is similar to rabbit. It sends notifications to an AMQP
message queue via Qpid.
* noop * noop
This strategy produces no notifications. It is the default strategy. This strategy produces no notifications. It is the default strategy.

View File

@ -85,8 +85,8 @@ registry_client_protocol = http
# Notifications can be sent when images are create, updated or deleted. # Notifications can be sent when images are create, updated or deleted.
# There are three methods of sending notifications, logging (via the # There are three methods of sending notifications, logging (via the
# log_file directive), rabbit (via a rabbitmq queue) or noop (no # log_file directive), rabbit (via a rabbitmq queue), qpid (via a Qpid
# notifications sent, the default) # message queue), or noop (no notifications sent, the default)
notifier_strategy = noop notifier_strategy = noop
# Configuration options if sending notifications via rabbitmq (these are # Configuration options if sending notifications via rabbitmq (these are
@ -100,6 +100,25 @@ rabbit_virtual_host = /
rabbit_notification_exchange = glance rabbit_notification_exchange = glance
rabbit_notification_topic = glance_notifications rabbit_notification_topic = glance_notifications
# Configuration options if sending notifications via Qpid (these are
# the defaults)
qpid_notification_exchange = glance
qpid_notification_topic = glance_notifications
qpid_host = localhost
qpid_port = 5672
qpid_username =
qpid_password =
qpid_sasl_mechanisms =
qpid_reconnect_timeout = 0
qpid_reconnect_limit = 0
qpid_reconnect_interval_min = 0
qpid_reconnect_interval_max = 0
qpid_reconnect_interval = 0
qpid_heartbeat = 5
# Set to 'ssl' to enable SSL
qpid_protocol = tcp
qpid_tcp_nodelay = True
# ============ Filesystem Store Options ======================== # ============ Filesystem Store Options ========================
# Directory that the Filesystem backend store # Directory that the Filesystem backend store

View File

@ -28,6 +28,7 @@ from glance.common import utils
_STRATEGIES = { _STRATEGIES = {
"logging": "glance.notifier.notify_log.LoggingStrategy", "logging": "glance.notifier.notify_log.LoggingStrategy",
"rabbit": "glance.notifier.notify_kombu.RabbitStrategy", "rabbit": "glance.notifier.notify_kombu.RabbitStrategy",
"qpid" : "glance.notifier.notify_qpid.QpidStrategy",
"noop": "glance.notifier.notify_noop.NoopStrategy", "noop": "glance.notifier.notify_noop.NoopStrategy",
"default": "glance.notifier.notify_noop.NoopStrategy", "default": "glance.notifier.notify_noop.NoopStrategy",
} }

View File

@ -0,0 +1,144 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import qpid.messaging
from glance.common import cfg
from glance.notifier import strategy
logger = logging.getLogger('glance.notifier.notify_qpid')
qpid_opts = [
cfg.StrOpt('qpid_notification_exchange',
default='glance',
help='Qpid exchange for notifications'),
cfg.StrOpt('qpid_notification_topic',
default='glance_notifications',
help='Qpid topic for notifications'),
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
help='Password for qpid connection'),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
cfg.IntOpt('qpid_reconnect_timeout',
default=0,
help='Reconnection timeout in seconds'),
cfg.IntOpt('qpid_reconnect_limit',
default=0,
help='Max reconnections before giving up'),
cfg.IntOpt('qpid_reconnect_interval_min',
default=0,
help='Minimum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval_max',
default=0,
help='Maximum seconds between reconnection attempts'),
cfg.IntOpt('qpid_reconnect_interval',
default=0,
help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
default=5,
help='Seconds between connection keepalive heartbeats'),
cfg.StrOpt('qpid_protocol',
default='tcp',
help="Transport to use, either 'tcp' or 'ssl'"),
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
]
class QpidStrategy(strategy.Strategy):
"""A notifier that puts a message on a queue when called."""
def __init__(self, conf):
"""Initialize the Qpid notification strategy."""
self.conf = conf
self.conf.register_opts(qpid_opts)
self.broker = self.conf.qpid_hostname + ":" + self.conf.qpid_port
self.connection = qpid.messaging.Connection(self.broker)
self.connection.username = self.conf.qpid_username
self.connection.password = self.conf.qpid_password
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
# Hard code this option as enabled so that reconnect logic isn't needed
# in this file at all.
self.connection.reconnect = True
if self.conf.qpid_reconnect_timeout:
self.connection.reconnect_timeout = (
self.conf.qpid_reconnect_timeout)
if self.conf.qpid_reconnect_limit:
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
self.conf.qpid_reconnect_interval_max)
if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
self.conf.qpid_reconnect_interval_min)
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
self.conf.qpid_reconnect_interval)
self.connection.hearbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
self.connection.open()
self.session = self.connection.session()
logger.info(_('Connected to AMQP server on %s') % self.broker)
self.sender_info = self._sender("info")
self.sender_warn = self._sender("warn")
self.sender_error = self._sender("error")
def _sender(self, priority):
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
},
},
}
topic = "%s.%s" % (self.conf.qpid_notification_topic, priority)
address = "%s/%s ; %s" % (self.conf.qpid_notification_exchange, topic,
json.dumps(addr_opts))
return self.session.sender(address)
def warn(self, msg):
self.sender_warn.send(msg)
def info(self, msg):
self.sender_info.send(msg)
def error(self, msg):
self.sender_error.send(msg)

View File

@ -18,6 +18,13 @@
import logging import logging
import unittest import unittest
import mox
try:
import qpid
import qpid.messaging
except ImportError:
qpid = None
from glance.common import exception from glance.common import exception
from glance.common import utils as common_utils from glance.common import utils as common_utils
from glance import notifier from glance import notifier
@ -261,3 +268,86 @@ class TestRabbitNotifier(unittest.TestCase):
self.assertEquals("ERROR", self.called["message"]["priority"]) self.assertEquals("ERROR", self.called["message"]["priority"])
self.assertEquals(info['send_called'], 2) self.assertEquals(info['send_called'], 2)
self.assertEquals(info['conn_called'], 2) self.assertEquals(info['conn_called'], 2)
class TestQpidNotifier(unittest.TestCase):
"""Test Qpid notifier."""
def setUp(self):
if not qpid:
return
self.mocker = mox.Mox()
self.mock_connection = None
self.mock_session = None
self.mock_sender = None
self.mock_receiver = None
self.orig_connection = qpid.messaging.Connection
self.orig_session = qpid.messaging.Session
self.orig_sender = qpid.messaging.Sender
self.orig_receiver = qpid.messaging.Receiver
qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
qpid.messaging.Session = lambda *_x, **_y: self.mock_session
qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
self.notify_qpid = common_utils.import_object(
"glance.notifier.notify_qpid")
super(TestQpidNotifier, self).setUp()
def tearDown(self):
if not qpid:
return
qpid.messaging.Connection = self.orig_connection
qpid.messaging.Session = self.orig_session
qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver
self.mocker.ResetAll()
super(TestQpidNotifier, self).tearDown()
def _test_notify(self, priority):
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
self.mock_session = self.mocker.CreateMock(self.orig_session)
self.mock_sender = self.mocker.CreateMock(self.orig_sender)
self.mock_connection.username = ""
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
for p in ["info", "warn", "error"]:
expected_address = ('glance/glance_notifications.%s ; {"node": '
'{"x-declare": {"auto-delete": true, "durable": false}, '
'"type": "topic"}, "create": "always"}' % p)
self.mock_session.sender(expected_address).AndReturn(
self.mock_sender)
self.mock_sender.send('stuff')
self.mocker.ReplayAll()
conf = utils.TestConfigOpts({"notifier_strategy": "qpid"})
notifier = self.notify_qpid.QpidStrategy(conf)
if priority == "info":
notifier.info("stuff")
elif priority == "warn":
notifier.warn("stuff")
elif priority == "error":
notifier.error("stuff")
self.mocker.VerifyAll()
@utils.skip_if(qpid is None, "qpid not installed")
def test_info(self):
self._test_notify('info')
@utils.skip_if(qpid is None, "qpid not installed")
def test_warn(self):
self._test_notify('warn')
@utils.skip_if(qpid is None, "qpid not installed")
def test_error(self):
self._test_notify('error')