Skip already checked partitions when auditing objects after a restart

The object auditor will save a short status file on each device, containing a
list of remaining partitions for auditing. If the auditor is restarted, it will
only audit partitions not yet checked. If all partitions on the current device
have been checked, it will simply skip this device. Once all partitions on all
disks are successfully audited, all status files are removed.

Closes-Bug: #1183656

Change-Id: Icf1d920d0942ce48f1d3d374ea4d63dbc29ea464
This commit is contained in:
Christian Schwede 2016-02-15 19:17:01 +00:00 committed by Thiago da Silva
parent 6ef66378c9
commit fd86d5a95d
4 changed files with 117 additions and 9 deletions

View File

@ -95,7 +95,9 @@ class AuditorWorker(object):
# can find all diskfile locations regardless of policy -- so for now
# just use Policy-0's manager.
all_locs = (self.diskfile_router[POLICIES[0]]
.object_audit_location_generator(device_dirs=device_dirs))
.object_audit_location_generator(
device_dirs=device_dirs,
auditor_type=self.auditor_type))
for location in all_locs:
loop_time = time.time()
self.failsafe_object_audit(location)
@ -156,6 +158,9 @@ class AuditorWorker(object):
self.logger.info(
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
# Unset remaining partitions to not skip them in the next run
diskfile.clear_auditor_status(self.devices, self.auditor_type)
def record_stats(self, obj_size):
"""
Based on config's object_size_stats will keep track of how many objects

View File

@ -33,6 +33,7 @@ are also not considered part of the backend API.
import six.moves.cPickle as pickle
import errno
import fcntl
import json
import os
import time
import uuid
@ -263,7 +264,7 @@ class AuditLocation(object):
def object_audit_location_generator(devices, mount_check=True, logger=None,
device_dirs=None):
device_dirs=None, auditor_type="ALL"):
"""
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
objects stored under that directory if device_dirs isn't set. If
@ -277,7 +278,8 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
:param mount_check: flag to check if a mount check should be performed
on devices
:param logger: a logger object
:device_dirs: a list of directories under devices to traverse
:param device_dirs: a list of directories under devices to traverse
:param auditor_type: either ALL or ZBF
"""
if not device_dirs:
device_dirs = listdir(devices)
@ -307,8 +309,12 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
'to a valid policy (%s)') % (dir_, e))
continue
datadir_path = os.path.join(devices, device, dir_)
partitions = listdir(datadir_path)
for partition in partitions:
partitions = get_auditor_status(datadir_path, logger, auditor_type)
for pos, partition in enumerate(partitions):
update_auditor_status(datadir_path, logger,
partitions[pos:], auditor_type)
part_path = os.path.join(datadir_path, partition)
try:
suffixes = listdir(part_path)
@ -329,6 +335,51 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
yield AuditLocation(hsh_path, device, partition,
policy)
update_auditor_status(datadir_path, logger, [], auditor_type)
def get_auditor_status(datadir_path, logger, auditor_type):
auditor_status = os.path.join(
datadir_path, "auditor_status_%s.json" % auditor_type)
status = {}
try:
with open(auditor_status) as statusfile:
status = statusfile.read()
except (OSError, IOError) as e:
if e.errno != errno.ENOENT and logger:
logger.warning(_('Cannot read %s (%s)') % (auditor_status, e))
return listdir(datadir_path)
try:
status = json.loads(status)
except ValueError as e:
logger.warning(_('Loading JSON from %s failed (%s)') % (
auditor_status, e))
return listdir(datadir_path)
return status['partitions']
def update_auditor_status(datadir_path, logger, partitions, auditor_type):
status = json.dumps({'partitions': partitions})
auditor_status = os.path.join(
datadir_path, "auditor_status_%s.json" % auditor_type)
try:
with open(auditor_status, "wb") as statusfile:
statusfile.write(status)
except (OSError, IOError) as e:
if logger:
logger.warning(_('Cannot write %s (%s)') % (auditor_status, e))
def clear_auditor_status(devices, auditor_type="ALL"):
for device in os.listdir(devices):
for dir_ in os.listdir(os.path.join(devices, device)):
if not dir_.startswith("objects"):
continue
datadir_path = os.path.join(devices, device, dir_)
auditor_status = os.path.join(
datadir_path, "auditor_status_%s.json" % auditor_type)
remove_file(auditor_status)
def strip_self(f):
"""
@ -897,14 +948,17 @@ class BaseDiskFileManager(object):
policy=policy, use_splice=self.use_splice,
pipe_size=self.pipe_size, **kwargs)
def object_audit_location_generator(self, device_dirs=None):
def object_audit_location_generator(self, device_dirs=None,
auditor_type="ALL"):
"""
Yield an AuditLocation for all objects stored under device_dirs.
:param device_dirs: directory of target device
:param auditor_type: either ALL or ZBF
"""
return object_audit_location_generator(self.devices, self.mount_check,
self.logger, device_dirs)
self.logger, device_dirs,
auditor_type)
def get_diskfile_from_audit_location(self, audit_location):
"""

View File

@ -26,7 +26,8 @@ from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \
DEFAULT_TEST_EC_TYPE
from swift.obj import auditor
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation
get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \
clear_auditor_status, get_auditor_status
from swift.common.utils import mkdirs, normalize_timestamp, Timestamp
from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \
POLICIES
@ -460,6 +461,7 @@ class TestAuditor(unittest.TestCase):
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
del(kwargs['zero_byte_fps'])
clear_auditor_status(self.devices)
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
@ -495,10 +497,20 @@ class TestAuditor(unittest.TestCase):
self.setup_bad_zero_byte()
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
called_args = [0]
def mock_get_auditor_status(path, logger, audit_type):
called_args[0] = audit_type
return get_auditor_status(path, logger, audit_type)
with mock.patch('swift.obj.diskfile.get_auditor_status',
mock_get_auditor_status):
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
self.assertEqual('ZBF', called_args[0])
def test_object_run_fast_track_zero_check_closed(self):
rat = [False]

View File

@ -359,6 +359,9 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
]
self.assertEqual(locations, expected)
# Reset status file for next run
diskfile.clear_auditor_status(tmpdir)
# now without a logger
locations = [(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
@ -433,6 +436,40 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
with mock.patch('os.listdir', splode_if_endswith("b54")):
self.assertRaises(OSError, list_locations, tmpdir)
def test_auditor_status(self):
with temptree([]) as tmpdir:
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
# Auditor starts, there are two partitions to check
gen = diskfile.object_audit_location_generator(tmpdir, False)
gen.next()
gen.next()
# Auditor stopped for some reason without raising StopIterator in
# the generator and restarts There is now only one remaining
# partition to check
gen = diskfile.object_audit_location_generator(tmpdir, False)
gen.next()
# There are no more remaining partitions
self.assertRaises(StopIteration, gen.next)
# There are no partitions to check if the auditor restarts another
# time and the status files have not been cleared
gen = diskfile.object_audit_location_generator(tmpdir, False)
self.assertRaises(StopIteration, gen.next)
# Reset status file
diskfile.clear_auditor_status(tmpdir)
# If the auditor restarts another time, we expect to
# check two partitions again, because the remaining
# partitions were empty and a new listdir was executed
gen = diskfile.object_audit_location_generator(tmpdir, False)
gen.next()
gen.next()
class TestDiskFileRouter(unittest.TestCase):