adding defaults, docs, and unit tests
This commit is contained in:
parent
7bd0184bfe
commit
8dee94fd7c
@ -229,7 +229,12 @@ Option Default Description
|
|||||||
log_name object-auditor Label used when logging
|
log_name object-auditor Label used when logging
|
||||||
log_facility LOG_LOCAL0 Syslog log facility
|
log_facility LOG_LOCAL0 Syslog log facility
|
||||||
log_level INFO Logging level
|
log_level INFO Logging level
|
||||||
interval 1800 Minimum time for a pass to take
|
files_per_second 20 Maximum files audited per second. Should
|
||||||
|
be tuned according to individual system
|
||||||
|
specs. 0 is unlimited.
|
||||||
|
bytes_per_second 10000000 Maximum bytes audited per second. Should
|
||||||
|
be tuned according to individual system
|
||||||
|
specs. 0 is unlimited.
|
||||||
================== ============== ==========================================
|
================== ============== ==========================================
|
||||||
|
|
||||||
------------------------------
|
------------------------------
|
||||||
|
@ -55,5 +55,5 @@ use = egg:swift#object
|
|||||||
|
|
||||||
[object-auditor]
|
[object-auditor]
|
||||||
# log_name = object-auditor
|
# log_name = object-auditor
|
||||||
# Will audit, at most, 1 object per device per interval
|
# files_per_second = 20
|
||||||
# interval = 1800
|
# bytes_per_second = 10000000
|
||||||
|
@ -726,8 +726,11 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
|||||||
on devices
|
on devices
|
||||||
:param logger: a logger object
|
:param logger: a logger object
|
||||||
'''
|
'''
|
||||||
for device in os.listdir(devices):
|
device_dir = os.listdir(devices)
|
||||||
if mount_check and not\
|
# randomize devices in case of process restart before sweep completed
|
||||||
|
shuffle(device_dir)
|
||||||
|
for device in device_dir:
|
||||||
|
if mount_check and not \
|
||||||
os.path.ismount(os.path.join(devices, device)):
|
os.path.ismount(os.path.join(devices, device)):
|
||||||
if logger:
|
if logger:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
@ -35,14 +35,18 @@ class ObjectAuditor(Daemon):
|
|||||||
self.devices = conf.get('devices', '/srv/node')
|
self.devices = conf.get('devices', '/srv/node')
|
||||||
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
||||||
('true', 't', '1', 'on', 'yes', 'y')
|
('true', 't', '1', 'on', 'yes', 'y')
|
||||||
self.max_files_per_second = float(conf.get('files_per_second', 0))
|
self.max_files_per_second = float(conf.get('files_per_second', 20))
|
||||||
self.max_bytes_per_second = float(conf.get('bytes_per_second', 0))
|
self.max_bytes_per_second = float(conf.get('bytes_per_second',
|
||||||
|
10000000))
|
||||||
self.files_running_time = 0
|
self.files_running_time = 0
|
||||||
self.bytes_running_time = 0
|
self.bytes_running_time = 0
|
||||||
self.bytes_processed = 0
|
self.bytes_processed = 0
|
||||||
|
self.total_bytes_processed = 0
|
||||||
|
self.total_files_processed = 0
|
||||||
self.passes = 0
|
self.passes = 0
|
||||||
self.quarantines = 0
|
self.quarantines = 0
|
||||||
self.errors = 0
|
self.errors = 0
|
||||||
|
self.log_time = 3600 # once an hour
|
||||||
|
|
||||||
def run_forever(self):
|
def run_forever(self):
|
||||||
"""Run the object audit until stopped."""
|
"""Run the object audit until stopped."""
|
||||||
@ -62,7 +66,8 @@ class ObjectAuditor(Daemon):
|
|||||||
self.object_audit(path, device, partition)
|
self.object_audit(path, device, partition)
|
||||||
self.files_running_time = ratelimit_sleep(
|
self.files_running_time = ratelimit_sleep(
|
||||||
self.files_running_time, self.max_files_per_second)
|
self.files_running_time, self.max_files_per_second)
|
||||||
if time.time() - reported >= 3600: # once an hour
|
self.total_files_processed += 1
|
||||||
|
if time.time() - reported >= self.log_time:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Since %s: Locally: %d passed audit, %d quarantined, '
|
'Since %s: Locally: %d passed audit, %d quarantined, '
|
||||||
'%d errors files/sec: %.2f , bytes/sec: %.2f' % (
|
'%d errors files/sec: %.2f , bytes/sec: %.2f' % (
|
||||||
@ -77,7 +82,11 @@ class ObjectAuditor(Daemon):
|
|||||||
self.bytes_processed = 0
|
self.bytes_processed = 0
|
||||||
elapsed = time.time() - begin
|
elapsed = time.time() - begin
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Object audit "%s" mode completed: %.02fs' % (mode, elapsed))
|
'Object audit "%s" mode completed: %.02fs. '
|
||||||
|
'Total bytes/sec: %.2f , Total files/sec: %.2f ' % (
|
||||||
|
mode, elapsed,
|
||||||
|
self.total_bytes_processed / elapsed,
|
||||||
|
self.total_files_processed / elapsed))
|
||||||
|
|
||||||
def object_audit(self, path, device, partition):
|
def object_audit(self, path, device, partition):
|
||||||
"""
|
"""
|
||||||
@ -114,6 +123,7 @@ class ObjectAuditor(Daemon):
|
|||||||
incr_by=len(chunk))
|
incr_by=len(chunk))
|
||||||
etag.update(chunk)
|
etag.update(chunk)
|
||||||
self.bytes_processed += len(chunk)
|
self.bytes_processed += len(chunk)
|
||||||
|
self.total_bytes_processed += len(chunk)
|
||||||
etag = etag.hexdigest()
|
etag = etag.hexdigest()
|
||||||
if etag != df.metadata['ETag']:
|
if etag != df.metadata['ETag']:
|
||||||
raise AuditException("ETag of %s does not match file's md5 of "
|
raise AuditException("ETag of %s does not match file's md5 of "
|
||||||
|
@ -219,7 +219,7 @@ class DiskFile(object):
|
|||||||
|
|
||||||
:params fd: file descriptor of the temp file
|
:params fd: file descriptor of the temp file
|
||||||
:param tmppath: path to the temporary file being used
|
:param tmppath: path to the temporary file being used
|
||||||
:param metadata: dictionary of metada to be written
|
:param metadata: dictionary of metadata to be written
|
||||||
:param extention: extension to be used when making the file
|
:param extention: extension to be used when making the file
|
||||||
"""
|
"""
|
||||||
metadata['name'] = self.name
|
metadata['name'] = self.name
|
||||||
|
@ -35,6 +35,7 @@ from swift.common import utils
|
|||||||
|
|
||||||
|
|
||||||
class MockOs():
|
class MockOs():
|
||||||
|
|
||||||
def __init__(self, pass_funcs=[], called_funcs=[], raise_funcs=[]):
|
def __init__(self, pass_funcs=[], called_funcs=[], raise_funcs=[]):
|
||||||
self.closed_fds = []
|
self.closed_fds = []
|
||||||
for func in pass_funcs:
|
for func in pass_funcs:
|
||||||
@ -184,12 +185,12 @@ class TestUtils(unittest.TestCase):
|
|||||||
print 'test2'
|
print 'test2'
|
||||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\n')
|
self.assertEquals(sio.getvalue(), 'STDOUT: test2\n')
|
||||||
sys.stderr = lfo
|
sys.stderr = lfo
|
||||||
print >>sys.stderr, 'test4'
|
print >> sys.stderr, 'test4'
|
||||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n')
|
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n')
|
||||||
sys.stdout = orig_stdout
|
sys.stdout = orig_stdout
|
||||||
print 'test5'
|
print 'test5'
|
||||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n')
|
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n')
|
||||||
print >>sys.stderr, 'test6'
|
print >> sys.stderr, 'test6'
|
||||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
|
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
|
||||||
'STDOUT: test6\n')
|
'STDOUT: test6\n')
|
||||||
sys.stderr = orig_stderr
|
sys.stderr = orig_stderr
|
||||||
@ -325,7 +326,7 @@ Error: unable to locate %s
|
|||||||
|
|
||||||
def test_hash_path(self):
|
def test_hash_path(self):
|
||||||
# Yes, these tests are deliberately very fragile. We want to make sure
|
# Yes, these tests are deliberately very fragile. We want to make sure
|
||||||
# that if someones changes the results hash_path produces, they know it.
|
# that if someones changes the results hash_path produces, they know it
|
||||||
self.assertEquals(utils.hash_path('a'),
|
self.assertEquals(utils.hash_path('a'),
|
||||||
'1c84525acb02107ea475dcd3d09c2c58')
|
'1c84525acb02107ea475dcd3d09c2c58')
|
||||||
self.assertEquals(utils.hash_path('a', 'c'),
|
self.assertEquals(utils.hash_path('a', 'c'),
|
||||||
@ -364,10 +365,12 @@ log_name = yarr'''
|
|||||||
result = utils.readconf('/tmp/test', 'section2').get('log_name')
|
result = utils.readconf('/tmp/test', 'section2').get('log_name')
|
||||||
expected = 'yarr'
|
expected = 'yarr'
|
||||||
self.assertEquals(result, expected)
|
self.assertEquals(result, expected)
|
||||||
result = utils.readconf('/tmp/test', 'section1', log_name='foo').get('log_name')
|
result = utils.readconf('/tmp/test', 'section1',
|
||||||
|
log_name='foo').get('log_name')
|
||||||
expected = 'foo'
|
expected = 'foo'
|
||||||
self.assertEquals(result, expected)
|
self.assertEquals(result, expected)
|
||||||
result = utils.readconf('/tmp/test', 'section1', defaults={'bar': 'baz'})
|
result = utils.readconf('/tmp/test', 'section1',
|
||||||
|
defaults={'bar': 'baz'})
|
||||||
expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'}
|
expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'}
|
||||||
self.assertEquals(result, expected)
|
self.assertEquals(result, expected)
|
||||||
os.unlink('/tmp/test')
|
os.unlink('/tmp/test')
|
||||||
@ -452,34 +455,35 @@ log_name = yarr'''
|
|||||||
start = time.time()
|
start = time.time()
|
||||||
for i in range(100):
|
for i in range(100):
|
||||||
running_time = utils.ratelimit_sleep(running_time, 0)
|
running_time = utils.ratelimit_sleep(running_time, 0)
|
||||||
self.assertTrue(abs((time.time() - start)* 1000) < 1)
|
self.assertTrue(abs((time.time() - start) * 100) < 1)
|
||||||
|
|
||||||
running_time = 0
|
running_time = 0
|
||||||
start = time.time()
|
start = time.time()
|
||||||
for i in range(50):
|
for i in range(50):
|
||||||
running_time = utils.ratelimit_sleep(running_time, 200)
|
running_time = utils.ratelimit_sleep(running_time, 200)
|
||||||
# make sure its accurate to 2/100 of a second
|
# make sure its accurate to 10th of a second
|
||||||
self.assertTrue(abs(25 - (time.time() - start)* 100) < 2)
|
self.assertTrue(abs(25 - (time.time() - start) * 100) < 10)
|
||||||
|
|
||||||
def test_ratelimit_sleep_with_sleep(self):
|
def test_ratelimit_sleep_with_sleep(self):
|
||||||
running_time = 0
|
running_time = 0
|
||||||
start = time.time()
|
start = time.time()
|
||||||
for i in range(25):
|
for i in range(25):
|
||||||
running_time = utils.ratelimit_sleep(running_time, 50)
|
running_time = utils.ratelimit_sleep(running_time, 50)
|
||||||
time.sleep(1.0/75)
|
time.sleep(1.0 / 75)
|
||||||
# make sure its accurate to 2/100 of a second
|
# make sure its accurate to 10th of a second
|
||||||
self.assertTrue(abs(50 - (time.time() - start)* 100) < 2)
|
self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
|
||||||
|
|
||||||
def test_ratelimit_sleep_with_incr(self):
|
def test_ratelimit_sleep_with_incr(self):
|
||||||
running_time = 0
|
running_time = 0
|
||||||
start = time.time()
|
start = time.time()
|
||||||
vals = [5,17,0,3,11,30,40,4,13,2,-1] * 2 # adds up to 250 (with no -1)
|
vals = [5, 17, 0, 3, 11, 30,
|
||||||
|
40, 4, 13, 2, -1] * 2 # adds up to 250 (with no -1)
|
||||||
total = 0
|
total = 0
|
||||||
for i in vals:
|
for i in vals:
|
||||||
running_time = utils.ratelimit_sleep(running_time,
|
running_time = utils.ratelimit_sleep(running_time,
|
||||||
500, incr_by=i)
|
500, incr_by=i)
|
||||||
total += i
|
total += i
|
||||||
self.assertTrue(abs(50 - (time.time() - start)* 100) < 2)
|
self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -23,9 +23,11 @@ from shutil import rmtree
|
|||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
from swift.obj import auditor
|
from swift.obj import auditor
|
||||||
from swift.obj.server import DiskFile, write_metadata
|
from swift.obj.server import DiskFile, write_metadata
|
||||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, renamer
|
||||||
|
from swift.obj.replicator import invalidate_hash
|
||||||
from swift.common.exceptions import AuditException
|
from swift.common.exceptions import AuditException
|
||||||
|
|
||||||
|
|
||||||
class TestAuditor(unittest.TestCase):
|
class TestAuditor(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -33,7 +35,7 @@ class TestAuditor(unittest.TestCase):
|
|||||||
self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
|
self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
|
||||||
if not self.path_to_test_xfs or \
|
if not self.path_to_test_xfs or \
|
||||||
not os.path.exists(self.path_to_test_xfs):
|
not os.path.exists(self.path_to_test_xfs):
|
||||||
print >>sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
|
print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
|
||||||
'pointing to a valid directory.\n' \
|
'pointing to a valid directory.\n' \
|
||||||
'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
|
'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
|
||||||
'system for testing.'
|
'system for testing.'
|
||||||
@ -60,8 +62,7 @@ class TestAuditor(unittest.TestCase):
|
|||||||
|
|
||||||
self.conf = dict(
|
self.conf = dict(
|
||||||
devices=self.devices,
|
devices=self.devices,
|
||||||
mount_check='false',
|
mount_check='false')
|
||||||
timeout='300', stats_interval='1')
|
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
rmtree(self.testdir, ignore_errors=1)
|
rmtree(self.testdir, ignore_errors=1)
|
||||||
@ -132,9 +133,42 @@ class TestAuditor(unittest.TestCase):
|
|||||||
'sda', cur_part)
|
'sda', cur_part)
|
||||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||||
|
|
||||||
|
def test_object_audit_no_meta(self):
|
||||||
|
self.auditor = auditor.ObjectAuditor(
|
||||||
|
self.conf)
|
||||||
|
cur_part = '0'
|
||||||
|
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
||||||
|
data = '0' * 1024
|
||||||
|
etag = md5()
|
||||||
|
pre_quarantines = self.auditor.quarantines
|
||||||
|
with disk_file.mkstemp() as (fd, tmppath):
|
||||||
|
os.write(fd, data)
|
||||||
|
etag.update(data)
|
||||||
|
etag = etag.hexdigest()
|
||||||
|
timestamp = str(normalize_timestamp(time.time()))
|
||||||
|
os.fsync(fd)
|
||||||
|
invalidate_hash(os.path.dirname(disk_file.datadir))
|
||||||
|
renamer(tmppath, os.path.join(disk_file.datadir,
|
||||||
|
timestamp + '.data'))
|
||||||
|
self.auditor.object_audit(
|
||||||
|
os.path.join(disk_file.datadir, timestamp + '.data'),
|
||||||
|
'sda', cur_part)
|
||||||
|
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||||
|
|
||||||
|
def test_object_audit_bad_args(self):
|
||||||
|
self.auditor = auditor.ObjectAuditor(
|
||||||
|
self.conf)
|
||||||
|
pre_errors = self.auditor.errors
|
||||||
|
self.auditor.object_audit(5, 'sda', '0')
|
||||||
|
self.assertEquals(self.auditor.errors, pre_errors + 1)
|
||||||
|
pre_errors = self.auditor.errors
|
||||||
|
self.auditor.object_audit('badpath', 'sda', '0')
|
||||||
|
self.assertEquals(self.auditor.errors, pre_errors) # just returns
|
||||||
|
|
||||||
def test_object_run_once_pass(self):
|
def test_object_run_once_pass(self):
|
||||||
self.auditor = auditor.ObjectAuditor(
|
self.auditor = auditor.ObjectAuditor(
|
||||||
self.conf)
|
self.conf)
|
||||||
|
self.auditor.log_time = 0
|
||||||
cur_part = '0'
|
cur_part = '0'
|
||||||
timestamp = str(normalize_timestamp(time.time()))
|
timestamp = str(normalize_timestamp(time.time()))
|
||||||
pre_quarantines = self.auditor.quarantines
|
pre_quarantines = self.auditor.quarantines
|
||||||
@ -155,7 +189,7 @@ class TestAuditor(unittest.TestCase):
|
|||||||
self.auditor.run_once()
|
self.auditor.run_once()
|
||||||
self.assertEquals(self.auditor.quarantines, pre_quarantines)
|
self.assertEquals(self.auditor.quarantines, pre_quarantines)
|
||||||
|
|
||||||
def test_object_run_once_multi_devices(self):
|
def test_object_run_once_no_sda(self):
|
||||||
self.auditor = auditor.ObjectAuditor(
|
self.auditor = auditor.ObjectAuditor(
|
||||||
self.conf)
|
self.conf)
|
||||||
cur_part = '0'
|
cur_part = '0'
|
||||||
@ -179,6 +213,45 @@ class TestAuditor(unittest.TestCase):
|
|||||||
self.auditor.run_once()
|
self.auditor.run_once()
|
||||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||||
|
|
||||||
|
def test_object_run_once_multi_devices(self):
|
||||||
|
self.auditor = auditor.ObjectAuditor(
|
||||||
|
self.conf)
|
||||||
|
cur_part = '0'
|
||||||
|
timestamp = str(normalize_timestamp(time.time()))
|
||||||
|
pre_quarantines = self.auditor.quarantines
|
||||||
|
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
||||||
|
data = '0' * 10
|
||||||
|
etag = md5()
|
||||||
|
with disk_file.mkstemp() as (fd, tmppath):
|
||||||
|
os.write(fd, data)
|
||||||
|
etag.update(data)
|
||||||
|
etag = etag.hexdigest()
|
||||||
|
metadata = {
|
||||||
|
'ETag': etag,
|
||||||
|
'X-Timestamp': timestamp,
|
||||||
|
'Content-Length': str(os.fstat(fd).st_size),
|
||||||
|
}
|
||||||
|
disk_file.put(fd, tmppath, metadata)
|
||||||
|
disk_file.close()
|
||||||
|
self.auditor.run_once()
|
||||||
|
disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob')
|
||||||
|
data = '1' * 10
|
||||||
|
etag = md5()
|
||||||
|
with disk_file.mkstemp() as (fd, tmppath):
|
||||||
|
os.write(fd, data)
|
||||||
|
etag.update(data)
|
||||||
|
etag = etag.hexdigest()
|
||||||
|
metadata = {
|
||||||
|
'ETag': etag,
|
||||||
|
'X-Timestamp': timestamp,
|
||||||
|
'Content-Length': str(os.fstat(fd).st_size),
|
||||||
|
}
|
||||||
|
disk_file.put(fd, tmppath, metadata)
|
||||||
|
disk_file.close()
|
||||||
|
os.write(fd, 'extra_data')
|
||||||
|
self.auditor.run_once()
|
||||||
|
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user