 72c501454e
			
		
	
	72c501454e
	
	
	
		
			
			The mock module has been added to Python 3.3 as unittest.mock. The third party mock module doesn't seem to be maintained anymore: the last commit was in April 2013. unittest.mock is more recent, has less bugs and is compatible with Python 3.4. There are bugs on Python 3 in the third party mock module, examples: * https://code.google.com/p/mock/issues/detail?id=225 * https://code.google.com/p/mock/issues/detail?id=234 Oslo Messaging hits these issues when running tests on Python 3.4. Import oslotest to setup six.moves for mock. Change-Id: Ic160101695cea67eb9bbbfcaddb8d3dac64e6804
		
			
				
	
	
		
			541 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			541 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
 | |
| # Copyright 2013 Red Hat, Inc.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| import datetime
 | |
| import logging
 | |
| import sys
 | |
| import uuid
 | |
| 
 | |
| import fixtures
 | |
| from oslo_serialization import jsonutils
 | |
| from oslo_utils import timeutils
 | |
| from stevedore import dispatch
 | |
| from stevedore import extension
 | |
| import testscenarios
 | |
| import yaml
 | |
| 
 | |
| from oslo import messaging
 | |
| from oslo.messaging.notify import notifier as msg_notifier
 | |
| from oslo.messaging import serializer as msg_serializer
 | |
| from oslo_messaging.notify import _impl_log
 | |
| from oslo_messaging.notify import _impl_messaging
 | |
| from oslo_messaging.notify import _impl_test
 | |
| from oslo_messaging.tests import utils as test_utils
 | |
| from six.moves import mock
 | |
| 
 | |
| load_tests = testscenarios.load_tests_apply_scenarios
 | |
| 
 | |
| 
 | |
| class _FakeTransport(object):
 | |
| 
 | |
|     def __init__(self, conf):
 | |
|         self.conf = conf
 | |
| 
 | |
|     def _send_notification(self, target, ctxt, message, version, retry=None):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
 | |
| 
 | |
|     """Record logged exceptions and re-raise in cleanup.
 | |
| 
 | |
|     The notifier just logs notification send errors so, for the sake of
 | |
|     debugging test failures, we record any exceptions logged and re-raise them
 | |
|     during cleanup.
 | |
|     """
 | |
| 
 | |
|     class FakeLogger(object):
 | |
| 
 | |
|         def __init__(self):
 | |
|             self.exceptions = []
 | |
| 
 | |
|         def exception(self, msg, *args, **kwargs):
 | |
|             self.exceptions.append(sys.exc_info()[1])
 | |
| 
 | |
|     def setUp(self):
 | |
|         super(_ReRaiseLoggedExceptionsFixture, self).setUp()
 | |
| 
 | |
|         self.logger = self.FakeLogger()
 | |
| 
 | |
|         def reraise_exceptions():
 | |
|             for ex in self.logger.exceptions:
 | |
|                 raise ex
 | |
| 
 | |
|         self.addCleanup(reraise_exceptions)
 | |
| 
 | |
| 
 | |
| class TestMessagingNotifier(test_utils.BaseTestCase):
 | |
| 
 | |
|     _v1 = [
 | |
|         ('v1', dict(v1=True)),
 | |
|         ('not_v1', dict(v1=False)),
 | |
|     ]
 | |
| 
 | |
|     _v2 = [
 | |
|         ('v2', dict(v2=True)),
 | |
|         ('not_v2', dict(v2=False)),
 | |
|     ]
 | |
| 
 | |
|     _publisher_id = [
 | |
|         ('ctor_pub_id', dict(ctor_pub_id='test',
 | |
|                              expected_pub_id='test')),
 | |
|         ('prep_pub_id', dict(prep_pub_id='test.localhost',
 | |
|                              expected_pub_id='test.localhost')),
 | |
|         ('override', dict(ctor_pub_id='test',
 | |
|                           prep_pub_id='test.localhost',
 | |
|                           expected_pub_id='test.localhost')),
 | |
|     ]
 | |
| 
 | |
|     _topics = [
 | |
|         ('no_topics', dict(topics=[])),
 | |
|         ('single_topic', dict(topics=['notifications'])),
 | |
|         ('multiple_topic2', dict(topics=['foo', 'bar'])),
 | |
|     ]
 | |
| 
 | |
|     _priority = [
 | |
|         ('audit', dict(priority='audit')),
 | |
|         ('debug', dict(priority='debug')),
 | |
|         ('info', dict(priority='info')),
 | |
|         ('warn', dict(priority='warn')),
 | |
|         ('error', dict(priority='error')),
 | |
|         ('sample', dict(priority='sample')),
 | |
|         ('critical', dict(priority='critical')),
 | |
|     ]
 | |
