diff --git a/etc/routing_notifier.yaml.sample b/etc/routing_notifier.yaml.sample new file mode 100644 index 000000000..26fb7310a --- /dev/null +++ b/etc/routing_notifier.yaml.sample @@ -0,0 +1,29 @@ +# Setting a priority AND an event means both have to be satisfied. +# +# However, defining different sets for the same driver allows you +# to do OR operations. +# +# See how this logic is modeled below: +# +# if (priority in info, warn or error) or +# (event == compute.scheduler.run_instance) +# send to messaging driver ... +# +# if priority == 'poll' and +# event == 'bandwidth.*' +# send to poll driver + +group_1: + messaging: + accepted_priorities: ['info', 'warn', 'error'] + + poll: + accepted_priorities: ['poll'] + accepted_events: ['bandwidth.*'] + + log: + accepted_events: ['compute.instance.exists'] + +group_2: + messaging:⋅ + accepted_events: ['compute.scheduler.run_instance.*'] diff --git a/oslo/messaging/notify/_impl_routing.py b/oslo/messaging/notify/_impl_routing.py new file mode 100644 index 000000000..1e8368fa3 --- /dev/null +++ b/oslo/messaging/notify/_impl_routing.py @@ -0,0 +1,136 @@ +# Copyright 2014 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import fnmatch +import logging + +from oslo.config import cfg +import six +from stevedore import dispatch +import yaml + +from oslo.messaging.notify import notifier +from oslo.messaging.openstack.common.gettextutils import _ # noqa + + +LOG = logging.getLogger(__name__) + +router_config = cfg.StrOpt('routing_notifier_config', default='', + help='RoutingNotifier configuration file location') + +CONF = cfg.CONF +CONF.register_opt(router_config) + + +class RoutingDriver(notifier._Driver): + NOTIFIER_PLUGIN_NAMESPACE = 'oslo.messaging.notify.drivers' + + plugin_manager = None + routing_groups = None # The routing groups from the config file. + used_drivers = None # Used driver names, extracted from config file. + + def _should_load_plugin(self, ext, *args, **kwargs): + # Hack to keep stevedore from circular importing since these + # endpoints are used for different purposes. + if ext.name == 'routing': + return False + return ext.name in self.used_drivers + + def _get_notifier_config_file(self, filename): + """Broken out for testing.""" + return file(filename, 'r') + + def _load_notifiers(self): + """One-time load of notifier config file.""" + self.routing_groups = {} + self.used_drivers = set() + filename = CONF.routing_notifier_config + if not filename: + return + + # Infer which drivers are used from the config file. + self.routing_groups = yaml.load( + self._get_notifier_config_file(filename)) + if not self.routing_groups: + self.routing_groups = {} # In case we got None from load() + return + + for group in self.routing_groups.values(): + self.used_drivers.update(group.keys()) + + LOG.debug(_('loading notifiers from %(namespace)s') % + {'namespace': self.NOTIFIER_PLUGIN_NAMESPACE}) + self.plugin_manager = dispatch.DispatchExtensionManager( + namespace=self.NOTIFIER_PLUGIN_NAMESPACE, + check_func=self._should_load_plugin, + invoke_on_load=True, + invoke_args=None) + if not list(self.plugin_manager): + LOG.warning(_("Failed to load any notifiers " + "for %(namespace)s") % + {'namespace': self.NOTIFIER_PLUGIN_NAMESPACE}) + + def _get_drivers_for_message(self, group, event_type, priority): + """Which drivers should be called for this event_type + or priority. + """ + accepted_drivers = set() + + for driver, rules in six.iteritems(group): + checks = [] + for key, patterns in six.iteritems(rules): + if key == 'accepted_events': + c = [fnmatch.fnmatch(event_type, p) + for p in patterns] + checks.append(any(c)) + if key == 'accepted_priorities': + c = [fnmatch.fnmatch(priority, p.lower()) + for p in patterns] + checks.append(any(c)) + if all(checks): + accepted_drivers.add(driver) + + return list(accepted_drivers) + + def _filter_func(self, ext, context, message, accepted_drivers): + """True/False if the driver should be called for this message. + """ + # context is unused here, but passed in by map() + return ext.name in accepted_drivers + + def _call_notify(self, ext, context, message, accepted_drivers): + """Emit the notification. + """ + # accepted_drivers is passed in as a result of the map() function + LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") % + {'event': message.get('event_type'), 'driver': ext.name}) + ext.obj.notify(context, message) + + def notify(self, context, message): + if not self.plugin_manager: + self._load_notifiers() + + # Fail if these aren't present ... + event_type = message['event_type'] + priority = message['priority'].lower() + + accepted_drivers = set() + for group in self.routing_groups.values(): + accepted_drivers.update(self._get_drivers_for_message(group, + event_type, + priority)) + + self.plugin_manager.map(self._filter_func, self._call_notify, context, + message, list(accepted_drivers)) diff --git a/requirements.txt b/requirements.txt index 5fe5fdfd6..1bdf72f0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,11 @@ six>=1.4.1 # FIXME(markmc): remove this when the drivers no longer # import eventlet + eventlet>=0.13.0 # used by openstack/common/gettextutils.py Babel>=1.3 + +# for the routing notifier +PyYAML>=3.1.0 diff --git a/setup.cfg b/setup.cfg index a1e6715a5..ed81aab68 100644 --- a/setup.cfg +++ b/setup.cfg @@ -48,6 +48,7 @@ oslo.messaging.notify.drivers = log = oslo.messaging.notify._impl_log:LogDriver test = oslo.messaging.notify._impl_test:TestDriver noop = oslo.messaging.notify._impl_noop:NoOpDriver + routing = oslo.messaging.notify._impl_routing:RoutingDriver [build_sphinx] source-dir = doc/source diff --git a/tests/test_notifier.py b/tests/test_notifier.py index 0f8b3cf1c..14174c7cb 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -20,11 +20,15 @@ import uuid import fixtures import mock +from stevedore import extension +from stevedore.tests import manager as test_manager import testscenarios +import yaml from oslo import messaging from oslo.messaging.notify import _impl_log from oslo.messaging.notify import _impl_messaging +from oslo.messaging.notify import _impl_routing as routing from oslo.messaging.notify import _impl_test from oslo.messaging.notify import notifier as msg_notifier from oslo.messaging.openstack.common import jsonutils @@ -188,7 +192,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase): target = messaging.Target(topic='%s.%s' % (topic, self.priority)) transport._send_notification(target, self.ctxt, message, - **send_kwargs) + **send_kwargs).InAnyOrder() self.mox.ReplayAll() @@ -302,3 +306,183 @@ class TestLogNotifier(test_utils.BaseTestCase): msg = {'event_type': 'foo'} driver.notify(None, msg, "sample") + + +class TestRoutingNotifier(test_utils.BaseTestCase): + def setUp(self): + super(TestRoutingNotifier, self).setUp() + self.router = routing.RoutingDriver(None, None, None) + + def _fake_extension_manager(self, ext): + return test_manager.TestExtensionManager( + [extension.Extension('test', None, None, ext), ]) + + def _empty_extension_manager(self): + return test_manager.TestExtensionManager([]) + + def test_should_load_plugin(self): + self.router.used_drivers = set(["zoo", "blah"]) + ext = mock.MagicMock() + ext.name = "foo" + self.assertFalse(self.router._should_load_plugin(ext)) + ext.name = "zoo" + self.assertTrue(self.router._should_load_plugin(ext)) + + def test_load_notifiers_no_config(self): + # default routing_notifier_config="" + self.router._load_notifiers() + self.assertEqual(self.router.routing_groups, {}) + self.assertEqual(0, len(self.router.used_drivers)) + + def test_load_notifiers_no_extensions(self): + self.config(routing_notifier_config="routing_notifier.yaml") + routing_config = r"" + config_file = mock.MagicMock() + config_file.return_value = routing_config + + with mock.patch.object(self.router, '_get_notifier_config_file', + config_file): + with mock.patch('stevedore.dispatch.DispatchExtensionManager', + return_value=self._empty_extension_manager()): + with mock.patch('oslo.messaging.notify.' + '_impl_routing.LOG') as mylog: + self.router._load_notifiers() + self.assertFalse(mylog.debug.called) + self.assertEqual(self.router.routing_groups, {}) + + def test_load_notifiers_config(self): + self.config(routing_notifier_config="routing_notifier.yaml") + routing_config = r""" +group_1: + rpc : foo +group_2: + rpc : blah + """ + + config_file = mock.MagicMock() + config_file.return_value = routing_config + + with mock.patch.object(self.router, '_get_notifier_config_file', + config_file): + with mock.patch('stevedore.dispatch.DispatchExtensionManager', + return_value=self._fake_extension_manager( + mock.MagicMock())): + self.router._load_notifiers() + groups = self.router.routing_groups.keys() + groups.sort() + self.assertEqual(['group_1', 'group_2'], groups) + + def test_get_drivers_for_message_accepted_events(self): + config = r""" +group_1: + rpc: + accepted_events: + - foo.* + - blah.zoo.* + - zip + """ + groups = yaml.load(config) + group = groups['group_1'] + + # No matching event ... + self.assertEqual([], + self.router._get_drivers_for_message( + group, "unknown", None)) + + # Child of foo ... + self.assertEqual(['rpc'], + self.router._get_drivers_for_message( + group, "foo.1", None)) + + # Foo itself ... + self.assertEqual([], + self.router._get_drivers_for_message( + group, "foo", None)) + + # Child of blah.zoo + self.assertEqual(['rpc'], + self.router._get_drivers_for_message( + group, "blah.zoo.zing", None)) + + def test_get_drivers_for_message_accepted_priorities(self): + config = r""" +group_1: + rpc: + accepted_priorities: + - info + - error + """ + groups = yaml.load(config) + group = groups['group_1'] + + # No matching priority + self.assertEqual([], + self.router._get_drivers_for_message( + group, None, "unknown")) + + # Info ... + self.assertEqual(['rpc'], + self.router._get_drivers_for_message( + group, None, "info")) + + # Error (to make sure the list is getting processed) ... + self.assertEqual(['rpc'], + self.router._get_drivers_for_message( + group, None, "error")) + + def test_get_drivers_for_message_both(self): + config = r""" +group_1: + rpc: + accepted_priorities: + - info + accepted_events: + - foo.* + driver_1: + accepted_priorities: + - info + driver_2: + accepted_events: + - foo.* + """ + groups = yaml.load(config) + group = groups['group_1'] + + # Valid event, but no matching priority + self.assertEqual(['driver_2'], + self.router._get_drivers_for_message( + group, 'foo.blah', "unknown")) + + # Valid priority, but no matching event + self.assertEqual(['driver_1'], + self.router._get_drivers_for_message( + group, 'unknown', "info")) + + # Happy day ... + x = self.router._get_drivers_for_message(group, 'foo.blah', "info") + x.sort() + self.assertEqual(['driver_1', 'driver_2', 'rpc'], x) + + def test_filter_func(self): + ext = mock.MagicMock() + ext.name = "rpc" + + # Good ... + self.assertTrue(self.router._filter_func(ext, {}, {}, + ['foo', 'rpc'])) + + # Bad + self.assertFalse(self.router._filter_func(ext, {}, {}, ['foo'])) + + def test_notify(self): + self.router.routing_groups = {'group_1': None, 'group_2': None} + message = {'event_type': 'my_event', 'priority': 'my_priority'} + + drivers_mock = mock.MagicMock() + drivers_mock.side_effect = [['rpc'], ['foo']] + + with mock.patch.object(self.router, 'plugin_manager') as pm: + with mock.patch.object(self.router, '_get_drivers_for_message', + drivers_mock): + self.router.notify({}, message) + self.assertEqual(pm.map.call_args[0][4], ['rpc', 'foo'])