Migrate to oslo.messaging
Glance currently uses a custom notifier and it has been maintaining it for a long time. In a hope of reducing duplicated code and improving cross-project contributions, this patch replaces the old notifier with the one, recently developed, in oslo.messaging. The oslo.messaging project is a port of the old oslo-rpc code to a standalone, more stable and improved project. It brings all the benefits that oslo-rpc would've brought as well as an easier way to integrate with other projects. This patch also: - Reduces the code shipped along with Glance since all the code copied from oslo-incubator related to the notifier is not needed anymore. - Improves the stability of existing, broker based, notifications. - Brings HA support. - Keeps backward compatibility by translating the old `notifier_strategy` into oslo.messaging drivers. Changes to the code: - It is now necessary to pass the request context to the notification call. - Notifier package is no longer necessary. A notifier module was added instead. - New, notifier related, configurations were added. - A lot of code was removed Since there's still not an official release, requirements.txt points to the latest tarball created. A release for oslo.messaging is planned for Icehouse. docImpact Implements bp oslo-messaging Change-Id: I8cd84772bc5867e06b2a50ed7e15b9e86f0b94ad
This commit is contained in:
parent
2f97e120ec
commit
90d6ef8130
@ -210,7 +210,14 @@ registry_client_protocol = http
|
||||
# There are three methods of sending notifications, logging (via the
|
||||
# log_file directive), rabbit (via a rabbitmq queue), qpid (via a Qpid
|
||||
# message queue), or noop (no notifications sent, the default)
|
||||
notifier_strategy = noop
|
||||
# NOTE: THIS CONFIGURATION OPTION HAS BEEN DEPRECATED IN FAVOR OF `notification_driver`
|
||||
# notifier_strategy = default
|
||||
|
||||
# Driver or drivers to handle sending notifications
|
||||
# notification_driver = noop
|
||||
|
||||
# Default publisher_id for outgoing notifications.
|
||||
# default_publisher_id = image.localhost
|
||||
|
||||
# Configuration options if sending notifications via rabbitmq (these are
|
||||
# the defaults)
|
||||
|
@ -241,10 +241,6 @@ class StoreAddDisabled(GlanceException):
|
||||
"store is disabled.")
|
||||
|
||||
|
||||
class InvalidNotifierStrategy(GlanceException):
|
||||
message = _("'%(strategy)s' is not an available notifier strategy.")
|
||||
|
||||
|
||||
class MaxRedirectsExceeded(GlanceException):
|
||||
message = _("Maximum redirects (%(redirects)s) was exceeded.")
|
||||
|
||||
|
@ -17,16 +17,14 @@
|
||||
# under the License.
|
||||
|
||||
|
||||
import socket
|
||||
import uuid
|
||||
import warnings
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
import webob
|
||||
|
||||
from glance.common import exception
|
||||
import glance.domain
|
||||
import glance.domain.proxy
|
||||
from glance.openstack.common import importutils
|
||||
import glance.openstack.common.log as logging
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
@ -37,7 +35,10 @@ notifier_opts = [
|
||||
'notifications, logging (via the log_file directive), '
|
||||
'rabbit (via a rabbitmq queue), qpid (via a Qpid '
|
||||
'message queue), or noop (no notifications sent, the '
|
||||
'default).'))
|
||||
'default). (DEPRECATED)')),
|
||||
|
||||
cfg.StrOpt('default_publisher_id', default="image.localhost",
|
||||
help='Default publisher_id for outgoing notifications'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -46,11 +47,11 @@ CONF.register_opts(notifier_opts)
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_STRATEGY_ALIASES = {
|
||||
"logging": "glance.notifier.notify_log.LoggingStrategy",
|
||||
"rabbit": "glance.notifier.notify_kombu.RabbitStrategy",
|
||||
"qpid": "glance.notifier.notify_qpid.QpidStrategy",
|
||||
"noop": "glance.notifier.notify_noop.NoopStrategy",
|
||||
"default": "glance.notifier.notify_noop.NoopStrategy",
|
||||
"logging": "log",
|
||||
"rabbit": "messaging",
|
||||
"qpid": "messaging",
|
||||
"noop": "noop",
|
||||
"default": "noop",
|
||||
}
|
||||
|
||||
|
||||
@ -58,44 +59,42 @@ class Notifier(object):
|
||||
"""Uses a notification strategy to send out messages about events."""
|
||||
|
||||
def __init__(self, strategy=None):
|
||||
_strategy = CONF.notifier_strategy
|
||||
try:
|
||||
strategy = _STRATEGY_ALIASES[_strategy]
|
||||
msg = _('Converted strategy alias %s to %s')
|
||||
LOG.debug(msg % (_strategy, strategy))
|
||||
except KeyError:
|
||||
strategy = _strategy
|
||||
LOG.debug(_('No strategy alias found for %s') % strategy)
|
||||
|
||||
try:
|
||||
strategy_class = importutils.import_class(strategy)
|
||||
except ImportError:
|
||||
raise exception.InvalidNotifierStrategy(strategy=strategy)
|
||||
else:
|
||||
self.strategy = strategy_class()
|
||||
if CONF.notifier_strategy != 'default':
|
||||
msg = _("notifier_strategy was deprecated in "
|
||||
"favor of `notification_driver`")
|
||||
warnings.warn(msg, DeprecationWarning)
|
||||
|
||||
@staticmethod
|
||||
def generate_message(event_type, priority, payload):
|
||||
return {
|
||||
"message_id": str(uuid.uuid4()),
|
||||
"publisher_id": socket.gethostname(),
|
||||
"event_type": event_type,
|
||||
"priority": priority,
|
||||
"payload": payload,
|
||||
"timestamp": str(timeutils.utcnow()),
|
||||
}
|
||||
# NOTE(flaper87): Use this to keep backwards
|
||||
# compatibility. We'll try to get an oslo.messaging
|
||||
# driver from the specified strategy.
|
||||
_strategy = strategy or CONF.notifier_strategy
|
||||
_driver = _STRATEGY_ALIASES.get(_strategy)
|
||||
|
||||
# NOTE(flaper87): The next 3 lines help
|
||||
# with the migration to oslo.messaging.
|
||||
# Without them, gate tests won't know
|
||||
# what driver should be loaded.
|
||||
# Once this patch lands, devstack will be
|
||||
# updated and then these lines will be removed.
|
||||
url = None
|
||||
if _strategy in ['rabbit', 'qpid']:
|
||||
url = _strategy + '://'
|
||||
|
||||
publisher_id = CONF.default_publisher_id
|
||||
self._transport = messaging.get_transport(CONF, url)
|
||||
self._notifier = messaging.Notifier(self._transport,
|
||||
driver=_driver,
|
||||
publisher_id=publisher_id)
|
||||
|
||||
def warn(self, event_type, payload):
|
||||
msg = self.generate_message(event_type, "WARN", payload)
|
||||
self.strategy.warn(msg)
|
||||
self._notifier.warn({}, event_type, payload)
|
||||
|
||||
def info(self, event_type, payload):
|
||||
msg = self.generate_message(event_type, "INFO", payload)
|
||||
self.strategy.info(msg)
|
||||
self._notifier.info({}, event_type, payload)
|
||||
|
||||
def error(self, event_type, payload):
|
||||
msg = self.generate_message(event_type, "ERROR", payload)
|
||||
self.strategy.error(msg)
|
||||
self._notifier.error({}, event_type, payload)
|
||||
|
||||
|
||||
def format_image_notification(image):
|
||||
@ -156,11 +155,13 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
|
||||
|
||||
def save(self, image):
|
||||
super(ImageRepoProxy, self).save(image)
|
||||
self.notifier.info('image.update', format_image_notification(image))
|
||||
self.notifier.info('image.update',
|
||||
format_image_notification(image))
|
||||
|
||||
def add(self, image):
|
||||
super(ImageRepoProxy, self).add(image)
|
||||
self.notifier.info('image.create', format_image_notification(image))
|
||||
self.notifier.info('image.create',
|
||||
format_image_notification(image))
|
||||
|
||||
def remove(self, image):
|
||||
super(ImageRepoProxy, self).remove(image)
|
||||
@ -207,7 +208,8 @@ class ImageProxy(glance.domain.proxy.Image):
|
||||
notify = self.notifier.info
|
||||
|
||||
try:
|
||||
notify('image.send', self._format_image_send(sent))
|
||||
notify('image.send',
|
||||
self._format_image_send(sent))
|
||||
except Exception as err:
|
||||
msg = (_("An error occurred during image.send"
|
||||
" notification: %(err)s") % {'err': err})
|
||||
@ -278,7 +280,8 @@ class TaskRepoProxy(glance.domain.proxy.Repo):
|
||||
item_proxy_kwargs=proxy_kwargs)
|
||||
|
||||
def add(self, task):
|
||||
self.notifier.info('task.create', format_task_notification(task))
|
||||
self.notifier.info('task.create',
|
||||
format_task_notification(task))
|
||||
return super(TaskRepoProxy, self).add(task)
|
||||
|
||||
def remove(self, task):
|
||||
@ -306,7 +309,8 @@ class TaskProxy(glance.domain.proxy.Task):
|
||||
super(TaskProxy, self).__init__(task)
|
||||
|
||||
def run(self, executor):
|
||||
self.notifier.info('task.run', format_task_notification(self.task))
|
||||
self.notifier.info('task.run',
|
||||
format_task_notification(self.task))
|
||||
return super(TaskProxy, self).run(executor)
|
||||
|
||||
def begin_processing(self):
|
@ -1,246 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011, OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
import kombu.connection
|
||||
import kombu.entity
|
||||
from oslo.config import cfg
|
||||
|
||||
from glance.notifier import strategy
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
rabbit_opts = [
|
||||
cfg.StrOpt('rabbit_host', default='localhost',
|
||||
help=_('The host name of the rabbitmq server')),
|
||||
cfg.IntOpt('rabbit_port', default=5672,
|
||||
help=_('The port on which the rabbitmq server is listening')),
|
||||
cfg.BoolOpt('rabbit_use_ssl', default=False,
|
||||
help=_('A boolean value indicating if the selected rabbitmq '
|
||||
'server uses SSL.')),
|
||||
cfg.StrOpt('rabbit_userid', default='guest',
|
||||
help=_('The user ID for authentication with rabbitmq.')),
|
||||
cfg.StrOpt('rabbit_password', default='guest', secret=True,
|
||||
help=_('The password that will be used for authentication '
|
||||
'with the rabbitmq server.')),
|
||||
cfg.StrOpt('rabbit_virtual_host', default='/',
|
||||
help=_('The virtual host used in the rabbitmq connection.')),
|
||||
cfg.StrOpt('rabbit_notification_exchange', default='glance',
|
||||
help=_('Exchange name to use for connection when using rabbit'
|
||||
' strategy.')),
|
||||
cfg.StrOpt('rabbit_notification_topic', default='notifications',
|
||||
help=_('Topic to use for connection when using rabbit '
|
||||
'strategy.')),
|
||||
cfg.IntOpt('rabbit_max_retries', default=0,
|
||||
help=_('The maximum number of times to attempt to connect to '
|
||||
'the AMQP server.')),
|
||||
cfg.IntOpt('rabbit_retry_backoff', default=2,
|
||||
help=_('This value multiplied by the number of connection '
|
||||
'attempts gives the amount of time in seconds to sleep '
|
||||
'between connection attempts to the AMQP server.')),
|
||||
cfg.IntOpt('rabbit_retry_max_backoff', default=30,
|
||||
help=_('The maximum amount of time to wait between connection '
|
||||
'attempts. The delay time will be the smaller of this '
|
||||
'value and the value of <rabbit_retry_backoff> * '
|
||||
'<the number of failed connection attempts so far>.')),
|
||||
cfg.BoolOpt('rabbit_durable_queues', default=False,
|
||||
help='A boolean to determine if the queues used for messaging '
|
||||
'should be retained after a restart.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(rabbit_opts)
|
||||
|
||||
|
||||
class KombuMaxRetriesReached(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RabbitStrategy(strategy.Strategy):
|
||||
"""A notifier that puts a message on a queue when called."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the rabbit notification strategy."""
|
||||
self.topic = CONF.rabbit_notification_topic
|
||||
self.max_retries = CONF.rabbit_max_retries
|
||||
# NOTE(comstud): When reading the config file, these values end
|
||||
# up being strings, and we need them as ints.
|
||||
self.retry_backoff = CONF.rabbit_retry_backoff
|
||||
self.retry_max_backoff = CONF.rabbit_retry_max_backoff
|
||||
|
||||
self.connection = None
|
||||
self.retry_attempts = 0
|
||||
try:
|
||||
self.reconnect()
|
||||
except KombuMaxRetriesReached:
|
||||
pass
|
||||
|
||||
def _close(self):
|
||||
"""Close connection to rabbit."""
|
||||
try:
|
||||
self.connection.close()
|
||||
except self.connection_errors:
|
||||
pass
|
||||
self.connection = None
|
||||
|
||||
def _connect(self):
|
||||
"""Connect to rabbit. Exceptions should be handled by the
|
||||
caller.
|
||||
"""
|
||||
log_info = {}
|
||||
log_info['hostname'] = CONF.rabbit_host
|
||||
log_info['port'] = CONF.rabbit_port
|
||||
if self.connection:
|
||||
LOG.info(_("Reconnecting to AMQP server on "
|
||||
"%(hostname)s:%(port)d") % log_info)
|
||||
self._close()
|
||||
else:
|
||||
LOG.info(_("Connecting to AMQP server on "
|
||||
"%(hostname)s:%(port)d") % log_info)
|
||||
self.connection = kombu.connection.BrokerConnection(
|
||||
hostname=CONF.rabbit_host,
|
||||
port=CONF.rabbit_port,
|
||||
userid=CONF.rabbit_userid,
|
||||
password=CONF.rabbit_password,
|
||||
virtual_host=CONF.rabbit_virtual_host,
|
||||
ssl=CONF.rabbit_use_ssl)
|
||||
self.connection_errors = self.connection.connection_errors
|
||||
self.connection.connect()
|
||||
self.channel = self.connection.channel()
|
||||
self.exchange = kombu.entity.Exchange(
|
||||
channel=self.channel,
|
||||
type="topic",
|
||||
durable=CONF.rabbit_durable_queues,
|
||||
name=CONF.rabbit_notification_exchange)
|
||||
|
||||
# NOTE(jerdfelt): Normally the consumer would create the queues,
|
||||
# but we do this to ensure that messages don't get dropped if the
|
||||
# consumer is started after we do
|
||||
for priority in ["WARN", "INFO", "ERROR"]:
|
||||
routing_key = "%s.%s" % (self.topic, priority.lower())
|
||||
queue = kombu.entity.Queue(
|
||||
channel=self.channel,
|
||||
exchange=self.exchange,
|
||||
durable=CONF.rabbit_durable_queues,
|
||||
name=routing_key,
|
||||
routing_key=routing_key)
|
||||
queue.declare()
|
||||
LOG.info(_("Connected to AMQP server on "
|
||||
"%(hostname)s:%(port)d") % log_info)
|
||||
|
||||
def reconnect(self):
|
||||
"""Handles reconnecting and re-establishing queues."""
|
||||
while True:
|
||||
self.retry_attempts += 1
|
||||
try:
|
||||
self._connect()
|
||||
return
|
||||
except self.connection_errors as e:
|
||||
pass
|
||||
except Exception as e:
|
||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||
# to return an error not covered by its transport
|
||||
# connection_errors in the case of a timeout waiting for
|
||||
# a protocol response. (See paste link in LP888621 for
|
||||
# nova.) So, we check all exceptions for 'timeout' in them
|
||||
# and try to reconnect in this case.
|
||||
if 'timeout' not in str(e):
|
||||
raise
|
||||
|
||||
log_info = {}
|
||||
log_info['err_str'] = str(e)
|
||||
log_info['max_retries'] = self.max_retries
|
||||
log_info['hostname'] = CONF.rabbit_host
|
||||
log_info['port'] = CONF.rabbit_port
|
||||
|
||||
if self.retry_attempts >= self.max_retries:
|
||||
LOG.exception(_('Unable to connect to AMQP server on '
|
||||
'%(hostname)s:%(port)d after %(max_retries)d '
|
||||
'tries: %(err_str)s') % log_info)
|
||||
if self.connection:
|
||||
self._close()
|
||||
raise KombuMaxRetriesReached
|
||||
|
||||
sleep_time = self.retry_backoff * self.retry_attempts
|
||||
if self.retry_max_backoff:
|
||||
sleep_time = min(sleep_time, self.retry_max_backoff)
|
||||
|
||||
log_info['sleep_time'] = sleep_time
|
||||
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
|
||||
' unreachable: %(err_str)s. Trying again in '
|
||||
'%(sleep_time)d seconds.') % log_info)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def log_failure(self, msg, priority):
|
||||
"""Fallback to logging when we can't send to rabbit."""
|
||||
message = _('Notification with priority %(priority)s failed: '
|
||||
'msg=%(msg)s')
|
||||
LOG.error(message % {'msg': msg, 'priority': priority})
|
||||
|
||||
def _send_message(self, msg, routing_key):
|
||||
"""Send a message. Caller needs to catch exceptions for retry."""
|
||||
msg = self.exchange.Message(json.dumps(msg),
|
||||
content_type='application/json')
|
||||
self.exchange.publish(msg, routing_key=routing_key)
|
||||
|
||||
def _notify(self, msg, priority):
|
||||
"""Send a notification and retry if needed."""
|
||||
self.retry_attempts = 0
|
||||
|
||||
if not self.connection:
|
||||
try:
|
||||
self.reconnect()
|
||||
except KombuMaxRetriesReached:
|
||||
self.log_failure(msg, priority)
|
||||
return
|
||||
|
||||
routing_key = "%s.%s" % (self.topic, priority.lower())
|
||||
|
||||
while True:
|
||||
try:
|
||||
self._send_message(msg, routing_key)
|
||||
return
|
||||
except self.connection_errors as e:
|
||||
pass
|
||||
except Exception as e:
|
||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||
# to return an error not covered by its transport
|
||||
# connection_errors in the case of a timeout waiting for
|
||||
# a protocol response. (See paste link in LP888621 for
|
||||
# nova.) So, we check all exceptions for 'timeout' in them
|
||||
# and try to reconnect in this case.
|
||||
if 'timeout' not in str(e):
|
||||
raise
|
||||
|
||||
LOG.exception(_("Unable to send notification: %s") % str(e))
|
||||
|
||||
try:
|
||||
self.reconnect()
|
||||
except KombuMaxRetriesReached:
|
||||
break
|
||||
self.log_failure(msg, priority)
|
||||
|
||||
def warn(self, msg):
|
||||
self._notify(msg, "WARN")
|
||||
|
||||
def info(self, msg):
|
||||
self._notify(msg, "INFO")
|
||||
|
||||
def error(self, msg):
|
||||
self._notify(msg, "ERROR")
|
@ -1,34 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011, OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from glance.notifier import strategy
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
|
||||
class LoggingStrategy(strategy.Strategy):
|
||||
"""A notifier that calls logging when called."""
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def warn(self, msg):
|
||||
self.logger.warn(msg)
|
||||
|
||||
def info(self, msg):
|
||||
self.logger.info(msg)
|
||||
|
||||
def error(self, msg):
|
||||
self.logger.error(msg)
|
@ -1,31 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011, OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from glance.notifier import strategy
|
||||
|
||||
|
||||
class NoopStrategy(strategy.Strategy):
|
||||
"""A notifier that does nothing when called."""
|
||||
|
||||
def warn(self, msg):
|
||||
pass
|
||||
|
||||
def info(self, msg):
|
||||
pass
|
||||
|
||||
def error(self, msg):
|
||||
pass
|
@ -1,153 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012, Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import json
|
||||
|
||||
from oslo.config import cfg
|
||||
import qpid.messaging
|
||||
|
||||
from glance.notifier import strategy
|
||||
from glance.openstack.common import jsonutils
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
qpid_opts = [
|
||||
cfg.StrOpt('qpid_notification_exchange',
|
||||
default='glance',
|
||||
help='Qpid exchange for notifications'),
|
||||
cfg.StrOpt('qpid_notification_topic',
|
||||
default='notifications',
|
||||
help='Qpid topic for notifications'),
|
||||
cfg.StrOpt('qpid_hostname',
|
||||
default='localhost',
|
||||
help='Qpid broker hostname'),
|
||||
cfg.StrOpt('qpid_port',
|
||||
default='5672',
|
||||
help='Qpid broker port'),
|
||||
cfg.StrOpt('qpid_username',
|
||||
default='',
|
||||
help='Username for qpid connection'),
|
||||
cfg.StrOpt('qpid_password',
|
||||
default='',
|
||||
help='Password for qpid connection',
|
||||
secret=True),
|
||||
cfg.StrOpt('qpid_sasl_mechanisms',
|
||||
default='',
|
||||
help='Space separated list of SASL mechanisms to use for auth'),
|
||||
cfg.IntOpt('qpid_reconnect_timeout',
|
||||
default=0,
|
||||
help='Reconnection timeout in seconds'),
|
||||
cfg.IntOpt('qpid_reconnect_limit',
|
||||
default=0,
|
||||
help='Max reconnections before giving up'),
|
||||
cfg.IntOpt('qpid_reconnect_interval_min',
|
||||
default=0,
|
||||
help='Minimum seconds between reconnection attempts'),
|
||||
cfg.IntOpt('qpid_reconnect_interval_max',
|
||||
default=0,
|
||||
help='Maximum seconds between reconnection attempts'),
|
||||
cfg.IntOpt('qpid_reconnect_interval',
|
||||
default=0,
|
||||
help='Equivalent to setting max and min to the same value'),
|
||||
cfg.IntOpt('qpid_heartbeat',
|
||||
default=60,
|
||||
help='Seconds between connection keepalive heartbeats'),
|
||||
cfg.StrOpt('qpid_protocol',
|
||||
default='tcp',
|
||||
help="Transport to use, either 'tcp' or 'ssl'"),
|
||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
||||
default=True,
|
||||
help='Disable Nagle algorithm'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(qpid_opts)
|
||||
|
||||
|
||||
class QpidStrategy(strategy.Strategy):
|
||||
"""A notifier that puts a message on a queue when called."""
|
||||
|
||||
def _open_connection(self):
|
||||
"""Initialize the Qpid notification strategy."""
|
||||
broker = CONF.qpid_hostname + ":" + CONF.qpid_port
|
||||
connection = qpid.messaging.Connection(broker)
|
||||
connection.username = CONF.qpid_username
|
||||
connection.password = CONF.qpid_password
|
||||
connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms
|
||||
# Hard code this option as enabled so that reconnect logic isn't needed
|
||||
# in this file at all.
|
||||
connection.reconnect = True
|
||||
if CONF.qpid_reconnect_timeout:
|
||||
connection.reconnect_timeout = CONF.qpid_reconnect_timeout
|
||||
if CONF.qpid_reconnect_limit:
|
||||
connection.reconnect_limit = CONF.qpid_reconnect_limit
|
||||
if CONF.qpid_reconnect_interval_max:
|
||||
connection.reconnect_interval_max = (
|
||||
CONF.qpid_reconnect_interval_max)
|
||||
if CONF.qpid_reconnect_interval_min:
|
||||
connection.reconnect_interval_min = (
|
||||
CONF.qpid_reconnect_interval_min)
|
||||
if CONF.qpid_reconnect_interval:
|
||||
connection.reconnect_interval = CONF.qpid_reconnect_interval
|
||||
connection.heartbeat = CONF.qpid_heartbeat
|
||||
connection.transport = CONF.qpid_protocol
|
||||
connection.tcp_nodelay = CONF.qpid_tcp_nodelay
|
||||
connection.open()
|
||||
LOG.info(_('Connected to AMQP server on %s') % broker)
|
||||
return connection
|
||||
|
||||
def _send(self, priority, msg):
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
# auto-delete isn't implemented for exchanges in qpid,
|
||||
# but put in here anyway
|
||||
"auto-delete": True,
|
||||
},
|
||||
},
|
||||
}
|
||||
topic = "%s.%s" % (CONF.qpid_notification_topic, priority)
|
||||
address = "%s/%s ; %s" % (CONF.qpid_notification_exchange, topic,
|
||||
json.dumps(addr_opts))
|
||||
connection = None
|
||||
try:
|
||||
connection = self._open_connection()
|
||||
session = connection.session()
|
||||
sender = session.sender(address)
|
||||
primitive_msg = jsonutils.to_primitive(msg)
|
||||
qpid_msg = qpid.messaging.Message(content=primitive_msg)
|
||||
sender.send(qpid_msg)
|
||||
except Exception:
|
||||
details = dict(priority=priority, msg=msg)
|
||||
LOG.exception(_('Notification error. Priority: %(priority)s '
|
||||
'Message: %(msg)s') % details)
|
||||
raise
|
||||
finally:
|
||||
if connection and connection.opened():
|
||||
connection.close()
|
||||
|
||||
def warn(self, msg):
|
||||
self._send('warn', msg)
|
||||
|
||||
def info(self, msg):
|
||||
self._send('info', msg)
|
||||
|
||||
def error(self, msg):
|
||||
self._send('error', msg)
|
@ -1,29 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011, OpenStack Foundation
|
||||
# Copyright 2012, Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class Strategy(object):
|
||||
"""Base class for a notification strategy"""
|
||||
|
||||
def warn(self, msg):
|
||||
raise NotImplementedError()
|
||||
|
||||
def info(self, msg):
|
||||
raise NotImplementedError()
|
||||
|
||||
def error(self, msg):
|
||||
raise NotImplementedError()
|
@ -1,14 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
@ -1,182 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from glance.openstack.common import context
|
||||
from glance.openstack.common.gettextutils import _
|
||||
from glance.openstack.common import importutils
|
||||
from glance.openstack.common import jsonutils
|
||||
from glance.openstack.common import log as logging
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notifier_opts = [
|
||||
cfg.MultiStrOpt('notification_driver',
|
||||
default=[],
|
||||
help='Driver or drivers to handle sending notifications'),
|
||||
cfg.StrOpt('default_notification_level',
|
||||
default='INFO',
|
||||
help='Default notification level for outgoing notifications'),
|
||||
cfg.StrOpt('default_publisher_id',
|
||||
default='$host',
|
||||
help='Default publisher_id for outgoing notifications'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(notifier_opts)
|
||||
|
||||
WARN = 'WARN'
|
||||
INFO = 'INFO'
|
||||
ERROR = 'ERROR'
|
||||
CRITICAL = 'CRITICAL'
|
||||
DEBUG = 'DEBUG'
|
||||
|
||||
log_levels = (DEBUG, WARN, INFO, ERROR, CRITICAL)
|
||||
|
||||
|
||||
class BadPriorityException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def notify_decorator(name, fn):
|
||||
""" decorator for notify which is used from utils.monkey_patch()
|
||||
|
||||
:param name: name of the function
|
||||
:param function: - object of the function
|
||||
:returns: function -- decorated function
|
||||
|
||||
"""
|
||||
def wrapped_func(*args, **kwarg):
|
||||
body = {}
|
||||
body['args'] = []
|
||||
body['kwarg'] = {}
|
||||
for arg in args:
|
||||
body['args'].append(arg)
|
||||
for key in kwarg:
|
||||
body['kwarg'][key] = kwarg[key]
|
||||
|
||||
ctxt = context.get_context_from_function_and_args(fn, args, kwarg)
|
||||
notify(ctxt,
|
||||
CONF.default_publisher_id,
|
||||
name,
|
||||
CONF.default_notification_level,
|
||||
body)
|
||||
return fn(*args, **kwarg)
|
||||
return wrapped_func
|
||||
|
||||
|
||||
def publisher_id(service, host=None):
|
||||
if not host:
|
||||
host = CONF.host
|
||||
return "%s.%s" % (service, host)
|
||||
|
||||
|
||||
def notify(context, publisher_id, event_type, priority, payload):
|
||||
"""Sends a notification using the specified driver
|
||||
|
||||
:param publisher_id: the source worker_type.host of the message
|
||||
:param event_type: the literal type of event (ex. Instance Creation)
|
||||
:param priority: patterned after the enumeration of Python logging
|
||||
levels in the set (DEBUG, WARN, INFO, ERROR, CRITICAL)
|
||||
:param payload: A python dictionary of attributes
|
||||
|
||||
Outgoing message format includes the above parameters, and appends the
|
||||
following:
|
||||
|
||||
message_id
|
||||
a UUID representing the id for this notification
|
||||
|
||||
timestamp
|
||||
the GMT timestamp the notification was sent at
|
||||
|
||||
The composite message will be constructed as a dictionary of the above
|
||||
attributes, which will then be sent via the transport mechanism defined
|
||||
by the driver.
|
||||
|
||||
Message example::
|
||||
|
||||
{'message_id': str(uuid.uuid4()),
|
||||
'publisher_id': 'compute.host1',
|
||||
'timestamp': timeutils.utcnow(),
|
||||
'priority': 'WARN',
|
||||
'event_type': 'compute.create_instance',
|
||||
'payload': {'instance_id': 12, ... }}
|
||||
|
||||
"""
|
||||
if priority not in log_levels:
|
||||
raise BadPriorityException(
|
||||
_('%s not in valid priorities') % priority)
|
||||
|
||||
# Ensure everything is JSON serializable.
|
||||
payload = jsonutils.to_primitive(payload, convert_instances=True)
|
||||
|
||||
msg = dict(message_id=str(uuid.uuid4()),
|
||||
publisher_id=publisher_id,
|
||||
event_type=event_type,
|
||||
priority=priority,
|
||||
payload=payload,
|
||||
timestamp=str(timeutils.utcnow()))
|
||||
|
||||
for driver in _get_drivers():
|
||||
try:
|
||||
driver.notify(context, msg)
|
||||
except Exception as e:
|
||||
LOG.exception(_("Problem '%(e)s' attempting to "
|
||||
"send to notification system. "
|
||||
"Payload=%(payload)s")
|
||||
% dict(e=e, payload=payload))
|
||||
|
||||
|
||||
_drivers = None
|
||||
|
||||
|
||||
def _get_drivers():
|
||||
"""Instantiate, cache, and return drivers based on the CONF."""
|
||||
global _drivers
|
||||
if _drivers is None:
|
||||
_drivers = {}
|
||||
for notification_driver in CONF.notification_driver:
|
||||
add_driver(notification_driver)
|
||||
|
||||
return _drivers.values()
|
||||
|
||||
|
||||
def add_driver(notification_driver):
|
||||
"""Add a notification driver at runtime."""
|
||||
# Make sure the driver list is initialized.
|
||||
_get_drivers()
|
||||
if isinstance(notification_driver, basestring):
|
||||
# Load and add
|
||||
try:
|
||||
driver = importutils.import_module(notification_driver)
|
||||
_drivers[notification_driver] = driver
|
||||
except ImportError:
|
||||
LOG.exception(_("Failed to load notifier %s. "
|
||||
"These notifications will not be sent.") %
|
||||
notification_driver)
|
||||
else:
|
||||
# Driver is already loaded; just add the object.
|
||||
_drivers[notification_driver] = notification_driver
|
||||
|
||||
|
||||
def _reset_drivers():
|
||||
"""Used by unit tests to reset the drivers."""
|
||||
global _drivers
|
||||
_drivers = None
|
@ -1,35 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from glance.openstack.common import jsonutils
|
||||
from glance.openstack.common import log as logging
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def notify(_context, message):
|
||||
"""Notifies the recipient of the desired event given the model.
|
||||
Log notifications using openstack's default logging system"""
|
||||
|
||||
priority = message.get('priority',
|
||||
CONF.default_notification_level)
|
||||
priority = priority.lower()
|
||||
logger = logging.getLogger(
|
||||
'glance.openstack.common.notification.%s' %
|
||||
message['event_type'])
|
||||
getattr(logger, priority)(jsonutils.dumps(message))
|
@ -1,19 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
def notify(_context, message):
|
||||
"""Notifies the recipient of the desired event given the model"""
|
||||
pass
|
@ -1,29 +0,0 @@
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from glance.openstack.common.gettextutils import _
|
||||
from glance.openstack.common import log as logging
|
||||
from glance.openstack.common.notifier import rpc_notifier
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def notify(context, message):
|
||||
"""Deprecated in Grizzly. Please use rpc_notifier instead."""
|
||||
|
||||
LOG.deprecated(_("The rabbit_notifier is now deprecated."
|
||||
" Please use rpc_notifier instead."))
|
||||
rpc_notifier.notify(context, message)
|
@ -1,46 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from glance.openstack.common import context as req_context
|
||||
from glance.openstack.common.gettextutils import _
|
||||
from glance.openstack.common import log as logging
|
||||
from glance.openstack.common import rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notification_topic_opt = cfg.ListOpt(
|
||||
'notification_topics', default=['notifications', ],
|
||||
help='AMQP topic used for openstack notifications')
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opt(notification_topic_opt)
|
||||
|
||||
|
||||
def notify(context, message):
|
||||
"""Sends a notification via RPC"""
|
||||
if not context:
|
||||
context = req_context.get_admin_context()
|
||||
priority = message.get('priority',
|
||||
CONF.default_notification_level)
|
||||
priority = priority.lower()
|
||||
for topic in CONF.notification_topics:
|
||||
topic = '%s.%s' % (topic, priority)
|
||||
try:
|
||||
rpc.notify(context, topic, message)
|
||||
except Exception:
|
||||
LOG.exception(_("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"), locals())
|
@ -1,52 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
'''messaging based notification driver, with message envelopes'''
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from glance.openstack.common import context as req_context
|
||||
from glance.openstack.common.gettextutils import _
|
||||
from glance.openstack.common import log as logging
|
||||
from glance.openstack.common import rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notification_topic_opt = cfg.ListOpt(
|
||||
'topics', default=['notifications', ],
|
||||
help='AMQP topic(s) used for openstack notifications')
|
||||
|
||||
opt_group = cfg.OptGroup(name='rpc_notifier2',
|
||||
title='Options for rpc_notifier2')
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_group(opt_group)
|
||||
CONF.register_opt(notification_topic_opt, opt_group)
|
||||
|
||||
|
||||
def notify(context, message):
|
||||
"""Sends a notification via RPC"""
|
||||
if not context:
|
||||
context = req_context.get_admin_context()
|
||||
priority = message.get('priority',
|
||||
CONF.default_notification_level)
|
||||
priority = priority.lower()
|
||||
for topic in CONF.rpc_notifier2.topics:
|
||||
topic = '%s.%s' % (topic, priority)
|
||||
try:
|
||||
rpc.notify(context, topic, message, envelope=True)
|
||||
except Exception:
|
||||
LOG.exception(_("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"), locals())
|
@ -1,22 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
NOTIFICATIONS = []
|
||||
|
||||
|
||||
def notify(_context, message):
|
||||
"""Test notifier, stores notifications in memory for unittests."""
|
||||
NOTIFICATIONS.append(message)
|
@ -18,21 +18,12 @@
|
||||
|
||||
import datetime
|
||||
|
||||
import kombu.entity
|
||||
import mock
|
||||
import mox
|
||||
import qpid
|
||||
import qpid.messaging
|
||||
import stubout
|
||||
import time
|
||||
import webob
|
||||
|
||||
from glance.common import exception
|
||||
import glance.context
|
||||
from glance import notifier
|
||||
from glance.notifier import notify_kombu
|
||||
from glance.openstack.common import importutils, timeutils
|
||||
import glance.openstack.common.log as logging
|
||||
from glance.openstack.common import timeutils
|
||||
import glance.tests.unit.utils as unit_test_utils
|
||||
from glance.tests import utils
|
||||
|
||||
@ -102,10 +93,9 @@ class TaskRepoStub(object):
|
||||
|
||||
class TestNotifier(utils.BaseTestCase):
|
||||
|
||||
def test_invalid_strategy(self):
|
||||
self.config(notifier_strategy="invalid_notifier")
|
||||
self.assertRaises(exception.InvalidNotifierStrategy,
|
||||
notifier.Notifier)
|
||||
def test_load_rabbit(self):
|
||||
nfier = notifier.Notifier('rabbit')
|
||||
self.assertIsNotNone(nfier._transport)
|
||||
|
||||
def test_custom_strategy(self):
|
||||
st = "glance.notifier.notify_noop.NoopStrategy"
|
||||
@ -114,390 +104,6 @@ class TestNotifier(utils.BaseTestCase):
|
||||
notifier.Notifier()
|
||||
|
||||
|
||||
class TestLoggingNotifier(utils.BaseTestCase):
|
||||
"""Test the logging notifier is selected and works properly."""
|
||||
|
||||
def setUp(self):
|
||||
super(TestLoggingNotifier, self).setUp()
|
||||
self.config(notifier_strategy="logging")
|
||||
self.called = False
|
||||
self.logger = logging.getLogger("glance.notifier.notify_log")
|
||||
self.notifier = notifier.Notifier()
|
||||
|
||||
def _called(self, msg):
|
||||
self.called = msg
|
||||
|
||||
def test_warn(self):
|
||||
self.logger.warn = self._called
|
||||
self.notifier.warn("test_event", "test_message")
|
||||
if self.called is False:
|
||||
self.fail("Did not call logging library correctly.")
|
||||
|
||||
def test_info(self):
|
||||
self.logger.info = self._called
|
||||
self.notifier.info("test_event", "test_message")
|
||||
if self.called is False:
|
||||
self.fail("Did not call logging library correctly.")
|
||||
|
||||
def test_erorr(self):
|
||||
self.logger.error = self._called
|
||||
self.notifier.error("test_event", "test_message")
|
||||
if self.called is False:
|
||||
self.fail("Did not call logging library correctly.")
|
||||
|
||||
|
||||
class TestNoopNotifier(utils.BaseTestCase):
|
||||
"""Test that the noop notifier works...and does nothing?"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestNoopNotifier, self).setUp()
|
||||
self.config(notifier_strategy="noop")
|
||||
self.notifier = notifier.Notifier()
|
||||
|
||||
def test_warn(self):
|
||||
self.notifier.warn("test_event", "test_message")
|
||||
|
||||
def test_info(self):
|
||||
self.notifier.info("test_event", "test_message")
|
||||
|
||||
def test_error(self):
|
||||
self.notifier.error("test_event", "test_message")
|
||||
|
||||
|
||||
class TestRabbitNotifier(utils.BaseTestCase):
|
||||
"""Test AMQP/Rabbit notifier works."""
|
||||
|
||||
def setUp(self):
|
||||
super(TestRabbitNotifier, self).setUp()
|
||||
|
||||
def _fake_connect(rabbit_self):
|
||||
rabbit_self.connection_errors = ()
|
||||
rabbit_self.connection = 'fake_connection'
|
||||
return None
|
||||
|
||||
self.notify_kombu = importutils.import_module("glance.notifier."
|
||||
"notify_kombu")
|
||||
self.notify_kombu.RabbitStrategy._send_message = self._send_message
|
||||
self.notify_kombu.RabbitStrategy._connect = _fake_connect
|
||||
self.called = False
|
||||
self.config(notifier_strategy="rabbit",
|
||||
rabbit_retry_backoff=0,
|
||||
rabbit_notification_topic="fake_topic")
|
||||
self.notifier = notifier.Notifier()
|
||||
|
||||
def _send_message(self, message, routing_key):
|
||||
self.called = {
|
||||
"message": message,
|
||||
"routing_key": routing_key,
|
||||
}
|
||||
|
||||
def test_warn(self):
|
||||
self.notifier.warn("test_event", "test_message")
|
||||
|
||||
if self.called is False:
|
||||
self.fail("Did not call _send_message properly.")
|
||||
|
||||
self.assertEqual("test_message", self.called["message"]["payload"])
|
||||
self.assertEqual("WARN", self.called["message"]["priority"])
|
||||
self.assertEqual("fake_topic.warn", self.called["routing_key"])
|
||||
|
||||
def test_info(self):
|
||||
self.notifier.info("test_event", "test_message")
|
||||
|
||||
if self.called is False:
|
||||
self.fail("Did not call _send_message properly.")
|
||||
|
||||
self.assertEqual("test_message", self.called["message"]["payload"])
|
||||
self.assertEqual("INFO", self.called["message"]["priority"])
|
||||
self.assertEqual("fake_topic.info", self.called["routing_key"])
|
||||
|
||||
def test_error(self):
|
||||
self.notifier.error("test_event", "test_message")
|
||||
|
||||
if self.called is False:
|
||||
self.fail("Did not call _send_message properly.")
|
||||
|
||||
self.assertEqual("test_message", self.called["message"]["payload"])
|
||||
self.assertEqual("ERROR", self.called["message"]["priority"])
|
||||
self.assertEqual("fake_topic.error", self.called["routing_key"])
|
||||
|
||||
def test_unknown_error_on_connect_raises(self):
|
||||
class MyException(Exception):
|
||||
pass
|
||||
|
||||
def _connect(self):
|
||||
self.connection_errors = ()
|
||||
raise MyException('meow')
|
||||
|
||||
self.notify_kombu.RabbitStrategy._connect = _connect
|
||||
self.assertRaises(MyException, notifier.Notifier)
|
||||
|
||||
def test_timeout_on_connect_reconnects(self):
|
||||
info = {'num_called': 0}
|
||||
|
||||
def _connect(rabbit_self):
|
||||
rabbit_self.connection_errors = ()
|
||||
info['num_called'] += 1
|
||||
if info['num_called'] == 1:
|
||||
raise Exception('foo timeout foo')
|
||||
rabbit_self.connection = 'fake_connection'
|
||||
|
||||
self.notify_kombu.RabbitStrategy._connect = _connect
|
||||
notifier_ = notifier.Notifier()
|
||||
notifier_.error('test_event', 'test_message')
|
||||
|
||||
if self.called is False:
|
||||
self.fail("Did not call _send_message properly.")
|
||||
|
||||
self.assertEqual("test_message", self.called["message"]["payload"])
|
||||
self.assertEqual("ERROR", self.called["message"]["priority"])
|
||||
self.assertEqual(info['num_called'], 2)
|
||||
|
||||
def test_connection_error_on_connect_reconnects(self):
|
||||
info = {'num_called': 0}
|
||||
|
||||
class MyException(Exception):
|
||||
pass
|
||||
|
||||
def _connect(rabbit_self):
|
||||
rabbit_self.connection_errors = (MyException, )
|
||||
info['num_called'] += 1
|
||||
if info['num_called'] == 1:
|
||||
raise MyException('meow')
|
||||
rabbit_self.connection = 'fake_connection'
|
||||
|
||||
self.notify_kombu.RabbitStrategy._connect = _connect
|
||||
notifier_ = notifier.Notifier()
|
||||
notifier_.error('test_event', 'test_message')
|
||||
|
||||
if self.called is False:
|
||||
self.fail("Did not call _send_message properly.")
|
||||
|
||||
self.assertEqual("test_message", self.called["message"]["payload"])
|
||||
self.assertEqual("ERROR", self.called["message"]["priority"])
|
||||
self.assertEqual(info['num_called'], 2)
|
||||
|
||||
def test_unknown_error_on_send_message_raises(self):
|
||||
class MyException(Exception):
|
||||
pass
|
||||
|
||||
def _send_message(rabbit_self, msg, routing_key):
|
||||
raise MyException('meow')
|
||||
|
||||
self.notify_kombu.RabbitStrategy._send_message = _send_message
|
||||
notifier_ = notifier.Notifier()
|
||||
self.assertRaises(MyException, notifier_.error, 'a', 'b')
|
||||
|
||||
def test_timeout_on_send_message_reconnects(self):
|
||||
info = {'send_called': 0, 'conn_called': 0}
|
||||
|
||||
def _connect(rabbit_self):
|
||||
info['conn_called'] += 1
|
||||
rabbit_self.connection_errors = ()
|
||||
rabbit_self.connection = 'fake_connection'
|
||||
|
||||
def _send_message(rabbit_self, msg, routing_key):
|
||||
info['send_called'] += 1
|
||||
if info['send_called'] == 1:
|
||||
raise Exception('foo timeout foo')
|
||||
self._send_message(msg, routing_key)
|
||||
|
||||
self.notify_kombu.RabbitStrategy._connect = _connect
|
||||
self.notify_kombu.RabbitStrategy._send_message = _send_message
|
||||
notifier_ = notifier.Notifier()
|
||||
notifier_.error('test_event', 'test_message')
|
||||
|
||||
if self.called is False:
|
||||
self.fail("Did not call _send_message properly.")
|
||||
|
||||
self.assertEqual("test_message", self.called["message"]["payload"])
|
||||
self.assertEqual("ERROR", self.called["message"]["priority"])
|
||||
self.assertEqual(info['send_called'], 2)
|
||||
self.assertEqual(info['conn_called'], 2)
|
||||
|
||||
def test_connection_error_on_send_message_reconnects(self):
|
||||
info = {'send_called': 0, 'conn_called': 0}
|
||||
|
||||
class MyException(Exception):
|
||||
pass
|
||||
|
||||
def _connect(rabbit_self):
|
||||
info['conn_called'] += 1
|
||||
rabbit_self.connection_errors = (MyException, )
|
||||
rabbit_self.connection = 'fake_connection'
|
||||
|
||||
def _send_message(rabbit_self, msg, routing_key):
|
||||
info['send_called'] += 1
|
||||
if info['send_called'] == 1:
|
||||
raise MyException('meow')
|
||||
self._send_message(msg, routing_key)
|
||||
|
||||
self.notify_kombu.RabbitStrategy._connect = _connect
|
||||
self.notify_kombu.RabbitStrategy._send_message = _send_message
|
||||
notifier_ = notifier.Notifier()
|
||||
notifier_.error('test_event', 'test_message')
|
||||
|
||||
if self.called is False:
|
||||
self.fail("Did not call _send_message properly.")
|
||||
|
||||
self.assertEqual("test_message", self.called["message"]["payload"])
|
||||
self.assertEqual("ERROR", self.called["message"]["priority"])
|
||||
self.assertEqual(info['send_called'], 2)
|
||||
self.assertEqual(info['conn_called'], 2)
|
||||
|
||||
|
||||
class TestQpidNotifier(utils.BaseTestCase):
|
||||
"""Test Qpid notifier."""
|
||||
|
||||
def setUp(self):
|
||||
super(TestQpidNotifier, self).setUp()
|
||||
|
||||
self.mocker = mox.Mox()
|
||||
|
||||
self.mock_connection = None
|
||||
self.mock_session = None
|
||||
self.mock_sender = None
|
||||
self.mock_receiver = None
|
||||
|
||||
self.orig_connection = qpid.messaging.Connection
|
||||
self.orig_session = qpid.messaging.Session
|
||||
self.orig_sender = qpid.messaging.Sender
|
||||
self.orig_receiver = qpid.messaging.Receiver
|
||||
qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
|
||||
qpid.messaging.Session = lambda *_x, **_y: self.mock_session
|
||||
qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
|
||||
qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
|
||||
|
||||
self.notify_qpid = importutils.import_module("glance.notifier."
|
||||
"notify_qpid")
|
||||
self.addCleanup(self.reset_qpid)
|
||||
self.addCleanup(self.mocker.ResetAll)
|
||||
|
||||
def reset_qpid(self):
|
||||
|
||||
qpid.messaging.Connection = self.orig_connection
|
||||
qpid.messaging.Session = self.orig_session
|
||||
qpid.messaging.Sender = self.orig_sender
|
||||
qpid.messaging.Receiver = self.orig_receiver
|
||||
|
||||
def _test_notify(self, priority, exception=False, exception_send=False):
|
||||
test_msg = {'a': 'b'}
|
||||
|
||||
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
|
||||
self.mock_session = self.mocker.CreateMock(self.orig_session)
|
||||
self.mock_sender = self.mocker.CreateMock(self.orig_sender)
|
||||
|
||||
self.mock_connection.username = ""
|
||||
if exception:
|
||||
self.mock_connection.open().AndRaise(
|
||||
Exception('Test open Exception'))
|
||||
else:
|
||||
self.mock_connection.open()
|
||||
self.mock_connection.session().AndReturn(self.mock_session)
|
||||
expected_address = ('glance/notifications.%s ; '
|
||||
'{"node": {"x-declare": {"auto-delete": true, '
|
||||
'"durable": false}, "type": "topic"}, '
|
||||
'"create": "always"}' % priority)
|
||||
self.mock_session.sender(expected_address).AndReturn(
|
||||
self.mock_sender)
|
||||
if exception_send:
|
||||
self.mock_sender.send(mox.IgnoreArg()).AndRaise(
|
||||
Exception('Test send Exception'))
|
||||
# NOTE(afazekas): the opened and close call is expected
|
||||
# in this case, but not expected if the open fails
|
||||
else:
|
||||
self.mock_sender.send(mox.IgnoreArg())
|
||||
self.mock_connection.opened().AndReturn(True)
|
||||
self.mock_connection.close()
|
||||
|
||||
self.mocker.ReplayAll()
|
||||
|
||||
self.config(notifier_strategy="qpid")
|
||||
notifier = self.notify_qpid.QpidStrategy()
|
||||
if priority == 'info':
|
||||
if exception or exception_send:
|
||||
self.assertRaises(Exception, notifier.info, test_msg)
|
||||
else:
|
||||
notifier.info(test_msg)
|
||||
elif priority == 'warn':
|
||||
if exception or exception_send:
|
||||
self.assertRaises(Exception, notifier.warn, test_msg)
|
||||
else:
|
||||
notifier.warn(test_msg)
|
||||
elif priority == 'error':
|
||||
if exception or exception_send:
|
||||
self.assertRaises(Exception, notifier.error, test_msg)
|
||||
else:
|
||||
notifier.error(test_msg)
|
||||
|
||||
self.mocker.VerifyAll()
|
||||
|
||||
def test_info(self):
|
||||
self._test_notify('info')
|
||||
|
||||
def test_warn(self):
|
||||
self._test_notify('warn')
|
||||
|
||||
def test_error(self):
|
||||
self._test_notify('error')
|
||||
|
||||
def test_exception_open_successful(self):
|
||||
self._test_notify('info', exception=True)
|
||||
|
||||
def test_info_fail(self):
|
||||
self._test_notify('info', exception_send=True)
|
||||
|
||||
def test_warn_fail(self):
|
||||
self._test_notify('warn', exception_send=True)
|
||||
|
||||
def test_error_fail(self):
|
||||
self._test_notify('error', exception_send=True)
|
||||
|
||||
|
||||
class TestRabbitContentType(utils.BaseTestCase):
|
||||
"""Test AMQP/Rabbit notifier works."""
|
||||
|
||||
def setUp(self):
|
||||
super(TestRabbitContentType, self).setUp()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
|
||||
def _fake_connect(rabbit_self):
|
||||
rabbit_self.connection_errors = ()
|
||||
rabbit_self.connection = 'fake_connection'
|
||||
rabbit_self.exchange = self._fake_exchange()
|
||||
return None
|
||||
|
||||
def dummy(*args, **kwargs):
|
||||
pass
|
||||
|
||||
self.stubs.Set(kombu.entity.Exchange, 'publish', dummy)
|
||||
self.stubs.Set(notify_kombu.RabbitStrategy, '_connect',
|
||||
_fake_connect)
|
||||
self.called = False
|
||||
self.config(notifier_strategy="rabbit",
|
||||
rabbit_retry_backoff=0,
|
||||
rabbit_notification_topic="fake_topic")
|
||||
self.notifier = notifier.Notifier()
|
||||
|
||||
def _fake_exchange(self):
|
||||
class Dummy(object):
|
||||
class Message(object):
|
||||
def __init__(message_self, message, content_type):
|
||||
self.called = {
|
||||
'message': message,
|
||||
'content_type': content_type
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def publish(*args, **kwargs):
|
||||
pass
|
||||
return Dummy
|
||||
|
||||
def test_content_type_passed(self):
|
||||
self.notifier.warn("test_event", "test_message")
|
||||
self.assertEqual(self.called['content_type'], 'application/json')
|
||||
|
||||
|
||||
class TestImageNotifications(utils.BaseTestCase):
|
||||
"""Test Image Notifications work"""
|
||||
|
||||
@ -764,121 +370,6 @@ class TestImageNotifications(utils.BaseTestCase):
|
||||
self.assertTrue('Failed' in output_log['payload'])
|
||||
|
||||
|
||||
class RabbitStrategyTestCase(utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(RabbitStrategyTestCase, self).setUp()
|
||||
self.rabbit_strategy = notify_kombu.RabbitStrategy()
|
||||
self.rabbit_strategy.retry_attempts = 0
|
||||
self.rabbit_strategy.max_retries = 2
|
||||
|
||||
def test_close(self):
|
||||
self.rabbit_strategy.connection = kombu.connection.BrokerConnection()
|
||||
self.rabbit_strategy.connection.close = mock.Mock()
|
||||
self.rabbit_strategy._close()
|
||||
self.assertEqual(self.rabbit_strategy.connection, None)
|
||||
|
||||
def test_connect(self):
|
||||
self.rabbit_strategy._close = mock.Mock()
|
||||
connection = kombu.connection.BrokerConnection(
|
||||
hostname='localhost',
|
||||
port=5672,
|
||||
userid='guest',
|
||||
password='guest',
|
||||
virtual_host='/',
|
||||
ssl=False)
|
||||
kombu.connection.BrokerConnection = mock.Mock()
|
||||
kombu.connection.BrokerConnection.return_value = connection
|
||||
connection.connect = mock.Mock()
|
||||
connection.channel = mock.Mock()
|
||||
connection.channel.return_value = 'fake_channel'
|
||||
kombu.entity.Exchange = mock.Mock()
|
||||
kombu.entity.Exchange.return_value = 'fake_exchange'
|
||||
fake_queue = mock.Mock()
|
||||
fake_queue.declare = mock.Mock()
|
||||
kombu.entity.Queue = mock.Mock()
|
||||
kombu.entity.Queue.return_value = fake_queue
|
||||
|
||||
self.rabbit_strategy._connect()
|
||||
kombu.connection.BrokerConnection.assert_called_with(
|
||||
hostname='localhost',
|
||||
port=5672,
|
||||
userid='guest',
|
||||
password='guest',
|
||||
virtual_host='/',
|
||||
ssl=False)
|
||||
kombu.entity.Exchange.assert_called_with(
|
||||
channel='fake_channel',
|
||||
type='topic',
|
||||
durable=False,
|
||||
name='glance')
|
||||
for routing_key in ['notifications.warn', 'notifications.info',
|
||||
'notifications.error']:
|
||||
kombu.entity.Queue.assert_any_called(
|
||||
channel='fake_channel',
|
||||
exchange='fake_exchange',
|
||||
durable=False,
|
||||
name=routing_key,
|
||||
routing_key=routing_key)
|
||||
|
||||
def test_reconnect_sleep_time(self):
|
||||
self.rabbit_strategy._connect = mock.Mock(
|
||||
side_effect=Exception('timeout'))
|
||||
time.sleep = mock.Mock()
|
||||
try:
|
||||
self.rabbit_strategy.reconnect()
|
||||
except notify_kombu.KombuMaxRetriesReached:
|
||||
pass
|
||||
finally:
|
||||
time.sleep.assert_called_once_with(2)
|
||||
|
||||
def test_reconnect_sleep_time_2(self):
|
||||
self.rabbit_strategy.retry_backoff = 40
|
||||
self.rabbit_strategy._connect = mock.Mock(
|
||||
side_effect=Exception('timeout'))
|
||||
time.sleep = mock.Mock()
|
||||
try:
|
||||
self.rabbit_strategy.reconnect()
|
||||
except notify_kombu.KombuMaxRetriesReached:
|
||||
pass
|
||||
finally:
|
||||
time.sleep.assert_called_once_with(30)
|
||||
|
||||
def test_reconnect_sleep_time_no_retry_max_backoff(self):
|
||||
self.rabbit_strategy.retry_max_backoff = None
|
||||
self.rabbit_strategy.retry_backoff = 100
|
||||
self.rabbit_strategy._connect = mock.Mock(
|
||||
side_effect=Exception('timeout'))
|
||||
time.sleep = mock.Mock()
|
||||
try:
|
||||
self.rabbit_strategy.reconnect()
|
||||
except notify_kombu.KombuMaxRetriesReached:
|
||||
pass
|
||||
finally:
|
||||
time.sleep.assert_called_once_with(100)
|
||||
|
||||
def test_notify_process_komby_max_retries_reached_error(self):
|
||||
self.rabbit_strategy.connection = None
|
||||
self.rabbit_strategy.reconnect = mock.Mock(
|
||||
side_effect=notify_kombu.KombuMaxRetriesReached())
|
||||
self.rabbit_strategy.log_failure = mock.Mock()
|
||||
|
||||
self.rabbit_strategy._notify('fake_msg', "WARN")
|
||||
self.rabbit_strategy.log_failure.assert_called_with('fake_msg', "WARN")
|
||||
|
||||
def test_notify_check_if_log_failure(self):
|
||||
self.rabbit_strategy.connection = 'fake_connection'
|
||||
self.rabbit_strategy._send_message = mock.Mock(
|
||||
side_effect=Exception('timeout'))
|
||||
self.rabbit_strategy.reconnect = mock.Mock(
|
||||
side_effect=notify_kombu.KombuMaxRetriesReached())
|
||||
self.rabbit_strategy.log_failure = mock.Mock()
|
||||
|
||||
self.rabbit_strategy._notify('fake_msg', "WARN")
|
||||
self.rabbit_strategy._send_message. \
|
||||
assert_called_with('fake_msg', 'notifications.warn')
|
||||
self.rabbit_strategy.log_failure.assert_called_with('fake_msg', "WARN")
|
||||
|
||||
|
||||
class TestTaskNotifications(utils.BaseTestCase):
|
||||
"""Test Task Notifications work"""
|
||||
|
||||
|
@ -182,26 +182,27 @@ class FakeNotifier(object):
|
||||
def __init__(self, *_args, **kwargs):
|
||||
self.log = []
|
||||
|
||||
def warn(self, event_type, payload):
|
||||
def _notify(self, event_type, payload, level):
|
||||
log = {}
|
||||
log['notification_type'] = "WARN"
|
||||
log['notification_type'] = level
|
||||
log['event_type'] = event_type
|
||||
log['payload'] = payload
|
||||
self.log.append(log)
|
||||
|
||||
def warn(self, event_type, payload):
|
||||
self._notify(event_type, payload, 'WARN')
|
||||
|
||||
def info(self, event_type, payload):
|
||||
log = {}
|
||||
log['notification_type'] = "INFO"
|
||||
log['event_type'] = event_type
|
||||
log['payload'] = payload
|
||||
self.log.append(log)
|
||||
self._notify(event_type, payload, 'INFO')
|
||||
|
||||
def error(self, event_type, payload):
|
||||
log = {}
|
||||
log['notification_type'] = "ERROR"
|
||||
log['event_type'] = event_type
|
||||
log['payload'] = payload
|
||||
self.log.append(log)
|
||||
self._notify(event_type, payload, 'ERROR')
|
||||
|
||||
def debug(self, event_type, payload):
|
||||
self._notify(event_type, payload, 'DEBUG')
|
||||
|
||||
def critical(self, event_type, payload):
|
||||
self._notify(event_type, payload, 'CRITICAL')
|
||||
|
||||
def get_logs(self):
|
||||
return self.log
|
||||
|
@ -12,7 +12,6 @@ module=jsonutils
|
||||
module=local
|
||||
module=lockutils
|
||||
module=log
|
||||
module=notifier
|
||||
module=policy
|
||||
module=strutils
|
||||
module=test
|
||||
|
@ -37,3 +37,6 @@ pyOpenSSL
|
||||
|
||||
# Required by openstack.common libraries
|
||||
six>=1.4.1
|
||||
|
||||
-f http://tarballs.openstack.org/oslo.messaging/oslo.messaging-1.2.0a11.tar.gz#egg=oslo.messaging-1.2.0a11
|
||||
oslo.messaging>=1.2.0a11
|
||||
|
Loading…
x
Reference in New Issue
Block a user