diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 68f8c9b5c8..8db4360a4d 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -229,7 +229,12 @@ Option Default Description log_name object-auditor Label used when logging log_facility LOG_LOCAL0 Syslog log facility log_level INFO Logging level -interval 1800 Minimum time for a pass to take +files_per_second 20 Maximum files audited per second. Should + be tuned according to individual system + specs. 0 is unlimited. +bytes_per_second 10000000 Maximum bytes audited per second. Should + be tuned according to individual system + specs. 0 is unlimited. ================== ============== ========================================== ------------------------------ diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index d4523566bf..cc80c18c07 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -55,5 +55,5 @@ use = egg:swift#object [object-auditor] # log_name = object-auditor -# Will audit, at most, 1 object per device per interval -# interval = 1800 +# files_per_second = 20 +# bytes_per_second = 10000000 diff --git a/swift/common/utils.py b/swift/common/utils.py index 75f6409ba6..531905f407 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -726,8 +726,11 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None): on devices :param logger: a logger object ''' - for device in os.listdir(devices): - if mount_check and not\ + device_dir = os.listdir(devices) + # randomize devices in case of process restart before sweep completed + shuffle(device_dir) + for device in device_dir: + if mount_check and not \ os.path.ismount(os.path.join(devices, device)): if logger: logger.debug( diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 98dc077640..dbfee18280 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -35,14 +35,18 @@ class ObjectAuditor(Daemon): self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ ('true', 't', '1', 'on', 'yes', 'y') - self.max_files_per_second = float(conf.get('files_per_second', 0)) - self.max_bytes_per_second = float(conf.get('bytes_per_second', 0)) + self.max_files_per_second = float(conf.get('files_per_second', 20)) + self.max_bytes_per_second = float(conf.get('bytes_per_second', + 10000000)) self.files_running_time = 0 self.bytes_running_time = 0 self.bytes_processed = 0 + self.total_bytes_processed = 0 + self.total_files_processed = 0 self.passes = 0 self.quarantines = 0 self.errors = 0 + self.log_time = 3600 # once an hour def run_forever(self): """Run the object audit until stopped.""" @@ -62,7 +66,8 @@ class ObjectAuditor(Daemon): self.object_audit(path, device, partition) self.files_running_time = ratelimit_sleep( self.files_running_time, self.max_files_per_second) - if time.time() - reported >= 3600: # once an hour + self.total_files_processed += 1 + if time.time() - reported >= self.log_time: self.logger.info( 'Since %s: Locally: %d passed audit, %d quarantined, ' '%d errors files/sec: %.2f , bytes/sec: %.2f' % ( @@ -77,7 +82,11 @@ class ObjectAuditor(Daemon): self.bytes_processed = 0 elapsed = time.time() - begin self.logger.info( - 'Object audit "%s" mode completed: %.02fs' % (mode, elapsed)) + 'Object audit "%s" mode completed: %.02fs. ' + 'Total bytes/sec: %.2f , Total files/sec: %.2f ' % ( + mode, elapsed, + self.total_bytes_processed / elapsed, + self.total_files_processed / elapsed)) def object_audit(self, path, device, partition): """ @@ -114,6 +123,7 @@ class ObjectAuditor(Daemon): incr_by=len(chunk)) etag.update(chunk) self.bytes_processed += len(chunk) + self.total_bytes_processed += len(chunk) etag = etag.hexdigest() if etag != df.metadata['ETag']: raise AuditException("ETag of %s does not match file's md5 of " diff --git a/swift/obj/server.py b/swift/obj/server.py index a690f7fe2a..5b4fc1b06b 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -219,7 +219,7 @@ class DiskFile(object): :params fd: file descriptor of the temp file :param tmppath: path to the temporary file being used - :param metadata: dictionary of metada to be written + :param metadata: dictionary of metadata to be written :param extention: extension to be used when making the file """ metadata['name'] = self.name diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index b680a27793..d0768a5a0b 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -35,6 +35,7 @@ from swift.common import utils class MockOs(): + def __init__(self, pass_funcs=[], called_funcs=[], raise_funcs=[]): self.closed_fds = [] for func in pass_funcs: @@ -184,12 +185,12 @@ class TestUtils(unittest.TestCase): print 'test2' self.assertEquals(sio.getvalue(), 'STDOUT: test2\n') sys.stderr = lfo - print >>sys.stderr, 'test4' + print >> sys.stderr, 'test4' self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n') sys.stdout = orig_stdout print 'test5' self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n') - print >>sys.stderr, 'test6' + print >> sys.stderr, 'test6' self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n' 'STDOUT: test6\n') sys.stderr = orig_stderr @@ -325,7 +326,7 @@ Error: unable to locate %s def test_hash_path(self): # Yes, these tests are deliberately very fragile. We want to make sure - # that if someones changes the results hash_path produces, they know it. + # that if someones changes the results hash_path produces, they know it self.assertEquals(utils.hash_path('a'), '1c84525acb02107ea475dcd3d09c2c58') self.assertEquals(utils.hash_path('a', 'c'), @@ -364,10 +365,12 @@ log_name = yarr''' result = utils.readconf('/tmp/test', 'section2').get('log_name') expected = 'yarr' self.assertEquals(result, expected) - result = utils.readconf('/tmp/test', 'section1', log_name='foo').get('log_name') + result = utils.readconf('/tmp/test', 'section1', + log_name='foo').get('log_name') expected = 'foo' self.assertEquals(result, expected) - result = utils.readconf('/tmp/test', 'section1', defaults={'bar': 'baz'}) + result = utils.readconf('/tmp/test', 'section1', + defaults={'bar': 'baz'}) expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'} self.assertEquals(result, expected) os.unlink('/tmp/test') @@ -452,34 +455,35 @@ log_name = yarr''' start = time.time() for i in range(100): running_time = utils.ratelimit_sleep(running_time, 0) - self.assertTrue(abs((time.time() - start)* 1000) < 1) + self.assertTrue(abs((time.time() - start) * 100) < 1) running_time = 0 start = time.time() for i in range(50): running_time = utils.ratelimit_sleep(running_time, 200) - # make sure its accurate to 2/100 of a second - self.assertTrue(abs(25 - (time.time() - start)* 100) < 2) + # make sure its accurate to 10th of a second + self.assertTrue(abs(25 - (time.time() - start) * 100) < 10) def test_ratelimit_sleep_with_sleep(self): running_time = 0 start = time.time() for i in range(25): running_time = utils.ratelimit_sleep(running_time, 50) - time.sleep(1.0/75) - # make sure its accurate to 2/100 of a second - self.assertTrue(abs(50 - (time.time() - start)* 100) < 2) + time.sleep(1.0 / 75) + # make sure its accurate to 10th of a second + self.assertTrue(abs(50 - (time.time() - start) * 100) < 10) def test_ratelimit_sleep_with_incr(self): running_time = 0 start = time.time() - vals = [5,17,0,3,11,30,40,4,13,2,-1] * 2 # adds up to 250 (with no -1) + vals = [5, 17, 0, 3, 11, 30, + 40, 4, 13, 2, -1] * 2 # adds up to 250 (with no -1) total = 0 for i in vals: running_time = utils.ratelimit_sleep(running_time, 500, incr_by=i) total += i - self.assertTrue(abs(50 - (time.time() - start)* 100) < 2) + self.assertTrue(abs(50 - (time.time() - start) * 100) < 10) if __name__ == '__main__': diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 90344420a7..e068af2114 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -23,9 +23,11 @@ from shutil import rmtree from hashlib import md5 from swift.obj import auditor from swift.obj.server import DiskFile, write_metadata -from swift.common.utils import hash_path, mkdirs, normalize_timestamp +from swift.common.utils import hash_path, mkdirs, normalize_timestamp, renamer +from swift.obj.replicator import invalidate_hash from swift.common.exceptions import AuditException + class TestAuditor(unittest.TestCase): def setUp(self): @@ -33,7 +35,7 @@ class TestAuditor(unittest.TestCase): self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS') if not self.path_to_test_xfs or \ not os.path.exists(self.path_to_test_xfs): - print >>sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ + print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ 'pointing to a valid directory.\n' \ 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \ 'system for testing.' @@ -60,8 +62,7 @@ class TestAuditor(unittest.TestCase): self.conf = dict( devices=self.devices, - mount_check='false', - timeout='300', stats_interval='1') + mount_check='false') def tearDown(self): rmtree(self.testdir, ignore_errors=1) @@ -132,9 +133,42 @@ class TestAuditor(unittest.TestCase): 'sda', cur_part) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) + def test_object_audit_no_meta(self): + self.auditor = auditor.ObjectAuditor( + self.conf) + cur_part = '0' + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + data = '0' * 1024 + etag = md5() + pre_quarantines = self.auditor.quarantines + with disk_file.mkstemp() as (fd, tmppath): + os.write(fd, data) + etag.update(data) + etag = etag.hexdigest() + timestamp = str(normalize_timestamp(time.time())) + os.fsync(fd) + invalidate_hash(os.path.dirname(disk_file.datadir)) + renamer(tmppath, os.path.join(disk_file.datadir, + timestamp + '.data')) + self.auditor.object_audit( + os.path.join(disk_file.datadir, timestamp + '.data'), + 'sda', cur_part) + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) + + def test_object_audit_bad_args(self): + self.auditor = auditor.ObjectAuditor( + self.conf) + pre_errors = self.auditor.errors + self.auditor.object_audit(5, 'sda', '0') + self.assertEquals(self.auditor.errors, pre_errors + 1) + pre_errors = self.auditor.errors + self.auditor.object_audit('badpath', 'sda', '0') + self.assertEquals(self.auditor.errors, pre_errors) # just returns + def test_object_run_once_pass(self): self.auditor = auditor.ObjectAuditor( self.conf) + self.auditor.log_time = 0 cur_part = '0' timestamp = str(normalize_timestamp(time.time())) pre_quarantines = self.auditor.quarantines @@ -155,7 +189,7 @@ class TestAuditor(unittest.TestCase): self.auditor.run_once() self.assertEquals(self.auditor.quarantines, pre_quarantines) - def test_object_run_once_multi_devices(self): + def test_object_run_once_no_sda(self): self.auditor = auditor.ObjectAuditor( self.conf) cur_part = '0' @@ -179,6 +213,45 @@ class TestAuditor(unittest.TestCase): self.auditor.run_once() self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) + def test_object_run_once_multi_devices(self): + self.auditor = auditor.ObjectAuditor( + self.conf) + cur_part = '0' + timestamp = str(normalize_timestamp(time.time())) + pre_quarantines = self.auditor.quarantines + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + data = '0' * 10 + etag = md5() + with disk_file.mkstemp() as (fd, tmppath): + os.write(fd, data) + etag.update(data) + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': timestamp, + 'Content-Length': str(os.fstat(fd).st_size), + } + disk_file.put(fd, tmppath, metadata) + disk_file.close() + self.auditor.run_once() + disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob') + data = '1' * 10 + etag = md5() + with disk_file.mkstemp() as (fd, tmppath): + os.write(fd, data) + etag.update(data) + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': timestamp, + 'Content-Length': str(os.fstat(fd).st_size), + } + disk_file.put(fd, tmppath, metadata) + disk_file.close() + os.write(fd, 'extra_data') + self.auditor.run_once() + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) + if __name__ == '__main__': unittest.main()