diff --git a/doc/source/overview_policies.rst b/doc/source/overview_policies.rst index 66dc5de212..12a74897c0 100644 --- a/doc/source/overview_policies.rst +++ b/doc/source/overview_policies.rst @@ -324,6 +324,12 @@ Each policy section contains the following options: policy types. - The default value is ``replication``. - When defining an EC policy use the value ``erasure_coding``. +* ``diskfile_module = `` (optional) + - The option ``diskfile_module`` is used to load an alternate backend + object storage plug-in architecture. + - The default value is ``egg:swift#replication.fs`` or + ``egg:swift#erasure_coding.fs`` depending on the policy type. The scheme + and package name are optionals and default to ``egg`` and ``swift``. The EC policy type has additional required options. See :ref:`using_ec_policy` for details. @@ -347,6 +353,7 @@ this example configuration.:: [storage-policy:1] name = silver policy_type = replication + diskfile_module = replication.fs deprecated = yes diff --git a/etc/swift.conf-sample b/etc/swift.conf-sample index 7b51f1a1d7..f28c020fb2 100644 --- a/etc/swift.conf-sample +++ b/etc/swift.conf-sample @@ -26,10 +26,15 @@ swift_hash_path_prefix = changeme # # A 'policy_type' argument is also supported but is not mandatory. Default # policy type 'replication' is used when 'policy_type' is unspecified. +# A 'diskfile_module' optional argument lets you specify an alternate backend +# object storage plug-in architecture. The default is +# "egg:swift#replication.fs", or "egg:swift#erasure_coding.fs", depending on +# the policy type. [storage-policy:0] name = Policy-0 default = yes #policy_type = replication +#diskfile_module = egg:swift#replication.fs aliases = yellow, orange # the following section would declare a policy called 'silver', the number of @@ -49,6 +54,7 @@ aliases = yellow, orange #[storage-policy:1] #name = silver #policy_type = replication +#diskfile_module = egg:swift#replication.fs # The following declares a storage policy of type 'erasure_coding' which uses # Erasure Coding for data reliability. Please refer to Swift documentation for @@ -81,6 +87,7 @@ aliases = yellow, orange #name = deepfreeze10-4 #aliases = df10-4 #policy_type = erasure_coding +#diskfile_module = egg:swift#erasure_coding.fs #ec_type = liberasurecode_rs_vand #ec_num_data_fragments = 10 #ec_num_parity_fragments = 4 diff --git a/setup.cfg b/setup.cfg index 594368dc07..52385f3da9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -123,6 +123,9 @@ paste.filter_factory = s3api = swift.common.middleware.s3api.s3api:filter_factory s3token = swift.common.middleware.s3api.s3token:filter_factory +swift.diskfile = + replication.fs = swift.obj.diskfile:DiskFileManager + erasure_coding.fs = swift.obj.diskfile:ECDiskFileManager [egg_info] tag_build = diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py index baf224227e..51eca182a7 100644 --- a/swift/common/storage_policy.py +++ b/swift/common/storage_policy.py @@ -21,7 +21,7 @@ import six from six.moves.configparser import ConfigParser from swift.common.utils import ( config_true_value, quorum_size, whataremyips, list_from_csv, - config_positive_int_value, get_zero_indexed_base_string) + config_positive_int_value, get_zero_indexed_base_string, load_pkg_resource) from swift.common.ring import Ring, RingData from swift.common import utils from swift.common.exceptions import RingLoadError @@ -157,7 +157,8 @@ class BaseStoragePolicy(object): policy_type_to_policy_cls = {} def __init__(self, idx, name='', is_default=False, is_deprecated=False, - object_ring=None, aliases=''): + object_ring=None, aliases='', + diskfile_module='egg:swift#replication.fs'): # do not allow BaseStoragePolicy class to be instantiated directly if type(self) == BaseStoragePolicy: raise TypeError("Can't instantiate BaseStoragePolicy directly") @@ -187,6 +188,8 @@ class BaseStoragePolicy(object): self.ring_name = _get_policy_string('object', self.idx) self.object_ring = object_ring + self.diskfile_module = diskfile_module + @property def name(self): return self.alias_list[0] @@ -252,6 +255,7 @@ class BaseStoragePolicy(object): 'policy_type': 'policy_type', 'default': 'is_default', 'deprecated': 'is_deprecated', + 'diskfile_module': 'diskfile_module' } @classmethod @@ -285,6 +289,7 @@ class BaseStoragePolicy(object): if not self.is_deprecated: info.pop('deprecated') info.pop('policy_type') + info.pop('diskfile_module') return info def _validate_policy_name(self, name): @@ -376,6 +381,32 @@ class BaseStoragePolicy(object): """ raise NotImplementedError() + def get_diskfile_manager(self, *args, **kwargs): + """ + Return an instance of the diskfile manager class configured for this + storage policy. + + :param args: positional args to pass to the diskfile manager + constructor. + :param kwargs: keyword args to pass to the diskfile manager + constructor. + :return: A disk file manager instance. + """ + try: + dfm_cls = load_pkg_resource('swift.diskfile', self.diskfile_module) + except ImportError as err: + raise PolicyError( + 'Unable to load diskfile_module %s for policy %s: %s' % + (self.diskfile_module, self.name, err)) + try: + dfm_cls.check_policy(self) + except ValueError as err: + raise PolicyError( + 'Invalid diskfile_module %s for policy %s:%s (%s)' % + (self.diskfile_module, int(self), self.name, self.policy_type)) + + return dfm_cls(*args, **kwargs) + @BaseStoragePolicy.register(REPL_POLICY) class StoragePolicy(BaseStoragePolicy): @@ -411,13 +442,15 @@ class ECStoragePolicy(BaseStoragePolicy): def __init__(self, idx, name='', aliases='', is_default=False, is_deprecated=False, object_ring=None, + diskfile_module='egg:swift#erasure_coding.fs', ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE, ec_type=None, ec_ndata=None, ec_nparity=None, ec_duplication_factor=1): super(ECStoragePolicy, self).__init__( idx=idx, name=name, aliases=aliases, is_default=is_default, - is_deprecated=is_deprecated, object_ring=object_ring) + is_deprecated=is_deprecated, object_ring=object_ring, + diskfile_module=diskfile_module) # Validate erasure_coding policy specific members # ec_type is one of the EC implementations supported by PyEClib diff --git a/swift/common/utils.py b/swift/common/utils.py index 68ffcbf167..5f7adad4e8 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -59,6 +59,7 @@ import eventlet.debug import eventlet.greenthread import eventlet.patcher import eventlet.semaphore +import pkg_resources from eventlet import GreenPool, sleep, Timeout from eventlet.green import socket, threading from eventlet.hubs import trampoline @@ -5246,6 +5247,25 @@ def replace_partition_in_path(path, part_power): return os.sep.join(path_components) +def load_pkg_resource(group, uri): + if '#' in uri: + uri, name = uri.split('#', 1) + else: + name = uri + uri = 'egg:swift' + + if ':' in uri: + scheme, dist = uri.split(':', 1) + scheme = scheme.lower() + else: + scheme = 'egg' + dist = uri + + if scheme != 'egg': + raise TypeError('Unhandled URI scheme: %r' % scheme) + return pkg_resources.load_entry_point(dist, group, name) + + class PipeMutex(object): """ Mutex using a pipe. Works across both greenlets and real threads, even diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index e7fc2fed02..987ec1d8d6 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -602,29 +602,12 @@ def strip_self(f): class DiskFileRouter(object): - policy_type_to_manager_cls = {} - - @classmethod - def register(cls, policy_type): - """ - Decorator for Storage Policy implementations to register - their DiskFile implementation. - """ - def register_wrapper(diskfile_cls): - if policy_type in cls.policy_type_to_manager_cls: - raise PolicyError( - '%r is already registered for the policy_type %r' % ( - cls.policy_type_to_manager_cls[policy_type], - policy_type)) - cls.policy_type_to_manager_cls[policy_type] = diskfile_cls - return diskfile_cls - return register_wrapper - def __init__(self, *args, **kwargs): self.policy_to_manager = {} for policy in POLICIES: - manager_cls = self.policy_type_to_manager_cls[policy.policy_type] - self.policy_to_manager[int(policy)] = manager_cls(*args, **kwargs) + # create diskfile managers now to provoke any errors + self.policy_to_manager[int(policy)] = \ + policy.get_diskfile_manager(*args, **kwargs) def __getitem__(self, policy): return self.policy_to_manager[int(policy)] @@ -654,6 +637,7 @@ class BaseDiskFileManager(object): """ diskfile_cls = None # must be set by subclasses + policy = None # must be set by subclasses invalidate_hash = strip_self(invalidate_hash) consolidate_hashes = strip_self(consolidate_hashes) @@ -722,6 +706,11 @@ class BaseDiskFileManager(object): self.pipe_size = min(max_pipe_size, self.disk_chunk_size) self.use_linkat = o_tmpfile_supported() + @classmethod + def check_policy(cls, policy): + if policy.policy_type != cls.policy: + raise ValueError('Invalid policy_type: %s' % policy.policy_type) + def make_on_disk_filename(self, timestamp, ext=None, ctype_timestamp=None, *a, **kw): """ @@ -2845,9 +2834,9 @@ class DiskFile(BaseDiskFile): return self._ondisk_info -@DiskFileRouter.register(REPL_POLICY) class DiskFileManager(BaseDiskFileManager): diskfile_cls = DiskFile + policy = REPL_POLICY def _process_ondisk_files(self, exts, results, **kwargs): """ @@ -3203,9 +3192,9 @@ class ECDiskFile(BaseDiskFile): self.manager.invalidate_hash(dirname(self._datadir)) -@DiskFileRouter.register(EC_POLICY) class ECDiskFileManager(BaseDiskFileManager): diskfile_cls = ECDiskFile + policy = EC_POLICY def validate_fragment_index(self, frag_index): """ diff --git a/test/unit/cli/test_relinker.py b/test/unit/cli/test_relinker.py index 35c6721bc1..8daddb13ff 100644 --- a/test/unit/cli/test_relinker.py +++ b/test/unit/cli/test_relinker.py @@ -22,11 +22,12 @@ from swift.cli import relinker from swift.common import exceptions, ring, utils from swift.common import storage_policy from swift.common.storage_policy import ( - StoragePolicy, StoragePolicyCollection, POLICIES) + StoragePolicy, StoragePolicyCollection, POLICIES, ECStoragePolicy) from swift.obj.diskfile import write_metadata -from test.unit import FakeLogger, skip_if_no_xattrs +from test.unit import FakeLogger, skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \ + patch_policies class TestRelinker(unittest.TestCase): @@ -148,16 +149,20 @@ class TestRelinker(unittest.TestCase): ['Error cleaning up %s: %s' % (self.objname, repr(exceptions.DiskFileNotExist()))]) + @patch_policies( + [ECStoragePolicy( + 0, name='platin', is_default=True, ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=4, ec_nparity=2)]) def test_cleanup_non_durable_fragment(self): self._common_test_cleanup() - # Actually all fragments are non-durable and raise and DiskFileNotExist - # in EC in this test. However, if the counterpart exists in the new - # location, this is ok - it will be fixed by the reconstructor later on - storage_policy._POLICIES[0].policy_type = 'erasure_coding' - + # Switch the policy type so that actually all fragments are non-durable + # and raise a DiskFileNotExist in EC in this test. However, if the + # counterpart exists in the new location, this is ok - it will be fixed + # by the reconstructor later on self.assertEqual( - 0, relinker.cleanup(self.testdir, self.devices, True, self.logger)) + 0, relinker.cleanup(self.testdir, self.devices, True, + self.logger)) self.assertEqual(self.logger.get_lines_for_level('warning'), []) def test_cleanup_quarantined(self): diff --git a/test/unit/common/test_storage_policy.py b/test/unit/common/test_storage_policy.py index e82305aaf4..693a956d86 100644 --- a/test/unit/common/test_storage_policy.py +++ b/test/unit/common/test_storage_policy.py @@ -22,7 +22,8 @@ from functools import partial from six.moves.configparser import ConfigParser from tempfile import NamedTemporaryFile -from test.unit import patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE +from test.unit import ( + patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE, FakeLogger) import swift.common.storage_policy from swift.common.storage_policy import ( StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies, @@ -1348,6 +1349,7 @@ class TestStoragePolicies(unittest.TestCase): 'aliases': 'zero', 'default': True, 'deprecated': False, + 'diskfile_module': 'egg:swift#replication.fs', 'policy_type': REPL_POLICY }, (0, False): { @@ -1361,6 +1363,7 @@ class TestStoragePolicies(unittest.TestCase): 'aliases': 'one, tahi, uno', 'default': False, 'deprecated': True, + 'diskfile_module': 'egg:swift#replication.fs', 'policy_type': REPL_POLICY }, (1, False): { @@ -1374,6 +1377,7 @@ class TestStoragePolicies(unittest.TestCase): 'aliases': 'ten', 'default': False, 'deprecated': False, + 'diskfile_module': 'egg:swift#erasure_coding.fs', 'policy_type': EC_POLICY, 'ec_type': DEFAULT_TEST_EC_TYPE, 'ec_num_data_fragments': 10, @@ -1391,6 +1395,7 @@ class TestStoragePolicies(unittest.TestCase): 'aliases': 'done', 'default': False, 'deprecated': True, + 'diskfile_module': 'egg:swift#erasure_coding.fs', 'policy_type': EC_POLICY, 'ec_type': DEFAULT_TEST_EC_TYPE, 'ec_num_data_fragments': 10, @@ -1409,6 +1414,7 @@ class TestStoragePolicies(unittest.TestCase): 'aliases': 'twelve', 'default': False, 'deprecated': False, + 'diskfile_module': 'egg:swift#erasure_coding.fs', 'policy_type': EC_POLICY, 'ec_type': DEFAULT_TEST_EC_TYPE, 'ec_num_data_fragments': 10, @@ -1451,6 +1457,84 @@ class TestStoragePolicies(unittest.TestCase): # pyeclib_driver.get_segment_info is called only once self.assertEqual(1, fake.call_count) + def test_get_diskfile_manager(self): + # verify unique diskfile manager instances are returned + policy = StoragePolicy(0, name='zero', is_default=True, + diskfile_module='replication.fs') + + dfm = policy.get_diskfile_manager({'devices': 'sdb1'}, FakeLogger()) + self.assertEqual('sdb1', dfm.devices) + dfm = policy.get_diskfile_manager({'devices': 'sdb2'}, FakeLogger()) + self.assertEqual('sdb2', dfm.devices) + dfm2 = policy.get_diskfile_manager({'devices': 'sdb2'}, FakeLogger()) + self.assertEqual('sdb2', dfm2.devices) + self.assertIsNot(dfm, dfm2) + + def test_get_diskfile_manager_custom_diskfile(self): + calls = [] + is_policy_ok = True + + class DFM(object): + def __init__(self, *args, **kwargs): + calls.append((args, kwargs)) + + @classmethod + def check_policy(cls, policy): + if not is_policy_ok: + raise ValueError("I am not ok") + + policy = StoragePolicy(0, name='zero', is_default=True, + diskfile_module='thin_air.fs') + with mock.patch( + 'swift.common.storage_policy.load_pkg_resource', + side_effect=lambda *a, **kw: DFM) as mock_load_pkg_resource: + dfm = policy.get_diskfile_manager('arg', kwarg='kwarg') + self.assertIsInstance(dfm, DFM) + mock_load_pkg_resource.assert_called_with( + 'swift.diskfile', 'thin_air.fs') + self.assertEqual([(('arg',), {'kwarg': 'kwarg'})], calls) + + calls = [] + is_policy_ok = False + + with mock.patch( + 'swift.common.storage_policy.load_pkg_resource', + side_effect=lambda *a, **kw: DFM) as mock_load_pkg_resource: + with self.assertRaises(PolicyError) as cm: + policy.get_diskfile_manager('arg', kwarg='kwarg') + mock_load_pkg_resource.assert_called_with( + 'swift.diskfile', 'thin_air.fs') + self.assertIn('Invalid diskfile_module thin_air.fs', str(cm.exception)) + + def test_get_diskfile_manager_invalid_policy_config(self): + bad_policy = StoragePolicy(0, name='zero', is_default=True, + diskfile_module='erasure_coding.fs') + + with self.assertRaises(PolicyError) as cm: + bad_policy.get_diskfile_manager() + self.assertIn('Invalid diskfile_module erasure_coding.fs', + str(cm.exception)) + + bad_policy = ECStoragePolicy(0, name='one', is_default=True, + ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=10, ec_nparity=4, + diskfile_module='replication.fs') + + with self.assertRaises(PolicyError) as cm: + bad_policy.get_diskfile_manager() + + self.assertIn('Invalid diskfile_module replication.fs', + str(cm.exception)) + + bad_policy = StoragePolicy(0, name='zero', is_default=True, + diskfile_module='thin_air.fs') + + with self.assertRaises(PolicyError) as cm: + bad_policy.get_diskfile_manager() + + self.assertIn('Unable to load diskfile_module thin_air.fs', + str(cm.exception)) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 54d675a73f..79d2bfccc4 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -4131,6 +4131,29 @@ cluster_dfw1 = http://dfw1.host/v1/ 'X-Backend-Redirect-Timestamp': '-1'}) self.assertIn('Invalid timestamp', str(exc)) + @mock.patch('pkg_resources.load_entry_point') + def test_load_pkg_resource(self, mock_driver): + tests = { + ('swift.diskfile', 'egg:swift#replication.fs'): + ('swift', 'swift.diskfile', 'replication.fs'), + ('swift.diskfile', 'egg:swift#erasure_coding.fs'): + ('swift', 'swift.diskfile', 'erasure_coding.fs'), + ('swift.section', 'egg:swift#thing.other'): + ('swift', 'swift.section', 'thing.other'), + ('swift.section', 'swift#thing.other'): + ('swift', 'swift.section', 'thing.other'), + ('swift.section', 'thing.other'): + ('swift', 'swift.section', 'thing.other'), + } + for args, expected in tests.items(): + utils.load_pkg_resource(*args) + mock_driver.assert_called_with(*expected) + + with self.assertRaises(TypeError) as cm: + args = ('swift.diskfile', 'nog:swift#replication.fs') + utils.load_pkg_resource(*args) + self.assertEqual("Unhandled URI scheme: 'nog'", str(cm.exception)) + class ResellerConfReader(unittest.TestCase): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 7d029946a4..12840bce04 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -57,8 +57,8 @@ from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ DiskFileError, ReplicationLockTimeout, DiskFileCollision, \ DiskFileExpired, SwiftException, DiskFileNoSpace, DiskFileXattrNotSupported from swift.common.storage_policy import ( - POLICIES, get_policy_string, StoragePolicy, ECStoragePolicy, - BaseStoragePolicy, REPL_POLICY, EC_POLICY) + POLICIES, get_policy_string, StoragePolicy, ECStoragePolicy, REPL_POLICY, + EC_POLICY, PolicyError) from test.unit.obj.common import write_diskfile @@ -644,21 +644,55 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): class TestDiskFileRouter(unittest.TestCase): - def test_register(self): - with mock.patch.dict( - diskfile.DiskFileRouter.policy_type_to_manager_cls, {}): - @diskfile.DiskFileRouter.register('test-policy') - class TestDiskFileManager(diskfile.DiskFileManager): - pass + @patch_policies(test_policies) + def test_policy(self): + conf = {} + logger = debug_logger('test-' + self.__class__.__name__) + df_router = diskfile.DiskFileRouter(conf, logger) + manager_0 = df_router[POLICIES[0]] + self.assertTrue(isinstance(manager_0, diskfile.DiskFileManager)) + manager_1 = df_router[POLICIES[1]] + self.assertTrue(isinstance(manager_1, diskfile.ECDiskFileManager)) - @BaseStoragePolicy.register('test-policy') - class TestStoragePolicy(BaseStoragePolicy): - pass + # The DiskFileRouter should not have to load the policy again + with mock.patch('swift.common.storage_policy.BaseStoragePolicy.' + + 'get_diskfile_manager') as mock_load: + manager_3 = df_router[POLICIES[0]] + mock_load.assert_not_called() + self.assertIs(manager_3, manager_0) + self.assertTrue(isinstance(manager_3, diskfile.DiskFileManager)) - with patch_policies([TestStoragePolicy(0, 'test')]): - router = diskfile.DiskFileRouter({}, debug_logger('test')) - manager = router[POLICIES.default] - self.assertTrue(isinstance(manager, TestDiskFileManager)) + def test_invalid_policy_config(self): + # verify that invalid policy diskfile configs are detected when the + # DiskfileRouter is created + bad_policy = StoragePolicy(0, name='zero', is_default=True, + diskfile_module='erasure_coding.fs') + + with patch_policies([bad_policy]): + with self.assertRaises(PolicyError) as cm: + diskfile.DiskFileRouter({}, debug_logger()) + self.assertIn('Invalid diskfile_module erasure_coding.fs', + str(cm.exception)) + + bad_policy = ECStoragePolicy(0, name='one', is_default=True, + ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=10, ec_nparity=4, + diskfile_module='replication.fs') + + with patch_policies([bad_policy]): + with self.assertRaises(PolicyError) as cm: + diskfile.DiskFileRouter({}, debug_logger()) + self.assertIn('Invalid diskfile_module replication.fs', + str(cm.exception)) + + bad_policy = StoragePolicy(0, name='zero', is_default=True, + diskfile_module='thin_air.fs') + + with patch_policies([bad_policy]): + with self.assertRaises(PolicyError) as cm: + diskfile.DiskFileRouter({}, debug_logger()) + self.assertIn('Unable to load diskfile_module thin_air.fs', + str(cm.exception)) class BaseDiskFileTestMixin(object): @@ -1798,6 +1832,18 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): except AssertionError: pass + def test_check_policy(self): + mock_policy = mock.MagicMock() + mock_policy.policy_type = REPL_POLICY + # sanity, DiskFileManager is ok with REPL_POLICY + diskfile.DiskFileManager.check_policy(mock_policy) + # DiskFileManager raises ValueError with EC_POLICY + mock_policy.policy_type = EC_POLICY + with self.assertRaises(ValueError) as cm: + diskfile.DiskFileManager.check_policy(mock_policy) + self.assertEqual('Invalid policy_type: %s' % EC_POLICY, + str(cm.exception)) + @patch_policies(with_ec_default=True) class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): @@ -3069,6 +3115,18 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): def test_get_diskfile_from_hash_frag_index_filter_legacy_durable(self): self._test_get_diskfile_from_hash_frag_index_filter(True) + def test_check_policy(self): + mock_policy = mock.MagicMock() + mock_policy.policy_type = EC_POLICY + # sanity, ECDiskFileManager is ok with EC_POLICY + diskfile.ECDiskFileManager.check_policy(mock_policy) + # ECDiskFileManager raises ValueError with REPL_POLICY + mock_policy.policy_type = REPL_POLICY + with self.assertRaises(ValueError) as cm: + diskfile.ECDiskFileManager.check_policy(mock_policy) + self.assertEqual('Invalid policy_type: %s' % REPL_POLICY, + str(cm.exception)) + class DiskFileMixin(BaseDiskFileTestMixin):