Merge "Change object_audit_location_generator() to yield for a single policy."
This commit is contained in:
commit
9aca9ad780
@ -27,7 +27,7 @@ from eventlet import Timeout
|
||||
from swift.obj import diskfile, replicator
|
||||
from swift.common.utils import (
|
||||
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
|
||||
unlink_paths_older_than, readconf, config_auto_int_value)
|
||||
unlink_paths_older_than, readconf, config_auto_int_value, round_robin_iter)
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
|
||||
DiskFileDeleted, DiskFileExpired
|
||||
from swift.common.daemon import Daemon
|
||||
@ -120,18 +120,17 @@ class AuditorWorker(object):
|
||||
total_quarantines = 0
|
||||
total_errors = 0
|
||||
time_auditing = 0
|
||||
# TODO: we should move audit-location generation to the storage policy,
|
||||
# as we may (conceivably) have a different filesystem layout for each.
|
||||
# We'd still need to generate the policies to audit from the actual
|
||||
# directories found on-disk, and have appropriate error reporting if we
|
||||
# find a directory that doesn't correspond to any known policy. This
|
||||
# will require a sizable refactor, but currently all diskfile managers
|
||||
# can find all diskfile locations regardless of policy -- so for now
|
||||
# just use Policy-0's manager.
|
||||
all_locs = (self.diskfile_router[POLICIES[0]]
|
||||
|
||||
# get AuditLocations for each policy
|
||||
loc_generators = []
|
||||
for policy in POLICIES:
|
||||
loc_generators.append(
|
||||
self.diskfile_router[policy]
|
||||
.object_audit_location_generator(
|
||||
device_dirs=device_dirs,
|
||||
policy, device_dirs=device_dirs,
|
||||
auditor_type=self.auditor_type))
|
||||
|
||||
all_locs = round_robin_iter(loc_generators)
|
||||
for location in all_locs:
|
||||
loop_time = time.time()
|
||||
self.failsafe_object_audit(location)
|
||||
@ -192,8 +191,11 @@ class AuditorWorker(object):
|
||||
self.logger.info(
|
||||
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
|
||||
|
||||
# Unset remaining partitions to not skip them in the next run
|
||||
diskfile.clear_auditor_status(self.devices, self.auditor_type)
|
||||
for policy in POLICIES:
|
||||
# Unset remaining partitions to not skip them in the next run
|
||||
self.diskfile_router[policy].clear_auditor_status(
|
||||
policy,
|
||||
self.auditor_type)
|
||||
|
||||
def record_stats(self, obj_size):
|
||||
"""
|
||||
|
@ -453,18 +453,20 @@ class AuditLocation(object):
|
||||
return str(self.path)
|
||||
|
||||
|
||||
def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
device_dirs=None, auditor_type="ALL"):
|
||||
def object_audit_location_generator(devices, datadir, mount_check=True,
|
||||
logger=None, device_dirs=None,
|
||||
auditor_type="ALL"):
|
||||
"""
|
||||
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
|
||||
objects stored under that directory if device_dirs isn't set. If
|
||||
device_dirs is set, only yield AuditLocation for the objects under the
|
||||
entries in device_dirs. The AuditLocation only knows the path to the hash
|
||||
directory, not to the .data file therein (if any). This is to avoid a
|
||||
double listdir(hash_dir); the DiskFile object will always do one, so
|
||||
we don't.
|
||||
objects stored under that directory for the given datadir (policy),
|
||||
if device_dirs isn't set. If device_dirs is set, only yield AuditLocation
|
||||
for the objects under the entries in device_dirs. The AuditLocation only
|
||||
knows the path to the hash directory, not to the .data file therein
|
||||
(if any). This is to avoid a double listdir(hash_dir); the DiskFile object
|
||||
will always do one, so we don't.
|
||||
|
||||
:param devices: parent directory of the devices to be audited
|
||||
:param datadir: objects directory
|
||||
:param mount_check: flag to check if a mount check should be performed
|
||||
on devices
|
||||
:param logger: a logger object
|
||||
@ -480,6 +482,7 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
# randomize devices in case of process restart before sweep completed
|
||||
shuffle(device_dirs)
|
||||
|
||||
base, policy = split_policy_string(datadir)
|
||||
for device in device_dirs:
|
||||
if not check_drive(devices, device, mount_check):
|
||||
if logger:
|
||||
@ -487,55 +490,37 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
'Skipping %s as it is not %s', device,
|
||||
'mounted' if mount_check else 'a dir')
|
||||
continue
|
||||
# loop through object dirs for all policies
|
||||
device_dir = os.path.join(devices, device)
|
||||
try:
|
||||
dirs = os.listdir(device_dir)
|
||||
except OSError as e:
|
||||
if logger:
|
||||
logger.debug(
|
||||
_('Skipping %(dir)s: %(err)s') % {'dir': device_dir,
|
||||
'err': e.strerror})
|
||||
|
||||
datadir_path = os.path.join(devices, device, datadir)
|
||||
if not os.path.exists(datadir_path):
|
||||
continue
|
||||
for dir_ in dirs:
|
||||
if not dir_.startswith(DATADIR_BASE):
|
||||
continue
|
||||
|
||||
partitions = get_auditor_status(datadir_path, logger, auditor_type)
|
||||
|
||||
for pos, partition in enumerate(partitions):
|
||||
update_auditor_status(datadir_path, logger,
|
||||
partitions[pos:], auditor_type)
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
try:
|
||||
base, policy = split_policy_string(dir_)
|
||||
except PolicyError as e:
|
||||
if logger:
|
||||
logger.warning(_('Directory %(directory)r does not map '
|
||||
'to a valid policy (%(error)s)') % {
|
||||
'directory': dir_, 'error': e})
|
||||
suffixes = listdir(part_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
datadir_path = os.path.join(devices, device, dir_)
|
||||
|
||||
partitions = get_auditor_status(datadir_path, logger, auditor_type)
|
||||
|
||||
for pos, partition in enumerate(partitions):
|
||||
update_auditor_status(datadir_path, logger,
|
||||
partitions[pos:], auditor_type)
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
for asuffix in suffixes:
|
||||
suff_path = os.path.join(part_path, asuffix)
|
||||
try:
|
||||
suffixes = listdir(part_path)
|
||||
hashes = listdir(suff_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
for asuffix in suffixes:
|
||||
suff_path = os.path.join(part_path, asuffix)
|
||||
try:
|
||||
hashes = listdir(suff_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
for hsh in hashes:
|
||||
hsh_path = os.path.join(suff_path, hsh)
|
||||
yield AuditLocation(hsh_path, device, partition,
|
||||
policy)
|
||||
for hsh in hashes:
|
||||
hsh_path = os.path.join(suff_path, hsh)
|
||||
yield AuditLocation(hsh_path, device, partition,
|
||||
policy)
|
||||
|
||||
update_auditor_status(datadir_path, logger, [], auditor_type)
|
||||
update_auditor_status(datadir_path, logger, [], auditor_type)
|
||||
|
||||
|
||||
def get_auditor_status(datadir_path, logger, auditor_type):
|
||||
@ -589,15 +574,13 @@ def update_auditor_status(datadir_path, logger, partitions, auditor_type):
|
||||
{'auditor_status': auditor_status, 'err': e})
|
||||
|
||||
|
||||
def clear_auditor_status(devices, auditor_type="ALL"):
|
||||
for device in os.listdir(devices):
|
||||
for dir_ in os.listdir(os.path.join(devices, device)):
|
||||
if not dir_.startswith("objects"):
|
||||
continue
|
||||
datadir_path = os.path.join(devices, device, dir_)
|
||||
auditor_status = os.path.join(
|
||||
datadir_path, "auditor_status_%s.json" % auditor_type)
|
||||
remove_file(auditor_status)
|
||||
def clear_auditor_status(devices, datadir, auditor_type="ALL"):
|
||||
device_dirs = listdir(devices)
|
||||
for device in device_dirs:
|
||||
datadir_path = os.path.join(devices, device, datadir)
|
||||
auditor_status = os.path.join(
|
||||
datadir_path, "auditor_status_%s.json" % auditor_type)
|
||||
remove_file(auditor_status)
|
||||
|
||||
|
||||
def strip_self(f):
|
||||
@ -1340,15 +1323,22 @@ class BaseDiskFileManager(object):
|
||||
pipe_size=self.pipe_size,
|
||||
use_linkat=self.use_linkat, **kwargs)
|
||||
|
||||
def object_audit_location_generator(self, device_dirs=None,
|
||||
def clear_auditor_status(self, policy, auditor_type="ALL"):
|
||||
datadir = get_data_dir(policy)
|
||||
clear_auditor_status(self.devices, datadir, auditor_type)
|
||||
|
||||
def object_audit_location_generator(self, policy, device_dirs=None,
|
||||
auditor_type="ALL"):
|
||||
"""
|
||||
Yield an AuditLocation for all objects stored under device_dirs.
|
||||
|
||||
:param policy: the StoragePolicy instance
|
||||
:param device_dirs: directory of target device
|
||||
:param auditor_type: either ALL or ZBF
|
||||
"""
|
||||
return object_audit_location_generator(self.devices, self.mount_check,
|
||||
datadir = get_data_dir(policy)
|
||||
return object_audit_location_generator(self.devices, datadir,
|
||||
self.mount_check,
|
||||
self.logger, device_dirs,
|
||||
auditor_type)
|
||||
|
||||
|
@ -852,7 +852,7 @@ class TestAuditor(unittest.TestCase):
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertFalse(os.path.isdir(quarantine_path))
|
||||
del(kwargs['zero_byte_fps'])
|
||||
clear_auditor_status(self.devices)
|
||||
clear_auditor_status(self.devices, 'objects')
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
|
||||
|
@ -375,17 +375,6 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
|
||||
"4a943bc72c2e647c4675923d58cf4ca5"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-2", "9971", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-99", "9972",
|
||||
"8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
# the bad
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-", "1135",
|
||||
"6c3",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-fud", "foo"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-+1", "foo"))
|
||||
|
||||
self._make_file(os.path.join(tmpdir, "sdp", "objects", "1519",
|
||||
"fed"))
|
||||
@ -404,26 +393,18 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
"4f9eee668b66c6f0250bfa3c7ab9e51e"))
|
||||
|
||||
logger = debug_logger()
|
||||
locations = [(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False,
|
||||
logger=logger)]
|
||||
locations.sort()
|
||||
loc_generators = []
|
||||
datadirs = ["objects", "objects-1"]
|
||||
for datadir in datadirs:
|
||||
loc_generators.append(
|
||||
diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, datadir=datadir, mount_check=False,
|
||||
logger=logger))
|
||||
|
||||
# expect some warnings about those bad dirs
|
||||
warnings = logger.get_lines_for_level('warning')
|
||||
self.assertEqual(set(warnings), set([
|
||||
("Directory 'objects-' does not map to a valid policy "
|
||||
"(Unknown policy, for index '')"),
|
||||
("Directory 'objects-2' does not map to a valid policy "
|
||||
"(Unknown policy, for index '2')"),
|
||||
("Directory 'objects-99' does not map to a valid policy "
|
||||
"(Unknown policy, for index '99')"),
|
||||
("Directory 'objects-fud' does not map to a valid policy "
|
||||
"(Unknown policy, for index 'fud')"),
|
||||
("Directory 'objects-+1' does not map to a valid policy "
|
||||
"(Unknown policy, for index '+1')"),
|
||||
]))
|
||||
all_locs = itertools.chain(*loc_generators)
|
||||
locations = [(loc.path, loc.device, loc.partition, loc.policy) for
|
||||
loc in all_locs]
|
||||
locations.sort()
|
||||
|
||||
expected = \
|
||||
[(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
|
||||
@ -448,12 +429,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
self.assertEqual(locations, expected)
|
||||
|
||||
# Reset status file for next run
|
||||
diskfile.clear_auditor_status(tmpdir)
|
||||
for datadir in datadirs:
|
||||
diskfile.clear_auditor_status(tmpdir, datadir)
|
||||
|
||||
# now without a logger
|
||||
locations = [(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False)]
|
||||
for datadir in datadirs:
|
||||
loc_generators.append(
|
||||
diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, datadir=datadir, mount_check=False,
|
||||
logger=logger))
|
||||
|
||||
all_locs = itertools.chain(*loc_generators)
|
||||
locations = [(loc.path, loc.device, loc.partition, loc.policy) for
|
||||
loc in all_locs]
|
||||
locations.sort()
|
||||
self.assertEqual(locations, expected)
|
||||
|
||||
@ -470,7 +458,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=True)]
|
||||
devices=tmpdir, datadir="objects", mount_check=True)]
|
||||
locations.sort()
|
||||
|
||||
self.assertEqual(
|
||||
@ -485,7 +473,8 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=True, logger=logger)]
|
||||
devices=tmpdir, datadir="objects", mount_check=True,
|
||||
logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping sdq as it is not mounted',
|
||||
@ -502,7 +491,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False)]
|
||||
devices=tmpdir, datadir="objects", mount_check=False)]
|
||||
|
||||
self.assertEqual(
|
||||
locations,
|
||||
@ -516,30 +505,22 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False, logger=logger)]
|
||||
devices=tmpdir, datadir="objects", mount_check=False,
|
||||
logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping garbage as it is not a dir',
|
||||
], debug_lines)
|
||||
logger.clear()
|
||||
with mock_check_drive(isdir=True):
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False, logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping %s: Not a directory' % os.path.join(
|
||||
tmpdir, "garbage"),
|
||||
], debug_lines)
|
||||
logger.clear()
|
||||
|
||||
with mock_check_drive() as mocks:
|
||||
mocks['ismount'].side_effect = lambda path: (
|
||||
False if path.endswith('garbage') else True)
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=True, logger=logger)]
|
||||
devices=tmpdir, datadir="objects", mount_check=True,
|
||||
logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping garbage as it is not mounted',
|
||||
@ -550,10 +531,10 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
# so that errors get logged and a human can see what's going wrong;
|
||||
# only normal FS corruption should be skipped over silently.
|
||||
|
||||
def list_locations(dirname):
|
||||
def list_locations(dirname, datadir):
|
||||
return [(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=dirname, mount_check=False)]
|
||||
devices=dirname, datadir=datadir, mount_check=False)]
|
||||
|
||||
real_listdir = os.listdir
|
||||
|
||||
@ -570,30 +551,34 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
"2607", "b54",
|
||||
"fe450ec990a88cc4b252b181bab04b54"))
|
||||
with mock.patch('os.listdir', splode_if_endswith("sdf/objects")):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
self.assertRaises(OSError, list_locations, tmpdir, "objects")
|
||||
with mock.patch('os.listdir', splode_if_endswith("2607")):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
self.assertRaises(OSError, list_locations, tmpdir, "objects")
|
||||
with mock.patch('os.listdir', splode_if_endswith("b54")):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
self.assertRaises(OSError, list_locations, tmpdir, "objects")
|
||||
|
||||
def test_auditor_status(self):
|
||||
with temptree([]) as tmpdir:
|
||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
|
||||
datadir = "objects"
|
||||
|
||||
# Pretend that some time passed between each partition
|
||||
with mock.patch('os.stat') as mock_stat, \
|
||||
mock_check_drive(isdir=True):
|
||||
mock_stat.return_value.st_mtime = time() - 60
|
||||
# Auditor starts, there are two partitions to check
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
gen = diskfile.object_audit_location_generator(tmpdir,
|
||||
datadir,
|
||||
False)
|
||||
gen.next()
|
||||
gen.next()
|
||||
|
||||
# Auditor stopped for some reason without raising StopIterator in
|
||||
# the generator and restarts There is now only one remaining
|
||||
# partition to check
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, datadir,
|
||||
False)
|
||||
with mock_check_drive(isdir=True):
|
||||
gen.next()
|
||||
|
||||
@ -602,17 +587,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
|
||||
# There are no partitions to check if the auditor restarts another
|
||||
# time and the status files have not been cleared
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, datadir,
|
||||
False)
|
||||
with mock_check_drive(isdir=True):
|
||||
self.assertRaises(StopIteration, gen.next)
|
||||
|
||||
# Reset status file
|
||||
diskfile.clear_auditor_status(tmpdir)
|
||||
diskfile.clear_auditor_status(tmpdir, datadir)
|
||||
|
||||
# If the auditor restarts another time, we expect to
|
||||
# check two partitions again, because the remaining
|
||||
# partitions were empty and a new listdir was executed
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, datadir,
|
||||
False)
|
||||
with mock_check_drive(isdir=True):
|
||||
gen.next()
|
||||
gen.next()
|
||||
@ -985,7 +972,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.df_mgr.logger.increment.assert_called_with('async_pendings')
|
||||
|
||||
def test_object_audit_location_generator(self):
|
||||
locations = list(self.df_mgr.object_audit_location_generator())
|
||||
locations = list(
|
||||
self.df_mgr.object_audit_location_generator(POLICIES[0]))
|
||||
self.assertEqual(locations, [])
|
||||
|
||||
def test_replication_one_per_device_deprecation(self):
|
||||
|
Loading…
Reference in New Issue
Block a user