Remove empty part dirs during ssync replication
When we're pushing data to a remote node using ssync, we end up walking the entire partition's directory tree. We were already removing reclaimable (i.e. old) tombstones and non-durable EC data files plus their containing hash dirs, but we were leaving the suffix dirs around for future removal, and we weren't cleaning up partition dirs at all. Now we remove as much of the directory structure as we can, even up to the partition dir, as soon as we observe that it's empty. Change-Id: I2849a757519a30684646f3a6f4467c21e9281707 Closes-Bug: 1706321
This commit is contained in:
parent
3313392462
commit
a19548b3e6
@ -101,6 +101,14 @@ MIN_TIME_UPDATE_AUDITOR_STATUS = 60
|
||||
RE_RSYNC_TEMPFILE = re.compile(r'^\..*\.([a-zA-Z0-9_]){6}$')
|
||||
|
||||
|
||||
def _unlink_if_present(filename):
|
||||
try:
|
||||
os.unlink(filename)
|
||||
except OSError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
|
||||
def _get_filename(fd):
|
||||
"""
|
||||
Helper function to get to file name from a file descriptor or filename.
|
||||
@ -1023,7 +1031,17 @@ class BaseDiskFileManager(object):
|
||||
def is_reclaimable(timestamp):
|
||||
return (time.time() - float(timestamp)) > self.reclaim_age
|
||||
|
||||
files = listdir(hsh_path)
|
||||
try:
|
||||
files = os.listdir(hsh_path)
|
||||
except OSError as err:
|
||||
if err.errno == errno.ENOENT:
|
||||
results = self.get_ondisk_files(
|
||||
[], hsh_path, verify=False, **kwargs)
|
||||
results['files'] = []
|
||||
return results
|
||||
else:
|
||||
raise
|
||||
|
||||
files.sort(reverse=True)
|
||||
results = self.get_ondisk_files(
|
||||
files, hsh_path, verify=False, **kwargs)
|
||||
@ -1039,6 +1057,11 @@ class BaseDiskFileManager(object):
|
||||
remove_file(join(hsh_path, file_info['filename']))
|
||||
files.remove(file_info['filename'])
|
||||
results['files'] = files
|
||||
if not files: # everything got unlinked
|
||||
try:
|
||||
os.rmdir(hsh_path)
|
||||
except OSError:
|
||||
pass
|
||||
return results
|
||||
|
||||
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||
@ -1081,10 +1104,6 @@ class BaseDiskFileManager(object):
|
||||
continue
|
||||
raise
|
||||
if not ondisk_info['files']:
|
||||
try:
|
||||
os.rmdir(hsh_path)
|
||||
except OSError:
|
||||
pass
|
||||
continue
|
||||
|
||||
# ondisk_info has info dicts containing timestamps for those
|
||||
@ -1486,25 +1505,37 @@ class BaseDiskFileManager(object):
|
||||
dev_path = self.get_dev_path(device)
|
||||
if not dev_path:
|
||||
raise DiskFileDeviceUnavailable()
|
||||
|
||||
partition_path = get_part_path(dev_path, policy, partition)
|
||||
if suffixes is None:
|
||||
suffixes = self.yield_suffixes(device, partition, policy)
|
||||
considering_all_suffixes = True
|
||||
else:
|
||||
partition_path = get_part_path(dev_path, policy, partition)
|
||||
suffixes = (
|
||||
(os.path.join(partition_path, suffix), suffix)
|
||||
for suffix in suffixes)
|
||||
considering_all_suffixes = False
|
||||
|
||||
key_preference = (
|
||||
('ts_meta', 'meta_info', 'timestamp'),
|
||||
('ts_data', 'data_info', 'timestamp'),
|
||||
('ts_data', 'ts_info', 'timestamp'),
|
||||
('ts_ctype', 'ctype_info', 'ctype_timestamp'),
|
||||
)
|
||||
|
||||
# We delete as many empty directories as we can.
|
||||
# cleanup_ondisk_files() takes care of the hash dirs, and we take
|
||||
# care of the suffix dirs and possibly even the partition dir.
|
||||
have_nonempty_suffix = False
|
||||
for suffix_path, suffix in suffixes:
|
||||
have_nonempty_hashdir = False
|
||||
for object_hash in self._listdir(suffix_path):
|
||||
object_path = os.path.join(suffix_path, object_hash)
|
||||
try:
|
||||
results = self.cleanup_ondisk_files(
|
||||
object_path, **kwargs)
|
||||
if results['files']:
|
||||
have_nonempty_hashdir = True
|
||||
timestamps = {}
|
||||
for ts_key, info_key, info_ts_key in key_preference:
|
||||
if info_key not in results:
|
||||
@ -1524,6 +1555,34 @@ class BaseDiskFileManager(object):
|
||||
'Invalid diskfile filename in %r (%s)' % (
|
||||
object_path, err))
|
||||
|
||||
if have_nonempty_hashdir:
|
||||
have_nonempty_suffix = True
|
||||
else:
|
||||
try:
|
||||
os.rmdir(suffix_path)
|
||||
except OSError:
|
||||
# cleanup_ondisk_files tries to remove empty hash dirs,
|
||||
# but if it fails, so will we. An empty directory
|
||||
# structure won't cause errors (just slowdown), so we
|
||||
# ignore the exception.
|
||||
pass
|
||||
if considering_all_suffixes and not have_nonempty_suffix:
|
||||
# There's nothing of interest in the partition, so delete it
|
||||
try:
|
||||
# Remove hashes.pkl *then* hashes.invalid; otherwise, if we
|
||||
# remove hashes.invalid but leave hashes.pkl, that makes it
|
||||
# look as though the invalidations in hashes.invalid never
|
||||
# occurred.
|
||||
_unlink_if_present(os.path.join(partition_path, HASH_FILE))
|
||||
_unlink_if_present(os.path.join(partition_path,
|
||||
HASH_INVALIDATIONS_FILE))
|
||||
# This lock is only held by people dealing with the hashes
|
||||
# or the hash invalidations, and we've just removed those.
|
||||
_unlink_if_present(os.path.join(partition_path, ".lock"))
|
||||
os.rmdir(partition_path)
|
||||
except OSError as err:
|
||||
self.logger.debug("Error cleaning up empty partition: %s", err)
|
||||
|
||||
|
||||
class BaseDiskFileWriter(object):
|
||||
"""
|
||||
|
@ -817,11 +817,15 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
# don't reclaim anything
|
||||
mock_time.time.return_value = 0.0
|
||||
class_under_test.cleanup_ondisk_files(hashdir)
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
|
||||
if expected_after_cleanup:
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
else:
|
||||
self.assertFalse(os.path.exists(hashdir))
|
||||
|
||||
def _test_yield_hashes_cleanup(self, scenarios, policy):
|
||||
# opportunistic test to check that yield_hashes cleans up dir using
|
||||
@ -849,11 +853,15 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
'ignored', '0', policy, suffixes=['abc']):
|
||||
# return values are tested in test_yield_hashes_*
|
||||
pass
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
|
||||
if expected_after_cleanup:
|
||||
after_cleanup = set(os.listdir(hashdir))
|
||||
errmsg = "expected %r, got %r for test %r" % (
|
||||
sorted(expected_after_cleanup), sorted(after_cleanup), test
|
||||
)
|
||||
self.assertEqual(expected_after_cleanup, after_cleanup, errmsg)
|
||||
else:
|
||||
self.assertFalse(os.path.exists(hashdir))
|
||||
|
||||
def test_get_ondisk_files_with_empty_dir(self):
|
||||
files = []
|
||||
@ -1316,6 +1324,106 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.assertEqual(list(self.df_mgr.yield_hashes(
|
||||
self.existing_device, '9', POLICIES[0])), [])
|
||||
|
||||
def test_yield_hashes_cleans_up_everything(self):
|
||||
the_time = [1525354555.657585]
|
||||
|
||||
def mock_time():
|
||||
return the_time[0]
|
||||
|
||||
with mock.patch('time.time', mock_time):
|
||||
# Make a couple of (soon-to-be-)expired tombstones
|
||||
df1 = self.df_mgr.get_diskfile(
|
||||
self.existing_device, 0, 'a', 'c', 'o1', POLICIES[0])
|
||||
df1.delete(Timestamp(the_time[0]))
|
||||
df1_hash = utils.hash_path('a', 'c', 'o1')
|
||||
df1_suffix = df1_hash[-3:]
|
||||
|
||||
df2 = self.df_mgr.get_diskfile(
|
||||
self.existing_device, 0, 'a', 'c', 'o2', POLICIES[0])
|
||||
df2.delete(Timestamp(the_time[0] + 1))
|
||||
df2_hash = utils.hash_path('a', 'c', 'o2')
|
||||
df2_suffix = df2_hash[-3:]
|
||||
|
||||
# sanity checks
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix, df1_hash,
|
||||
"1525354555.65758.ts")))
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix, df2_hash,
|
||||
"1525354556.65758.ts")))
|
||||
|
||||
# Expire the tombstones
|
||||
the_time[0] += 2 * self.df_mgr.reclaim_age
|
||||
|
||||
hashes = list(self.df_mgr.yield_hashes(
|
||||
self.existing_device, '0', POLICIES[0]))
|
||||
self.assertEqual(hashes, [])
|
||||
|
||||
# The tombstones are gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix, df1_hash,
|
||||
"1525354555.65758.ts")))
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix, df2_hash,
|
||||
"1525354556.65758.ts")))
|
||||
|
||||
# The empty hash dirs are gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix, df1_hash)))
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix, df2_hash)))
|
||||
|
||||
# The empty suffix dirs are gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df1_suffix)))
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df2_suffix)))
|
||||
|
||||
# The empty partition dir is gone
|
||||
self.assertFalse(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0')))
|
||||
|
||||
def test_focused_yield_hashes_does_not_clean_up(self):
|
||||
the_time = [1525354555.657585]
|
||||
|
||||
def mock_time():
|
||||
return the_time[0]
|
||||
|
||||
with mock.patch('time.time', mock_time):
|
||||
df = self.df_mgr.get_diskfile(
|
||||
self.existing_device, 0, 'a', 'c', 'o', POLICIES[0])
|
||||
df.delete(Timestamp(the_time[0]))
|
||||
df_hash = utils.hash_path('a', 'c', 'o')
|
||||
df_suffix = df_hash[-3:]
|
||||
|
||||
# sanity check
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0',
|
||||
df_suffix, df_hash,
|
||||
"1525354555.65758.ts")))
|
||||
|
||||
# Expire the tombstone
|
||||
the_time[0] += 2 * self.df_mgr.reclaim_age
|
||||
|
||||
hashes = list(self.df_mgr.yield_hashes(
|
||||
self.existing_device, '0', POLICIES[0],
|
||||
suffixes=[df_suffix]))
|
||||
self.assertEqual(hashes, [])
|
||||
|
||||
# The partition dir is still there. Since we didn't visit all the
|
||||
# suffix dirs, we didn't learn whether or not the partition dir was
|
||||
# empty.
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.testdir, self.existing_device, 'objects', '0')))
|
||||
|
||||
def test_yield_hashes_empty_suffixes(self):
|
||||
def _listdir(path):
|
||||
return []
|
||||
@ -1346,7 +1454,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
(hash_, timestamps)
|
||||
for hash_, timestamps in expected.items()]
|
||||
with mock.patch('os.listdir', _listdir), \
|
||||
mock.patch('os.unlink'):
|
||||
mock.patch('os.unlink'), \
|
||||
mock.patch('os.rmdir'):
|
||||
df_mgr = self.df_router[policy]
|
||||
hash_items = list(df_mgr.yield_hashes(
|
||||
device, part, policy, **kwargs))
|
||||
@ -3819,18 +3928,24 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
self.fail("Expected exception OSError")
|
||||
|
||||
def test_create_close_oserror(self):
|
||||
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
|
||||
'xyz', policy=POLICIES.legacy)
|
||||
with mock.patch("swift.obj.diskfile.os.close",
|
||||
mock.MagicMock(side_effect=OSError(
|
||||
errno.EACCES, os.strerror(errno.EACCES)))):
|
||||
try:
|
||||
with df.create(size=200):
|
||||
# This is a horrible hack so you can run this test in isolation.
|
||||
# Some of the ctypes machinery calls os.close(), and that runs afoul
|
||||
# of our mock.
|
||||
with mock.patch.object(utils, '_sys_fallocate', None):
|
||||
utils.disable_fallocate()
|
||||
|
||||
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc',
|
||||
'123', 'xyz', policy=POLICIES.legacy)
|
||||
with mock.patch("swift.obj.diskfile.os.close",
|
||||
mock.MagicMock(side_effect=OSError(
|
||||
errno.EACCES, os.strerror(errno.EACCES)))):
|
||||
try:
|
||||
with df.create(size=200):
|
||||
pass
|
||||
except Exception as err:
|
||||
self.fail("Unexpected exception raised: %r" % err)
|
||||
else:
|
||||
pass
|
||||
except Exception as err:
|
||||
self.fail("Unexpected exception raised: %r" % err)
|
||||
else:
|
||||
pass
|
||||
|
||||
def test_write_metadata(self):
|
||||
df, df_data = self._create_test_file('1234567890')
|
||||
@ -6000,6 +6115,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
def check_cleanup_ondisk_files(self, policy, input_files, output_files):
|
||||
orig_unlink = os.unlink
|
||||
file_list = list(input_files)
|
||||
rmdirs = []
|
||||
|
||||
def mock_listdir(path):
|
||||
return list(file_list)
|
||||
@ -6016,7 +6132,8 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
file_list.remove(os.path.basename(path))
|
||||
|
||||
df_mgr = self.df_router[policy]
|
||||
with unit_mock({'os.listdir': mock_listdir, 'os.unlink': mock_unlink}):
|
||||
with unit_mock({'os.listdir': mock_listdir, 'os.unlink': mock_unlink,
|
||||
'os.rmdir': rmdirs.append}):
|
||||
if isinstance(output_files, Exception):
|
||||
path = os.path.join(self.testdir, 'does-not-matter')
|
||||
self.assertRaises(output_files.__class__,
|
||||
@ -6024,6 +6141,10 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
return
|
||||
files = df_mgr.cleanup_ondisk_files('/whatever')['files']
|
||||
self.assertEqual(files, output_files)
|
||||
if files:
|
||||
self.assertEqual(rmdirs, [])
|
||||
else:
|
||||
self.assertEqual(rmdirs, ['/whatever'])
|
||||
|
||||
# cleanup_ondisk_files tests - behaviors
|
||||
|
||||
|
@ -1175,21 +1175,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'sda1', '2', self.policy)
|
||||
for path, hash_, ts in hash_gen:
|
||||
self.fail('found %s with %s in %s' % (hash_, ts, path))
|
||||
# but the partition directory and hashes pkl still exist
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
hashes_path = os.path.join(self.objects_1, '2', diskfile.HASH_FILE)
|
||||
self.assertTrue(os.access(hashes_path, os.F_OK))
|
||||
|
||||
# ... but on next pass
|
||||
ssync_calls = []
|
||||
with mocked_http_conn() as request_log:
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(ssync_calls)):
|
||||
self.reconstructor.reconstruct(override_partitions=[2])
|
||||
# reconstruct won't generate any replicate or ssync_calls
|
||||
self.assertFalse(request_log.requests)
|
||||
self.assertFalse(ssync_calls)
|
||||
# and the partition will get removed!
|
||||
# even the partition directory is gone
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_process_job_all_success(self):
|
||||
|
@ -6671,12 +6671,7 @@ class TestObjectController(unittest.TestCase):
|
||||
resp = delete_request.get_response(self.object_controller)
|
||||
# we won't even create the tombstone
|
||||
self.assertFalse(os.path.exists(tombstone_file))
|
||||
# hashdir sticks around tho
|
||||
self.assertTrue(os.path.exists(objfile._datadir))
|
||||
# REPLICATE will clean it all up
|
||||
resp = replicate_request.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual({}, pickle.loads(resp.body))
|
||||
# hashdir's empty, so it gets cleaned up
|
||||
self.assertFalse(os.path.exists(objfile._datadir))
|
||||
|
||||
def test_SSYNC_can_be_called(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user