Merge "Remove empty part dirs during ssync replication"
This commit is contained in:
commit
3de21d945b
@ -101,6 +101,14 @@ MIN_TIME_UPDATE_AUDITOR_STATUS = 60
|
||||
RE_RSYNC_TEMPFILE = re.compile(r'^\..*\.([a-zA-Z0-9_]){6}$')
|
||||
|
||||
|
||||
def _unlink_if_present(filename):
|
||||
try:
|
||||
os.unlink(filename)
|
||||
except OSError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
|
||||
def _get_filename(fd):
|
||||
"""
|
||||
Helper function to get to file name from a file descriptor or filename.
|
||||
@ -1023,7 +1031,17 @@ class BaseDiskFileManager(object):
|
||||
def is_reclaimable(timestamp):
|
||||
return (time.time() - float(timestamp)) > self.reclaim_age
|
||||
|
||||
files = listdir(hsh_path)
|
||||
try:
|
||||
files = os.listdir(hsh_path)
|
||||
except OSError as err:
|
||||
if err.errno == errno.ENOENT:
|
||||
results = self.get_ondisk_files(
|
||||
[], hsh_path, verify=False, **kwargs)
|
||||
results['files'] = []
|
||||
return results
|
||||
else:
|
||||
raise
|
||||
|
||||
files.sort(reverse=True)
|
||||
results = self.get_ondisk_files(
|
||||
files, hsh_path, verify=False, **kwargs)
|
||||
@ -1039,6 +1057,11 @@ class BaseDiskFileManager(object):
|
||||
remove_file(join(hsh_path, file_info['filename']))
|
||||
files.remove(file_info['filename'])
|
||||
results['files'] = files
|
||||
if not files: # everything got unlinked
|
||||
try:
|
||||
os.rmdir(hsh_path)
|
||||
except OSError:
|
||||
pass
|
||||
return results
|
||||
|
||||
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||
@ -1081,10 +1104,6 @@ class BaseDiskFileManager(object):
|
||||
continue
|
||||
raise
|
||||
if not ondisk_info['files']:
|
||||
try:
|
||||
os.rmdir(hsh_path)
|
||||
except OSError:
|
||||
pass
|
||||
continue
|
||||
|
||||
# ondisk_info has info dicts containing timestamps for those
|
||||
@ -1486,25 +1505,37 @@ class BaseDiskFileManager(object):
|
||||
dev_path = self.get_dev_path(device)
|
||||
if not dev_path:
|
||||
raise DiskFileDeviceUnavailable()
|
||||
|
||||
partition_path = get_part_path(dev_path, policy, partition)
|
||||
if suffixes is None:
|
||||
suffixes = self.yield_suffixes(device, partition, policy)
|
||||
considering_all_suffixes = True
|
||||
else:
|
||||
partition_path = get_part_path(dev_path, policy, partition)
|
||||
suffixes = (
|
||||
(os.path.join(partition_path, suffix), suffix)
|
||||
for suffix in suffixes)
|
||||
considering_all_suffixes = False
|
||||
|
||||
key_preference = (
|
||||
('ts_meta', 'meta_info', 'timestamp'),
|
||||
('ts_data', 'data_info', 'timestamp'),
|
||||
('ts_data', 'ts_info', 'timestamp'),
|
||||
('ts_ctype', 'ctype_info', 'ctype_timestamp'),
|
||||
)
|
||||
|
||||
# We delete as many empty directories as we can.
|
||||
# cleanup_ondisk_files() takes care of the hash dirs, and we take
|
||||
# care of the suffix dirs and possibly even the partition dir.
|
||||
have_nonempty_suffix = False
|
||||
for suffix_path, suffix in suffixes:
|
||||
have_nonempty_hashdir = False
|
||||
for object_hash in self._listdir(suffix_path):
|
||||
object_path = os.path.join(suffix_path, object_hash)
|
||||
try:
|
||||
results = self.cleanup_ondisk_files(
|
||||
object_path, **kwargs)
|
||||
if results['files']:
|
||||
have_nonempty_hashdir = True
|
||||
timestamps = {}
|
||||
for ts_key, info_key, info_ts_key in key_preference:
|
||||
if info_key not in results:
|
||||
@ -1524,6 +1555,34 @@ class BaseDiskFileManager(object):
|
||||
'Invalid diskfile filename in %r (%s)' % (
|
||||
object_path, err))
|
||||
|
||||
if have_nonempty_hashdir:
|
||||
have_nonempty_suffix = True
|
||||
else:
|
||||
try:
|
||||
os.rmdir(suffix_path)
|
||||
except OSError:
|
||||
# cleanup_ondisk_files tries to remove empty hash dirs,
|
||||
# but if it fails, so will we. An empty directory
|
||||
# structure won't cause errors (just slowdown), so we
|
||||
# ignore the exception.
|
||||
pass
|
||||
if considering_all_suffixes and not have_nonempty_suffix:
|
||||
# There's nothing of interest in the partition, so delete it
|
||||
try:
|
||||
# Remove hashes.pkl *then* hashes.invalid; otherwise, if we
|
||||
# remove hashes.invalid but leave hashes.pkl, that makes it
|
||||
# look as though the invalidations in hashes.invalid never
|
||||
# occurred.
|
||||
_unlink_if_present(os.path.join(partition_path, HASH_FILE))
|
||||
_unlink_if_present(os.path.join(partition_path,
|
||||
HASH_INVALIDATIONS_FILE))
|
||||
# This lock is only held by people dealing with the hashes
|
||||
# or the hash invalidations, and we've just removed those.
|
||||
_unlink_if_present(os.path.join(partition_path, ".lock"))
|
||||
os.rmdir(partition_path)
|
||||
except OSError as err:
|
||||
self.logger.debug("Error cleaning up empty partition: %s", err)
|
||||
|
||||
|
||||
class BaseDiskFileWriter(object):
|
||||
"""
|
||||
|
@ -817,11 +817,15 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
# don't reclaim anything
|
||||
mock_time.time.return_value = 0.0
|
||||
class_under_test.cleanup_ondisk_files(hashdir)
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
|
||||
if expected_after_cleanup:
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
else:
|
||||
self.assertFalse(os.path.exists(hashdir))
|
||||
|
||||
def _test_yield_hashes_cleanup(self, scenarios, policy):
|
||||
# opportunistic test to check that yield_hashes cleans up dir using
|
||||
@ -849,11 +853,15 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
'ignored', '0', policy, suffixes=['abc']):
|
||||
# return values are tested in test_yield_hashes_*
|
||||
pass
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
|
||||
if expected_after_cleanup:
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
else:
|
||||
self.assertFalse(os.path.exists(hashdir))
|
||||
|
||||
def test_get_ondisk_files_with_empty_dir(self):
|
||||
files = []
|
||||
@ -1316,6 +1324,106 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.assertEqual(list(self.df_mgr.yield_hashes(
|
||||
self.existing_device, '9', POLICIES[0])), [])
|
||||
|
||||
def test_yield_hashes_cleans_up_everything(self):
|
||||
the_time = [1525354555.657585]
|
||||
|
||||
def mock_time():
|
||||
return the_time[0]
|
||||
|
||||
with mock.patch('time.time', mock_time):
|
||||
# Make a couple of (soon-to-be-)expired tombstones
|
||||
df1 = self.df_mgr.get_diskfile(
|
||||
self.existing_device, 0, 'a', 'c', 'o1', POLICIES[0])
|
||||
df1.delete(Timestamp(the_time[0]))
|
||||
df1_hash = utils.hash_path('a', 'c', 'o1')
|
||||
df1_suffix = df1_hash[-3:]
|
||||
|
||||
df2 = self.df_mgr.get_diskfile(
|
||||
self.existing_device, 0, 'a', 'c', 'o2', POLICIES[0])
|
||||
df2.delete(Timestamp(the_time[0] + 1))
|
||||
df2_hash = utils.hash_path('a', 'c', 'o2')
|
||||
df2_suffix = df2_hash[-3:]
|
||||
|
||||
# sanity checks
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix, df1_hash,
|
||||
"1525354555.65758.ts")))
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix, df2_hash,
|
||||
"1525354556.65758.ts")))
|
||||
|
||||
# Expire the tombstones
|
||||
the_time[0] += 2 * self.df_mgr.reclaim_age
|
||||
|
||||
hashes = list(self.df_mgr.yield_hashes(
|
||||
self.existing_device, '0', POLICIES[0]))
|
||||
self.assertEqual(hashes, [])
|
||||
|
||||
# The tombstones are gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix, df1_hash,
|
||||
"1525354555.65758.ts")))
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix, df2_hash,
|
||||
"1525354556.65758.ts")))
|
||||
|
||||
# The empty hash dirs are gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix, df1_hash)))
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix, df2_hash)))
|
||||
|
||||
# The empty suffix dirs are gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix)))
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix)))
|
||||
|
||||
# The empty partition dir is gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0')))
|
||||
|
||||
def test_focused_yield_hashes_does_not_clean_up(self):
|
||||
the_time = [1525354555.657585]
|
||||
|
||||
def mock_time():
|
||||
return the_time[0]
|
||||
|
||||
with mock.patch('time.time', mock_time):
|
||||
df = self.df_mgr.get_diskfile(
|
||||
self.existing_device, 0, 'a', 'c', 'o', POLICIES[0])
|
||||
df.delete(Timestamp(the_time[0]))
|
||||
df_hash = utils.hash_path('a', 'c', 'o')
|
||||
df_suffix = df_hash[-3:]
|
||||
|
||||
# sanity check
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df_suffix, df_hash,
|
||||
"1525354555.65758.ts")))
|
||||
|
||||
# Expire the tombstone
|
||||
the_time[0] += 2 * self.df_mgr.reclaim_age
|
||||
|
||||
hashes = list(self.df_mgr.yield_hashes(
|
||||
self.existing_device, '0', POLICIES[0],
|
||||
suffixes=[df_suffix]))
|
||||
self.assertEqual(hashes, [])
|
||||
|
||||
# The partition dir is still there. Since we didn't visit all the
|
||||
# suffix dirs, we didn't learn whether or not the partition dir was
|
||||
# empty.
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0')))
|
||||
|
||||
def test_yield_hashes_empty_suffixes(self):
|
||||
def _listdir(path):
|
||||
return []
|
||||
@ -1346,7 +1454,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
(hash_, timestamps)
|
||||
for hash_, timestamps in expected.items()]
|
||||
with mock.patch('os.listdir', _listdir), \
|
||||
mock.patch('os.unlink'):
|
||||
mock.patch('os.unlink'), \
|
||||
mock.patch('os.rmdir'):
|
||||
df_mgr = self.df_router[policy]
|
||||
hash_items = list(df_mgr.yield_hashes(
|
||||
device, part, policy, **kwargs))
|
||||
@ -3819,18 +3928,24 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
self.fail("Expected exception OSError")
|
||||
|
||||
def test_create_close_oserror(self):
|
||||
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
|
||||
'xyz', policy=POLICIES.legacy)
|
||||
with mock.patch("swift.obj.diskfile.os.close",
|
||||
mock.MagicMock(side_effect=OSError(
|
||||
errno.EACCES, os.strerror(errno.EACCES)))):
|
||||
try:
|
||||
with df.create(size=200):
|
||||
# This is a horrible hack so you can run this test in isolation.
|
||||
# Some of the ctypes machinery calls os.close(), and that runs afoul
|
||||
# of our mock.
|
||||
with mock.patch.object(utils, '_sys_fallocate', None):
|
||||
utils.disable_fallocate()
|
||||
|
||||
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc',
|
||||
'123', 'xyz', policy=POLICIES.legacy)
|
||||
with mock.patch("swift.obj.diskfile.os.close",
|
||||
mock.MagicMock(side_effect=OSError(
|
||||
errno.EACCES, os.strerror(errno.EACCES)))):
|
||||
try:
|
||||
with df.create(size=200):
|
||||
pass
|
||||
except Exception as err:
|
||||
self.fail("Unexpected exception raised: %r" % err)
|
||||
else:
|
||||
pass
|
||||
except Exception as err:
|
||||
self.fail("Unexpected exception raised: %r" % err)
|
||||
else:
|
||||
pass
|
||||
|
||||
def test_write_metadata(self):
|
||||
df, df_data = self._create_test_file('1234567890')
|
||||
@ -6000,6 +6115,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
def check_cleanup_ondisk_files(self, policy, input_files, output_files):
|
||||
orig_unlink = os.unlink
|
||||
file_list = list(input_files)
|
||||
rmdirs = []
|
||||
|
||||
def mock_listdir(path):
|
||||
return list(file_list)
|
||||
@ -6016,7 +6132,8 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
file_list.remove(os.path.basename(path))
|
||||
|
||||
df_mgr = self.df_router[policy]
|
||||
with unit_mock({'os.listdir': mock_listdir, 'os.unlink': mock_unlink}):
|
||||
with unit_mock({'os.listdir': mock_listdir, 'os.unlink': mock_unlink,
|
||||
'os.rmdir': rmdirs.append}):
|
||||
if isinstance(output_files, Exception):
|
||||
path = os.path.join(self.testdir, 'does-not-matter')
|
||||
self.assertRaises(output_files.__class__,
|
||||
@ -6024,6 +6141,10 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
return
|
||||
files = df_mgr.cleanup_ondisk_files('/whatever')['files']
|
||||
self.assertEqual(files, output_files)
|
||||
if files:
|
||||
self.assertEqual(rmdirs, [])
|
||||
else:
|
||||
self.assertEqual(rmdirs, ['/whatever'])
|
||||
|
||||
# cleanup_ondisk_files tests - behaviors
|
||||
|
||||
|
@ -1175,21 +1175,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'sda1', '2', self.policy)
|
||||
for path, hash_, ts in hash_gen:
|
||||
self.fail('found %s with %s in %s' % (hash_, ts, path))
|
||||
# but the partition directory and hashes pkl still exist
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
hashes_path = os.path.join(self.objects_1, '2', diskfile.HASH_FILE)
|
||||
self.assertTrue(os.access(hashes_path, os.F_OK))
|
||||
|
||||
# ... but on next pass
|
||||
ssync_calls = []
|
||||
with mocked_http_conn() as request_log:
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(ssync_calls)):
|
||||
self.reconstructor.reconstruct(override_partitions=[2])
|
||||
# reconstruct won't generate any replicate or ssync_calls
|
||||
self.assertFalse(request_log.requests)
|
||||
self.assertFalse(ssync_calls)
|
||||
# and the partition will get removed!
|
||||
# even the partition directory is gone
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_process_job_all_success(self):
|
||||
|
@ -6872,12 +6872,7 @@ class TestObjectController(unittest.TestCase):
|
||||
resp = delete_request.get_response(self.object_controller)
|
||||
# we won't even create the tombstone
|
||||
self.assertFalse(os.path.exists(tombstone_file))
|
||||
# hashdir sticks around tho
|
||||
self.assertTrue(os.path.exists(objfile._datadir))
|
||||
# REPLICATE will clean it all up
|
||||
resp = replicate_request.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual({}, pickle.loads(resp.body))
|
||||
# hashdir's empty, so it gets cleaned up
|
||||
self.assertFalse(os.path.exists(objfile._datadir))
|
||||
|
||||
def test_SSYNC_can_be_called(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user