| 
 | |
|     _payload = [
 | |
|         ('payload', dict(payload={'foo': 'bar'})),
 | |
|     ]
 | |
| 
 | |
|     _context = [
 | |
|         ('ctxt', dict(ctxt={'user': 'bob'})),
 | |
|     ]
 | |
| 
 | |
|     _retry = [
 | |
|         ('unconfigured', dict()),
 | |
|         ('None', dict(retry=None)),
 | |
|         ('0', dict(retry=0)),
 | |
|         ('5', dict(retry=5)),
 | |
|     ]
 | |
| 
 | |
|     @classmethod
 | |
|     def generate_scenarios(cls):
 | |
|         cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
 | |
|                                                          cls._v2,
 | |
|                                                          cls._publisher_id,
 | |
|                                                          cls._topics,
 | |
|                                                          cls._priority,
 | |
|                                                          cls._payload,
 | |
|                                                          cls._context,
 | |
|                                                          cls._retry)
 | |
| 
 | |
|     def setUp(self):
 | |
|         super(TestMessagingNotifier, self).setUp()
 | |
| 
 | |
|         self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger
 | |
|         self.stubs.Set(_impl_messaging, 'LOG', self.logger)
 | |
|         self.stubs.Set(msg_notifier, '_LOG', self.logger)
 | |
| 
 | |
|     @mock.patch('oslo_utils.timeutils.utcnow')
 | |
|     def test_notifier(self, mock_utcnow):
 | |
|         drivers = []
 | |
|         if self.v1:
 | |
|             drivers.append('messaging')
 | |
|         if self.v2:
 | |
|             drivers.append('messagingv2')
 | |
| 
 | |
|         self.config(notification_driver=drivers,
 | |
|                     notification_topics=self.topics)
 | |
| 
 | |
|         transport = _FakeTransport(self.conf)
 | |
| 
 | |
|         if hasattr(self, 'ctor_pub_id'):
 | |
|             notifier = messaging.Notifier(transport,
 | |
|                                           publisher_id=self.ctor_pub_id)
 | |
|         else:
 | |
|             notifier = messaging.Notifier(transport)
 | |
| 
 | |
|         prepare_kwds = {}
 | |
|         if hasattr(self, 'retry'):
 | |
|             prepare_kwds['retry'] = self.retry
 | |
|         if hasattr(self, 'prep_pub_id'):
 | |
|             prepare_kwds['publisher_id'] = self.prep_pub_id
 | |
|         if prepare_kwds:
 | |
|             notifier = notifier.prepare(**prepare_kwds)
 | |
| 
 | |
|         self.mox.StubOutWithMock(transport, '_send_notification')
 | |
| 
 | |
|         message_id = uuid.uuid4()
 | |
|         self.mox.StubOutWithMock(uuid, 'uuid4')
 | |
|         uuid.uuid4().AndReturn(message_id)
 | |
| 
 | |
|         mock_utcnow.return_value = datetime.datetime.utcnow()
 | |
| 
 | |
|         message = {
 | |
|             'message_id': str(message_id),
 | |
|             'publisher_id': self.expected_pub_id,
 | |
|             'event_type': 'test.notify',
 | |
|             'priority': self.priority.upper(),
 | |
|             'payload': self.payload,
 | |
|             'timestamp': str(timeutils.utcnow()),
 | |
|         }
 | |
| 
 | |
|         sends = []
 | |
|         if self.v1:
 | |
|             sends.append(dict(version=1.0))
 | |
|         if self.v2:
 | |
|             sends.append(dict(version=2.0))
 | |
| 
 | |
|         for send_kwargs in sends:
 | |
|             for topic in self.topics:
 | |
|                 if hasattr(self, 'retry'):
 | |
|                     send_kwargs['retry'] = self.retry
 | |
|                 else:
 | |
|                     send_kwargs['retry'] = None
 | |
|                 target = messaging.Target(topic='%s.%s' % (topic,
 | |
|                                                            self.priority))
 | |
|                 transport._send_notification(target, self.ctxt, message,
 | |
|                                              **send_kwargs).InAnyOrder()
 | |
| 
 | |
|         self.mox.ReplayAll()
 | |
| 
 | |
|         method = getattr(notifier, self.priority)
 | |
|         method(self.ctxt, 'test.notify', self.payload)
 | |
| 
 | |
| 
 | |
| TestMessagingNotifier.generate_scenarios()
 | |
| 
 | |
| 
 | |
