Merge "Configure diskfile per storage policy"

This commit is contained in:
Zuul 2018-09-27 00:19:32 +00:00 committed by Gerrit Code Review
commit 5cc4a72c76
10 changed files with 278 additions and 49 deletions

View File

@ -324,6 +324,12 @@ Each policy section contains the following options:
policy types.
- The default value is ``replication``.
- When defining an EC policy use the value ``erasure_coding``.
* ``diskfile_module = <entry point>`` (optional)
- The option ``diskfile_module`` is used to load an alternate backend
object storage plug-in architecture.
- The default value is ``egg:swift#replication.fs`` or
``egg:swift#erasure_coding.fs`` depending on the policy type. The scheme
and package name are optionals and default to ``egg`` and ``swift``.
The EC policy type has additional required options. See
:ref:`using_ec_policy` for details.
@ -347,6 +353,7 @@ this example configuration.::
[storage-policy:1]
name = silver
policy_type = replication
diskfile_module = replication.fs
deprecated = yes

View File

@ -26,10 +26,15 @@ swift_hash_path_prefix = changeme
#
# A 'policy_type' argument is also supported but is not mandatory. Default
# policy type 'replication' is used when 'policy_type' is unspecified.
# A 'diskfile_module' optional argument lets you specify an alternate backend
# object storage plug-in architecture. The default is
# "egg:swift#replication.fs", or "egg:swift#erasure_coding.fs", depending on
# the policy type.
[storage-policy:0]
name = Policy-0
default = yes
#policy_type = replication
#diskfile_module = egg:swift#replication.fs
aliases = yellow, orange
# the following section would declare a policy called 'silver', the number of
@ -49,6 +54,7 @@ aliases = yellow, orange
#[storage-policy:1]
#name = silver
#policy_type = replication
#diskfile_module = egg:swift#replication.fs
# The following declares a storage policy of type 'erasure_coding' which uses
# Erasure Coding for data reliability. Please refer to Swift documentation for
@ -81,6 +87,7 @@ aliases = yellow, orange
#name = deepfreeze10-4
#aliases = df10-4
#policy_type = erasure_coding
#diskfile_module = egg:swift#erasure_coding.fs
#ec_type = liberasurecode_rs_vand
#ec_num_data_fragments = 10
#ec_num_parity_fragments = 4

View File

@ -123,6 +123,9 @@ paste.filter_factory =
s3api = swift.common.middleware.s3api.s3api:filter_factory
s3token = swift.common.middleware.s3api.s3token:filter_factory
swift.diskfile =
replication.fs = swift.obj.diskfile:DiskFileManager
erasure_coding.fs = swift.obj.diskfile:ECDiskFileManager
[egg_info]
tag_build =

View File

