Auditor will clean up stale rsync tempfiles

DiskFile already fills in the _ondisk_info attribute when it tries to open
a diskfile - even if the DiskFile's fileset is not valid or deleted.
During this process the rsync tempfiles would be discovered and logged,
but no-one would attempt to clean them up - even if they were really old.

Instead of logging and ignoring unexpected files when validate a DiskFile
fileset we'll add unexpected files to the unexpected key in the
_ondisk_info attribute.

With a little bit of re-organization in the auditor's object_audit method
to get things into a single return path we can add an unconditional check
for unexpected files and remove those that are "old enough".

Since the replicator will kill any rsync processes that are running longer
than the configured rsync_timeout we know that any rsync tempfiles older
than this can be deleted.

Split unlink_older_than in common.utils into two functions to allow an
explicit list of previously discovered paths to be passed in to avoid an
extra listdir.  Since the getmtime handling already ignores OSError
there's less concern of race condition where a previous discovered
unexpected file is reaped by rsync while we're attempting to clean it up.

Update some doc on the new config option.

Closes-Bug: #1554005

Change-Id: Id67681cb77f605e3491b8afcb9c69d769e154283
This commit is contained in:
Clay Gerrard 2016-03-15 17:09:21 -07:00 committed by Alistair Coles
parent d9a4f18b49
commit 1d03803a85
9 changed files with 344 additions and 37 deletions

View File

@ -499,6 +499,9 @@ and ensure that swift has read/write. The default is /var/cache/swift.
Takes a comma separated list of ints. If set, the object auditor will
increment a counter for every object whose size is <= to the given break
points and report the result after a full scan.
.IP \fBrsync_tempfile_timeout\fR
Time elapsed in seconds before rsync tempfiles will be unlinked. Config value of "auto"
will try to use object-replicator's rsync_timeout + 900 or fall-back to 86400 (1 day).
.RE

View File

@ -738,6 +738,11 @@ concurrency 1 The number of parallel processes
zero_byte_files_per_second 50
object_size_stats
recon_cache_path /var/cache/swift Path to recon cache
rsync_tempfile_timeout auto Time elapsed in seconds before rsync
tempfiles will be unlinked. Config value
of "auto" try to use object-replicator's
rsync_timeout + 900 or fallback to 86400
(1 day).
=========================== =================== ==========================================
------------------------------

View File

@ -306,6 +306,13 @@ use = egg:swift#recon
# points and report the result after a full scan.
# object_size_stats =
# The auditor will cleanup old rsync tempfiles after they are "old
# enough" to delete. You can configure the time elapsed in seconds
# before rsync tempfiles will be unlinked, or the default value of
# "auto" try to use object-replicator's rsync_timeout + 900 and fallback
# to 86400 (1 day).
# rsync_tempfile_timeout = auto
# Note: Put it at the beginning of the pipleline to profile all middleware. But
# it is safer to put this after healthcheck.
[filter:xprofile]

View File