| class TestSerializer(test_utils.BaseTestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         super(TestSerializer, self).setUp()
 | |
|         self.addCleanup(_impl_test.reset)
 | |
| 
 | |
|     @mock.patch('oslo_utils.timeutils.utcnow')
 | |
|     def test_serializer(self, mock_utcnow):
 | |
|         transport = _FakeTransport(self.conf)
 | |
| 
 | |
|         serializer = msg_serializer.NoOpSerializer()
 | |
| 
 | |
|         notifier = messaging.Notifier(transport,
 | |
|                                       'test.localhost',
 | |
|                                       driver='test',
 | |
|                                       topic='test',
 | |
|                                       serializer=serializer)
 | |
| 
 | |
|         message_id = uuid.uuid4()
 | |
|         self.mox.StubOutWithMock(uuid, 'uuid4')
 | |
|         uuid.uuid4().AndReturn(message_id)
 | |
| 
 | |
|         mock_utcnow.return_value = datetime.datetime.utcnow()
 | |
| 
 | |
|         self.mox.StubOutWithMock(serializer, 'serialize_context')
 | |
|         self.mox.StubOutWithMock(serializer, 'serialize_entity')
 | |
|         serializer.serialize_context(dict(user='bob')).\
 | |
|             AndReturn(dict(user='alice'))
 | |
|         serializer.serialize_entity(dict(user='bob'), 'bar').AndReturn('sbar')
 | |
| 
 | |
|         self.mox.ReplayAll()
 | |
| 
 | |
|         notifier.info(dict(user='bob'), 'test.notify', 'bar')
 | |
| 
 | |
|         message = {
 | |
|             'message_id': str(message_id),
 | |
|             'publisher_id': 'test.localhost',
 | |
|             'event_type': 'test.notify',
 | |
|             'priority': 'INFO',
 | |
|             'payload': 'sbar',
 | |
|             'timestamp': str(timeutils.utcnow()),
 | |
|         }
 | |
| 
 | |
|         self.assertEqual([(dict(user='alice'), message, 'INFO', None)],
 | |
|                          _impl_test.NOTIFICATIONS)
 | |
| 
 | |
| 
 | |
| class TestLogNotifier(test_utils.BaseTestCase):
 | |
| 
 | |
|     @mock.patch('oslo_utils.timeutils.utcnow')
 | |
|     def test_notifier(self, mock_utcnow):
 | |
|         self.config(notification_driver=['log'])
 | |
| 
 | |
|         transport = _FakeTransport(self.conf)
 | |
| 
 | |
|         notifier = messaging.Notifier(transport, 'test.localhost')
 | |
| 
 | |
|         message_id = uuid.uuid4()
 | |
|         self.mox.StubOutWithMock(uuid, 'uuid4')
 | |
|         uuid.uuid4().AndReturn(message_id)
 | |
| 
 | |
|         mock_utcnow.return_value = datetime.datetime.utcnow()
 | |
| 
 | |
|         message = {
 | |
|             'message_id': str(message_id),
 | |
|             'publisher_id': 'test.localhost',
 | |
|             'event_type': 'test.notify',
 | |
|             'priority': 'INFO',
 | |
|             'payload': 'bar',
 | |
|             'timestamp': str(timeutils.utcnow()),
 | |
|         }
 | |
| 
 | |
|         logger = self.mox.CreateMockAnything()
 | |
| 
 | |
|         self.mox.StubOutWithMock(logging, 'getLogger')
 | |
|         logging.getLogger('oslo.messaging.notification.test.notify').\
 | |
|             AndReturn(logger)
 | |
| 
 | |
|         logger.info(jsonutils.dumps(message))
 | |
| 
 | |
|         self.mox.ReplayAll()
 | |
| 
 | |
|         notifier.info({}, 'test.notify', 'bar')
 | |
| 
 | |
|     def test_sample_priority(self):
 | |
|         # Ensure logger drops sample-level notifications.
 | |
|         driver = _impl_log.LogDriver(None, None, None)
 | |
| 
 | |
|         logger = self.mox.CreateMock(
 | |
|             logging.getLogger('oslo.messaging.notification.foo'))
 | |
|         logger.sample = None
 | |
|         self.mox.StubOutWithMock(logging, 'getLogger')
 | |
|         logging.getLogger('oslo.messaging.notification.foo').\
 | |
|             AndReturn(logger)
 | |
| 
 | |
|         self.mox.ReplayAll()
 | |
| 
 | |
|         msg = {'event_type': 'foo'}
 | |
|         driver.notify(None, msg, "sample", None)
 | |
| 
 | |
| 
 | |
| class TestRoutingNotifier(test_utils.BaseTestCase):
 | |
|     def setUp(self):
 | |
|         super(TestRoutingNotifier, self).setUp()
 | |
|         self.config(notification_driver=['routing'])
 | |
| 
 | |
|         transport = _FakeTransport(self.conf)
 | |
|         self.notifier = messaging.Notifier(transport)
 | |
|         self.router = self.notifier._driver_mgr['routing'].obj
 | |
| 
 | |
|     def _fake_extension_manager(self, ext):
 | |
|         return extension.ExtensionManager.make_test_instance(
 | |
|             [extension.Extension('test', None, None, ext), ])
 | |
| 
 | |
|     def _empty_extension_manager(self):
 | |
|         return extension.ExtensionManager.make_test_instance([])
 | |
| 
 | |
|     def test_should_load_plugin(self):
 | |
|         self.router.used_drivers = set(["zoo", "blah"])
 | |
|         ext = mock.MagicMock()
 | |
|         ext.name = "foo"
 | |
|         self.assertFalse(self.router._should_load_plugin(ext))
 | |
|         ext.name = "zoo"
 | |
|         self.assertTrue(self.router._should_load_plugin(ext))
 | |
| 
 | |
|     def test_load_notifiers_no_config(self):
 | |
|         # default routing_notifier_config=""
 | |
|         self.router._load_notifiers()
 | |
|         self.assertEqual({}, self.router.routing_groups)
 | |
|         self.assertEqual(0, len(self.router.used_drivers))
 | |
| 
 | |
|     def test_load_notifiers_no_extensions(self):
 | |
|         self.config(routing_notifier_config="routing_notifier.yaml")
 | |
|         routing_config = r""
 | |
|         config_file = mock.MagicMock()
 | |
|         config_file.return_value = routing_config
 | |
| 
 | |
|         with mock.patch.object(self.router, '_get_notifier_config_file',
 | |
|                                config_file):
 | |
|             with mock.patch('stevedore.dispatch.DispatchExtensionManager',
 | |
|                             return_value=self._empty_extension_manager()):
 | |
|                 with mock.patch('oslo_messaging.notify.'
 | |
|                                 '_impl_routing.LOG') as mylog:
 | |
|                     self.router._load_notifiers()
 | |
|                     self.assertFalse(mylog.debug.called)
 | |
|         self.assertEqual({}, self.router.routing_groups)
 | |
| 
 | |
|     def test_load_notifiers_config(self):
 | |
|         self.config(routing_notifier_config="routing_notifier.yaml")
 | |
|         routing_config = r"""
 | |
| group_1:
 | |
|    rpc : foo
 | |
| group_2:
 | |
|    rpc : blah
 | |
|         """
 | |
| 
 | |
|         config_file = mock.MagicMock()
 | |
|         config_file.return_value = routing_config
 | |
| 
 | |
|         with mock.patch.object(self.router, '_get_notifier_config_file',
 | |
|                                config_file):
 | |
|             with mock.patch('stevedore.dispatch.DispatchExtensionManager',
 | |
|                             return_value=self._fake_extension_manager(
 | |
|                                 mock.MagicMock())):
 | |
|                 self.router._load_notifiers()
 | |
|                 groups = list(self.router.routing_groups.keys())
 | |
|                 groups.sort()
 | |
|                 self.assertEqual(['group_1', 'group_2'], groups)
 | |
| 
 | |
|     def test_get_drivers_for_message_accepted_events(self):
 | |
|         config = r"""
 | |
| group_1:
 | |
|    rpc:
 | |
|        accepted_events:
 | |
|           - foo.*
 | |
|           - blah.zoo.*
 | |
|           - zip
 | |
|         """
 | |
|         groups = yaml.load(config)
 | |
|         group = groups['group_1']
 | |
| 
 | |
|         # No matching event ...
 | |
|         self.assertEqual([],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, "unknown", "info"))
 | |