@ -21,7 +21,7 @@ import six
from six.moves.configparser import ConfigParser
from swift.common.utils import (
config_true_value, quorum_size, whataremyips, list_from_csv,
config_positive_int_value, get_zero_indexed_base_string)
config_positive_int_value, get_zero_indexed_base_string, load_pkg_resource)
from swift.common.ring import Ring, RingData
from swift.common import utils
from swift.common.exceptions import RingLoadError
@ -157,7 +157,8 @@ class BaseStoragePolicy(object):
policy_type_to_policy_cls = {}
def __init__(self, idx, name='', is_default=False, is_deprecated=False,
object_ring=None, aliases=''):
object_ring=None, aliases='',
diskfile_module='egg:swift#replication.fs'):
# do not allow BaseStoragePolicy class to be instantiated directly
if type(self) == BaseStoragePolicy:
raise TypeError("Can't instantiate BaseStoragePolicy directly")
@ -187,6 +188,8 @@ class BaseStoragePolicy(object):
self.ring_name = _get_policy_string('object', self.idx)
self.object_ring = object_ring
self.diskfile_module = diskfile_module
@property
def name(self):
return self.alias_list[0]
@ -252,6 +255,7 @@ class BaseStoragePolicy(object):
'policy_type': 'policy_type',
'default': 'is_default',
'deprecated': 'is_deprecated',
'diskfile_module': 'diskfile_module'
}
@classmethod
@ -285,6 +289,7 @@ class BaseStoragePolicy(object):
if not self.is_deprecated:
info.pop('deprecated')
info.pop('policy_type')
info.pop('diskfile_module')
return info
def _validate_policy_name(self, name):
@ -376,6 +381,32 @@ class BaseStoragePolicy(object):
"""
raise NotImplementedError()
def get_diskfile_manager(self, *args, **kwargs):
"""
Return an instance of the diskfile manager class configured for this
storage policy.
:param args: positional args to pass to the diskfile manager
constructor.
:param kwargs: keyword args to pass to the diskfile manager
constructor.
:return: A disk file manager instance.
"""
try:
dfm_cls = load_pkg_resource('swift.diskfile', self.diskfile_module)
except ImportError as err:
raise PolicyError(
'Unable to load diskfile_module %s for policy %s: %s' %
(self.diskfile_module, self.name, err))
try:
dfm_cls.check_policy(self)
except ValueError as err:
raise PolicyError(
'Invalid diskfile_module %s for policy %s:%s (%s)' %
(self.diskfile_module, int(self), self.name, self.policy_type))
return dfm_cls(*args, **kwargs)
@BaseStoragePolicy.register(REPL_POLICY)
class StoragePolicy(BaseStoragePolicy):
@ -411,13 +442,15 @@ class ECStoragePolicy(BaseStoragePolicy):
def __init__(self, idx, name='', aliases='', is_default=False,
is_deprecated=False, object_ring=None,
diskfile_module='egg:swift#erasure_coding.fs',
ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE,
ec_type=None, ec_ndata=None, ec_nparity=None,
ec_duplication_factor=1):
super(ECStoragePolicy, self).__init__(
idx=idx, name=name, aliases=aliases, is_default=is_default,
is_deprecated=is_deprecated, object_ring=object_ring)
is_deprecated=is_deprecated, object_ring=object_ring,
diskfile_module=diskfile_module)
# Validate erasure_coding policy specific members
# ec_type is one of the EC implementations supported by PyEClib

View File

@ -59,6 +59,7 @@ import eventlet.debug
import eventlet.greenthread
import eventlet.patcher
import eventlet.semaphore
import pkg_resources
from eventlet import GreenPool, sleep, Timeout
from eventlet.green import socket, threading
from eventlet.hubs import trampoline
@ -5246,6 +5247,25 @@ def replace_partition_in_path(path, part_power):
return os.sep.join(path_components)
def load_pkg_resource(group, uri):
if '#' in uri:
uri, name = uri.split('#', 1)
else:
name = uri
uri = 'egg:swift'
if ':' in uri:
scheme, dist = uri.split(':', 1)
scheme = scheme.lower()
else:
scheme = 'egg'
dist = uri
if scheme != 'egg':
raise TypeError('Unhandled URI scheme: %r' % scheme)
return pkg_resources.load_entry_point(dist, group, name)
class PipeMutex(object):
"""
Mutex using a pipe. Works across both greenlets and real threads, even

View File

@ -602,29 +602,12 @@ def strip_self(f):
class DiskFileRouter(object):
policy_type_to_manager_cls = {}
@classmethod
def register(cls, policy_type):
"""
Decorator for Storage Policy implementations to register
their DiskFile implementation.
"""
def register_wrapper(diskfile_cls):
if policy_type in cls.policy_type_to_manager_cls:
raise PolicyError(
'%r is already registered for the policy_type %r' % (
cls.policy_type_to_manager_cls[policy_type],
policy_type))
cls.policy_type_to_manager_cls[policy_type] = diskfile_cls
return diskfile_cls
return register_wrapper
def __init__(self, *args, **kwargs):
self.policy_to_manager = {}
for policy in POLICIES:
manager_cls = self.policy_type_to_manager_cls[policy.policy_type]
self.policy_to_manager[int(policy)] = manager_cls(*args, **kwargs)
# create diskfile managers now to provoke any errors
self.policy_to_manager[int(policy)] = \
policy.get_diskfile_manager(*args, **kwargs)
def __getitem__(self, policy):
return self.policy_to_manager[int(policy)]
@ -654,6 +637,7 @@ class BaseDiskFileManager(object):
"""
diskfile_cls = None # must be set by subclasses
policy = None # must be set by subclasses
invalidate_hash = strip_self(invalidate_hash)
consolidate_hashes = strip_self(consolidate_hashes)
@ -722,6 +706,11 @@ class BaseDiskFileManager(object):
self.pipe_size = min(max_pipe_size, self.disk_chunk_size)
self.use_linkat = o_tmpfile_supported()
@classmethod
def check_policy(cls, policy):
if policy.policy_type != cls.policy:
raise ValueError('Invalid policy_type: %s' % policy.policy_type)
def make_on_disk_filename(self, timestamp, ext=None,
ctype_timestamp=None, *a, **kw):
"""
@ -2845,9 +2834,9 @@ class DiskFile(BaseDiskFile):
return self._ondisk_info
@DiskFileRouter.register(REPL_POLICY)
class DiskFileManager(BaseDiskFileManager):
diskfile_cls = DiskFile
policy = REPL_POLICY
def _process_ondisk_files(self, exts, results, **kwargs):
"""
@ -3203,9 +3192,9 @@ class ECDiskFile(BaseDiskFile):
self.manager.invalidate_hash(dirname(self._datadir))
@DiskFileRouter.register(EC_POLICY)
class ECDiskFileManager(BaseDiskFileManager):
diskfile_cls = ECDiskFile
policy = EC_POLICY
def validate_fragment_index(self, frag_index):
"""

