Merge "Make dict deletion idempotent in dump_recon_cache"

This commit is contained in:
Jenkins 2017-07-18 03:28:16 +00:00 committed by Gerrit Code Review
commit 3c11f6b8a8
3 changed files with 178 additions and 18 deletions

View File

@ -3064,18 +3064,27 @@ def human_readable(value):
def put_recon_cache_entry(cache_entry, key, item): def put_recon_cache_entry(cache_entry, key, item):
""" """
Function that will check if item is a dict, and if so put it under Update a recon cache entry item.
cache_entry[key]. We use nested recon cache entries when the object
auditor runs in parallel or else in 'once' mode with a specified If ``item`` is an empty dict then any existing ``key`` in ``cache_entry``
subset of devices. will be deleted. Similarly if ``item`` is a dict and any of its values are
empty dicts then the corrsponsing key will be deleted from the nested dict
in ``cache_entry``.
We use nested recon cache entries when the object auditor
runs in parallel or else in 'once' mode with a specified subset of devices.
:param cache_entry: a dict of existing cache entries
:param key: key for item to update
:param item: value for item to update
""" """
if isinstance(item, dict): if isinstance(item, dict):
if not item:
cache_entry.pop(key, None)
return
if key not in cache_entry or key in cache_entry and not \ if key not in cache_entry or key in cache_entry and not \
isinstance(cache_entry[key], dict): isinstance(cache_entry[key], dict):
cache_entry[key] = {} cache_entry[key] = {}
elif key in cache_entry and item == {}:
cache_entry.pop(key, None)
return
for k, v in item.items(): for k, v in item.items():
if v == {}: if v == {}:
cache_entry[key].pop(k, None) cache_entry[key].pop(k, None)

View File

@ -1361,21 +1361,53 @@ class TestUtils(unittest.TestCase):
testcache_file = os.path.join(testdir_base, 'cache.recon') testcache_file = os.path.join(testdir_base, 'cache.recon')
logger = utils.get_logger(None, 'server', log_route='server') logger = utils.get_logger(None, 'server', log_route='server')
try: try:
submit_dict = {'key1': {'value1': 1, 'value2': 2}} submit_dict = {'key0': 99,
'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger) utils.dump_recon_cache(submit_dict, testcache_file, logger)
fd = open(testcache_file) with open(testcache_file) as fd:
file_dict = json.loads(fd.readline()) file_dict = json.loads(fd.readline())
fd.close()
self.assertEqual(submit_dict, file_dict) self.assertEqual(submit_dict, file_dict)
# Use a nested entry # Use a nested entry
submit_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}}} submit_dict = {'key0': 101,
result_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}, 'key1': {'key2': {'value1': 1, 'value2': 2}}}
expect_dict = {'key0': 101,
'key1': {'key2': {'value1': 1, 'value2': 2},
'value1': 1, 'value2': 2}} 'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger) utils.dump_recon_cache(submit_dict, testcache_file, logger)
fd = open(testcache_file) with open(testcache_file) as fd:
file_dict = json.loads(fd.readline()) file_dict = json.loads(fd.readline())
fd.close() self.assertEqual(expect_dict, file_dict)
self.assertEqual(result_dict, file_dict) # cached entries are sticky
submit_dict = {}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# nested dicts can be erased...
submit_dict = {'key1': {'key2': {}}}
expect_dict = {'key0': 101,
'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# ... and erasure is idempotent
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# top level dicts can be erased...
submit_dict = {'key1': {}}
expect_dict = {'key0': 101}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# ... and erasure is idempotent
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
finally: finally:
rmtree(testdir_base) rmtree(testdir_base)

View File

@ -12,6 +12,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
from test import unit from test import unit
import unittest import unittest
@ -616,6 +617,77 @@ class TestAuditor(unittest.TestCase):
self.assertGreater(len(log_lines), 0) self.assertGreater(len(log_lines), 0)
self.assertTrue(log_lines[0].index('ZBF - sda')) self.assertTrue(log_lines[0].index('ZBF - sda'))
def test_object_run_recon_cache(self):
ts = Timestamp(time.time())
data = 'test_data'
with self.disk_file.create() as writer:
writer.write(data)
metadata = {
'ETag': md5(data).hexdigest(),
'X-Timestamp': ts.normal,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(ts)
# all devices
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects()
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected = {'object_auditor_stats_ALL':
{'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 9}}
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
self.assertEqual(expected, actual_rcache)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects()
self.assertEqual(expected, actual_rcache)
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected.update({
'object_auditor_stats_ZBF':
{'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 0}})
self.assertEqual(expected, actual_rcache)
# specific devices
os.unlink(self.rcache)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=['sda'])
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected = {'object_auditor_stats_ALL':
{'sda': {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 9}}}
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
self.assertEqual(expected, actual_rcache)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=['sda'])
self.assertEqual(expected, actual_rcache)
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected.update({
'object_auditor_stats_ZBF':
{'sda': {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 0}}})
self.assertEqual(expected, actual_rcache)
def test_object_run_once_no_sda(self): def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger, auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices) self.rcache, self.devices)
@ -1237,6 +1309,53 @@ class TestAuditor(unittest.TestCase):
self.assertEqual(sorted(forked_pids), [2, 1001]) self.assertEqual(sorted(forked_pids), [2, 1001])
def test_run_audit_once_zbfps(self):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89,
concurrency=1,
recon_cache_path=self.testdir))
with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once(zero_byte_fps=50)
with open(self.rcache) as fd:
# there's no objects to audit so expect no stats; this assertion
# may change if https://bugs.launchpad.net/swift/+bug/1704858 is
# fixed
self.assertEqual({}, json.load(fd))
# check recon cache stays clean after a second run
with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once(zero_byte_fps=50)
with open(self.rcache) as fd:
self.assertEqual({}, json.load(fd))
ts = Timestamp(time.time())
with self.disk_file.create() as writer:
metadata = {
'ETag': md5('').hexdigest(),
'X-Timestamp': ts.normal,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(ts)
# check recon cache stays clean after a second run
with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once(zero_byte_fps=50)
with open(self.rcache) as fd:
self.assertEqual({
'object_auditor_stats_ZBF': {
'audit_time': 0,
'bytes_processed': 0,
'errors': 0,
'passes': 1,
'quarantined': 0,
'start_time': mock.ANY}},
json.load(fd))
def test_run_parallel_audit_once(self): def test_run_parallel_audit_once(self):
my_auditor = auditor.ObjectAuditor( my_auditor = auditor.ObjectAuditor(
dict(devices=self.devices, mount_check='false', dict(devices=self.devices, mount_check='false',