| 
 | |
|         # Child of foo ...
 | |
|         self.assertEqual(['rpc'],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, "foo.1", "info"))
 | |
| 
 | |
|         # Foo itself ...
 | |
|         self.assertEqual([],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, "foo", "info"))
 | |
| 
 | |
|         # Child of blah.zoo
 | |
|         self.assertEqual(['rpc'],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, "blah.zoo.zing", "info"))
 | |
| 
 | |
|     def test_get_drivers_for_message_accepted_priorities(self):
 | |
|         config = r"""
 | |
| group_1:
 | |
|    rpc:
 | |
|        accepted_priorities:
 | |
|           - info
 | |
|           - error
 | |
|         """
 | |
|         groups = yaml.load(config)
 | |
|         group = groups['group_1']
 | |
| 
 | |
|         # No matching priority
 | |
|         self.assertEqual([],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, None, "unknown"))
 | |
| 
 | |
|         # Info ...
 | |
|         self.assertEqual(['rpc'],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, None, "info"))
 | |
| 
 | |
|         # Error (to make sure the list is getting processed) ...
 | |
|         self.assertEqual(['rpc'],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, None, "error"))
 | |
| 
 | |
|     def test_get_drivers_for_message_both(self):
 | |
|         config = r"""
 | |
| group_1:
 | |
|    rpc:
 | |
|        accepted_priorities:
 | |
|           - info
 | |
|        accepted_events:
 | |
|           - foo.*
 | |
|    driver_1:
 | |
|        accepted_priorities:
 | |
|           - info
 | |
|    driver_2:
 | |
|       accepted_events:
 | |
|           - foo.*
 | |
|         """
 | |