View File

@ -22,11 +22,12 @@ from swift.cli import relinker
from swift.common import exceptions, ring, utils
from swift.common import storage_policy
from swift.common.storage_policy import (
StoragePolicy, StoragePolicyCollection, POLICIES)
StoragePolicy, StoragePolicyCollection, POLICIES, ECStoragePolicy)
from swift.obj.diskfile import write_metadata
from test.unit import FakeLogger, skip_if_no_xattrs
from test.unit import FakeLogger, skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \
patch_policies
class TestRelinker(unittest.TestCase):
@ -148,16 +149,20 @@ class TestRelinker(unittest.TestCase):
['Error cleaning up %s: %s' % (self.objname,
repr(exceptions.DiskFileNotExist()))])
@patch_policies(
[ECStoragePolicy(
0, name='platin', is_default=True, ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2)])
def test_cleanup_non_durable_fragment(self):
self._common_test_cleanup()
# Actually all fragments are non-durable and raise and DiskFileNotExist
# in EC in this test. However, if the counterpart exists in the new
# location, this is ok - it will be fixed by the reconstructor later on
storage_policy._POLICIES[0].policy_type = 'erasure_coding'
# Switch the policy type so that actually all fragments are non-durable
# and raise a DiskFileNotExist in EC in this test. However, if the
# counterpart exists in the new location, this is ok - it will be fixed
# by the reconstructor later on
self.assertEqual(
0, relinker.cleanup(self.testdir, self.devices, True, self.logger))
0, relinker.cleanup(self.testdir, self.devices, True,
self.logger))
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
def test_cleanup_quarantined(self):

View File

