Merge "Delete old tombstones"
This commit is contained in:
commit
af98608c14
@ -19,6 +19,7 @@ import sys
|
||||
import time
|
||||
import signal
|
||||
import re
|
||||
from os.path import basename, dirname, join
|
||||
from random import shuffle
|
||||
from swift import gettext_ as _
|
||||
from contextlib import closing
|
||||
@ -28,7 +29,8 @@ from swift.obj import diskfile, replicator
|
||||
from swift.common.utils import (
|
||||
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
|
||||
unlink_paths_older_than, readconf, config_auto_int_value)
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
|
||||
DiskFileDeleted
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
@ -43,7 +45,6 @@ class AuditorWorker(object):
|
||||
self.conf = conf
|
||||
self.logger = logger
|
||||
self.devices = devices
|
||||
self.diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
|
||||
self.max_files_per_second = float(conf.get('files_per_second', 20))
|
||||
self.max_bytes_per_second = float(conf.get('bytes_per_second',
|
||||
10000000))
|
||||
@ -56,17 +57,25 @@ class AuditorWorker(object):
|
||||
except (KeyError, SystemExit):
|
||||
# if we can't parse the real config (generally a KeyError on
|
||||
# __file__, or SystemExit on no object-replicator section) we use
|
||||
# a very conservative default
|
||||
default = 86400
|
||||
# a very conservative default for rsync_timeout
|
||||
default_rsync_timeout = 86400
|
||||
else:
|
||||
replicator_rsync_timeout = int(replicator_config.get(
|
||||
'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT))
|
||||
# Here we can do some light math for ops and use the *replicator's*
|
||||
# rsync_timeout (plus 15 mins to avoid deleting local tempfiles
|
||||
# before the remote replicator kills it's rsync)
|
||||
default = replicator_rsync_timeout + 900
|
||||
default_rsync_timeout = replicator_rsync_timeout + 900
|
||||
# there's not really a good reason to assume the replicator
|
||||
# section's reclaim_age is more appropriate than the reconstructor
|
||||
# reclaim_age - but we're already parsing the config so we can set
|
||||
# the default value in our config if it's not already set
|
||||
if 'reclaim_age' in replicator_config:
|
||||
conf.setdefault('reclaim_age',
|
||||
replicator_config['reclaim_age'])
|
||||
self.rsync_tempfile_timeout = config_auto_int_value(
|
||||
self.conf.get('rsync_tempfile_timeout'), default)
|
||||
self.conf.get('rsync_tempfile_timeout'), default_rsync_timeout)
|
||||
self.diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
|
||||
|
||||
self.auditor_type = 'ALL'
|
||||
self.zero_byte_only_at_fps = zero_byte_only_at_fps
|
||||
@ -251,19 +260,26 @@ class AuditorWorker(object):
|
||||
incr_by=chunk_len)
|
||||
self.bytes_processed += chunk_len
|
||||
self.total_bytes_processed += chunk_len
|
||||
except DiskFileNotExist:
|
||||
pass
|
||||
except DiskFileQuarantined as err:
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
|
||||
' quarantined: %(err)s'),
|
||||
{'obj': location, 'err': err})
|
||||
except DiskFileDeleted:
|
||||
# If there is a reclaimable tombstone, we'll invalidate the hash
|
||||
# to trigger the replciator to rehash/cleanup this suffix
|
||||
ts = df._ondisk_info['ts_info']['timestamp']
|
||||
if (time.time() - float(ts)) > df.manager.reclaim_age:
|
||||
df.manager.invalidate_hash(dirname(df._datadir))
|
||||
except DiskFileNotExist:
|
||||
pass
|
||||
|
||||
self.passes += 1
|
||||
# _ondisk_info attr is initialized to None and filled in by open
|
||||
ondisk_info_dict = df._ondisk_info or {}
|
||||
if 'unexpected' in ondisk_info_dict:
|
||||
is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match(
|
||||
os.path.basename(fpath))
|
||||
basename(fpath))
|
||||
rsync_tempfile_paths = filter(is_rsync_tempfile,
|
||||
ondisk_info_dict['unexpected'])
|
||||
mtime = time.time() - self.rsync_tempfile_timeout
|
||||
@ -282,7 +298,7 @@ class ObjectAuditor(Daemon):
|
||||
conf.get('zero_byte_files_per_second', 50))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
|
||||
self.rcache = join(self.recon_cache_path, "object.recon")
|
||||
self.interval = int(conf.get('interval', 30))
|
||||
|
||||
def _sleep(self):
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from test import unit
|
||||
import six.moves.cPickle as pickle
|
||||
import unittest
|
||||
import mock
|
||||
import os
|
||||
@ -23,13 +24,14 @@ from shutil import rmtree
|
||||
from hashlib import md5
|
||||
from tempfile import mkdtemp
|
||||
import textwrap
|
||||
from os.path import dirname, basename, join
|
||||
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
|
||||
DEFAULT_TEST_EC_TYPE)
|
||||
from swift.obj import auditor, replicator
|
||||
from swift.obj.diskfile import (
|
||||
DiskFile, write_metadata, invalidate_hash, get_data_dir,
|
||||
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
|
||||
get_auditor_status)
|
||||
get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE)
|
||||
from swift.common.utils import (
|
||||
mkdirs, normalize_timestamp, Timestamp, readconf)
|
||||
from swift.common.storage_policy import (
|
||||
@ -326,7 +328,7 @@ class TestAuditor(unittest.TestCase):
|
||||
[object-auditor]
|
||||
rsync_tempfile_timeout = auto
|
||||
"""
|
||||
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
|
||||
with open(config_path, 'w') as f:
|
||||
f.write(textwrap.dedent(stub_config))
|
||||
conf = readconf(config_path, 'object-auditor')
|
||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||
@ -344,7 +346,7 @@ class TestAuditor(unittest.TestCase):
|
||||
[object-auditor]
|
||||
rsync_tempfile_timeout = auto
|
||||
"""
|
||||
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
|
||||
with open(config_path, 'w') as f:
|
||||
f.write(textwrap.dedent(stub_config))
|
||||
conf = readconf(config_path, 'object-auditor')
|
||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||
@ -744,6 +746,139 @@ class TestAuditor(unittest.TestCase):
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
||||
|
||||
def test_with_tombstone_delete(self):
|
||||
test_md5 = '098f6bcd4621d373cade4e832627b4f6'
|
||||
|
||||
def do_audit(self, timestamp, invalidate=False):
|
||||
dir_path = self.disk_file._datadir
|
||||
ts_file = os.path.join(dir_path, '%d.ts' % timestamp)
|
||||
|
||||
# Create a .ts file
|
||||
if not os.path.exists(dir_path):
|
||||
mkdirs(dir_path)
|
||||
fp = open(ts_file, 'w')
|
||||
write_metadata(fp, {'X-Timestamp': '%d' % timestamp})
|
||||
fp.close()
|
||||
# Create hashes.pkl
|
||||
hash = dirname(dirname(ts_file)) # hash value of ts file
|
||||
suffix = basename(hash)
|
||||
hashes_pkl = join(os.path.dirname(hash), HASH_FILE)
|
||||
with open(hashes_pkl, 'wb') as fp:
|
||||
pickle.dump({suffix: test_md5}, fp, 0)
|
||||
# Run auditor
|
||||
kwargs = {'mode': 'once'}
|
||||
self.auditor.run_audit(**kwargs)
|
||||
# Check if hash invalid file exists
|
||||
hash_invalid = join(dirname(hash), HASH_INVALIDATIONS_FILE)
|
||||
hash_invalid_exists = os.path.exists(hash_invalid)
|
||||
# If invalidate, fetch value from hashes.invalid
|
||||
if invalidate:
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
hash_val = fp.read()
|
||||
return hash_invalid_exists, hash_val, suffix
|
||||
return hash_invalid_exists, ts_file
|
||||
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor.log_time = 0
|
||||
|
||||
now = time.time()
|
||||
|
||||
# audit with a recent tombstone
|
||||
hash_invalid_exists, ts_file = do_audit(self, now - 55)
|
||||
self.assertFalse(hash_invalid_exists)
|
||||
os.unlink(ts_file)
|
||||
|
||||
# audit with a tombstone that is beyond default reclaim_age
|
||||
hash_invalid_exists, hash_val, suffix = do_audit(self, now - (604800),
|
||||
True)
|
||||
self.assertTrue(hash_invalid_exists)
|
||||
self.assertEqual(hash_val.strip('\n'), suffix)
|
||||
|
||||
def test_auditor_reclaim_age(self):
|
||||
# if we don't have access to the replicator config section we'll
|
||||
# diskfile's default
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
router = auditor_worker.diskfile_router
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
|
||||
|
||||
# if the reclaim_age option is set explicitly we use that
|
||||
self.conf['reclaim_age'] = '1800'
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
router = auditor_worker.diskfile_router
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(router[policy].reclaim_age, 1800)
|
||||
|
||||
# if we have a real config we can be a little smarter
|
||||
config_path = os.path.join(self.testdir, 'objserver.conf')
|
||||
|
||||
# if there is no object-replicator section we still have to fall back
|
||||
# to default because we can't parse the config for that section!
|
||||
stub_config = """
|
||||
[object-auditor]
|
||||
"""
|
||||
with open(config_path, 'w') as f:
|
||||
f.write(textwrap.dedent(stub_config))
|
||||
conf = readconf(config_path, 'object-auditor')
|
||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
router = auditor_worker.diskfile_router
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
|
||||
|
||||
# verify reclaim_age is of auditor config value
|
||||
stub_config = """
|
||||
[object-replicator]
|
||||
[object-auditor]
|
||||
reclaim_age = 60
|
||||
"""
|
||||
with open(config_path, 'w') as f:
|
||||
f.write(textwrap.dedent(stub_config))
|
||||
conf = readconf(config_path, 'object-auditor')
|
||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
router = auditor_worker.diskfile_router
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(router[policy].reclaim_age, 60)
|
||||
|
||||
# verify reclaim_age falls back to replicator config value
|
||||
# if there is no auditor config value
|
||||
config_path = os.path.join(self.testdir, 'objserver.conf')
|
||||
stub_config = """
|
||||
[object-replicator]
|
||||
reclaim_age = 60
|
||||
[object-auditor]
|
||||
"""
|
||||
with open(config_path, 'w') as f:
|
||||
f.write(textwrap.dedent(stub_config))
|
||||
conf = readconf(config_path, 'object-auditor')
|
||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
router = auditor_worker.diskfile_router
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(router[policy].reclaim_age, 60)
|
||||
|
||||
# we'll prefer our own DEFAULT section to the replicator though
|
||||
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
|
||||
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
|
||||
stub_config = """
|
||||
[DEFAULT]
|
||||
reclaim_age = 1209600
|
||||
[object-replicator]
|
||||
reclaim_age = 1800
|
||||
[object-auditor]
|
||||
"""
|
||||
with open(config_path, 'w') as f:
|
||||
f.write(textwrap.dedent(stub_config))
|
||||
conf = readconf(config_path, 'object-auditor')
|
||||
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
router = auditor_worker.diskfile_router
|
||||
for policy in POLICIES:
|
||||
self.assertEqual(router[policy].reclaim_age, 1209600)
|
||||
|
||||
def test_sleeper(self):
|
||||
with mock.patch(
|
||||
'time.sleep', mock.MagicMock()) as mock_sleep:
|
||||
|
@ -5426,6 +5426,64 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
self.assertEqual(hashes, {})
|
||||
|
||||
def test_hash_suffix_one_reclaim_tombstone_with_hash_pkl(self):
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
'sda1', '0', 'a', 'c', 'o', policy=policy)
|
||||
suffix_dir = os.path.dirname(df._datadir)
|
||||
part_dir = os.path.dirname(suffix_dir)
|
||||
hash_file = os.path.join(part_dir, diskfile.HASH_FILE)
|
||||
|
||||
# scale back reclaim age a bit
|
||||
df_mgr.reclaim_age = 1000
|
||||
# write a tombstone that's just a *little* older
|
||||
old_time = time() - 1001
|
||||
timestamp = Timestamp(old_time)
|
||||
df.delete(timestamp.internal)
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
# sanity
|
||||
self.assertEqual(hashes, {})
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
hash_timestamp = os.stat(hash_file).st_mtime
|
||||
|
||||
# if hash.pkl exists, that .ts file is not reclaimed
|
||||
df = df_mgr.get_diskfile(
|
||||
'sda1', '0', 'a', 'c', 'o', policy=policy)
|
||||
df.delete(timestamp.internal)
|
||||
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
# This was a cached value so the value looks empty
|
||||
self.assertEqual(hashes, {})
|
||||
# and the hash.pkl is not touched
|
||||
self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime)
|
||||
# and we still have tombstone entry
|
||||
tombstone = '%s.ts' % timestamp.internal
|
||||
self.assertTrue(os.path.exists(df._datadir))
|
||||
self.assertIn(tombstone, os.listdir(df._datadir))
|
||||
|
||||
# However if we call invalidate_hash for the suffix dir,
|
||||
# get_hashes can reclaim the tombstone
|
||||
with mock.patch('swift.obj.diskfile.lock_path'):
|
||||
df_mgr.invalidate_hash(suffix_dir)
|
||||
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
|
||||
self.assertEqual(hashes, {})
|
||||
# If we have no other objects in the suffix, get_hashes
|
||||
# doesn't reclaim anything
|
||||
self.assertTrue(os.path.exists(df._datadir))
|
||||
self.assertIn(tombstone, os.listdir(df._datadir))
|
||||
self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime)
|
||||
|
||||
# *BUT* if suffix value is given to recalc, it can force to recaim!
|
||||
suffix = os.path.dirname(suffix_dir)
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [suffix], policy)
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
# hash.pkl was updated
|
||||
self.assertGreater(os.stat(hash_file).st_mtime, hash_timestamp)
|
||||
|
||||
def test_hash_suffix_one_reclaim_and_one_valid_tombstone(self):
|
||||
for policy in self.iter_policies():
|
||||
paths, suffix = find_paths_with_matching_suffixes(2, 1)
|
||||
|
Loading…
Reference in New Issue
Block a user