diff --git a/openstack/common/messaging/_drivers/base.py b/openstack/common/messaging/_drivers/base.py index eff1a984f..75e91fe72 100644 --- a/openstack/common/messaging/_drivers/base.py +++ b/openstack/common/messaging/_drivers/base.py @@ -61,7 +61,8 @@ class BaseDriver(object): self._default_exchange = default_exchange @abc.abstractmethod - def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def send(self, target, ctxt, message, + wait_for_reply=None, timeout=None, envelope=False): """Send a message to the given target.""" return None diff --git a/openstack/common/messaging/_drivers/impl_fake.py b/openstack/common/messaging/_drivers/impl_fake.py index f43bc6362..92ebc02e3 100644 --- a/openstack/common/messaging/_drivers/impl_fake.py +++ b/openstack/common/messaging/_drivers/impl_fake.py @@ -95,7 +95,8 @@ class FakeDriver(base.BaseDriver): """ json.dumps(message) - def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def send(self, target, ctxt, message, + wait_for_reply=None, timeout=None, envelope=False): if not target.topic: raise InvalidTarget(_('A topic is required to send'), target) diff --git a/openstack/common/messaging/notes.txt b/openstack/common/messaging/notes.txt index 05c37ea6c..3ed51f217 100644 --- a/openstack/common/messaging/notes.txt +++ b/openstack/common/messaging/notes.txt @@ -34,6 +34,12 @@ TODO: - I'm not sure listener.done() is really needed - can't we ack the message before returning it from poll() ? + - envelope=True/False really sucks - it's a transport driver specific + flag and we're only using it to communicate whether to use the new + or older on-the-wire notification message format. Maybe we should + have a high-level "notification message format version" which each + transport driver can map to an on-the-wire format. Meh. + Things I don't like: - CallContext - we already abuse the term "context" enough diff --git a/openstack/common/messaging/notify/_impl_log.py b/openstack/common/messaging/notify/_impl_log.py new file mode 100644 index 000000000..bbd69794d --- /dev/null +++ b/openstack/common/messaging/notify/_impl_log.py @@ -0,0 +1,30 @@ + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.common import jsonutils +from openstack.common import log as logging +from openstack.common.messaging.notify import notifier + + +class LogDriver(notifier._Driver): + + LOGGER_BASE = 'openstack.common.notification' + + def notify(self, context, message, priority): + logger = logging.getLogger('%s.%s' % (self.LOGGER_BASE, + message['event_type'])) + getattr(logger, priority)(jsonutils.dumps(message)) diff --git a/openstack/common/messaging/notify/_impl_messaging.py b/openstack/common/messaging/notify/_impl_messaging.py new file mode 100644 index 000000000..2704d48c3 --- /dev/null +++ b/openstack/common/messaging/notify/_impl_messaging.py @@ -0,0 +1,49 @@ + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.common.gettextutils import _ +from openstack.common import log as logging +from openstack.common.messaging +from openstack.common.messaging.notify import notifier + +LOG = logging.getLogger(__name__) + + +class MessagingDriver(notifier._Driver): + + def __init__(self, conf, topics=None, transport=None, envelope=False): + if transport is None: + transport = messaging.get_transport(conf) + super(MessagingDriver, self).__init__(conf, topics, transport) + self.envelope = envelope + + def notify(self, context, message, priority): + for topic in self.topics: + target = messaging.Target(topic='%s.%s' % (topic, priority)) + try: + self.transport._send(target, context, message, + envelope=self.envelope) + except Exception: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), + dict(topic=topic, message=message)) + + +class MessagingV2Driver(MessagingDriver): + + def __init__(self, conf, **kwargs): + super(MessagingDriver, self).__init__(conf, envelope=True, **kwargs) diff --git a/openstack/common/messaging/notify/_impl_noop.py b/openstack/common/messaging/notify/_impl_noop.py new file mode 100644 index 000000000..c055a558e --- /dev/null +++ b/openstack/common/messaging/notify/_impl_noop.py @@ -0,0 +1,24 @@ + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.common.messaging.notify import notifier + + +class NoOpDriver(notifier._Driver): + + def notify(self, context, message, priority): + pass diff --git a/openstack/common/messaging/notify/_impl_test.py b/openstack/common/messaging/notify/_impl_test.py new file mode 100644 index 000000000..010789668 --- /dev/null +++ b/openstack/common/messaging/notify/_impl_test.py @@ -0,0 +1,28 @@ + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.common.messaging.notify import notifier + + +class TestDriver(notifier._Driver): + + def __init__(self, conf, **kwargs): + super(TestDriver, self).__init__(conf, **kwargs) + self.notifications = [] + + def notify(self, context, message, priority): + self.notifications.append(message) diff --git a/openstack/common/messaging/notify/notifier.py b/openstack/common/messaging/notify/notifier.py new file mode 100644 index 000000000..5968a0034 --- /dev/null +++ b/openstack/common/messaging/notify/notifier.py @@ -0,0 +1,143 @@ + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +from oslo.config import cfg +from stevedore import driver + +from openstack.common.gettextutils import _ +from openstack.common import timeutils +from openstack.common import uuidutils + +_notifier_opts = [ + cfg.MultiStrOpt('notification_driver', + default=[], + help='Driver or drivers to handle sending notifications'), + cfg.ListOpt('notification_topics', + default=['notifications', ], + deprecated_name='topics', + deprecated_group='rpc_notifier2', + help='AMQP topic used for openstack notifications'), +] + + +def _driver(module, name): + return 'openstack.common.messaging.notify._impl_%s:%s' % (module, name) + +_MESSAGING_V2_DRIVER = _driver('messaging', 'MessagingV2Driver') +_MESSAGING_DRIVER = _driver('messaging', 'MessagingDriver') +_LOG_DRIVER = _driver('log', 'LogDriver') +_TEST_DRIVER = _driver('test', 'TestDriver') +_NOOP_DRIVER = _driver('noop', 'NoOpDriver') + +NOTIFIER_DRIVERS = [ + 'messagingv2 = ' + _MESSAGING_V2_DRIVER, + 'messaging = ' + _MESSAGING_DRIVER, + 'log = ' + _LOG_DRIVER, + 'test = ' + _TEST_DRIVER, + 'noop = ' + _NOOP_DRIVER, + + # For backwards compat + 'openstack.common.notify.rpc2_notifier = ' + _MESSAGING_V2_DRIVER, + 'openstack.common.notify.rpc_notifier = ' + _MESSAGING_DRIVER, + 'openstack.common.notify.log_notifier = ' + _LOG_DRIVER, + 'openstack.common.notify.test_notifier = ' + _TEST_DRIVER, + 'openstack.common.notify.no_op_notifier = ' + _NOOP_DRIVER, +] + +NAMESPACE = 'openstack.common.notify.drivers' + + +class _Driver(object): + + __metaclass__ = abc.ABCMeta + + def __init__(self, conf, topics=None, transport=None): + self.conf = conf + self.topics = topics + self.transport = transport + + @abc.abstractmethod + def notify(self, context, msg, priority): + pass + + +class Notifier(object): + + def __init__(self, conf, publisher_id, + driver=None, topic=None, transport=None): + self.conf = conf + self.conf.register_opts(_notifier_opts) + + self.publisher_id = publisher_id + + self._driver_names = ([driver] if driver is not None + else conf.notification_driver) + self._drivers = None + self._topics = ([topic] if topic is not None + else conf.notification_topics) + self._transport = transport + + def _get_drivers(self): + if self._drivers is not None: + return self._drivers + + self._drivers = [] + + kwargs = dict(topics=self._topics, transport=self._transport) + + for driver in self._driver_names: + mgr = driver.DriverManager(NAMESPACE, + driver, + invoke_on_load=True, + invoke_args=[self.conf], + invoke_kwds=kwargs) + self._drivers.append(driver) + + return self._drivers + + def _notify(self, context, event_type, payload, priority): + msg = dict(message_id=uuidutils.generate_uuid(), + publisher_id=self.publisher_id, + event_type=event_type, + priority=priority, + payload=payload, + timestamp=str(timeutils.utcnow())) + + for driver in self._get_drivers(): + try: + driver.notify(context, msg, priority) + except Exception as e: + LOG.exception(_("Problem '%(e)s' attempting to send to " + "notification system. Payload=%(payload)s") + % dict(e=e, payload=payload)) + + def debug(self, context, event_type, payload): + self._notify(context, event_type, payload, 'DEBUG') + + def info(self, context, event_type, payload): + self._notify(context, event_type, payload, 'INFO') + + def warn(self, context, event_type, payload): + self._notify(context, event_type, payload, 'WARN') + + def error(self, context, event_type, payload): + self._notify(context, event_type, payload, 'ERROR') + + def critical(self, context, event_type, payload): + self._notify(context, event_type, payload, 'CRITICAL') diff --git a/openstack/common/messaging/transport.py b/openstack/common/messaging/transport.py index 93f9f97d3..36c77a8e9 100644 --- a/openstack/common/messaging/transport.py +++ b/openstack/common/messaging/transport.py @@ -52,10 +52,12 @@ class Transport(object): self.conf = driver.conf self._driver = driver - def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def _send(self, target, ctxt, message, + wait_for_reply=None, timeout=None, envelope=False): return self._driver.send(target, ctxt, message, wait_for_reply=wait_for_reply, - timeout=timeout) + timeout=timeout, + envelope=envelope) def _listen(self, target): return self._driver.listen(target)