@ -22,7 +22,8 @@ from functools import partial
from six.moves.configparser import ConfigParser
from tempfile import NamedTemporaryFile
from test.unit import patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE
from test.unit import (
patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE, FakeLogger)
import swift.common.storage_policy
from swift.common.storage_policy import (
StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies,
@ -1348,6 +1349,7 @@ class TestStoragePolicies(unittest.TestCase):
'aliases': 'zero',
'default': True,
'deprecated': False,
'diskfile_module': 'egg:swift#replication.fs',
'policy_type': REPL_POLICY
},
(0, False): {
@ -1361,6 +1363,7 @@ class TestStoragePolicies(unittest.TestCase):
'aliases': 'one, tahi, uno',
'default': False,
'deprecated': True,
'diskfile_module': 'egg:swift#replication.fs',
'policy_type': REPL_POLICY
},
(1, False): {
@ -1374,6 +1377,7 @@ class TestStoragePolicies(unittest.TestCase):
'aliases': 'ten',
'default': False,
'deprecated': False,
'diskfile_module': 'egg:swift#erasure_coding.fs',
'policy_type': EC_POLICY,
'ec_type': DEFAULT_TEST_EC_TYPE,
'ec_num_data_fragments': 10,
@ -1391,6 +1395,7 @@ class TestStoragePolicies(unittest.TestCase):
'aliases': 'done',
'default': False,
'deprecated': True,
'diskfile_module': 'egg:swift#erasure_coding.fs',
'policy_type': EC_POLICY,
'ec_type': DEFAULT_TEST_EC_TYPE,
'ec_num_data_fragments': 10,
@ -1409,6 +1414,7 @@ class TestStoragePolicies(unittest.TestCase):
'aliases': 'twelve',
'default': False,
'deprecated': False,
'diskfile_module': 'egg:swift#erasure_coding.fs',
'policy_type': EC_POLICY,
'ec_type': DEFAULT_TEST_EC_TYPE,
'ec_num_data_fragments': 10,
@ -1451,6 +1457,84 @@ class TestStoragePolicies(unittest.TestCase):
# pyeclib_driver.get_segment_info is called only once
self.assertEqual(1, fake.call_count)
def test_get_diskfile_manager(self):
# verify unique diskfile manager instances are returned
policy = StoragePolicy(0, name='zero', is_default=True,
diskfile_module='replication.fs')
dfm = policy.get_diskfile_manager({'devices': 'sdb1'}, FakeLogger())
self.assertEqual('sdb1', dfm.devices)
dfm = policy.get_diskfile_manager({'devices': 'sdb2'}, FakeLogger())
self.assertEqual('sdb2', dfm.devices)
dfm2 = policy.get_diskfile_manager({'devices': 'sdb2'}, FakeLogger())
self.assertEqual('sdb2', dfm2.devices)
self.assertIsNot(dfm, dfm2)
def test_get_diskfile_manager_custom_diskfile(self):
calls = []
is_policy_ok = True
class DFM(object):
def __init__(self, *args, **kwargs):
calls.append((args, kwargs))
@classmethod
def check_policy(cls, policy):
if not is_policy_ok:
raise ValueError("I am not ok")
policy = StoragePolicy(0, name='zero', is_default=True,
diskfile_module='thin_air.fs')
with mock.patch(
'swift.common.storage_policy.load_pkg_resource',
side_effect=lambda *a, **kw: DFM) as mock_load_pkg_resource:
dfm = policy.get_diskfile_manager('arg', kwarg='kwarg')
self.assertIsInstance(dfm, DFM)
mock_load_pkg_resource.assert_called_with(
'swift.diskfile', 'thin_air.fs')
self.assertEqual([(('arg',), {'kwarg': 'kwarg'})], calls)
calls = []
is_policy_ok = False
with mock.patch(
'swift.common.storage_policy.load_pkg_resource',
side_effect=lambda *a, **kw: DFM) as mock_load_pkg_resource:
with self.assertRaises(PolicyError) as cm:
policy.get_diskfile_manager('arg', kwarg='kwarg')
mock_load_pkg_resource.assert_called_with(
'swift.diskfile', 'thin_air.fs')
self.assertIn('Invalid diskfile_module thin_air.fs', str(cm.exception))
def test_get_diskfile_manager_invalid_policy_config(self):
bad_policy = StoragePolicy(0, name='zero', is_default=True,
diskfile_module='erasure_coding.fs')
with self.assertRaises(PolicyError) as cm:
bad_policy.get_diskfile_manager()
self.assertIn('Invalid diskfile_module erasure_coding.fs',
str(cm.exception))
bad_policy = ECStoragePolicy(0, name='one', is_default=True,
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
diskfile_module='replication.fs')
with self.assertRaises(PolicyError) as cm:
bad_policy.get_diskfile_manager()
self.assertIn('Invalid diskfile_module replication.fs',
str(cm.exception))
bad_policy = StoragePolicy(0, name='zero', is_default=True,
diskfile_module='thin_air.fs')
with self.assertRaises(PolicyError) as cm:
bad_policy.get_diskfile_manager()
self.assertIn('Unable to load diskfile_module thin_air.fs',
str(cm.exception))
if __name__ == '__main__':
unittest.main()

View File

@ -4131,6 +4131,29 @@ cluster_dfw1 = http://dfw1.host/v1/
'X-Backend-Redirect-Timestamp': '-1'})
self.assertIn('Invalid timestamp', str(exc))
@mock.patch('pkg_resources.load_entry_point')
def test_load_pkg_resource(self, mock_driver):
tests = {
('swift.diskfile', 'egg:swift#replication.fs'):
('swift', 'swift.diskfile', 'replication.fs'),
('swift.diskfile', 'egg:swift#erasure_coding.fs'):
('swift', 'swift.diskfile', 'erasure_coding.fs'),
('swift.section', 'egg:swift#thing.other'):
('swift', 'swift.section', 'thing.other'),
('swift.section', 'swift#thing.other'):
('swift', 'swift.section', 'thing.other'),
('swift.section', 'thing.other'):
('swift', 'swift.section', 'thing.other'),
}
for args, expected in tests.items():
utils.load_pkg_resource(*args)
mock_driver.assert_called_with(*expected)
with self.assertRaises(TypeError) as cm:
args = ('swift.diskfile', 'nog:swift#replication.fs')
utils.load_pkg_resource(*args)
self.assertEqual("Unhandled URI scheme: 'nog'", str(cm.exception))
class ResellerConfReader(unittest.TestCase):

View File

