diff --git a/doc/source/configuring.rst b/doc/source/configuring.rst index 99b96aa439..1c9b011ba5 100644 --- a/doc/source/configuring.rst +++ b/doc/source/configuring.rst @@ -642,7 +642,7 @@ a RabbitMQ queue. The configuration options are specified in the Optional. Default: ``noop`` Sets the strategy used for notifications. Options are ``logging``, -``rabbit`` and ``noop``. +``rabbit``, ``qpid`` and ``noop``. For more information :doc:`Glance notifications ` * ``rabbit_host`` @@ -714,6 +714,122 @@ Optional. Default: ``30`` Maximum seconds to wait before reconnecting on failures when using ``rabbit`` strategy. +* ``qpid_notification_exchange`` + +Optional. Default: ``glance`` + +Message exchange to use when using the ``qpid`` notification strategy. + +* ``qpid_notification_topic`` + +Optional. Default: ``glanice_notifications`` + +This is the topic prefix for notifications when using the ``qpid`` +notification strategy. When a notification is sent at the ``info`` priority, +the topic will be ``glance_notifications.info``. The same idea applies for +the ``error`` and ``warn`` notification priorities. To receive all +notifications, you would set up a receiver with a topic of +``glance_notifications.*``. + +* ``qpid_host`` + +Optional. Default: ``localhost`` + +This is the hostname or IP address of the Qpid broker that will be used +when Glance has been configured to use the ``qpid`` notification strategy. + +* ``qpid_port`` + +Optional. Default: ``5672`` + +This is the port number to connect to on the Qpid broker, ``qpid_host``, +when using the ``qpid`` notification strategy. + +* ``qpid_username`` + +Optional. Default: None + +This is the username that Glance will use to authenticate with the Qpid +broker if using the ``qpid`` notification strategy. + +* ``qpid_password`` + +Optional. Default: None + +This is the username that Glance will use to authenticate with the Qpid +broker if using the ``qpid`` notification strategy. + +* ``qpid_sasl_mechanisms`` + +Optional. Default: None + +This is a space separated list of SASL mechanisms to use for authentication +with the Qpid broker if using the ``qpid`` notification strategy. + +* ``qpid_reconnect_timeout`` + +Optional. Default: None + +This option specifies a timeout in seconds for automatic reconnect attempts +to the Qpid broker if the ``qpid`` notification strategy is used. In general, +it is safe to leave all of the reconnect timing options not set. In that case, +the Qpid client's default behavior will be used, which is to attempt to +reconnect to the broker at exponential back-off intervals (in 1 second, then 2 +seconds, then 4, 8, 16, etc). + +* ``qpid_reconnect_limit`` + +Optional. Default: None + +This option specifies a maximum number of reconnect attempts to the Qpid +broker if the ``qpid`` notification strategy is being used. Normally the +Qpid client will continue attempting to reconnect until successful. + +* ``qpid_reconnect_interval_min`` + +Optional. Default: None + +This option specifies the minimum number of seconds between reconnection +attempts if the ``qpid`` notification strategy is being used. + +* ``qpid_reconnect_interval_max`` + +Optional. Default: None + +This option specifies the maximum number of seconds between reconnection +attempts if the ``qpid`` notification strategy is being used. + +* ``qpid_reconnect_interval`` + +This option specifies the exact number of seconds between reconnection +attempts if the ``qpid`` notification strategy is being used. Setting +this option is equivalent to setting ``qpid_reconnect_interval_max`` and +``qpid_reconnect_interval_min`` to the same value. + +* ``qpid_heartbeat`` + +Optional. Default: ``5`` + +This option is used to specify the number of seconds between heartbeat messages +exchanged between the Qpid client and Qpid broker if the ``qpid`` notification +strategy is being used. Heartbeats are used to more quickly detect that a +connection has been lost. + +* ``qpid_protocol`` + +Optional. Default: ``tcp`` + +This option is used to specify the transport protocol to use if using the +``qpid`` notification strategy. To enable SSL, set this option to ``ssl``. + +* ``qpid_tcp_nodelay`` + +Optional. Default: ``True`` + +This option can be used to disable the TCP NODELAY option. It effectively +disables the Nagle algorithm for the connection to the Qpid broker. This +option only applies if the ``qpid`` notification strategy is used. + Configuring Access Policies --------------------------- diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index beab3f0136..0e6d44b494 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -34,6 +34,11 @@ Strategies This strategy sends notifications to a rabbitmq queue. This can then be processed by other services or applications. +* qpid + + This strategy is similar to rabbit. It sends notifications to an AMQP + message queue via Qpid. + * noop This strategy produces no notifications. It is the default strategy. diff --git a/etc/glance-api.conf b/etc/glance-api.conf index a339fe96d3..f516788893 100644 --- a/etc/glance-api.conf +++ b/etc/glance-api.conf @@ -85,8 +85,8 @@ registry_client_protocol = http # Notifications can be sent when images are create, updated or deleted. # There are three methods of sending notifications, logging (via the -# log_file directive), rabbit (via a rabbitmq queue) or noop (no -# notifications sent, the default) +# log_file directive), rabbit (via a rabbitmq queue), qpid (via a Qpid +# message queue), or noop (no notifications sent, the default) notifier_strategy = noop # Configuration options if sending notifications via rabbitmq (these are @@ -100,6 +100,25 @@ rabbit_virtual_host = / rabbit_notification_exchange = glance rabbit_notification_topic = glance_notifications +# Configuration options if sending notifications via Qpid (these are +# the defaults) +qpid_notification_exchange = glance +qpid_notification_topic = glance_notifications +qpid_host = localhost +qpid_port = 5672 +qpid_username = +qpid_password = +qpid_sasl_mechanisms = +qpid_reconnect_timeout = 0 +qpid_reconnect_limit = 0 +qpid_reconnect_interval_min = 0 +qpid_reconnect_interval_max = 0 +qpid_reconnect_interval = 0 +qpid_heartbeat = 5 +# Set to 'ssl' to enable SSL +qpid_protocol = tcp +qpid_tcp_nodelay = True + # ============ Filesystem Store Options ======================== # Directory that the Filesystem backend store diff --git a/glance/notifier/__init__.py b/glance/notifier/__init__.py index 707452ccd8..777fde23c3 100644 --- a/glance/notifier/__init__.py +++ b/glance/notifier/__init__.py @@ -28,6 +28,7 @@ from glance.common import utils _STRATEGIES = { "logging": "glance.notifier.notify_log.LoggingStrategy", "rabbit": "glance.notifier.notify_kombu.RabbitStrategy", + "qpid" : "glance.notifier.notify_qpid.QpidStrategy", "noop": "glance.notifier.notify_noop.NoopStrategy", "default": "glance.notifier.notify_noop.NoopStrategy", } diff --git a/glance/notifier/notify_qpid.py b/glance/notifier/notify_qpid.py new file mode 100644 index 0000000000..e34db8f445 --- /dev/null +++ b/glance/notifier/notify_qpid.py @@ -0,0 +1,144 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import json +import logging + +import qpid.messaging + +from glance.common import cfg +from glance.notifier import strategy + + +logger = logging.getLogger('glance.notifier.notify_qpid') + + +qpid_opts = [ + cfg.StrOpt('qpid_notification_exchange', + default='glance', + help='Qpid exchange for notifications'), + cfg.StrOpt('qpid_notification_topic', + default='glance_notifications', + help='Qpid topic for notifications'), + cfg.StrOpt('qpid_hostname', + default='localhost', + help='Qpid broker hostname'), + cfg.StrOpt('qpid_port', + default='5672', + help='Qpid broker port'), + cfg.StrOpt('qpid_username', + default='', + help='Username for qpid connection'), + cfg.StrOpt('qpid_password', + default='', + help='Password for qpid connection'), + cfg.StrOpt('qpid_sasl_mechanisms', + default='', + help='Space separated list of SASL mechanisms to use for auth'), + cfg.IntOpt('qpid_reconnect_timeout', + default=0, + help='Reconnection timeout in seconds'), + cfg.IntOpt('qpid_reconnect_limit', + default=0, + help='Max reconnections before giving up'), + cfg.IntOpt('qpid_reconnect_interval_min', + default=0, + help='Minimum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval_max', + default=0, + help='Maximum seconds between reconnection attempts'), + cfg.IntOpt('qpid_reconnect_interval', + default=0, + help='Equivalent to setting max and min to the same value'), + cfg.IntOpt('qpid_heartbeat', + default=5, + help='Seconds between connection keepalive heartbeats'), + cfg.StrOpt('qpid_protocol', + default='tcp', + help="Transport to use, either 'tcp' or 'ssl'"), + cfg.BoolOpt('qpid_tcp_nodelay', + default=True, + help='Disable Nagle algorithm'), + ] + + +class QpidStrategy(strategy.Strategy): + """A notifier that puts a message on a queue when called.""" + + def __init__(self, conf): + """Initialize the Qpid notification strategy.""" + self.conf = conf + self.conf.register_opts(qpid_opts) + + self.broker = self.conf.qpid_hostname + ":" + self.conf.qpid_port + self.connection = qpid.messaging.Connection(self.broker) + self.connection.username = self.conf.qpid_username + self.connection.password = self.conf.qpid_password + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms + # Hard code this option as enabled so that reconnect logic isn't needed + # in this file at all. + self.connection.reconnect = True + if self.conf.qpid_reconnect_timeout: + self.connection.reconnect_timeout = ( + self.conf.qpid_reconnect_timeout) + if self.conf.qpid_reconnect_limit: + self.connection.reconnect_limit = self.conf.qpid_reconnect_limit + if self.conf.qpid_reconnect_interval_max: + self.connection.reconnect_interval_max = ( + self.conf.qpid_reconnect_interval_max) + if self.conf.qpid_reconnect_interval_min: + self.connection.reconnect_interval_min = ( + self.conf.qpid_reconnect_interval_min) + if self.conf.qpid_reconnect_interval: + self.connection.reconnect_interval = ( + self.conf.qpid_reconnect_interval) + self.connection.hearbeat = self.conf.qpid_heartbeat + self.connection.protocol = self.conf.qpid_protocol + self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay + self.connection.open() + self.session = self.connection.session() + logger.info(_('Connected to AMQP server on %s') % self.broker) + + self.sender_info = self._sender("info") + self.sender_warn = self._sender("warn") + self.sender_error = self._sender("error") + + def _sender(self, priority): + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": False, + # auto-delete isn't implemented for exchanges in qpid, + # but put in here anyway + "auto-delete": True, + }, + }, + } + topic = "%s.%s" % (self.conf.qpid_notification_topic, priority) + address = "%s/%s ; %s" % (self.conf.qpid_notification_exchange, topic, + json.dumps(addr_opts)) + return self.session.sender(address) + + def warn(self, msg): + self.sender_warn.send(msg) + + def info(self, msg): + self.sender_info.send(msg) + + def error(self, msg): + self.sender_error.send(msg) diff --git a/glance/tests/unit/test_notifier.py b/glance/tests/unit/test_notifier.py index 8769abc1c3..7be571c2a8 100644 --- a/glance/tests/unit/test_notifier.py +++ b/glance/tests/unit/test_notifier.py @@ -18,6 +18,13 @@ import logging import unittest +import mox +try: + import qpid + import qpid.messaging +except ImportError: + qpid = None + from glance.common import exception from glance.common import utils as common_utils from glance import notifier @@ -261,3 +268,86 @@ class TestRabbitNotifier(unittest.TestCase): self.assertEquals("ERROR", self.called["message"]["priority"]) self.assertEquals(info['send_called'], 2) self.assertEquals(info['conn_called'], 2) + + +class TestQpidNotifier(unittest.TestCase): + """Test Qpid notifier.""" + + def setUp(self): + if not qpid: + return + + self.mocker = mox.Mox() + + self.mock_connection = None + self.mock_session = None + self.mock_sender = None + self.mock_receiver = None + + self.orig_connection = qpid.messaging.Connection + self.orig_session = qpid.messaging.Session + self.orig_sender = qpid.messaging.Sender + self.orig_receiver = qpid.messaging.Receiver + qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection + qpid.messaging.Session = lambda *_x, **_y: self.mock_session + qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender + qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver + + self.notify_qpid = common_utils.import_object( + "glance.notifier.notify_qpid") + + super(TestQpidNotifier, self).setUp() + + def tearDown(self): + if not qpid: + return + + qpid.messaging.Connection = self.orig_connection + qpid.messaging.Session = self.orig_session + qpid.messaging.Sender = self.orig_sender + qpid.messaging.Receiver = self.orig_receiver + + self.mocker.ResetAll() + + super(TestQpidNotifier, self).tearDown() + + def _test_notify(self, priority): + self.mock_connection = self.mocker.CreateMock(self.orig_connection) + self.mock_session = self.mocker.CreateMock(self.orig_session) + self.mock_sender = self.mocker.CreateMock(self.orig_sender) + + self.mock_connection.username = "" + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) + for p in ["info", "warn", "error"]: + expected_address = ('glance/glance_notifications.%s ; {"node": ' + '{"x-declare": {"auto-delete": true, "durable": false}, ' + '"type": "topic"}, "create": "always"}' % p) + self.mock_session.sender(expected_address).AndReturn( + self.mock_sender) + self.mock_sender.send('stuff') + + self.mocker.ReplayAll() + + conf = utils.TestConfigOpts({"notifier_strategy": "qpid"}) + notifier = self.notify_qpid.QpidStrategy(conf) + if priority == "info": + notifier.info("stuff") + elif priority == "warn": + notifier.warn("stuff") + elif priority == "error": + notifier.error("stuff") + + self.mocker.VerifyAll() + + @utils.skip_if(qpid is None, "qpid not installed") + def test_info(self): + self._test_notify('info') + + @utils.skip_if(qpid is None, "qpid not installed") + def test_warn(self): + self._test_notify('warn') + + @utils.skip_if(qpid is None, "qpid not installed") + def test_error(self): + self._test_notify('error')