diff --git a/etc/glance-api.conf b/etc/glance-api.conf index 9df88dc342..974e836319 100644 --- a/etc/glance-api.conf +++ b/etc/glance-api.conf @@ -210,7 +210,14 @@ registry_client_protocol = http # There are three methods of sending notifications, logging (via the # log_file directive), rabbit (via a rabbitmq queue), qpid (via a Qpid # message queue), or noop (no notifications sent, the default) -notifier_strategy = noop +# NOTE: THIS CONFIGURATION OPTION HAS BEEN DEPRECATED IN FAVOR OF `notification_driver` +# notifier_strategy = default + +# Driver or drivers to handle sending notifications +# notification_driver = noop + +# Default publisher_id for outgoing notifications. +# default_publisher_id = image.localhost # Configuration options if sending notifications via rabbitmq (these are # the defaults) diff --git a/glance/common/exception.py b/glance/common/exception.py index 0d32819084..752c791ce3 100644 --- a/glance/common/exception.py +++ b/glance/common/exception.py @@ -241,10 +241,6 @@ class StoreAddDisabled(GlanceException): "store is disabled.") -class InvalidNotifierStrategy(GlanceException): - message = _("'%(strategy)s' is not an available notifier strategy.") - - class MaxRedirectsExceeded(GlanceException): message = _("Maximum redirects (%(redirects)s) was exceeded.") diff --git a/glance/notifier/__init__.py b/glance/notifier.py similarity index 82% rename from glance/notifier/__init__.py rename to glance/notifier.py index 126a5c461d..7c68619e18 100644 --- a/glance/notifier/__init__.py +++ b/glance/notifier.py @@ -17,16 +17,14 @@ # under the License. -import socket -import uuid +import warnings from oslo.config import cfg +from oslo import messaging import webob from glance.common import exception -import glance.domain import glance.domain.proxy -from glance.openstack.common import importutils import glance.openstack.common.log as logging from glance.openstack.common import timeutils @@ -37,7 +35,10 @@ notifier_opts = [ 'notifications, logging (via the log_file directive), ' 'rabbit (via a rabbitmq queue), qpid (via a Qpid ' 'message queue), or noop (no notifications sent, the ' - 'default).')) + 'default). (DEPRECATED)')), + + cfg.StrOpt('default_publisher_id', default="image.localhost", + help='Default publisher_id for outgoing notifications'), ] CONF = cfg.CONF @@ -46,11 +47,11 @@ CONF.register_opts(notifier_opts) LOG = logging.getLogger(__name__) _STRATEGY_ALIASES = { - "logging": "glance.notifier.notify_log.LoggingStrategy", - "rabbit": "glance.notifier.notify_kombu.RabbitStrategy", - "qpid": "glance.notifier.notify_qpid.QpidStrategy", - "noop": "glance.notifier.notify_noop.NoopStrategy", - "default": "glance.notifier.notify_noop.NoopStrategy", + "logging": "log", + "rabbit": "messaging", + "qpid": "messaging", + "noop": "noop", + "default": "noop", } @@ -58,44 +59,42 @@ class Notifier(object): """Uses a notification strategy to send out messages about events.""" def __init__(self, strategy=None): - _strategy = CONF.notifier_strategy - try: - strategy = _STRATEGY_ALIASES[_strategy] - msg = _('Converted strategy alias %s to %s') - LOG.debug(msg % (_strategy, strategy)) - except KeyError: - strategy = _strategy - LOG.debug(_('No strategy alias found for %s') % strategy) - try: - strategy_class = importutils.import_class(strategy) - except ImportError: - raise exception.InvalidNotifierStrategy(strategy=strategy) - else: - self.strategy = strategy_class() + if CONF.notifier_strategy != 'default': + msg = _("notifier_strategy was deprecated in " + "favor of `notification_driver`") + warnings.warn(msg, DeprecationWarning) - @staticmethod - def generate_message(event_type, priority, payload): - return { - "message_id": str(uuid.uuid4()), - "publisher_id": socket.gethostname(), - "event_type": event_type, - "priority": priority, - "payload": payload, - "timestamp": str(timeutils.utcnow()), - } + # NOTE(flaper87): Use this to keep backwards + # compatibility. We'll try to get an oslo.messaging + # driver from the specified strategy. + _strategy = strategy or CONF.notifier_strategy + _driver = _STRATEGY_ALIASES.get(_strategy) + + # NOTE(flaper87): The next 3 lines help + # with the migration to oslo.messaging. + # Without them, gate tests won't know + # what driver should be loaded. + # Once this patch lands, devstack will be + # updated and then these lines will be removed. + url = None + if _strategy in ['rabbit', 'qpid']: + url = _strategy + '://' + + publisher_id = CONF.default_publisher_id + self._transport = messaging.get_transport(CONF, url) + self._notifier = messaging.Notifier(self._transport, + driver=_driver, + publisher_id=publisher_id) def warn(self, event_type, payload): - msg = self.generate_message(event_type, "WARN", payload) - self.strategy.warn(msg) + self._notifier.warn({}, event_type, payload) def info(self, event_type, payload): - msg = self.generate_message(event_type, "INFO", payload) - self.strategy.info(msg) + self._notifier.info({}, event_type, payload) def error(self, event_type, payload): - msg = self.generate_message(event_type, "ERROR", payload) - self.strategy.error(msg) + self._notifier.error({}, event_type, payload) def format_image_notification(image): @@ -156,11 +155,13 @@ class ImageRepoProxy(glance.domain.proxy.Repo): def save(self, image): super(ImageRepoProxy, self).save(image) - self.notifier.info('image.update', format_image_notification(image)) + self.notifier.info('image.update', + format_image_notification(image)) def add(self, image): super(ImageRepoProxy, self).add(image) - self.notifier.info('image.create', format_image_notification(image)) + self.notifier.info('image.create', + format_image_notification(image)) def remove(self, image): super(ImageRepoProxy, self).remove(image) @@ -207,7 +208,8 @@ class ImageProxy(glance.domain.proxy.Image): notify = self.notifier.info try: - notify('image.send', self._format_image_send(sent)) + notify('image.send', + self._format_image_send(sent)) except Exception as err: msg = (_("An error occurred during image.send" " notification: %(err)s") % {'err': err}) @@ -278,7 +280,8 @@ class TaskRepoProxy(glance.domain.proxy.Repo): item_proxy_kwargs=proxy_kwargs) def add(self, task): - self.notifier.info('task.create', format_task_notification(task)) + self.notifier.info('task.create', + format_task_notification(task)) return super(TaskRepoProxy, self).add(task) def remove(self, task): @@ -306,7 +309,8 @@ class TaskProxy(glance.domain.proxy.Task): super(TaskProxy, self).__init__(task) def run(self, executor): - self.notifier.info('task.run', format_task_notification(self.task)) + self.notifier.info('task.run', + format_task_notification(self.task)) return super(TaskProxy, self).run(executor) def begin_processing(self): diff --git a/glance/notifier/notify_kombu.py b/glance/notifier/notify_kombu.py deleted file mode 100644 index 4da88e2863..0000000000 --- a/glance/notifier/notify_kombu.py +++ /dev/null @@ -1,246 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011, OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import json -import time - -import kombu.connection -import kombu.entity -from oslo.config import cfg - -from glance.notifier import strategy -import glance.openstack.common.log as logging - -LOG = logging.getLogger(__name__) - -rabbit_opts = [ - cfg.StrOpt('rabbit_host', default='localhost', - help=_('The host name of the rabbitmq server')), - cfg.IntOpt('rabbit_port', default=5672, - help=_('The port on which the rabbitmq server is listening')), - cfg.BoolOpt('rabbit_use_ssl', default=False, - help=_('A boolean value indicating if the selected rabbitmq ' - 'server uses SSL.')), - cfg.StrOpt('rabbit_userid', default='guest', - help=_('The user ID for authentication with rabbitmq.')), - cfg.StrOpt('rabbit_password', default='guest', secret=True, - help=_('The password that will be used for authentication ' - 'with the rabbitmq server.')), - cfg.StrOpt('rabbit_virtual_host', default='/', - help=_('The virtual host used in the rabbitmq connection.')), - cfg.StrOpt('rabbit_notification_exchange', default='glance', - help=_('Exchange name to use for connection when using rabbit' - ' strategy.')), - cfg.StrOpt('rabbit_notification_topic', default='notifications', - help=_('Topic to use for connection when using rabbit ' - 'strategy.')), - cfg.IntOpt('rabbit_max_retries', default=0, - help=_('The maximum number of times to attempt to connect to ' - 'the AMQP server.')), - cfg.IntOpt('rabbit_retry_backoff', default=2, - help=_('This value multiplied by the number of connection ' - 'attempts gives the amount of time in seconds to sleep ' - 'between connection attempts to the AMQP server.')), - cfg.IntOpt('rabbit_retry_max_backoff', default=30, - help=_('The maximum amount of time to wait between connection ' - 'attempts. The delay time will be the smaller of this ' - 'value and the value of * ' - '.')), - cfg.BoolOpt('rabbit_durable_queues', default=False, - help='A boolean to determine if the queues used for messaging ' - 'should be retained after a restart.'), -] - -CONF = cfg.CONF -CONF.register_opts(rabbit_opts) - - -class KombuMaxRetriesReached(Exception): - pass - - -class RabbitStrategy(strategy.Strategy): - """A notifier that puts a message on a queue when called.""" - - def __init__(self): - """Initialize the rabbit notification strategy.""" - self.topic = CONF.rabbit_notification_topic - self.max_retries = CONF.rabbit_max_retries - # NOTE(comstud): When reading the config file, these values end - # up being strings, and we need them as ints. - self.retry_backoff = CONF.rabbit_retry_backoff - self.retry_max_backoff = CONF.rabbit_retry_max_backoff - - self.connection = None - self.retry_attempts = 0 - try: - self.reconnect() - except KombuMaxRetriesReached: - pass - - def _close(self): - """Close connection to rabbit.""" - try: - self.connection.close() - except self.connection_errors: - pass - self.connection = None - - def _connect(self): - """Connect to rabbit. Exceptions should be handled by the - caller. - """ - log_info = {} - log_info['hostname'] = CONF.rabbit_host - log_info['port'] = CONF.rabbit_port - if self.connection: - LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % log_info) - self._close() - else: - LOG.info(_("Connecting to AMQP server on " - "%(hostname)s:%(port)d") % log_info) - self.connection = kombu.connection.BrokerConnection( - hostname=CONF.rabbit_host, - port=CONF.rabbit_port, - userid=CONF.rabbit_userid, - password=CONF.rabbit_password, - virtual_host=CONF.rabbit_virtual_host, - ssl=CONF.rabbit_use_ssl) - self.connection_errors = self.connection.connection_errors - self.connection.connect() - self.channel = self.connection.channel() - self.exchange = kombu.entity.Exchange( - channel=self.channel, - type="topic", - durable=CONF.rabbit_durable_queues, - name=CONF.rabbit_notification_exchange) - - # NOTE(jerdfelt): Normally the consumer would create the queues, - # but we do this to ensure that messages don't get dropped if the - # consumer is started after we do - for priority in ["WARN", "INFO", "ERROR"]: - routing_key = "%s.%s" % (self.topic, priority.lower()) - queue = kombu.entity.Queue( - channel=self.channel, - exchange=self.exchange, - durable=CONF.rabbit_durable_queues, - name=routing_key, - routing_key=routing_key) - queue.declare() - LOG.info(_("Connected to AMQP server on " - "%(hostname)s:%(port)d") % log_info) - - def reconnect(self): - """Handles reconnecting and re-establishing queues.""" - while True: - self.retry_attempts += 1 - try: - self._connect() - return - except self.connection_errors as e: - pass - except Exception as e: - # NOTE(comstud): Unfortunately it's possible for amqplib - # to return an error not covered by its transport - # connection_errors in the case of a timeout waiting for - # a protocol response. (See paste link in LP888621 for - # nova.) So, we check all exceptions for 'timeout' in them - # and try to reconnect in this case. - if 'timeout' not in str(e): - raise - - log_info = {} - log_info['err_str'] = str(e) - log_info['max_retries'] = self.max_retries - log_info['hostname'] = CONF.rabbit_host - log_info['port'] = CONF.rabbit_port - - if self.retry_attempts >= self.max_retries: - LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) - if self.connection: - self._close() - raise KombuMaxRetriesReached - - sleep_time = self.retry_backoff * self.retry_attempts - if self.retry_max_backoff: - sleep_time = min(sleep_time, self.retry_max_backoff) - - log_info['sleep_time'] = sleep_time - LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) - time.sleep(sleep_time) - - def log_failure(self, msg, priority): - """Fallback to logging when we can't send to rabbit.""" - message = _('Notification with priority %(priority)s failed: ' - 'msg=%(msg)s') - LOG.error(message % {'msg': msg, 'priority': priority}) - - def _send_message(self, msg, routing_key): - """Send a message. Caller needs to catch exceptions for retry.""" - msg = self.exchange.Message(json.dumps(msg), - content_type='application/json') - self.exchange.publish(msg, routing_key=routing_key) - - def _notify(self, msg, priority): - """Send a notification and retry if needed.""" - self.retry_attempts = 0 - - if not self.connection: - try: - self.reconnect() - except KombuMaxRetriesReached: - self.log_failure(msg, priority) - return - - routing_key = "%s.%s" % (self.topic, priority.lower()) - - while True: - try: - self._send_message(msg, routing_key) - return - except self.connection_errors as e: - pass - except Exception as e: - # NOTE(comstud): Unfortunately it's possible for amqplib - # to return an error not covered by its transport - # connection_errors in the case of a timeout waiting for - # a protocol response. (See paste link in LP888621 for - # nova.) So, we check all exceptions for 'timeout' in them - # and try to reconnect in this case. - if 'timeout' not in str(e): - raise - - LOG.exception(_("Unable to send notification: %s") % str(e)) - - try: - self.reconnect() - except KombuMaxRetriesReached: - break - self.log_failure(msg, priority) - - def warn(self, msg): - self._notify(msg, "WARN") - - def info(self, msg): - self._notify(msg, "INFO") - - def error(self, msg): - self._notify(msg, "ERROR") diff --git a/glance/notifier/notify_log.py b/glance/notifier/notify_log.py deleted file mode 100644 index 4ceec9e85b..0000000000 --- a/glance/notifier/notify_log.py +++ /dev/null @@ -1,34 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011, OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from glance.notifier import strategy -import glance.openstack.common.log as logging - - -class LoggingStrategy(strategy.Strategy): - """A notifier that calls logging when called.""" - - def __init__(self): - self.logger = logging.getLogger(__name__) - - def warn(self, msg): - self.logger.warn(msg) - - def info(self, msg): - self.logger.info(msg) - - def error(self, msg): - self.logger.error(msg) diff --git a/glance/notifier/notify_noop.py b/glance/notifier/notify_noop.py deleted file mode 100644 index e579da0d0d..0000000000 --- a/glance/notifier/notify_noop.py +++ /dev/null @@ -1,31 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011, OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from glance.notifier import strategy - - -class NoopStrategy(strategy.Strategy): - """A notifier that does nothing when called.""" - - def warn(self, msg): - pass - - def info(self, msg): - pass - - def error(self, msg): - pass diff --git a/glance/notifier/notify_qpid.py b/glance/notifier/notify_qpid.py deleted file mode 100644 index 97f5b48a37..0000000000 --- a/glance/notifier/notify_qpid.py +++ /dev/null @@ -1,153 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import json - -from oslo.config import cfg -import qpid.messaging - -from glance.notifier import strategy -from glance.openstack.common import jsonutils -import glance.openstack.common.log as logging - -LOG = logging.getLogger(__name__) - -qpid_opts = [ - cfg.StrOpt('qpid_notification_exchange', - default='glance', - help='Qpid exchange for notifications'), - cfg.StrOpt('qpid_notification_topic', - default='notifications', - help='Qpid topic for notifications'), - cfg.StrOpt('qpid_hostname', - default='localhost', - help='Qpid broker hostname'), - cfg.StrOpt('qpid_port', - default='5672', - help='Qpid broker port'), - cfg.StrOpt('qpid_username', - default='', - help='Username for qpid connection'), - cfg.StrOpt('qpid_password', - default='', - help='Password for qpid connection', - secret=True), - cfg.StrOpt('qpid_sasl_mechanisms', - default='', - help='Space separated list of SASL mechanisms to use for auth'), - cfg.IntOpt('qpid_reconnect_timeout', - default=0, - help='Reconnection timeout in seconds'), - cfg.IntOpt('qpid_reconnect_limit', - default=0, - help='Max reconnections before giving up'), - cfg.IntOpt('qpid_reconnect_interval_min', - default=0, - help='Minimum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval_max', - default=0, - help='Maximum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval', - default=0, - help='Equivalent to setting max and min to the same value'), - cfg.IntOpt('qpid_heartbeat', - default=60, - help='Seconds between connection keepalive heartbeats'), - cfg.StrOpt('qpid_protocol', - default='tcp', - help="Transport to use, either 'tcp' or 'ssl'"), - cfg.BoolOpt('qpid_tcp_nodelay', - default=True, - help='Disable Nagle algorithm'), -] - -CONF = cfg.CONF -CONF.register_opts(qpid_opts) - - -class QpidStrategy(strategy.Strategy): - """A notifier that puts a message on a queue when called.""" - - def _open_connection(self): - """Initialize the Qpid notification strategy.""" - broker = CONF.qpid_hostname + ":" + CONF.qpid_port - connection = qpid.messaging.Connection(broker) - connection.username = CONF.qpid_username - connection.password = CONF.qpid_password - connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms - # Hard code this option as enabled so that reconnect logic isn't needed - # in this file at all. - connection.reconnect = True - if CONF.qpid_reconnect_timeout: - connection.reconnect_timeout = CONF.qpid_reconnect_timeout - if CONF.qpid_reconnect_limit: - connection.reconnect_limit = CONF.qpid_reconnect_limit - if CONF.qpid_reconnect_interval_max: - connection.reconnect_interval_max = ( - CONF.qpid_reconnect_interval_max) - if CONF.qpid_reconnect_interval_min: - connection.reconnect_interval_min = ( - CONF.qpid_reconnect_interval_min) - if CONF.qpid_reconnect_interval: - connection.reconnect_interval = CONF.qpid_reconnect_interval - connection.heartbeat = CONF.qpid_heartbeat - connection.transport = CONF.qpid_protocol - connection.tcp_nodelay = CONF.qpid_tcp_nodelay - connection.open() - LOG.info(_('Connected to AMQP server on %s') % broker) - return connection - - def _send(self, priority, msg): - addr_opts = { - "create": "always", - "node": { - "type": "topic", - "x-declare": { - "durable": False, - # auto-delete isn't implemented for exchanges in qpid, - # but put in here anyway - "auto-delete": True, - }, - }, - } - topic = "%s.%s" % (CONF.qpid_notification_topic, priority) - address = "%s/%s ; %s" % (CONF.qpid_notification_exchange, topic, - json.dumps(addr_opts)) - connection = None - try: - connection = self._open_connection() - session = connection.session() - sender = session.sender(address) - primitive_msg = jsonutils.to_primitive(msg) - qpid_msg = qpid.messaging.Message(content=primitive_msg) - sender.send(qpid_msg) - except Exception: - details = dict(priority=priority, msg=msg) - LOG.exception(_('Notification error. Priority: %(priority)s ' - 'Message: %(msg)s') % details) - raise - finally: - if connection and connection.opened(): - connection.close() - - def warn(self, msg): - self._send('warn', msg) - - def info(self, msg): - self._send('info', msg) - - def error(self, msg): - self._send('error', msg) diff --git a/glance/notifier/strategy.py b/glance/notifier/strategy.py deleted file mode 100644 index 6ce6b8578d..0000000000 --- a/glance/notifier/strategy.py +++ /dev/null @@ -1,29 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011, OpenStack Foundation -# Copyright 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -class Strategy(object): - """Base class for a notification strategy""" - - def warn(self, msg): - raise NotImplementedError() - - def info(self, msg): - raise NotImplementedError() - - def error(self, msg): - raise NotImplementedError() diff --git a/glance/openstack/common/notifier/__init__.py b/glance/openstack/common/notifier/__init__.py deleted file mode 100644 index 45c3b46ae9..0000000000 --- a/glance/openstack/common/notifier/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/glance/openstack/common/notifier/api.py b/glance/openstack/common/notifier/api.py deleted file mode 100644 index 2590527fa5..0000000000 --- a/glance/openstack/common/notifier/api.py +++ /dev/null @@ -1,182 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from oslo.config import cfg - -from glance.openstack.common import context -from glance.openstack.common.gettextutils import _ -from glance.openstack.common import importutils -from glance.openstack.common import jsonutils -from glance.openstack.common import log as logging -from glance.openstack.common import timeutils - - -LOG = logging.getLogger(__name__) - -notifier_opts = [ - cfg.MultiStrOpt('notification_driver', - default=[], - help='Driver or drivers to handle sending notifications'), - cfg.StrOpt('default_notification_level', - default='INFO', - help='Default notification level for outgoing notifications'), - cfg.StrOpt('default_publisher_id', - default='$host', - help='Default publisher_id for outgoing notifications'), -] - -CONF = cfg.CONF -CONF.register_opts(notifier_opts) - -WARN = 'WARN' -INFO = 'INFO' -ERROR = 'ERROR' -CRITICAL = 'CRITICAL' -DEBUG = 'DEBUG' - -log_levels = (DEBUG, WARN, INFO, ERROR, CRITICAL) - - -class BadPriorityException(Exception): - pass - - -def notify_decorator(name, fn): - """ decorator for notify which is used from utils.monkey_patch() - - :param name: name of the function - :param function: - object of the function - :returns: function -- decorated function - - """ - def wrapped_func(*args, **kwarg): - body = {} - body['args'] = [] - body['kwarg'] = {} - for arg in args: - body['args'].append(arg) - for key in kwarg: - body['kwarg'][key] = kwarg[key] - - ctxt = context.get_context_from_function_and_args(fn, args, kwarg) - notify(ctxt, - CONF.default_publisher_id, - name, - CONF.default_notification_level, - body) - return fn(*args, **kwarg) - return wrapped_func - - -def publisher_id(service, host=None): - if not host: - host = CONF.host - return "%s.%s" % (service, host) - - -def notify(context, publisher_id, event_type, priority, payload): - """Sends a notification using the specified driver - - :param publisher_id: the source worker_type.host of the message - :param event_type: the literal type of event (ex. Instance Creation) - :param priority: patterned after the enumeration of Python logging - levels in the set (DEBUG, WARN, INFO, ERROR, CRITICAL) - :param payload: A python dictionary of attributes - - Outgoing message format includes the above parameters, and appends the - following: - - message_id - a UUID representing the id for this notification - - timestamp - the GMT timestamp the notification was sent at - - The composite message will be constructed as a dictionary of the above - attributes, which will then be sent via the transport mechanism defined - by the driver. - - Message example:: - - {'message_id': str(uuid.uuid4()), - 'publisher_id': 'compute.host1', - 'timestamp': timeutils.utcnow(), - 'priority': 'WARN', - 'event_type': 'compute.create_instance', - 'payload': {'instance_id': 12, ... }} - - """ - if priority not in log_levels: - raise BadPriorityException( - _('%s not in valid priorities') % priority) - - # Ensure everything is JSON serializable. - payload = jsonutils.to_primitive(payload, convert_instances=True) - - msg = dict(message_id=str(uuid.uuid4()), - publisher_id=publisher_id, - event_type=event_type, - priority=priority, - payload=payload, - timestamp=str(timeutils.utcnow())) - - for driver in _get_drivers(): - try: - driver.notify(context, msg) - except Exception as e: - LOG.exception(_("Problem '%(e)s' attempting to " - "send to notification system. " - "Payload=%(payload)s") - % dict(e=e, payload=payload)) - - -_drivers = None - - -def _get_drivers(): - """Instantiate, cache, and return drivers based on the CONF.""" - global _drivers - if _drivers is None: - _drivers = {} - for notification_driver in CONF.notification_driver: - add_driver(notification_driver) - - return _drivers.values() - - -def add_driver(notification_driver): - """Add a notification driver at runtime.""" - # Make sure the driver list is initialized. - _get_drivers() - if isinstance(notification_driver, basestring): - # Load and add - try: - driver = importutils.import_module(notification_driver) - _drivers[notification_driver] = driver - except ImportError: - LOG.exception(_("Failed to load notifier %s. " - "These notifications will not be sent.") % - notification_driver) - else: - # Driver is already loaded; just add the object. - _drivers[notification_driver] = notification_driver - - -def _reset_drivers(): - """Used by unit tests to reset the drivers.""" - global _drivers - _drivers = None diff --git a/glance/openstack/common/notifier/log_notifier.py b/glance/openstack/common/notifier/log_notifier.py deleted file mode 100644 index dc1d09ceed..0000000000 --- a/glance/openstack/common/notifier/log_notifier.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo.config import cfg - -from glance.openstack.common import jsonutils -from glance.openstack.common import log as logging - - -CONF = cfg.CONF - - -def notify(_context, message): - """Notifies the recipient of the desired event given the model. - Log notifications using openstack's default logging system""" - - priority = message.get('priority', - CONF.default_notification_level) - priority = priority.lower() - logger = logging.getLogger( - 'glance.openstack.common.notification.%s' % - message['event_type']) - getattr(logger, priority)(jsonutils.dumps(message)) diff --git a/glance/openstack/common/notifier/no_op_notifier.py b/glance/openstack/common/notifier/no_op_notifier.py deleted file mode 100644 index bc7a56ca7a..0000000000 --- a/glance/openstack/common/notifier/no_op_notifier.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -def notify(_context, message): - """Notifies the recipient of the desired event given the model""" - pass diff --git a/glance/openstack/common/notifier/rabbit_notifier.py b/glance/openstack/common/notifier/rabbit_notifier.py deleted file mode 100644 index 83b6cae149..0000000000 --- a/glance/openstack/common/notifier/rabbit_notifier.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2012 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from glance.openstack.common.gettextutils import _ -from glance.openstack.common import log as logging -from glance.openstack.common.notifier import rpc_notifier - -LOG = logging.getLogger(__name__) - - -def notify(context, message): - """Deprecated in Grizzly. Please use rpc_notifier instead.""" - - LOG.deprecated(_("The rabbit_notifier is now deprecated." - " Please use rpc_notifier instead.")) - rpc_notifier.notify(context, message) diff --git a/glance/openstack/common/notifier/rpc_notifier.py b/glance/openstack/common/notifier/rpc_notifier.py deleted file mode 100644 index 201d62ae2b..0000000000 --- a/glance/openstack/common/notifier/rpc_notifier.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo.config import cfg - -from glance.openstack.common import context as req_context -from glance.openstack.common.gettextutils import _ -from glance.openstack.common import log as logging -from glance.openstack.common import rpc - -LOG = logging.getLogger(__name__) - -notification_topic_opt = cfg.ListOpt( - 'notification_topics', default=['notifications', ], - help='AMQP topic used for openstack notifications') - -CONF = cfg.CONF -CONF.register_opt(notification_topic_opt) - - -def notify(context, message): - """Sends a notification via RPC""" - if not context: - context = req_context.get_admin_context() - priority = message.get('priority', - CONF.default_notification_level) - priority = priority.lower() - for topic in CONF.notification_topics: - topic = '%s.%s' % (topic, priority) - try: - rpc.notify(context, topic, message) - except Exception: - LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), locals()) diff --git a/glance/openstack/common/notifier/rpc_notifier2.py b/glance/openstack/common/notifier/rpc_notifier2.py deleted file mode 100644 index 50475978af..0000000000 --- a/glance/openstack/common/notifier/rpc_notifier2.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -'''messaging based notification driver, with message envelopes''' - -from oslo.config import cfg - -from glance.openstack.common import context as req_context -from glance.openstack.common.gettextutils import _ -from glance.openstack.common import log as logging -from glance.openstack.common import rpc - -LOG = logging.getLogger(__name__) - -notification_topic_opt = cfg.ListOpt( - 'topics', default=['notifications', ], - help='AMQP topic(s) used for openstack notifications') - -opt_group = cfg.OptGroup(name='rpc_notifier2', - title='Options for rpc_notifier2') - -CONF = cfg.CONF -CONF.register_group(opt_group) -CONF.register_opt(notification_topic_opt, opt_group) - - -def notify(context, message): - """Sends a notification via RPC""" - if not context: - context = req_context.get_admin_context() - priority = message.get('priority', - CONF.default_notification_level) - priority = priority.lower() - for topic in CONF.rpc_notifier2.topics: - topic = '%s.%s' % (topic, priority) - try: - rpc.notify(context, topic, message, envelope=True) - except Exception: - LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), locals()) diff --git a/glance/openstack/common/notifier/test_notifier.py b/glance/openstack/common/notifier/test_notifier.py deleted file mode 100644 index 96c1746bf4..0000000000 --- a/glance/openstack/common/notifier/test_notifier.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -NOTIFICATIONS = [] - - -def notify(_context, message): - """Test notifier, stores notifications in memory for unittests.""" - NOTIFICATIONS.append(message) diff --git a/glance/tests/unit/test_notifier.py b/glance/tests/unit/test_notifier.py index 2a02a4a40b..a1b6b5a015 100644 --- a/glance/tests/unit/test_notifier.py +++ b/glance/tests/unit/test_notifier.py @@ -18,21 +18,12 @@ import datetime -import kombu.entity -import mock -import mox -import qpid -import qpid.messaging -import stubout -import time import webob from glance.common import exception import glance.context from glance import notifier -from glance.notifier import notify_kombu -from glance.openstack.common import importutils, timeutils -import glance.openstack.common.log as logging +from glance.openstack.common import timeutils import glance.tests.unit.utils as unit_test_utils from glance.tests import utils @@ -102,10 +93,9 @@ class TaskRepoStub(object): class TestNotifier(utils.BaseTestCase): - def test_invalid_strategy(self): - self.config(notifier_strategy="invalid_notifier") - self.assertRaises(exception.InvalidNotifierStrategy, - notifier.Notifier) + def test_load_rabbit(self): + nfier = notifier.Notifier('rabbit') + self.assertIsNotNone(nfier._transport) def test_custom_strategy(self): st = "glance.notifier.notify_noop.NoopStrategy" @@ -114,390 +104,6 @@ class TestNotifier(utils.BaseTestCase): notifier.Notifier() -class TestLoggingNotifier(utils.BaseTestCase): - """Test the logging notifier is selected and works properly.""" - - def setUp(self): - super(TestLoggingNotifier, self).setUp() - self.config(notifier_strategy="logging") - self.called = False - self.logger = logging.getLogger("glance.notifier.notify_log") - self.notifier = notifier.Notifier() - - def _called(self, msg): - self.called = msg - - def test_warn(self): - self.logger.warn = self._called - self.notifier.warn("test_event", "test_message") - if self.called is False: - self.fail("Did not call logging library correctly.") - - def test_info(self): - self.logger.info = self._called - self.notifier.info("test_event", "test_message") - if self.called is False: - self.fail("Did not call logging library correctly.") - - def test_erorr(self): - self.logger.error = self._called - self.notifier.error("test_event", "test_message") - if self.called is False: - self.fail("Did not call logging library correctly.") - - -class TestNoopNotifier(utils.BaseTestCase): - """Test that the noop notifier works...and does nothing?""" - - def setUp(self): - super(TestNoopNotifier, self).setUp() - self.config(notifier_strategy="noop") - self.notifier = notifier.Notifier() - - def test_warn(self): - self.notifier.warn("test_event", "test_message") - - def test_info(self): - self.notifier.info("test_event", "test_message") - - def test_error(self): - self.notifier.error("test_event", "test_message") - - -class TestRabbitNotifier(utils.BaseTestCase): - """Test AMQP/Rabbit notifier works.""" - - def setUp(self): - super(TestRabbitNotifier, self).setUp() - - def _fake_connect(rabbit_self): - rabbit_self.connection_errors = () - rabbit_self.connection = 'fake_connection' - return None - - self.notify_kombu = importutils.import_module("glance.notifier." - "notify_kombu") - self.notify_kombu.RabbitStrategy._send_message = self._send_message - self.notify_kombu.RabbitStrategy._connect = _fake_connect - self.called = False - self.config(notifier_strategy="rabbit", - rabbit_retry_backoff=0, - rabbit_notification_topic="fake_topic") - self.notifier = notifier.Notifier() - - def _send_message(self, message, routing_key): - self.called = { - "message": message, - "routing_key": routing_key, - } - - def test_warn(self): - self.notifier.warn("test_event", "test_message") - - if self.called is False: - self.fail("Did not call _send_message properly.") - - self.assertEqual("test_message", self.called["message"]["payload"]) - self.assertEqual("WARN", self.called["message"]["priority"]) - self.assertEqual("fake_topic.warn", self.called["routing_key"]) - - def test_info(self): - self.notifier.info("test_event", "test_message") - - if self.called is False: - self.fail("Did not call _send_message properly.") - - self.assertEqual("test_message", self.called["message"]["payload"]) - self.assertEqual("INFO", self.called["message"]["priority"]) - self.assertEqual("fake_topic.info", self.called["routing_key"]) - - def test_error(self): - self.notifier.error("test_event", "test_message") - - if self.called is False: - self.fail("Did not call _send_message properly.") - - self.assertEqual("test_message", self.called["message"]["payload"]) - self.assertEqual("ERROR", self.called["message"]["priority"]) - self.assertEqual("fake_topic.error", self.called["routing_key"]) - - def test_unknown_error_on_connect_raises(self): - class MyException(Exception): - pass - - def _connect(self): - self.connection_errors = () - raise MyException('meow') - - self.notify_kombu.RabbitStrategy._connect = _connect - self.assertRaises(MyException, notifier.Notifier) - - def test_timeout_on_connect_reconnects(self): - info = {'num_called': 0} - - def _connect(rabbit_self): - rabbit_self.connection_errors = () - info['num_called'] += 1 - if info['num_called'] == 1: - raise Exception('foo timeout foo') - rabbit_self.connection = 'fake_connection' - - self.notify_kombu.RabbitStrategy._connect = _connect - notifier_ = notifier.Notifier() - notifier_.error('test_event', 'test_message') - - if self.called is False: - self.fail("Did not call _send_message properly.") - - self.assertEqual("test_message", self.called["message"]["payload"]) - self.assertEqual("ERROR", self.called["message"]["priority"]) - self.assertEqual(info['num_called'], 2) - - def test_connection_error_on_connect_reconnects(self): - info = {'num_called': 0} - - class MyException(Exception): - pass - - def _connect(rabbit_self): - rabbit_self.connection_errors = (MyException, ) - info['num_called'] += 1 - if info['num_called'] == 1: - raise MyException('meow') - rabbit_self.connection = 'fake_connection' - - self.notify_kombu.RabbitStrategy._connect = _connect - notifier_ = notifier.Notifier() - notifier_.error('test_event', 'test_message') - - if self.called is False: - self.fail("Did not call _send_message properly.") - - self.assertEqual("test_message", self.called["message"]["payload"]) - self.assertEqual("ERROR", self.called["message"]["priority"]) - self.assertEqual(info['num_called'], 2) - - def test_unknown_error_on_send_message_raises(self): - class MyException(Exception): - pass - - def _send_message(rabbit_self, msg, routing_key): - raise MyException('meow') - - self.notify_kombu.RabbitStrategy._send_message = _send_message - notifier_ = notifier.Notifier() - self.assertRaises(MyException, notifier_.error, 'a', 'b') - - def test_timeout_on_send_message_reconnects(self): - info = {'send_called': 0, 'conn_called': 0} - - def _connect(rabbit_self): - info['conn_called'] += 1 - rabbit_self.connection_errors = () - rabbit_self.connection = 'fake_connection' - - def _send_message(rabbit_self, msg, routing_key): - info['send_called'] += 1 - if info['send_called'] == 1: - raise Exception('foo timeout foo') - self._send_message(msg, routing_key) - - self.notify_kombu.RabbitStrategy._connect = _connect - self.notify_kombu.RabbitStrategy._send_message = _send_message - notifier_ = notifier.Notifier() - notifier_.error('test_event', 'test_message') - - if self.called is False: - self.fail("Did not call _send_message properly.") - - self.assertEqual("test_message", self.called["message"]["payload"]) - self.assertEqual("ERROR", self.called["message"]["priority"]) - self.assertEqual(info['send_called'], 2) - self.assertEqual(info['conn_called'], 2) - - def test_connection_error_on_send_message_reconnects(self): - info = {'send_called': 0, 'conn_called': 0} - - class MyException(Exception): - pass - - def _connect(rabbit_self): - info['conn_called'] += 1 - rabbit_self.connection_errors = (MyException, ) - rabbit_self.connection = 'fake_connection' - - def _send_message(rabbit_self, msg, routing_key): - info['send_called'] += 1 - if info['send_called'] == 1: - raise MyException('meow') - self._send_message(msg, routing_key) - - self.notify_kombu.RabbitStrategy._connect = _connect - self.notify_kombu.RabbitStrategy._send_message = _send_message - notifier_ = notifier.Notifier() - notifier_.error('test_event', 'test_message') - - if self.called is False: - self.fail("Did not call _send_message properly.") - - self.assertEqual("test_message", self.called["message"]["payload"]) - self.assertEqual("ERROR", self.called["message"]["priority"]) - self.assertEqual(info['send_called'], 2) - self.assertEqual(info['conn_called'], 2) - - -class TestQpidNotifier(utils.BaseTestCase): - """Test Qpid notifier.""" - - def setUp(self): - super(TestQpidNotifier, self).setUp() - - self.mocker = mox.Mox() - - self.mock_connection = None - self.mock_session = None - self.mock_sender = None - self.mock_receiver = None - - self.orig_connection = qpid.messaging.Connection - self.orig_session = qpid.messaging.Session - self.orig_sender = qpid.messaging.Sender - self.orig_receiver = qpid.messaging.Receiver - qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection - qpid.messaging.Session = lambda *_x, **_y: self.mock_session - qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender - qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver - - self.notify_qpid = importutils.import_module("glance.notifier." - "notify_qpid") - self.addCleanup(self.reset_qpid) - self.addCleanup(self.mocker.ResetAll) - - def reset_qpid(self): - - qpid.messaging.Connection = self.orig_connection - qpid.messaging.Session = self.orig_session - qpid.messaging.Sender = self.orig_sender - qpid.messaging.Receiver = self.orig_receiver - - def _test_notify(self, priority, exception=False, exception_send=False): - test_msg = {'a': 'b'} - - self.mock_connection = self.mocker.CreateMock(self.orig_connection) - self.mock_session = self.mocker.CreateMock(self.orig_session) - self.mock_sender = self.mocker.CreateMock(self.orig_sender) - - self.mock_connection.username = "" - if exception: - self.mock_connection.open().AndRaise( - Exception('Test open Exception')) - else: - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - expected_address = ('glance/notifications.%s ; ' - '{"node": {"x-declare": {"auto-delete": true, ' - '"durable": false}, "type": "topic"}, ' - '"create": "always"}' % priority) - self.mock_session.sender(expected_address).AndReturn( - self.mock_sender) - if exception_send: - self.mock_sender.send(mox.IgnoreArg()).AndRaise( - Exception('Test send Exception')) - # NOTE(afazekas): the opened and close call is expected - # in this case, but not expected if the open fails - else: - self.mock_sender.send(mox.IgnoreArg()) - self.mock_connection.opened().AndReturn(True) - self.mock_connection.close() - - self.mocker.ReplayAll() - - self.config(notifier_strategy="qpid") - notifier = self.notify_qpid.QpidStrategy() - if priority == 'info': - if exception or exception_send: - self.assertRaises(Exception, notifier.info, test_msg) - else: - notifier.info(test_msg) - elif priority == 'warn': - if exception or exception_send: - self.assertRaises(Exception, notifier.warn, test_msg) - else: - notifier.warn(test_msg) - elif priority == 'error': - if exception or exception_send: - self.assertRaises(Exception, notifier.error, test_msg) - else: - notifier.error(test_msg) - - self.mocker.VerifyAll() - - def test_info(self): - self._test_notify('info') - - def test_warn(self): - self._test_notify('warn') - - def test_error(self): - self._test_notify('error') - - def test_exception_open_successful(self): - self._test_notify('info', exception=True) - - def test_info_fail(self): - self._test_notify('info', exception_send=True) - - def test_warn_fail(self): - self._test_notify('warn', exception_send=True) - - def test_error_fail(self): - self._test_notify('error', exception_send=True) - - -class TestRabbitContentType(utils.BaseTestCase): - """Test AMQP/Rabbit notifier works.""" - - def setUp(self): - super(TestRabbitContentType, self).setUp() - self.stubs = stubout.StubOutForTesting() - - def _fake_connect(rabbit_self): - rabbit_self.connection_errors = () - rabbit_self.connection = 'fake_connection' - rabbit_self.exchange = self._fake_exchange() - return None - - def dummy(*args, **kwargs): - pass - - self.stubs.Set(kombu.entity.Exchange, 'publish', dummy) - self.stubs.Set(notify_kombu.RabbitStrategy, '_connect', - _fake_connect) - self.called = False - self.config(notifier_strategy="rabbit", - rabbit_retry_backoff=0, - rabbit_notification_topic="fake_topic") - self.notifier = notifier.Notifier() - - def _fake_exchange(self): - class Dummy(object): - class Message(object): - def __init__(message_self, message, content_type): - self.called = { - 'message': message, - 'content_type': content_type - } - - @classmethod - def publish(*args, **kwargs): - pass - return Dummy - - def test_content_type_passed(self): - self.notifier.warn("test_event", "test_message") - self.assertEqual(self.called['content_type'], 'application/json') - - class TestImageNotifications(utils.BaseTestCase): """Test Image Notifications work""" @@ -764,121 +370,6 @@ class TestImageNotifications(utils.BaseTestCase): self.assertTrue('Failed' in output_log['payload']) -class RabbitStrategyTestCase(utils.BaseTestCase): - def setUp(self): - super(RabbitStrategyTestCase, self).setUp() - self.rabbit_strategy = notify_kombu.RabbitStrategy() - self.rabbit_strategy.retry_attempts = 0 - self.rabbit_strategy.max_retries = 2 - - def test_close(self): - self.rabbit_strategy.connection = kombu.connection.BrokerConnection() - self.rabbit_strategy.connection.close = mock.Mock() - self.rabbit_strategy._close() - self.assertEqual(self.rabbit_strategy.connection, None) - - def test_connect(self): - self.rabbit_strategy._close = mock.Mock() - connection = kombu.connection.BrokerConnection( - hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='/', - ssl=False) - kombu.connection.BrokerConnection = mock.Mock() - kombu.connection.BrokerConnection.return_value = connection - connection.connect = mock.Mock() - connection.channel = mock.Mock() - connection.channel.return_value = 'fake_channel' - kombu.entity.Exchange = mock.Mock() - kombu.entity.Exchange.return_value = 'fake_exchange' - fake_queue = mock.Mock() - fake_queue.declare = mock.Mock() - kombu.entity.Queue = mock.Mock() - kombu.entity.Queue.return_value = fake_queue - - self.rabbit_strategy._connect() - kombu.connection.BrokerConnection.assert_called_with( - hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='/', - ssl=False) - kombu.entity.Exchange.assert_called_with( - channel='fake_channel', - type='topic', - durable=False, - name='glance') - for routing_key in ['notifications.warn', 'notifications.info', - 'notifications.error']: - kombu.entity.Queue.assert_any_called( - channel='fake_channel', - exchange='fake_exchange', - durable=False, - name=routing_key, - routing_key=routing_key) - - def test_reconnect_sleep_time(self): - self.rabbit_strategy._connect = mock.Mock( - side_effect=Exception('timeout')) - time.sleep = mock.Mock() - try: - self.rabbit_strategy.reconnect() - except notify_kombu.KombuMaxRetriesReached: - pass - finally: - time.sleep.assert_called_once_with(2) - - def test_reconnect_sleep_time_2(self): - self.rabbit_strategy.retry_backoff = 40 - self.rabbit_strategy._connect = mock.Mock( - side_effect=Exception('timeout')) - time.sleep = mock.Mock() - try: - self.rabbit_strategy.reconnect() - except notify_kombu.KombuMaxRetriesReached: - pass - finally: - time.sleep.assert_called_once_with(30) - - def test_reconnect_sleep_time_no_retry_max_backoff(self): - self.rabbit_strategy.retry_max_backoff = None - self.rabbit_strategy.retry_backoff = 100 - self.rabbit_strategy._connect = mock.Mock( - side_effect=Exception('timeout')) - time.sleep = mock.Mock() - try: - self.rabbit_strategy.reconnect() - except notify_kombu.KombuMaxRetriesReached: - pass - finally: - time.sleep.assert_called_once_with(100) - - def test_notify_process_komby_max_retries_reached_error(self): - self.rabbit_strategy.connection = None - self.rabbit_strategy.reconnect = mock.Mock( - side_effect=notify_kombu.KombuMaxRetriesReached()) - self.rabbit_strategy.log_failure = mock.Mock() - - self.rabbit_strategy._notify('fake_msg', "WARN") - self.rabbit_strategy.log_failure.assert_called_with('fake_msg', "WARN") - - def test_notify_check_if_log_failure(self): - self.rabbit_strategy.connection = 'fake_connection' - self.rabbit_strategy._send_message = mock.Mock( - side_effect=Exception('timeout')) - self.rabbit_strategy.reconnect = mock.Mock( - side_effect=notify_kombu.KombuMaxRetriesReached()) - self.rabbit_strategy.log_failure = mock.Mock() - - self.rabbit_strategy._notify('fake_msg', "WARN") - self.rabbit_strategy._send_message. \ - assert_called_with('fake_msg', 'notifications.warn') - self.rabbit_strategy.log_failure.assert_called_with('fake_msg', "WARN") - - class TestTaskNotifications(utils.BaseTestCase): """Test Task Notifications work""" diff --git a/glance/tests/unit/utils.py b/glance/tests/unit/utils.py index 0c7b311d64..af4b2a70dc 100644 --- a/glance/tests/unit/utils.py +++ b/glance/tests/unit/utils.py @@ -182,26 +182,27 @@ class FakeNotifier(object): def __init__(self, *_args, **kwargs): self.log = [] - def warn(self, event_type, payload): + def _notify(self, event_type, payload, level): log = {} - log['notification_type'] = "WARN" + log['notification_type'] = level log['event_type'] = event_type log['payload'] = payload self.log.append(log) + def warn(self, event_type, payload): + self._notify(event_type, payload, 'WARN') + def info(self, event_type, payload): - log = {} - log['notification_type'] = "INFO" - log['event_type'] = event_type - log['payload'] = payload - self.log.append(log) + self._notify(event_type, payload, 'INFO') def error(self, event_type, payload): - log = {} - log['notification_type'] = "ERROR" - log['event_type'] = event_type - log['payload'] = payload - self.log.append(log) + self._notify(event_type, payload, 'ERROR') + + def debug(self, event_type, payload): + self._notify(event_type, payload, 'DEBUG') + + def critical(self, event_type, payload): + self._notify(event_type, payload, 'CRITICAL') def get_logs(self): return self.log diff --git a/openstack-common.conf b/openstack-common.conf index 850e50b9ae..9dedd372e5 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -12,7 +12,6 @@ module=jsonutils module=local module=lockutils module=log -module=notifier module=policy module=strutils module=test diff --git a/requirements.txt b/requirements.txt index 7c6572deca..9eb6a263f8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,3 +37,6 @@ pyOpenSSL # Required by openstack.common libraries six>=1.4.1 + +-f http://tarballs.openstack.org/oslo.messaging/oslo.messaging-1.2.0a11.tar.gz#egg=oslo.messaging-1.2.0a11 +oslo.messaging>=1.2.0a11