diff --git a/manila/tests/test_utils.py b/manila/tests/test_utils.py index c5e1ad33c7..f1b39d4dff 100644 --- a/manila/tests/test_utils.py +++ b/manila/tests/test_utils.py @@ -16,10 +16,7 @@ import datetime import errno -import os -import os.path import socket -import tempfile import time import uuid @@ -28,7 +25,6 @@ import mock from oslo_config import cfg from oslo_utils import timeutils import paramiko -from six.moves import builtins import manila from manila.common import constants @@ -41,221 +37,8 @@ from manila import utils CONF = cfg.CONF -class GetFromPathTestCase(test.TestCase): - def test_tolerates_nones(self): - f = utils.get_from_path - - input = [] - self.assertEqual([], f(input, "a")) - self.assertEqual([], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [None] - self.assertEqual([], f(input, "a")) - self.assertEqual([], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': None}] - self.assertEqual([], f(input, "a")) - self.assertEqual([], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': None}}] - self.assertEqual([{'b': None}], f(input, "a")) - self.assertEqual([], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': {'c': None}}}] - self.assertEqual([{'b': {'c': None}}], f(input, "a")) - self.assertEqual([{'c': None}], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': {'c': None}}}, {'a': None}] - self.assertEqual([{'b': {'c': None}}], f(input, "a")) - self.assertEqual([{'c': None}], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}] - self.assertEqual([{'b': {'c': None}}, {'b': None}], f(input, "a")) - self.assertEqual([{'c': None}], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - def test_does_select(self): - f = utils.get_from_path - - input = [{'a': 'a_1'}] - self.assertEqual(['a_1'], f(input, "a")) - self.assertEqual([], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': 'b_1'}}] - self.assertEqual([{'b': 'b_1'}], f(input, "a")) - self.assertEqual(['b_1'], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': {'c': 'c_1'}}}] - self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a")) - self.assertEqual([{'c': 'c_1'}], f(input, "a/b")) - self.assertEqual(['c_1'], f(input, "a/b/c")) - - input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}] - self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a")) - self.assertEqual([{'c': 'c_1'}], f(input, "a/b")) - self.assertEqual(['c_1'], f(input, "a/b/c")) - - input = [{'a': {'b': {'c': 'c_1'}}}, - {'a': {'b': None}}] - self.assertEqual([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a")) - self.assertEqual([{'c': 'c_1'}], f(input, "a/b")) - self.assertEqual(['c_1'], f(input, "a/b/c")) - - input = [{'a': {'b': {'c': 'c_1'}}}, - {'a': {'b': {'c': 'c_2'}}}] - self.assertEqual([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}], - f(input, "a")) - self.assertEqual([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b")) - self.assertEqual(['c_1', 'c_2'], f(input, "a/b/c")) - - self.assertEqual([], f(input, "a/b/c/d")) - self.assertEqual([], f(input, "c/a/b/d")) - self.assertEqual([], f(input, "i/r/t")) - - def test_flattens_lists(self): - f = utils.get_from_path - - input = [{'a': [1, 2, 3]}] - self.assertEqual([1, 2, 3], f(input, "a")) - self.assertEqual([], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': [1, 2, 3]}}] - self.assertEqual([{'b': [1, 2, 3]}], f(input, "a")) - self.assertEqual([1, 2, 3], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}] - self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}] - self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = [{'a': [1, 2, {'b': 'b_1'}]}] - self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a")) - self.assertEqual(['b_1'], f(input, "a/b")) - - def test_bad_xpath(self): - f = utils.get_from_path - - self.assertRaises(exception.Error, f, [], None) - self.assertRaises(exception.Error, f, [], "") - self.assertRaises(exception.Error, f, [], "/") - self.assertRaises(exception.Error, f, [], "/a") - self.assertRaises(exception.Error, f, [], "/a/") - self.assertRaises(exception.Error, f, [], "//") - self.assertRaises(exception.Error, f, [], "//a") - self.assertRaises(exception.Error, f, [], "a//a") - self.assertRaises(exception.Error, f, [], "a//a/") - self.assertRaises(exception.Error, f, [], "a/a/") - - def test_real_failure1(self): - # Real world failure case... - # We weren't coping when the input was a Dictionary instead of a List - # This led to test_accepts_dictionaries - f = utils.get_from_path - - inst = {'fixed_ip': {'floating_ips': [{'address': '1.2.3.4'}], - 'address': '192.168.0.3'}, - 'hostname': ''} - - private_ips = f(inst, 'fixed_ip/address') - public_ips = f(inst, 'fixed_ip/floating_ips/address') - self.assertEqual(['192.168.0.3'], private_ips) - self.assertEqual(['1.2.3.4'], public_ips) - - def test_accepts_dictionaries(self): - f = utils.get_from_path - - input = {'a': [1, 2, 3]} - self.assertEqual([1, 2, 3], f(input, "a")) - self.assertEqual([], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = {'a': {'b': [1, 2, 3]}} - self.assertEqual([{'b': [1, 2, 3]}], f(input, "a")) - self.assertEqual([1, 2, 3], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]} - self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b")) - self.assertEqual([], f(input, "a/b/c")) - - input = {'a': [1, 2, {'b': 'b_1'}]} - self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a")) - self.assertEqual(['b_1'], f(input, "a/b")) - - @ddt.ddt class GenericUtilsTestCase(test.TestCase): - def test_read_cached_file(self): - cache_data = {"data": 1123, "mtime": 1} - with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=1)): - data = utils.read_cached_file("/this/is/a/fake", cache_data) - self.assertEqual(cache_data["data"], data) - os.path.getmtime.assert_called_once_with("/this/is/a/fake") - - def test_read_modified_cached_file(self): - with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=2)): - fake_contents = "lorem ipsum" - fake_file = mock.Mock() - fake_file.read = mock.Mock(return_value=fake_contents) - fake_context_manager = mock.Mock() - fake_context_manager.__enter__ = mock.Mock(return_value=fake_file) - fake_context_manager.__exit__ = mock.Mock() - with mock.patch.object( - builtins, 'open', - mock.Mock(return_value=fake_context_manager)): - cache_data = {"data": 1123, "mtime": 1} - self.reload_called = False - - def test_reload(reloaded_data): - self.assertEqual(fake_contents, reloaded_data) - self.reload_called = True - - data = utils.read_cached_file("/this/is/a/fake", - cache_data, - reload_func=test_reload) - self.assertEqual(fake_contents, data) - self.assertTrue(self.reload_called) - fake_file.read.assert_called_once_with() - fake_context_manager.__enter__.assert_any_call() - builtins.open.assert_called_once_with("/this/is/a/fake") - os.path.getmtime.assert_called_once_with("/this/is/a/fake") - - def test_read_file_as_root(self): - def fake_execute(*args, **kwargs): - if args[1] == 'bad': - raise exception.ProcessExecutionError - return 'fakecontents', None - - self.mock_object(utils, 'execute', fake_execute) - contents = utils.read_file_as_root('good') - self.assertEqual('fakecontents', contents) - self.assertRaises(exception.FileNotFound, - utils.read_file_as_root, 'bad') - - def test_temporary_chown(self): - def fake_execute(*args, **kwargs): - if args[0] == 'chown': - fake_execute.uid = args[1] - self.mock_object(utils, 'execute', fake_execute) - - with tempfile.NamedTemporaryFile() as f: - with utils.temporary_chown(f.name, owner_uid=2): - self.assertEqual(2, fake_execute.uid) - self.assertEqual(fake_execute.uid, os.getuid()) - def test_service_is_up(self): fts_func = datetime.datetime.fromtimestamp fake_now = 1000 diff --git a/manila/utils.py b/manila/utils.py index a3dd2f0c07..ea21ef21af 100644 --- a/manila/utils.py +++ b/manila/utils.py @@ -218,57 +218,6 @@ class LazyPluggable(object): return getattr(backend, key) -def get_from_path(items, path): - """Returns a list of items matching the specified path. - - Takes an XPath-like expression e.g. prop1/prop2/prop3, and for each item - in items, looks up items[prop1][prop2][prop3]. Like XPath, if any of the - intermediate results are lists it will treat each list item individually. - A 'None' in items or any child expressions will be ignored, this function - will not throw because of None (anywhere) in items. The returned list - will contain no None values. - - """ - if path is None: - raise exception.Error('Invalid mini_xpath') - - (first_token, sep, remainder) = path.partition('/') - - if first_token == '': - raise exception.Error('Invalid mini_xpath') - - results = [] - - if items is None: - return results - - if not isinstance(items, list): - # Wrap single objects in a list - items = [items] - - for item in items: - if item is None: - continue - get_method = getattr(item, 'get', None) - if get_method is None: - continue - child = get_method(first_token) - if child is None: - continue - if isinstance(child, list): - # Flatten intermediate lists - for x in child: - results.append(x) - else: - results.append(child) - - if not sep: - # No more tokens - return results - else: - return get_from_path(results, remainder) - - def is_eventlet_bug105(): """Check if eventlet support IPv6 addresses. @@ -341,26 +290,6 @@ def monkey_patch(): decorator("%s.%s" % (module, key), func)) -def read_cached_file(filename, cache_info, reload_func=None): - """Read from a file if it has been modified. - - :param cache_info: dictionary to hold opaque cache. - :param reload_func: optional function to be called with data when - file is reloaded due to a modification. - - :returns: data from file - - """ - mtime = os.path.getmtime(filename) - if not cache_info or mtime != cache_info.get('mtime'): - with open(filename) as fap: - cache_info['data'] = fap.read() - cache_info['mtime'] = mtime - if reload_func: - reload_func(cache_info['data']) - return cache_info['data'] - - def file_open(*args, **kwargs): """Open file @@ -391,35 +320,6 @@ def validate_service_host(context, host): return service -def read_file_as_root(file_path): - """Secure helper to read file as root.""" - try: - out, _err = execute('cat', file_path, run_as_root=True) - return out - except exception.ProcessExecutionError: - raise exception.FileNotFound(file_path=file_path) - - -@contextlib.contextmanager -def temporary_chown(path, owner_uid=None): - """Temporarily chown a path. - - :params owner_uid: UID of temporary owner (defaults to current user) - """ - if owner_uid is None: - owner_uid = os.getuid() - - orig_uid = os.stat(path).st_uid - - if orig_uid != owner_uid: - execute('chown', owner_uid, path, run_as_root=True) - try: - yield - finally: - if orig_uid != owner_uid: - execute('chown', orig_uid, path, run_as_root=True) - - @contextlib.contextmanager def tempdir(**kwargs): tmpdir = tempfile.mkdtemp(**kwargs)