Skip already checked partitions when auditing objects after a restart
The object auditor will save a short status file on each device, containing a list of remaining partitions for auditing. If the auditor is restarted, it will only audit partitions not yet checked. If all partitions on the current device have been checked, it will simply skip this device. Once all partitions on all disks are successfully audited, all status files are removed. Closes-Bug: #1183656 Change-Id: Icf1d920d0942ce48f1d3d374ea4d63dbc29ea464
This commit is contained in:
parent
6ef66378c9
commit
fd86d5a95d
@ -95,7 +95,9 @@ class AuditorWorker(object):
|
||||
# can find all diskfile locations regardless of policy -- so for now
|
||||
# just use Policy-0's manager.
|
||||
all_locs = (self.diskfile_router[POLICIES[0]]
|
||||
.object_audit_location_generator(device_dirs=device_dirs))
|
||||
.object_audit_location_generator(
|
||||
device_dirs=device_dirs,
|
||||
auditor_type=self.auditor_type))
|
||||
for location in all_locs:
|
||||
loop_time = time.time()
|
||||
self.failsafe_object_audit(location)
|
||||
@ -156,6 +158,9 @@ class AuditorWorker(object):
|
||||
self.logger.info(
|
||||
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
|
||||
|
||||
# Unset remaining partitions to not skip them in the next run
|
||||
diskfile.clear_auditor_status(self.devices, self.auditor_type)
|
||||
|
||||
def record_stats(self, obj_size):
|
||||
"""
|
||||
Based on config's object_size_stats will keep track of how many objects
|
||||
|
@ -33,6 +33,7 @@ are also not considered part of the backend API.
|
||||
import six.moves.cPickle as pickle
|
||||
import errno
|
||||
import fcntl
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
@ -263,7 +264,7 @@ class AuditLocation(object):
|
||||
|
||||
|
||||
def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
device_dirs=None):
|
||||
device_dirs=None, auditor_type="ALL"):
|
||||
"""
|
||||
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
|
||||
objects stored under that directory if device_dirs isn't set. If
|
||||
@ -277,7 +278,8 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
:param mount_check: flag to check if a mount check should be performed
|
||||
on devices
|
||||
:param logger: a logger object
|
||||
:device_dirs: a list of directories under devices to traverse
|
||||
:param device_dirs: a list of directories under devices to traverse
|
||||
:param auditor_type: either ALL or ZBF
|
||||
"""
|
||||
if not device_dirs:
|
||||
device_dirs = listdir(devices)
|
||||
@ -307,8 +309,12 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
'to a valid policy (%s)') % (dir_, e))
|
||||
continue
|
||||
datadir_path = os.path.join(devices, device, dir_)
|
||||
partitions = listdir(datadir_path)
|
||||
for partition in partitions:
|
||||
|
||||
partitions = get_auditor_status(datadir_path, logger, auditor_type)
|
||||
|
||||
for pos, partition in enumerate(partitions):
|
||||
update_auditor_status(datadir_path, logger,
|
||||
partitions[pos:], auditor_type)
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
try:
|
||||
suffixes = listdir(part_path)
|
||||
@ -329,6 +335,51 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
yield AuditLocation(hsh_path, device, partition,
|
||||
policy)
|
||||
|
||||
update_auditor_status(datadir_path, logger, [], auditor_type)
|
||||
|
||||
|
||||
def get_auditor_status(datadir_path, logger, auditor_type):
|
||||
auditor_status = os.path.join(
|
||||
datadir_path, "auditor_status_%s.json" % auditor_type)
|
||||
status = {}
|
||||
try:
|
||||
with open(auditor_status) as statusfile:
|
||||
status = statusfile.read()
|
||||
except (OSError, IOError) as e:
|
||||
if e.errno != errno.ENOENT and logger:
|
||||
logger.warning(_('Cannot read %s (%s)') % (auditor_status, e))
|
||||
return listdir(datadir_path)
|
||||
try:
|
||||
status = json.loads(status)
|
||||
except ValueError as e:
|
||||
logger.warning(_('Loading JSON from %s failed (%s)') % (
|
||||
auditor_status, e))
|
||||
return listdir(datadir_path)
|
||||
return status['partitions']
|
||||
|
||||
|
||||
def update_auditor_status(datadir_path, logger, partitions, auditor_type):
|
||||
status = json.dumps({'partitions': partitions})
|
||||
auditor_status = os.path.join(
|
||||
datadir_path, "auditor_status_%s.json" % auditor_type)
|
||||
try:
|
||||
with open(auditor_status, "wb") as statusfile:
|
||||
statusfile.write(status)
|
||||
except (OSError, IOError) as e:
|
||||
if logger:
|
||||
logger.warning(_('Cannot write %s (%s)') % (auditor_status, e))
|
||||
|
||||
|
||||
def clear_auditor_status(devices, auditor_type="ALL"):
|
||||
for device in os.listdir(devices):
|
||||
for dir_ in os.listdir(os.path.join(devices, device)):
|
||||
if not dir_.startswith("objects"):
|
||||
continue
|
||||
datadir_path = os.path.join(devices, device, dir_)
|
||||
auditor_status = os.path.join(
|
||||
datadir_path, "auditor_status_%s.json" % auditor_type)
|
||||
remove_file(auditor_status)
|
||||
|
||||
|
||||
def strip_self(f):
|
||||
"""
|
||||
@ -897,14 +948,17 @@ class BaseDiskFileManager(object):
|
||||
policy=policy, use_splice=self.use_splice,
|
||||
pipe_size=self.pipe_size, **kwargs)
|
||||
|
||||
def object_audit_location_generator(self, device_dirs=None):
|
||||
def object_audit_location_generator(self, device_dirs=None,
|
||||
auditor_type="ALL"):
|
||||
"""
|
||||
Yield an AuditLocation for all objects stored under device_dirs.
|
||||
|
||||
:param device_dirs: directory of target device
|
||||
:param auditor_type: either ALL or ZBF
|
||||
"""
|
||||
return object_audit_location_generator(self.devices, self.mount_check,
|
||||
self.logger, device_dirs)
|
||||
self.logger, device_dirs,
|
||||
auditor_type)
|
||||
|
||||
def get_diskfile_from_audit_location(self, audit_location):
|
||||
"""
|
||||
|
@ -26,7 +26,8 @@ from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \
|
||||
DEFAULT_TEST_EC_TYPE
|
||||
from swift.obj import auditor
|
||||
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
|
||||
get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation
|
||||
get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \
|
||||
clear_auditor_status, get_auditor_status
|
||||
from swift.common.utils import mkdirs, normalize_timestamp, Timestamp
|
||||
from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \
|
||||
POLICIES
|
||||
@ -460,6 +461,7 @@ class TestAuditor(unittest.TestCase):
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertFalse(os.path.isdir(quarantine_path))
|
||||
del(kwargs['zero_byte_fps'])
|
||||
clear_auditor_status(self.devices)
|
||||
self.auditor.run_audit(**kwargs)
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
|
||||
@ -495,10 +497,20 @@ class TestAuditor(unittest.TestCase):
|
||||
self.setup_bad_zero_byte()
|
||||
kwargs = {'mode': 'once'}
|
||||
kwargs['zero_byte_fps'] = 50
|
||||
self.auditor.run_audit(**kwargs)
|
||||
|
||||
called_args = [0]
|
||||
|
||||
def mock_get_auditor_status(path, logger, audit_type):
|
||||
called_args[0] = audit_type
|
||||
return get_auditor_status(path, logger, audit_type)
|
||||
|
||||
with mock.patch('swift.obj.diskfile.get_auditor_status',
|
||||
mock_get_auditor_status):
|
||||
self.auditor.run_audit(**kwargs)
|
||||
quarantine_path = os.path.join(self.devices,
|
||||
'sda', 'quarantined', 'objects')
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
self.assertEqual('ZBF', called_args[0])
|
||||
|
||||
def test_object_run_fast_track_zero_check_closed(self):
|
||||
rat = [False]
|
||||
|
@ -359,6 +359,9 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
]
|
||||
self.assertEqual(locations, expected)
|
||||
|
||||
# Reset status file for next run
|
||||
diskfile.clear_auditor_status(tmpdir)
|
||||
|
||||
# now without a logger
|
||||
locations = [(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
@ -433,6 +436,40 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
with mock.patch('os.listdir', splode_if_endswith("b54")):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
|
||||
def test_auditor_status(self):
|
||||
with temptree([]) as tmpdir:
|
||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
|
||||
|
||||
# Auditor starts, there are two partitions to check
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
gen.next()
|
||||
gen.next()
|
||||
|
||||
# Auditor stopped for some reason without raising StopIterator in
|
||||
# the generator and restarts There is now only one remaining
|
||||
# partition to check
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
gen.next()
|
||||
|
||||
# There are no more remaining partitions
|
||||
self.assertRaises(StopIteration, gen.next)
|
||||
|
||||
# There are no partitions to check if the auditor restarts another
|
||||
# time and the status files have not been cleared
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
self.assertRaises(StopIteration, gen.next)
|
||||
|
||||
# Reset status file
|
||||
diskfile.clear_auditor_status(tmpdir)
|
||||
|
||||
# If the auditor restarts another time, we expect to
|
||||
# check two partitions again, because the remaining
|
||||
# partitions were empty and a new listdir was executed
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
gen.next()
|
||||
gen.next()
|
||||
|
||||
|
||||
class TestDiskFileRouter(unittest.TestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user