@ -2122,10 +2122,21 @@ def unlink_older_than(path, mtime):
Remove any file in a given path that that was last modified before mtime.
:param path: path to remove file from
:mtime: timestamp of oldest file to keep
:param mtime: timestamp of oldest file to keep
"""
for fname in listdir(path):
fpath = os.path.join(path, fname)
filepaths = map(functools.partial(os.path.join, path), listdir(path))
return unlink_paths_older_than(filepaths, mtime)
def unlink_paths_older_than(filepaths, mtime):
"""
Remove any files from the given list that that were
last modified before mtime.
:param filepaths: a list of strings, the full paths of files to check
:param mtime: timestamp of oldest file to keep
"""
for fpath in filepaths:
try:
if os.path.getmtime(fpath) < mtime:
os.unlink(fpath)

View File

@ -18,18 +18,23 @@ import os
import sys
import time
import signal
import re
from random import shuffle
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
from swift.obj import diskfile
from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \
list_from_csv, listdir
from swift.obj import diskfile, replicator
from swift.common.utils import (
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
unlink_paths_older_than, readconf, config_auto_int_value)
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
# This matches rsync tempfiles, like ".<timestamp>.data.Xy095a"
RE_RSYNC_TEMPFILE = re.compile(r'^\..*\.([a-zA-Z0-9_]){6}$')
class AuditorWorker(object):
"""Walk through file system to audit objects"""
@ -42,6 +47,27 @@ class AuditorWorker(object):
self.max_files_per_second = float(conf.get('files_per_second', 20))
self.max_bytes_per_second = float(conf.get('bytes_per_second',
10000000))
try:
# ideally unless ops overrides the rsync_tempfile_timeout in the
# auditor section we can base our behavior on whatever they
# configure for their replicator
replicator_config = readconf(self.conf['__file__'],
'object-replicator')
except (KeyError, SystemExit):
# if we can't parse the real config (generally a KeyError on
# __file__, or SystemExit on no object-replicator section) we use
# a very conservative default
default = 86400
else:
replicator_rsync_timeout = int(replicator_config.get(
'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT))
# Here we can do some light math for ops and use the *replicator's*
# rsync_timeout (plus 15 mins to avoid deleting local tempfiles
# before the remote replicator kills it's rsync)
default = replicator_rsync_timeout + 900
self.rsync_tempfile_timeout = config_auto_int_value(
self.conf.get('rsync_tempfile_timeout'), default)
self.auditor_type = 'ALL'
self.zero_byte_only_at_fps = zero_byte_only_at_fps
if self.zero_byte_only_at_fps:
@ -200,34 +226,46 @@ class AuditorWorker(object):
raise DiskFileQuarantined(msg)
diskfile_mgr = self.diskfile_router[location.policy]
# this method doesn't normally raise errors, even if the audit
# location does not exist; if this raises an unexpected error it
# will get logged in failsafe
df = diskfile_mgr.get_diskfile_from_audit_location(location)
reader = None
try:
df = diskfile_mgr.get_diskfile_from_audit_location(location)
with df.open():
metadata = df.get_metadata()
obj_size = int(metadata['Content-Length'])
if self.stats_sizes:
self.record_stats(obj_size)
if self.zero_byte_only_at_fps and obj_size:
self.passes += 1
return
reader = df.reader(_quarantine_hook=raise_dfq)
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
if obj_size and not self.zero_byte_only_at_fps:
reader = df.reader(_quarantine_hook=raise_dfq)
if reader:
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
except DiskFileNotExist:
return
pass
except DiskFileQuarantined as err:
self.quarantines += 1
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
' quarantined: %(err)s'),
{'obj': location, 'err': err})
self.passes += 1
# _ondisk_info attr is initialized to None and filled in by open
ondisk_info_dict = df._ondisk_info or {}
if 'unexpected' in ondisk_info_dict:
is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match(
os.path.basename(fpath))
rsync_tempfile_paths = filter(is_rsync_tempfile,
ondisk_info_dict['unexpected'])
mtime = time.time() - self.rsync_tempfile_timeout
unlink_paths_older_than(rsync_tempfile_paths, mtime)
class ObjectAuditor(Daemon):

View File

@ -741,7 +741,10 @@ class BaseDiskFileManager(object):
# dicts for the files having that extension. The file_info dicts are of
# the form returned by parse_on_disk_filename, with the filename added.
# Each list is sorted in reverse timestamp order.
#
# the results dict is used to collect results of file filtering
results = {}
# The exts dict will be modified during subsequent processing as files
# are removed to be discarded or ignored.
exts = defaultdict(list)
@ -752,16 +755,15 @@ class BaseDiskFileManager(object):
file_info['filename'] = afile
exts[file_info['ext']].append(file_info)
except DiskFileError as e:
self.logger.warning('Unexpected file %s: %s' %
(os.path.join(datadir or '', afile), e))
file_path = os.path.join(datadir or '', afile)
self.logger.warning('Unexpected file %s: %s',
file_path, e)
results.setdefault('unexpected', []).append(file_path)
for ext in exts:
# For each extension sort files into reverse chronological order.
exts[ext] = sorted(
exts[ext], key=lambda info: info['timestamp'], reverse=True)
# the results dict is used to collect results of file filtering
results = {}
if exts.get('.ts'):
# non-tombstones older than or equal to latest tombstone are
# obsolete

View File

@ -41,6 +41,7 @@ from swift.obj import ssync_sender
from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir
from swift.common.storage_policy import POLICIES, REPL_POLICY
DEFAULT_RSYNC_TIMEOUT = 900
hubs.use_hub(get_hub())
@ -76,7 +77,8 @@ class ObjectReplicator(Daemon):
self.partition_times = []
self.interval = int(conf.get('interval') or
conf.get('run_pause') or 30)
self.rsync_timeout = int(conf.get('rsync_timeout', 900))
self.rsync_timeout = int(conf.get('rsync_timeout',
DEFAULT_RSYNC_TIMEOUT))
self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
self.rsync_bwlimit = conf.get('rsync_bwlimit', '0')
self.rsync_compress = config_true_value(

View File

@ -18,6 +18,7 @@ from __future__ import print_function
from test.unit import temptree
import ctypes
import contextlib
import errno
import eventlet
import eventlet.event
@ -3422,6 +3423,86 @@ class ResellerConfReader(unittest.TestCase):
self.assertEqual('pre2_group', options['PRE2_'].get('require_group'))
class TestUnlinkOlder(unittest.TestCase):
def setUp(self):
self.tempdir = mkdtemp()
self.mtime = {}
def tearDown(self):
rmtree(self.tempdir, ignore_errors=True)
def touch(self, fpath, mtime=None):
self.mtime[fpath] = mtime or time.time()
open(fpath, 'w')
@contextlib.contextmanager
def high_resolution_getmtime(self):
orig_getmtime = os.path.getmtime
def mock_getmtime(fpath):
mtime = self.mtime.get(fpath)
if mtime is None:
mtime = orig_getmtime(fpath)
return mtime
with mock.patch('os.path.getmtime', mock_getmtime):
yield
def test_unlink_older_than_path_not_exists(self):
path = os.path.join(self.tempdir, 'does-not-exist')
# just make sure it doesn't blow up
utils.unlink_older_than(path, time.time())
def test_unlink_older_than_file(self):
path = os.path.join(self.tempdir, 'some-file')
self.touch(path)
with self.assertRaises(OSError) as ctx:
utils.unlink_older_than(path, time.time())
self.assertEqual(ctx.exception.errno, errno.ENOTDIR)
def test_unlink_older_than_now(self):
self.touch(os.path.join(self.tempdir, 'test'))
with self.high_resolution_getmtime():
utils.unlink_older_than(self.tempdir, time.time())
self.assertEqual([], os.listdir(self.tempdir))
def test_unlink_not_old_enough(self):
start = time.time()
self.touch(os.path.join(self.tempdir, 'test'))
with self.high_resolution_getmtime():
utils.unlink_older_than(self.tempdir, start)
self.assertEqual(['test'], os.listdir(self.tempdir))
def test_unlink_mixed(self):
self.touch(os.path.join(self.tempdir, 'first'))
cutoff = time.time()
self.touch(os.path.join(self.tempdir, 'second'))
with self.high_resolution_getmtime():
utils.unlink_older_than(self.tempdir, cutoff)
self.assertEqual(['second'], os.listdir(self.tempdir))
def test_unlink_paths(self):
paths = []
for item in ('first', 'second', 'third'):
path = os.path.join(self.tempdir, item)
self.touch(path)
paths.append(path)
# don't unlink everyone
with self.high_resolution_getmtime():
utils.unlink_paths_older_than(paths[:2], time.time())
self.assertEqual(['third'], os.listdir(self.tempdir))
def test_unlink_empty_paths(self):
# just make sure it doesn't blow up
utils.unlink_paths_older_than([], time.time())
def test_unlink_not_exists_paths(self):
path = os.path.join(self.tempdir, 'does-not-exist')
# just make sure it doesn't blow up
utils.unlink_paths_older_than([path], time.time())
class TestSwiftInfo(unittest.TestCase):
def tearDown(self):

View File

@ -22,15 +22,18 @@ import string
from shutil import rmtree
from hashlib import md5
from tempfile import mkdtemp
from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \
DEFAULT_TEST_EC_TYPE
from swift.obj import auditor
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \
clear_auditor_status, get_auditor_status
from swift.common.utils import mkdirs, normalize_timestamp, Timestamp
from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \
POLICIES
import textwrap
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
DEFAULT_TEST_EC_TYPE)
from swift.obj import auditor, replicator
from swift.obj.diskfile import (
DiskFile, write_metadata, invalidate_hash, get_data_dir,
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
get_auditor_status)
from swift.common.utils import (
mkdirs, normalize_timestamp, Timestamp, readconf)
from swift.common.storage_policy import (
ECStoragePolicy, StoragePolicy, POLICIES)
_mocked_policies = [
@ -275,6 +278,161 @@ class TestAuditor(unittest.TestCase):
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.errors, 1)
def test_audit_location_gets_quarantined(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
# instead of a datadir, we'll make a file!
mkdirs(os.path.dirname(self.disk_file._datadir))
open(self.disk_file._datadir, 'w')
# after we turn the crank ...
auditor_worker.object_audit(location)
# ... it should get quarantined
self.assertFalse(os.path.exists(self.disk_file._datadir))
self.assertEqual(1, auditor_worker.quarantines)
def test_rsync_tempfile_timeout_auto_option(self):
# if we don't have access to the replicator config section we'll use
# our default
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
# if the rsync_tempfile_timeout option is set explicitly we use that
self.conf['rsync_tempfile_timeout'] = '1800'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800)
# if we have a real config we can be a little smarter
config_path = os.path.join(self.testdir, 'objserver.conf')
stub_config = """
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
# the Daemon loader will hand the object-auditor config to the
# auditor who will build the workers from it
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is no object-replicator section we still have to fall back
# to default because we can't parse the config for that section!
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
stub_config = """
[object-replicator]
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if the object-replicator section will parse but does not override
# the default rsync_timeout we assume the default rsync_timeout value
# and add 15mins
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
stub_config = """
[DEFAULT]
reclaim_age = 1209600
[object-replicator]
rsync_timeout = 3600
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is an object-replicator section with a rsync_timeout
# configured we'll use that value (3600) + 900
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900)
def test_inprogress_rsync_tempfiles_get_cleaned_up(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
data = 'VERIFY'
etag = md5()
timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
datafilename = None
datadir_files = os.listdir(self.disk_file._datadir)
for filename in datadir_files:
if filename.endswith('.data'):
datafilename = filename
break
else:
self.fail('Did not find .data file in %r: %r' %
(self.disk_file._datadir, datadir_files))
rsynctempfile_path = os.path.join(self.disk_file._datadir,
'.%s.9ILVBL' % datafilename)
open(rsynctempfile_path, 'w')
# sanity check we have an extra file
rsync_files = os.listdir(self.disk_file._datadir)
self.assertEqual(len(datadir_files) + 1, len(rsync_files))
# and after we turn the crank ...
auditor_worker.object_audit(location)
# ... we've still got the rsync file
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# and we'll keep it - depending on the rsync_tempfile_timeout
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
self.conf['rsync_tempfile_timeout'] = '3600'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600)
now = time.time() + 1900
with mock.patch('swift.obj.auditor.time.time',
return_value=now):
auditor_worker.object_audit(location)
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# but *tomorrow* when we run
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# ... we'll totally clean that stuff up!
self.assertEqual(datadir_files, os.listdir(self.disk_file._datadir))
# but if we have some random crazy file in there
random_crazy_file_path = os.path.join(self.disk_file._datadir,
'.random.crazy.file')
open(random_crazy_file_path, 'w')
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# that's someone elses problem
self.assertIn(os.path.basename(random_crazy_file_path),
os.listdir(self.disk_file._datadir))
def test_generic_exception_handling(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)