@ -57,8 +57,8 @@ from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \
DiskFileError, ReplicationLockTimeout, DiskFileCollision, \
DiskFileExpired, SwiftException, DiskFileNoSpace, DiskFileXattrNotSupported
from swift.common.storage_policy import (
POLICIES, get_policy_string, StoragePolicy, ECStoragePolicy,
BaseStoragePolicy, REPL_POLICY, EC_POLICY)
POLICIES, get_policy_string, StoragePolicy, ECStoragePolicy, REPL_POLICY,
EC_POLICY, PolicyError)
from test.unit.obj.common import write_diskfile
@ -644,21 +644,55 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
class TestDiskFileRouter(unittest.TestCase):
def test_register(self):
with mock.patch.dict(
diskfile.DiskFileRouter.policy_type_to_manager_cls, {}):
@diskfile.DiskFileRouter.register('test-policy')
class TestDiskFileManager(diskfile.DiskFileManager):
pass
@patch_policies(test_policies)
def test_policy(self):
conf = {}
logger = debug_logger('test-' + self.__class__.__name__)
df_router = diskfile.DiskFileRouter(conf, logger)
manager_0 = df_router[POLICIES[0]]
self.assertTrue(isinstance(manager_0, diskfile.DiskFileManager))
manager_1 = df_router[POLICIES[1]]
self.assertTrue(isinstance(manager_1, diskfile.ECDiskFileManager))
@BaseStoragePolicy.register('test-policy')
class TestStoragePolicy(BaseStoragePolicy):
pass
# The DiskFileRouter should not have to load the policy again
with mock.patch('swift.common.storage_policy.BaseStoragePolicy.' +
'get_diskfile_manager') as mock_load:
manager_3 = df_router[POLICIES[0]]
mock_load.assert_not_called()
self.assertIs(manager_3, manager_0)
self.assertTrue(isinstance(manager_3, diskfile.DiskFileManager))
with patch_policies([TestStoragePolicy(0, 'test')]):
router = diskfile.DiskFileRouter({}, debug_logger('test'))
manager = router[POLICIES.default]
self.assertTrue(isinstance(manager, TestDiskFileManager))
def test_invalid_policy_config(self):
# verify that invalid policy diskfile configs are detected when the
# DiskfileRouter is created
bad_policy = StoragePolicy(0, name='zero', is_default=True,
diskfile_module='erasure_coding.fs')
with patch_policies([bad_policy]):
with self.assertRaises(PolicyError) as cm:
diskfile.DiskFileRouter({}, debug_logger())
self.assertIn('Invalid diskfile_module erasure_coding.fs',
str(cm.exception))
bad_policy = ECStoragePolicy(0, name='one', is_default=True,
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
diskfile_module='replication.fs')
with patch_policies([bad_policy]):
with self.assertRaises(PolicyError) as cm:
diskfile.DiskFileRouter({}, debug_logger())
self.assertIn('Invalid diskfile_module replication.fs',
str(cm.exception))
bad_policy = StoragePolicy(0, name='zero', is_default=True,
diskfile_module='thin_air.fs')
with patch_policies([bad_policy]):
with self.assertRaises(PolicyError) as cm:
diskfile.DiskFileRouter({}, debug_logger())
self.assertIn('Unable to load diskfile_module thin_air.fs',
str(cm.exception))
class BaseDiskFileTestMixin(object):
@ -1798,6 +1832,18 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
except AssertionError:
pass
def test_check_policy(self):
mock_policy = mock.MagicMock()
mock_policy.policy_type = REPL_POLICY
# sanity, DiskFileManager is ok with REPL_POLICY
diskfile.DiskFileManager.check_policy(mock_policy)
# DiskFileManager raises ValueError with EC_POLICY
mock_policy.policy_type = EC_POLICY
with self.assertRaises(ValueError) as cm:
diskfile.DiskFileManager.check_policy(mock_policy)
self.assertEqual('Invalid policy_type: %s' % EC_POLICY,
str(cm.exception))
@patch_policies(with_ec_default=True)
class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
@ -3069,6 +3115,18 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
def test_get_diskfile_from_hash_frag_index_filter_legacy_durable(self):
self._test_get_diskfile_from_hash_frag_index_filter(True)
def test_check_policy(self):
mock_policy = mock.MagicMock()
mock_policy.policy_type = EC_POLICY
# sanity, ECDiskFileManager is ok with EC_POLICY
diskfile.ECDiskFileManager.check_policy(mock_policy)
# ECDiskFileManager raises ValueError with REPL_POLICY
mock_policy.policy_type = REPL_POLICY
with self.assertRaises(ValueError) as cm:
diskfile.ECDiskFileManager.check_policy(mock_policy)
self.assertEqual('Invalid policy_type: %s' % REPL_POLICY,
str(cm.exception))
class DiskFileMixin(BaseDiskFileTestMixin):