diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index de1199ea52..b9b1643709 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -95,7 +95,9 @@ class AuditorWorker(object): # can find all diskfile locations regardless of policy -- so for now # just use Policy-0's manager. all_locs = (self.diskfile_router[POLICIES[0]] - .object_audit_location_generator(device_dirs=device_dirs)) + .object_audit_location_generator( + device_dirs=device_dirs, + auditor_type=self.auditor_type)) for location in all_locs: loop_time = time.time() self.failsafe_object_audit(location) @@ -156,6 +158,9 @@ class AuditorWorker(object): self.logger.info( _('Object audit stats: %s') % json.dumps(self.stats_buckets)) + # Unset remaining partitions to not skip them in the next run + diskfile.clear_auditor_status(self.devices, self.auditor_type) + def record_stats(self, obj_size): """ Based on config's object_size_stats will keep track of how many objects diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 0928a5b688..4e4d3a40c1 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -33,6 +33,7 @@ are also not considered part of the backend API. import six.moves.cPickle as pickle import errno import fcntl +import json import os import time import uuid @@ -263,7 +264,7 @@ class AuditLocation(object): def object_audit_location_generator(devices, mount_check=True, logger=None, - device_dirs=None): + device_dirs=None, auditor_type="ALL"): """ Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all objects stored under that directory if device_dirs isn't set. If @@ -277,7 +278,8 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, :param mount_check: flag to check if a mount check should be performed on devices :param logger: a logger object - :device_dirs: a list of directories under devices to traverse + :param device_dirs: a list of directories under devices to traverse + :param auditor_type: either ALL or ZBF """ if not device_dirs: device_dirs = listdir(devices) @@ -307,8 +309,12 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, 'to a valid policy (%s)') % (dir_, e)) continue datadir_path = os.path.join(devices, device, dir_) - partitions = listdir(datadir_path) - for partition in partitions: + + partitions = get_auditor_status(datadir_path, logger, auditor_type) + + for pos, partition in enumerate(partitions): + update_auditor_status(datadir_path, logger, + partitions[pos:], auditor_type) part_path = os.path.join(datadir_path, partition) try: suffixes = listdir(part_path) @@ -329,6 +335,51 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, yield AuditLocation(hsh_path, device, partition, policy) + update_auditor_status(datadir_path, logger, [], auditor_type) + + +def get_auditor_status(datadir_path, logger, auditor_type): + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + status = {} + try: + with open(auditor_status) as statusfile: + status = statusfile.read() + except (OSError, IOError) as e: + if e.errno != errno.ENOENT and logger: + logger.warning(_('Cannot read %s (%s)') % (auditor_status, e)) + return listdir(datadir_path) + try: + status = json.loads(status) + except ValueError as e: + logger.warning(_('Loading JSON from %s failed (%s)') % ( + auditor_status, e)) + return listdir(datadir_path) + return status['partitions'] + + +def update_auditor_status(datadir_path, logger, partitions, auditor_type): + status = json.dumps({'partitions': partitions}) + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + try: + with open(auditor_status, "wb") as statusfile: + statusfile.write(status) + except (OSError, IOError) as e: + if logger: + logger.warning(_('Cannot write %s (%s)') % (auditor_status, e)) + + +def clear_auditor_status(devices, auditor_type="ALL"): + for device in os.listdir(devices): + for dir_ in os.listdir(os.path.join(devices, device)): + if not dir_.startswith("objects"): + continue + datadir_path = os.path.join(devices, device, dir_) + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + remove_file(auditor_status) + def strip_self(f): """ @@ -897,14 +948,17 @@ class BaseDiskFileManager(object): policy=policy, use_splice=self.use_splice, pipe_size=self.pipe_size, **kwargs) - def object_audit_location_generator(self, device_dirs=None): + def object_audit_location_generator(self, device_dirs=None, + auditor_type="ALL"): """ Yield an AuditLocation for all objects stored under device_dirs. :param device_dirs: directory of target device + :param auditor_type: either ALL or ZBF """ return object_audit_location_generator(self.devices, self.mount_check, - self.logger, device_dirs) + self.logger, device_dirs, + auditor_type) def get_diskfile_from_audit_location(self, audit_location): """ diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index b5db2e55e9..bebff1f41d 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -26,7 +26,8 @@ from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \ DEFAULT_TEST_EC_TYPE from swift.obj import auditor from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \ - get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation + get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \ + clear_auditor_status, get_auditor_status from swift.common.utils import mkdirs, normalize_timestamp, Timestamp from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \ POLICIES @@ -460,6 +461,7 @@ class TestAuditor(unittest.TestCase): self.auditor.run_audit(**kwargs) self.assertFalse(os.path.isdir(quarantine_path)) del(kwargs['zero_byte_fps']) + clear_auditor_status(self.devices) self.auditor.run_audit(**kwargs) self.assertTrue(os.path.isdir(quarantine_path)) @@ -495,10 +497,20 @@ class TestAuditor(unittest.TestCase): self.setup_bad_zero_byte() kwargs = {'mode': 'once'} kwargs['zero_byte_fps'] = 50 - self.auditor.run_audit(**kwargs) + + called_args = [0] + + def mock_get_auditor_status(path, logger, audit_type): + called_args[0] = audit_type + return get_auditor_status(path, logger, audit_type) + + with mock.patch('swift.obj.diskfile.get_auditor_status', + mock_get_auditor_status): + self.auditor.run_audit(**kwargs) quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') self.assertTrue(os.path.isdir(quarantine_path)) + self.assertEqual('ZBF', called_args[0]) def test_object_run_fast_track_zero_check_closed(self): rat = [False] diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 534882bec3..18530678a2 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -359,6 +359,9 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): ] self.assertEqual(locations, expected) + # Reset status file for next run + diskfile.clear_auditor_status(tmpdir) + # now without a logger locations = [(loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( @@ -433,6 +436,40 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): with mock.patch('os.listdir', splode_if_endswith("b54")): self.assertRaises(OSError, list_locations, tmpdir) + def test_auditor_status(self): + with temptree([]) as tmpdir: + os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b")) + os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b")) + + # Auditor starts, there are two partitions to check + gen = diskfile.object_audit_location_generator(tmpdir, False) + gen.next() + gen.next() + + # Auditor stopped for some reason without raising StopIterator in + # the generator and restarts There is now only one remaining + # partition to check + gen = diskfile.object_audit_location_generator(tmpdir, False) + gen.next() + + # There are no more remaining partitions + self.assertRaises(StopIteration, gen.next) + + # There are no partitions to check if the auditor restarts another + # time and the status files have not been cleared + gen = diskfile.object_audit_location_generator(tmpdir, False) + self.assertRaises(StopIteration, gen.next) + + # Reset status file + diskfile.clear_auditor_status(tmpdir) + + # If the auditor restarts another time, we expect to + # check two partitions again, because the remaining + # partitions were empty and a new listdir was executed + gen = diskfile.object_audit_location_generator(tmpdir, False) + gen.next() + gen.next() + class TestDiskFileRouter(unittest.TestCase):