From fd86d5a95d73714365c07cb36bfd1404306142a7 Mon Sep 17 00:00:00 2001 From: Christian Schwede Date: Mon, 15 Feb 2016 19:17:01 +0000 Subject: [PATCH] Skip already checked partitions when auditing objects after a restart The object auditor will save a short status file on each device, containing a list of remaining partitions for auditing. If the auditor is restarted, it will only audit partitions not yet checked. If all partitions on the current device have been checked, it will simply skip this device. Once all partitions on all disks are successfully audited, all status files are removed. Closes-Bug: #1183656 Change-Id: Icf1d920d0942ce48f1d3d374ea4d63dbc29ea464 --- swift/obj/auditor.py | 7 +++- swift/obj/diskfile.py | 66 ++++++++++++++++++++++++++++++---- test/unit/obj/test_auditor.py | 16 +++++++-- test/unit/obj/test_diskfile.py | 37 +++++++++++++++++++ 4 files changed, 117 insertions(+), 9 deletions(-) diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index de1199ea52..b9b1643709 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -95,7 +95,9 @@ class AuditorWorker(object): # can find all diskfile locations regardless of policy -- so for now # just use Policy-0's manager. all_locs = (self.diskfile_router[POLICIES[0]] - .object_audit_location_generator(device_dirs=device_dirs)) + .object_audit_location_generator( + device_dirs=device_dirs, + auditor_type=self.auditor_type)) for location in all_locs: loop_time = time.time() self.failsafe_object_audit(location) @@ -156,6 +158,9 @@ class AuditorWorker(object): self.logger.info( _('Object audit stats: %s') % json.dumps(self.stats_buckets)) + # Unset remaining partitions to not skip them in the next run + diskfile.clear_auditor_status(self.devices, self.auditor_type) + def record_stats(self, obj_size): """ Based on config's object_size_stats will keep track of how many objects diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 0928a5b688..4e4d3a40c1 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -33,6 +33,7 @@ are also not considered part of the backend API. import six.moves.cPickle as pickle import errno import fcntl +import json import os import time import uuid @@ -263,7 +264,7 @@ class AuditLocation(object): def object_audit_location_generator(devices, mount_check=True, logger=None, - device_dirs=None): + device_dirs=None, auditor_type="ALL"): """ Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all objects stored under that directory if device_dirs isn't set. If @@ -277,7 +278,8 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, :param mount_check: flag to check if a mount check should be performed on devices :param logger: a logger object - :device_dirs: a list of directories under devices to traverse + :param device_dirs: a list of directories under devices to traverse + :param auditor_type: either ALL or ZBF """ if not device_dirs: device_dirs = listdir(devices) @@ -307,8 +309,12 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, 'to a valid policy (%s)') % (dir_, e)) continue datadir_path = os.path.join(devices, device, dir_) - partitions = listdir(datadir_path) - for partition in partitions: + + partitions = get_auditor_status(datadir_path, logger, auditor_type) + + for pos, partition in enumerate(partitions): + update_auditor_status(datadir_path, logger, + partitions[pos:], auditor_type) part_path = os.path.join(datadir_path, partition) try: suffixes = listdir(part_path) @@ -329,6 +335,51 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, yield AuditLocation(hsh_path, device, partition, policy) + update_auditor_status(datadir_path, logger, [], auditor_type) + + +def get_auditor_status(datadir_path, logger, auditor_type): + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + status = {} + try: + with open(auditor_status) as statusfile: + status = statusfile.read() + except (OSError, IOError) as e: + if e.errno != errno.ENOENT and logger: + logger.warning(_('Cannot read %s (%s)') % (auditor_status, e)) + return listdir(datadir_path) + try: + status = json.loads(status) + except ValueError as e: + logger.warning(_('Loading JSON from %s failed (%s)') % ( + auditor_status, e)) + return listdir(datadir_path) + return status['partitions'] + + +def update_auditor_status(datadir_path, logger, partitions, auditor_type): + status = json.dumps({'partitions': partitions}) + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + try: + with open(auditor_status, "wb") as statusfile: + statusfile.write(status) + except (OSError, IOError) as e: + if logger: + logger.warning(_('Cannot write %s (%s)') % (auditor_status, e)) + + +def clear_auditor_status(devices, auditor_type="ALL"): + for device in os.listdir(devices): + for dir_ in os.listdir(os.path.join(devices, device)): + if not dir_.startswith("objects"): + continue + datadir_path = os.path.join(devices, device, dir_) + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + remove_file(auditor_status) + def strip_self(f): """ @@ -897,14 +948,17 @@ class BaseDiskFileManager(object): policy=policy, use_splice=self.use_splice, pipe_size=self.pipe_size, **kwargs) - def object_audit_location_generator(self, device_dirs=None): + def object_audit_location_generator(self, device_dirs=None, + auditor_type="ALL"): """ Yield an AuditLocation for all objects stored under device_dirs. :param device_dirs: directory of target device + :param auditor_type: either ALL or ZBF """ return object_audit_location_generator(self.devices, self.mount_check, - self.logger, device_dirs) + self.logger, device_dirs, + auditor_type) def get_diskfile_from_audit_location(self, audit_location): """ diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index b5db2e55e9..bebff1f41d 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -26,7 +26,8 @@ from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \ DEFAULT_TEST_EC_TYPE from swift.obj import auditor from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \ - get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation + get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \ + clear_auditor_status, get_auditor_status from swift.common.utils import mkdirs, normalize_timestamp, Timestamp from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \ POLICIES @@ -460,6 +461,7 @@ class TestAuditor(unittest.TestCase): self.auditor.run_audit(**kwargs) self.assertFalse(os.path.isdir(quarantine_path)) del(kwargs['zero_byte_fps']) + clear_auditor_status(self.devices) self.auditor.run_audit(**kwargs) self.assertTrue(os.path.isdir(quarantine_path)) @@ -495,10 +497,20 @@ class TestAuditor(unittest.TestCase): self.setup_bad_zero_byte() kwargs = {'mode': 'once'} kwargs['zero_byte_fps'] = 50 - self.auditor.run_audit(**kwargs) + + called_args = [0] + + def mock_get_auditor_status(path, logger, audit_type): + called_args[0] = audit_type + return get_auditor_status(path, logger, audit_type) + + with mock.patch('swift.obj.diskfile.get_auditor_status', + mock_get_auditor_status): + self.auditor.run_audit(**kwargs) quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') self.assertTrue(os.path.isdir(quarantine_path)) + self.assertEqual('ZBF', called_args[0]) def test_object_run_fast_track_zero_check_closed(self): rat = [False] diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 534882bec3..18530678a2 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -359,6 +359,9 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): ] self.assertEqual(locations, expected) + # Reset status file for next run + diskfile.clear_auditor_status(tmpdir) + # now without a logger locations = [(loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( @@ -433,6 +436,40 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): with mock.patch('os.listdir', splode_if_endswith("b54")): self.assertRaises(OSError, list_locations, tmpdir) + def test_auditor_status(self): + with temptree([]) as tmpdir: + os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b")) + os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b")) + + # Auditor starts, there are two partitions to check + gen = diskfile.object_audit_location_generator(tmpdir, False) + gen.next() + gen.next() + + # Auditor stopped for some reason without raising StopIterator in + # the generator and restarts There is now only one remaining + # partition to check + gen = diskfile.object_audit_location_generator(tmpdir, False) + gen.next() + + # There are no more remaining partitions + self.assertRaises(StopIteration, gen.next) + + # There are no partitions to check if the auditor restarts another + # time and the status files have not been cleared + gen = diskfile.object_audit_location_generator(tmpdir, False) + self.assertRaises(StopIteration, gen.next) + + # Reset status file + diskfile.clear_auditor_status(tmpdir) + + # If the auditor restarts another time, we expect to + # check two partitions again, because the remaining + # partitions were empty and a new listdir was executed + gen = diskfile.object_audit_location_generator(tmpdir, False) + gen.next() + gen.next() + class TestDiskFileRouter(unittest.TestCase):