|         groups = yaml.load(config)
 | |
|         group = groups['group_1']
 | |
| 
 | |
|         # Valid event, but no matching priority
 | |
|         self.assertEqual(['driver_2'],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, 'foo.blah', "unknown"))
 | |
| 
 | |
|         # Valid priority, but no matching event
 | |
|         self.assertEqual(['driver_1'],
 | |
|                          self.router._get_drivers_for_message(
 | |
|                              group, 'unknown', "info"))
 | |
| 
 | |
|         # Happy day ...
 | |
|         x = self.router._get_drivers_for_message(group, 'foo.blah', "info")
 | |
|         x.sort()
 | |
|         self.assertEqual(['driver_1', 'driver_2', 'rpc'], x)
 | |
| 
 | |
|     def test_filter_func(self):
 | |
|         ext = mock.MagicMock()
 | |
|         ext.name = "rpc"
 | |
| 
 | |
|         # Good ...
 | |
|         self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
 | |
|                         None, ['foo', 'rpc']))
 | |
| 
 | |
|         # Bad
 | |
|         self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
 | |
|                                                   None, ['foo']))
 | |
| 
 | |
|     def test_notify(self):
 | |
|         self.router.routing_groups = {'group_1': None, 'group_2': None}
 | |
|         drivers_mock = mock.MagicMock()
 | |
|         drivers_mock.side_effect = [['rpc'], ['foo']]
 | |
| 
 | |
|         with mock.patch.object(self.router, 'plugin_manager') as pm:
 | |
|             with mock.patch.object(self.router, '_get_drivers_for_message',
 | |
|                                    drivers_mock):
 | |
|                 self.notifier.info({}, 'my_event', {})
 | |
|                 self.assertEqual(sorted(['rpc', 'foo']),
 | |
|                                  sorted(pm.map.call_args[0][6]))
 | |
| 
 | |
|     def test_notify_filtered(self):
 | |
|         self.config(routing_notifier_config="routing_notifier.yaml")
 | |
|         routing_config = r"""
 | |
| group_1:
 | |
|     rpc:
 | |
|         accepted_events:
 | |
|           - my_event
 | |
|     rpc2:
 | |
|         accepted_priorities:
 | |
|           - info
 | |
|     bar:
 | |
|         accepted_events:
 | |
|             - nothing
 | |
|         """
 | |
|         config_file = mock.MagicMock()
 | |
|         config_file.return_value = routing_config
 | |
| 
 | |
|         rpc_driver = mock.Mock()
 | |
|         rpc2_driver = mock.Mock()
 | |
|         bar_driver = mock.Mock()
 | |
| 
 | |
|         pm = dispatch.DispatchExtensionManager.make_test_instance(
 | |
|             [extension.Extension('rpc', None, None, rpc_driver),
 | |
|              extension.Extension('rpc2', None, None, rpc2_driver),
 | |
|              extension.Extension('bar', None, None, bar_driver)],
 | |
|         )
 | |
| 
 | |
|         with mock.patch.object(self.router, '_get_notifier_config_file',
 | |
|                                config_file):
 | |
|             with mock.patch('stevedore.dispatch.DispatchExtensionManager',
 | |
|                             return_value=pm):
 | |
|                 self.notifier.info({}, 'my_event', {})
 | |
|                 self.assertFalse(bar_driver.info.called)
 | |
|                 rpc_driver.notify.assert_called_once_with(
 | |
|                     {}, mock.ANY, 'INFO', None)
 | |
|                 rpc2_driver.notify.assert_called_once_with(
 | |
|                     {}, mock.ANY, 'INFO', None)
 |