Delete old tombstones
- Call invalidate_hash in auditor for reclaimable tombstones - assert changed auditor behavior with a unit test - driveby test: assert get_hashes behavior with a unit test Co-Authored-By: Pete Zaitcev <zaitcev@redhat.com> Co-Authored-By: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp> Closes-Bug: #1301728 Change-Id: I3e99dc702d55a7424c6482969e03cb4afac854a4
This commit is contained in:
parent
561284e3d4
commit
81d4673674
@ -19,6 +19,7 @@ import sys
|
|||||||
import time
|
import time
|
||||||
import signal
|
import signal
|
||||||
import re
|
import re
|
||||||
|
from os.path import basename, dirname, join
|
||||||
from random import shuffle
|
from random import shuffle
|
||||||
from swift import gettext_ as _
|
from swift import gettext_ as _
|
||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
@ -28,7 +29,8 @@ from swift.obj import diskfile, replicator
|
|||||||
from swift.common.utils import (
|
from swift.common.utils import (
|
||||||
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
|
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
|
||||||
unlink_paths_older_than, readconf, config_auto_int_value)
|
unlink_paths_older_than, readconf, config_auto_int_value)
|
||||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
|
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
|
||||||
|
DiskFileDeleted
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
|
|
||||||
@ -43,7 +45,6 @@ class AuditorWorker(object):
|
|||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self.devices = devices
|
self.devices = devices
|
||||||
self.diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
|
|
||||||
self.max_files_per_second = float(conf.get('files_per_second', 20))
|
self.max_files_per_second = float(conf.get('files_per_second', 20))
|
||||||
self.max_bytes_per_second = float(conf.get('bytes_per_second',
|
self.max_bytes_per_second = float(conf.get('bytes_per_second',
|
||||||
10000000))
|
10000000))
|
||||||
@ -56,17 +57,25 @@ class AuditorWorker(object):
|
|||||||
except (KeyError, SystemExit):
|
except (KeyError, SystemExit):
|
||||||
# if we can't parse the real config (generally a KeyError on
|
# if we can't parse the real config (generally a KeyError on
|
||||||
# __file__, or SystemExit on no object-replicator section) we use
|
# __file__, or SystemExit on no object-replicator section) we use
|
||||||
# a very conservative default
|
# a very conservative default for rsync_timeout
|
||||||
default = 86400
|
default_rsync_timeout = 86400
|
||||||
else:
|
else:
|
||||||
replicator_rsync_timeout = int(replicator_config.get(
|
replicator_rsync_timeout = int(replicator_config.get(
|
||||||
'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT))
|
'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT))
|
||||||
# Here we can do some light math for ops and use the *replicator's*
|
# Here we can do some light math for ops and use the *replicator's*
|
||||||
# rsync_timeout (plus 15 mins to avoid deleting local tempfiles
|
# rsync_timeout (plus 15 mins to avoid deleting local tempfiles
|
||||||
# before the remote replicator kills it's rsync)
|
# before the remote replicator kills it's rsync)
|
||||||
default = replicator_rsync_timeout + 900
|
default_rsync_timeout = replicator_rsync_timeout + 900
|
||||||
|
# there's not really a good reason to assume the replicator
|
||||||
|
# section's reclaim_age is more appropriate than the reconstructor
|
||||||
|
# reclaim_age - but we're already parsing the config so we can set
|
||||||
|
# the default value in our config if it's not already set
|
||||||
|
if 'reclaim_age' in replicator_config:
|
||||||
|
conf.setdefault('reclaim_age',
|
||||||
|
replicator_config['reclaim_age'])
|
||||||
self.rsync_tempfile_timeout = config_auto_int_value(
|
self.rsync_tempfile_timeout = config_auto_int_value(
|
||||||
self.conf.get('rsync_tempfile_timeout'), default)
|
self.conf.get('rsync_tempfile_timeout'), default_rsync_timeout)
|
||||||
|
self.diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
|
||||||
|
|
||||||
self.auditor_type = 'ALL'
|
self.auditor_type = 'ALL'
|
||||||
self.zero_byte_only_at_fps = zero_byte_only_at_fps
|
self.zero_byte_only_at_fps = zero_byte_only_at_fps
|
||||||
@ -251,19 +260,26 @@ class AuditorWorker(object):
|
|||||||
incr_by=chunk_len)
|
incr_by=chunk_len)
|
||||||
self.bytes_processed += chunk_len
|
self.bytes_processed += chunk_len
|
||||||
self.total_bytes_processed += chunk_len
|
self.total_bytes_processed += chunk_len
|
||||||
except DiskFileNotExist:
|
|
||||||
pass
|
|
||||||
except DiskFileQuarantined as err:
|
except DiskFileQuarantined as err:
|
||||||
self.quarantines += 1
|
self.quarantines += 1
|
||||||
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
|
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
|
||||||
' quarantined: %(err)s'),
|
' quarantined: %(err)s'),
|
||||||
{'obj': location, 'err': err})
|
{'obj': location, 'err': err})
|
||||||
|
except DiskFileDeleted:
|
||||||
|
# If there is a reclaimable tombstone, we'll invalidate the hash
|
||||||
|
# to trigger the replciator to rehash/cleanup this suffix
|
||||||
|
ts = df._ondisk_info['ts_info']['timestamp']
|
||||||
|
if (time.time() - float(ts)) > df.manager.reclaim_age:
|
||||||
|
df.manager.invalidate_hash(dirname(df._datadir))
|
||||||
|
except DiskFileNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
self.passes += 1
|
self.passes += 1
|
||||||
# _ondisk_info attr is initialized to None and filled in by open
|
# _ondisk_info attr is initialized to None and filled in by open
|
||||||
ondisk_info_dict = df._ondisk_info or {}
|
ondisk_info_dict = df._ondisk_info or {}
|
||||||
if 'unexpected' in ondisk_info_dict:
|
if 'unexpected' in ondisk_info_dict:
|
||||||
is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match(
|
is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match(
|
||||||
os.path.basename(fpath))
|
basename(fpath))
|
||||||
rsync_tempfile_paths = filter(is_rsync_tempfile,
|
rsync_tempfile_paths = filter(is_rsync_tempfile,
|
||||||
ondisk_info_dict['unexpected'])
|
ondisk_info_dict['unexpected'])
|
||||||
mtime = time.time() - self.rsync_tempfile_timeout
|
mtime = time.time() - self.rsync_tempfile_timeout
|
||||||
@ -282,7 +298,7 @@ class ObjectAuditor(Daemon):
|
|||||||
conf.get('zero_byte_files_per_second', 50))
|
conf.get('zero_byte_files_per_second', 50))
|
||||||
self.recon_cache_path = conf.get('recon_cache_path',
|
self.recon_cache_path = conf.get('recon_cache_path',
|
||||||
'/var/cache/swift')
|
'/var/cache/swift')
|
||||||
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
|
self.rcache = join(self.recon_cache_path, "object.recon")
|
||||||
self.interval = int(conf.get('interval', 30))
|
self.interval = int(conf.get('interval', 30))
|
||||||
|
|
||||||
def _sleep(self):
|
def _sleep(self):
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from test import unit
|
from test import unit
|
||||||
|
import six.moves.cPickle as pickle
|
||||||
import unittest
|
import unittest
|
||||||
import mock
|
import mock
|
||||||
import os
|
import os
|
||||||
@ -23,13 +24,14 @@ from shutil import rmtree
|
|||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
import textwrap
|
import textwrap
|
||||||
|
from os.path import dirname, basename, join
|
||||||
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
|
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
|
||||||
DEFAULT_TEST_EC_TYPE)
|
DEFAULT_TEST_EC_TYPE)
|
||||||
from swift.obj import auditor, replicator
|
from swift.obj import auditor, replicator
|
||||||
from swift.obj.diskfile import (
|
from swift.obj.diskfile import (
|
||||||
DiskFile, write_metadata, invalidate_hash, get_data_dir,
|
DiskFile, write_metadata, invalidate_hash, get_data_dir,
|
||||||
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
|
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
|
||||||
get_auditor_status)
|
get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE)
|
||||||
from swift.common.utils import (
|
from swift.common.utils import (
|
||||||
mkdirs, normalize_timestamp, Timestamp, readconf)
|
mkdirs, normalize_timestamp, Timestamp, readconf)
|
||||||
from swift.common.storage_policy import (
|
from swift.common.storage_policy import (
|
||||||
@ -328,7 +330,7 @@ class TestAuditor(unittest.TestCase):
|
|||||||
[object-auditor]
|
[object-auditor]
|
||||||
rsync_tempfile_timeout = auto
|
rsync_tempfile_timeout = auto
|
||||||
"""
|
"""
|
||||||
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
|
with open(config_path, 'w') as f:
|
||||||
f.write(textwrap.dedent(stub_config))
|
f.write(textwrap.dedent(stub_config))
|
||||||
conf = readconf(config_path, 'object-auditor')
|
conf = readconf(config_path, 'object-auditor')
|
||||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||||
@ -346,7 +348,7 @@ class TestAuditor(unittest.TestCase):
|
|||||||
[object-auditor]
|
[object-auditor]
|
||||||
rsync_tempfile_timeout = auto
|
rsync_tempfile_timeout = auto
|
||||||
"""
|
"""
|
||||||
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
|
with open(config_path, 'w') as f:
|
||||||
f.write(textwrap.dedent(stub_config))
|
f.write(textwrap.dedent(stub_config))
|
||||||
conf = readconf(config_path, 'object-auditor')
|
conf = readconf(config_path, 'object-auditor')
|
||||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||||
@ -746,6 +748,139 @@ class TestAuditor(unittest.TestCase):
|
|||||||
self.auditor.run_audit(**kwargs)
|
self.auditor.run_audit(**kwargs)
|
||||||
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
||||||
|
|
||||||
|
def test_with_tombstone_delete(self):
|
||||||
|
test_md5 = '098f6bcd4621d373cade4e832627b4f6'
|
||||||
|
|
||||||
|
def do_audit(self, timestamp, invalidate=False):
|
||||||
|
dir_path = self.disk_file._datadir
|
||||||
|
ts_file = os.path.join(dir_path, '%d.ts' % timestamp)
|
||||||
|
|
||||||
|
# Create a .ts file
|
||||||
|
if not os.path.exists(dir_path):
|
||||||
|
mkdirs(dir_path)
|
||||||
|
fp = open(ts_file, 'w')
|
||||||
|
write_metadata(fp, {'X-Timestamp': '%d' % timestamp})
|
||||||
|
fp.close()
|
||||||
|
# Create hashes.pkl
|
||||||
|
hash = dirname(dirname(ts_file)) # hash value of ts file
|
||||||
|
suffix = basename(hash)
|
||||||
|
hashes_pkl = join(os.path.dirname(hash), HASH_FILE)
|
||||||
|
with open(hashes_pkl, 'wb') as fp:
|
||||||
|
pickle.dump({suffix: test_md5}, fp, 0)
|
||||||
|
# Run auditor
|
||||||
|
kwargs = {'mode': 'once'}
|
||||||
|
self.auditor.run_audit(**kwargs)
|
||||||
|
# Check if hash invalid file exists
|
||||||
|
hash_invalid = join(dirname(hash), HASH_INVALIDATIONS_FILE)
|
||||||
|
hash_invalid_exists = os.path.exists(hash_invalid)
|
||||||
|
# If invalidate, fetch value from hashes.invalid
|
||||||
|
if invalidate:
|
||||||
|
with open(hash_invalid, 'rb') as fp:
|
||||||
|
hash_val = fp.read()
|
||||||
|
return hash_invalid_exists, hash_val, suffix
|
||||||
|
return hash_invalid_exists, ts_file
|
||||||
|
|
||||||
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||||
|
self.auditor.log_time = 0
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
# audit with a recent tombstone
|
||||||
|
hash_invalid_exists, ts_file = do_audit(self, now - 55)
|
||||||
|
self.assertFalse(hash_invalid_exists)
|
||||||
|
os.unlink(ts_file)
|
||||||
|
|
||||||
|
# audit with a tombstone that is beyond default reclaim_age
|
||||||
|
hash_invalid_exists, hash_val, suffix = do_audit(self, now - (604800),
|
||||||
|
True)
|
||||||
|
self.assertTrue(hash_invalid_exists)
|
||||||
|
self.assertEqual(hash_val.strip('\n'), suffix)
|
||||||
|
|
||||||
|
def test_auditor_reclaim_age(self):
|
||||||
|
# if we don't have access to the replicator config section we'll
|
||||||
|
# diskfile's default
|
||||||
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||||
|
self.rcache, self.devices)
|
||||||
|
router = auditor_worker.diskfile_router
|
||||||
|
for policy in POLICIES:
|
||||||
|
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
|
||||||
|
|
||||||
|
# if the reclaim_age option is set explicitly we use that
|
||||||
|
self.conf['reclaim_age'] = '1800'
|
||||||
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||||
|
self.rcache, self.devices)
|
||||||
|
router = auditor_worker.diskfile_router
|
||||||
|
for policy in POLICIES:
|
||||||
|
self.assertEqual(router[policy].reclaim_age, 1800)
|
||||||
|
|
||||||
|
# if we have a real config we can be a little smarter
|
||||||
|
config_path = os.path.join(self.testdir, 'objserver.conf')
|
||||||
|
|
||||||
|
# if there is no object-replicator section we still have to fall back
|
||||||
|
# to default because we can't parse the config for that section!
|
||||||
|
stub_config = """
|
||||||
|
[object-auditor]
|
||||||
|
"""
|
||||||
|
with open(config_path, 'w') as f:
|
||||||
|
f.write(textwrap.dedent(stub_config))
|
||||||
|
conf = readconf(config_path, 'object-auditor')
|
||||||
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||||
|
self.rcache, self.devices)
|
||||||
|
router = auditor_worker.diskfile_router
|
||||||
|
for policy in POLICIES:
|
||||||
|
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
|
||||||
|
|
||||||
|
# verify reclaim_age is of auditor config value
|
||||||
|
stub_config = """
|
||||||
|
[object-replicator]
|
||||||
|
[object-auditor]
|
||||||
|
reclaim_age = 60
|
||||||
|
"""
|
||||||
|
with open(config_path, 'w') as f:
|
||||||
|
f.write(textwrap.dedent(stub_config))
|
||||||
|
conf = readconf(config_path, 'object-auditor')
|
||||||
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||||
|
self.rcache, self.devices)
|
||||||
|
router = auditor_worker.diskfile_router
|
||||||
|
for policy in POLICIES:
|
||||||
|
self.assertEqual(router[policy].reclaim_age, 60)
|
||||||
|
|
||||||
|
# verify reclaim_age falls back to replicator config value
|
||||||
|
# if there is no auditor config value
|
||||||
|
config_path = os.path.join(self.testdir, 'objserver.conf')
|
||||||
|
stub_config = """
|
||||||
|
[object-replicator]
|
||||||
|
reclaim_age = 60
|
||||||
|
[object-auditor]
|
||||||
|
"""
|
||||||
|
with open(config_path, 'w') as f:
|
||||||
|
f.write(textwrap.dedent(stub_config))
|
||||||
|
conf = readconf(config_path, 'object-auditor')
|
||||||
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||||
|
self.rcache, self.devices)
|
||||||
|
router = auditor_worker.diskfile_router
|
||||||
|
for policy in POLICIES:
|
||||||
|
self.assertEqual(router[policy].reclaim_age, 60)
|
||||||
|
|
||||||
|
# we'll prefer our own DEFAULT section to the replicator though
|
||||||
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
|
||||||
|
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
|
||||||
|
stub_config = """
|
||||||
|
[DEFAULT]
|
||||||
|
reclaim_age = 1209600
|
||||||
|
[object-replicator]
|
||||||
|
reclaim_age = 1800
|
||||||
|
[object-auditor]
|
||||||
|
"""
|
||||||
|
with open(config_path, 'w') as f:
|
||||||
|
f.write(textwrap.dedent(stub_config))
|
||||||
|
conf = readconf(config_path, 'object-auditor')
|
||||||
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||||
|
self.rcache, self.devices)
|
||||||
|
router = auditor_worker.diskfile_router
|
||||||
|
for policy in POLICIES:
|
||||||
|
self.assertEqual(router[policy].reclaim_age, 1209600)
|
||||||
|
|
||||||
def test_sleeper(self):
|
def test_sleeper(self):
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
'time.sleep', mock.MagicMock()) as mock_sleep:
|
'time.sleep', mock.MagicMock()) as mock_sleep:
|
||||||
|
@ -5052,6 +5052,64 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
self.assertEqual(hashes, {})
|
self.assertEqual(hashes, {})
|
||||||
|
|
||||||
|
def test_hash_suffix_one_reclaim_tombstone_with_hash_pkl(self):
|
||||||
|
for policy in self.iter_policies():
|
||||||
|
df_mgr = self.df_router[policy]
|
||||||
|
df = df_mgr.get_diskfile(
|
||||||
|
'sda1', '0', 'a', 'c', 'o', policy=policy)
|
||||||
|
suffix_dir = os.path.dirname(df._datadir)
|
||||||
|
part_dir = os.path.dirname(suffix_dir)
|
||||||
|
hash_file = os.path.join(part_dir, diskfile.HASH_FILE)
|
||||||
|
|
||||||
|
# scale back reclaim age a bit
|
||||||
|
df_mgr.reclaim_age = 1000
|
||||||
|
# write a tombstone that's just a *little* older
|
||||||
|
old_time = time() - 1001
|
||||||
|
timestamp = Timestamp(old_time)
|
||||||
|
df.delete(timestamp.internal)
|
||||||
|
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
|
# sanity
|
||||||
|
self.assertEqual(hashes, {})
|
||||||
|
self.assertFalse(os.path.exists(df._datadir))
|
||||||
|
|
||||||
|
hash_timestamp = os.stat(hash_file).st_mtime
|
||||||
|
|
||||||
|
# if hash.pkl exists, that .ts file is not reclaimed
|
||||||
|
df = df_mgr.get_diskfile(
|
||||||
|
'sda1', '0', 'a', 'c', 'o', policy=policy)
|
||||||
|
df.delete(timestamp.internal)
|
||||||
|
|
||||||
|
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
|
# This was a cached value so the value looks empty
|
||||||
|
self.assertEqual(hashes, {})
|
||||||
|
# and the hash.pkl is not touched
|
||||||
|
self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime)
|
||||||
|
# and we still have tombstone entry
|
||||||
|
tombstone = '%s.ts' % timestamp.internal
|
||||||
|
self.assertTrue(os.path.exists(df._datadir))
|
||||||
|
self.assertIn(tombstone, os.listdir(df._datadir))
|
||||||
|
|
||||||
|
# However if we call invalidate_hash for the suffix dir,
|
||||||
|
# get_hashes can reclaim the tombstone
|
||||||
|
with mock.patch('swift.obj.diskfile.lock_path'):
|
||||||
|
df_mgr.invalidate_hash(suffix_dir)
|
||||||
|
|
||||||
|
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
|
|
||||||
|
self.assertEqual(hashes, {})
|
||||||
|
# If we have no other objects in the suffix, get_hashes
|
||||||
|
# doesn't reclaim anything
|
||||||
|
self.assertTrue(os.path.exists(df._datadir))
|
||||||
|
self.assertIn(tombstone, os.listdir(df._datadir))
|
||||||
|
self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime)
|
||||||
|
|
||||||
|
# *BUT* if suffix value is given to recalc, it can force to recaim!
|
||||||
|
suffix = os.path.dirname(suffix_dir)
|
||||||
|
hashes = df_mgr.get_hashes('sda1', '0', [suffix], policy)
|
||||||
|
self.assertFalse(os.path.exists(df._datadir))
|
||||||
|
# hash.pkl was updated
|
||||||
|
self.assertGreater(os.stat(hash_file).st_mtime, hash_timestamp)
|
||||||
|
|
||||||
def test_hash_suffix_one_reclaim_and_one_valid_tombstone(self):
|
def test_hash_suffix_one_reclaim_and_one_valid_tombstone(self):
|
||||||
for policy in self.iter_policies():
|
for policy in self.iter_policies():
|
||||||
paths, suffix = find_paths_with_matching_suffixes(2, 1)
|
paths, suffix = find_paths_with_matching_suffixes(2, 1)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user