First cut at the notifier API

See:

  https://wiki.openstack.org/wiki/Oslo/Messaging#Emitting_Notifications
This commit is contained in:
Mark McLoughlin 2013-05-17 18:48:55 +01:00
parent 48a1cfae8a
commit e8429af763
9 changed files with 288 additions and 4 deletions

@ -61,7 +61,8 @@ class BaseDriver(object):
self._default_exchange = default_exchange
@abc.abstractmethod
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):
"""Send a message to the given target."""
return None

@ -95,7 +95,8 @@ class FakeDriver(base.BaseDriver):
"""
json.dumps(message)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):
if not target.topic:
raise InvalidTarget(_('A topic is required to send'), target)

@ -34,6 +34,12 @@ TODO:
- I'm not sure listener.done() is really needed - can't we ack the
message before returning it from poll() ?
- envelope=True/False really sucks - it's a transport driver specific
flag and we're only using it to communicate whether to use the new
or older on-the-wire notification message format. Maybe we should
have a high-level "notification message format version" which each
transport driver can map to an on-the-wire format. Meh.
Things I don't like:
- CallContext - we already abuse the term "context" enough

@ -0,0 +1,30 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common import jsonutils
from openstack.common import log as logging
from openstack.common.messaging.notify import notifier
class LogDriver(notifier._Driver):
LOGGER_BASE = 'openstack.common.notification'
def notify(self, context, message, priority):
logger = logging.getLogger('%s.%s' % (self.LOGGER_BASE,
message['event_type']))
getattr(logger, priority)(jsonutils.dumps(message))

@ -0,0 +1,49 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common.gettextutils import _
from openstack.common import log as logging
from openstack.common.messaging
from openstack.common.messaging.notify import notifier
LOG = logging.getLogger(__name__)
class MessagingDriver(notifier._Driver):
def __init__(self, conf, topics=None, transport=None, envelope=False):
if transport is None:
transport = messaging.get_transport(conf)
super(MessagingDriver, self).__init__(conf, topics, transport)
self.envelope = envelope
def notify(self, context, message, priority):
for topic in self.topics:
target = messaging.Target(topic='%s.%s' % (topic, priority))
try:
self.transport._send(target, context, message,
envelope=self.envelope)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"),
dict(topic=topic, message=message))
class MessagingV2Driver(MessagingDriver):
def __init__(self, conf, **kwargs):
super(MessagingDriver, self).__init__(conf, envelope=True, **kwargs)

@ -0,0 +1,24 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common.messaging.notify import notifier
class NoOpDriver(notifier._Driver):
def notify(self, context, message, priority):
pass

@ -0,0 +1,28 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common.messaging.notify import notifier
class TestDriver(notifier._Driver):
def __init__(self, conf, **kwargs):
super(TestDriver, self).__init__(conf, **kwargs)
self.notifications = []
def notify(self, context, message, priority):
self.notifications.append(message)

@ -0,0 +1,143 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo.config import cfg
from stevedore import driver
from openstack.common.gettextutils import _
from openstack.common import timeutils
from openstack.common import uuidutils
_notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
help='Driver or drivers to handle sending notifications'),
cfg.ListOpt('notification_topics',
default=['notifications', ],
deprecated_name='topics',
deprecated_group='rpc_notifier2',
help='AMQP topic used for openstack notifications'),
]
def _driver(module, name):
return 'openstack.common.messaging.notify._impl_%s:%s' % (module, name)
_MESSAGING_V2_DRIVER = _driver('messaging', 'MessagingV2Driver')
_MESSAGING_DRIVER = _driver('messaging', 'MessagingDriver')
_LOG_DRIVER = _driver('log', 'LogDriver')
_TEST_DRIVER = _driver('test', 'TestDriver')
_NOOP_DRIVER = _driver('noop', 'NoOpDriver')
NOTIFIER_DRIVERS = [
'messagingv2 = ' + _MESSAGING_V2_DRIVER,
'messaging = ' + _MESSAGING_DRIVER,
'log = ' + _LOG_DRIVER,
'test = ' + _TEST_DRIVER,
'noop = ' + _NOOP_DRIVER,
# For backwards compat
'openstack.common.notify.rpc2_notifier = ' + _MESSAGING_V2_DRIVER,
'openstack.common.notify.rpc_notifier = ' + _MESSAGING_DRIVER,
'openstack.common.notify.log_notifier = ' + _LOG_DRIVER,
'openstack.common.notify.test_notifier = ' + _TEST_DRIVER,
'openstack.common.notify.no_op_notifier = ' + _NOOP_DRIVER,
]
NAMESPACE = 'openstack.common.notify.drivers'
class _Driver(object):
__metaclass__ = abc.ABCMeta
def __init__(self, conf, topics=None, transport=None):
self.conf = conf
self.topics = topics
self.transport = transport
@abc.abstractmethod
def notify(self, context, msg, priority):
pass
class Notifier(object):
def __init__(self, conf, publisher_id,
driver=None, topic=None, transport=None):
self.conf = conf
self.conf.register_opts(_notifier_opts)
self.publisher_id = publisher_id
self._driver_names = ([driver] if driver is not None
else conf.notification_driver)
self._drivers = None
self._topics = ([topic] if topic is not None
else conf.notification_topics)
self._transport = transport
def _get_drivers(self):
if self._drivers is not None:
return self._drivers
self._drivers = []
kwargs = dict(topics=self._topics, transport=self._transport)
for driver in self._driver_names:
mgr = driver.DriverManager(NAMESPACE,
driver,
invoke_on_load=True,
invoke_args=[self.conf],
invoke_kwds=kwargs)
self._drivers.append(driver)
return self._drivers
def _notify(self, context, event_type, payload, priority):
msg = dict(message_id=uuidutils.generate_uuid(),
publisher_id=self.publisher_id,
event_type=event_type,
priority=priority,
payload=payload,
timestamp=str(timeutils.utcnow()))
for driver in self._get_drivers():
try:
driver.notify(context, msg, priority)
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to send to "
"notification system. Payload=%(payload)s")
% dict(e=e, payload=payload))
def debug(self, context, event_type, payload):
self._notify(context, event_type, payload, 'DEBUG')
def info(self, context, event_type, payload):
self._notify(context, event_type, payload, 'INFO')
def warn(self, context, event_type, payload):
self._notify(context, event_type, payload, 'WARN')
def error(self, context, event_type, payload):
self._notify(context, event_type, payload, 'ERROR')
def critical(self, context, event_type, payload):
self._notify(context, event_type, payload, 'CRITICAL')

@ -52,10 +52,12 @@ class Transport(object):
self.conf = driver.conf
self._driver = driver
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):
return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply,
timeout=timeout)
timeout=timeout,
envelope=envelope)
def _listen(self, target):
return self._driver.listen(target)