Merge "Fix reconstructor stats mssage."

This commit is contained in:
Jenkins 2015-07-28 00:22:32 +00:00 committed by Gerrit Code Review
commit f05eddeece
2 changed files with 112 additions and 17 deletions

View File

@ -337,22 +337,34 @@ class ObjectReconstructor(Daemon):
"""
Logs various stats for the currently running reconstruction pass.
"""
if self.reconstruction_count:
if (self.device_count and self.part_count and
self.reconstruction_device_count):
elapsed = (time.time() - self.start) or 0.000001
rate = self.reconstruction_count / elapsed
rate = self.reconstruction_part_count / elapsed
total_part_count = (self.part_count *
self.device_count /
self.reconstruction_device_count)
self.logger.info(
_("%(reconstructed)d/%(total)d (%(percentage).2f%%)"
" partitions reconstructed in %(time).2fs (%(rate).2f/sec, "
"%(remaining)s remaining)"),
{'reconstructed': self.reconstruction_count,
'total': self.job_count,
" partitions of %(device)d/%(dtotal)d "
"(%(dpercentage).2f%%) devices"
" reconstructed in %(time).2fs "
"(%(rate).2f/sec, %(remaining)s remaining)"),
{'reconstructed': self.reconstruction_part_count,
'total': self.part_count,
'percentage':
self.reconstruction_count * 100.0 / self.job_count,
self.reconstruction_part_count * 100.0 / self.part_count,
'device': self.reconstruction_device_count,
'dtotal': self.device_count,
'dpercentage':
self.reconstruction_device_count * 100.0 / self.device_count,
'time': time.time() - self.start, 'rate': rate,
'remaining': '%d%s' % compute_eta(self.start,
self.reconstruction_count,
self.job_count)})
if self.suffix_count:
'remaining': '%d%s' %
compute_eta(self.start,
self.reconstruction_part_count,
total_part_count)})
if self.suffix_count and self.partition_times:
self.logger.info(
_("%(checked)d suffixes checked - "
"%(hashed).2f%% hashed, %(synced).2f%% synced"),
@ -781,16 +793,22 @@ class ObjectReconstructor(Daemon):
self._diskfile_mgr = self._df_router[policy]
self.load_object_ring(policy)
data_dir = get_data_dir(policy)
local_devices = itertools.ifilter(
local_devices = list(itertools.ifilter(
lambda dev: dev and is_local_device(
ips, self.port,
dev['replication_ip'], dev['replication_port']),
policy.object_ring.devs)
policy.object_ring.devs))
if override_devices:
self.device_count = len(override_devices)
else:
self.device_count = len(local_devices)
for local_dev in local_devices:
if override_devices and (local_dev['device'] not in
override_devices):
continue
self.reconstruction_device_count += 1
dev_path = self._df_router[policy].get_dev_path(
local_dev['device'])
if not dev_path:
@ -814,6 +832,8 @@ class ObjectReconstructor(Daemon):
self.logger.exception(
'Unable to list partitions in %r' % obj_path)
continue
self.part_count += len(partitions)
for partition in partitions:
part_path = join(obj_path, partition)
if not (partition.isdigit() and
@ -821,6 +841,7 @@ class ObjectReconstructor(Daemon):
self.logger.warning(
'Unexpected entity in data dir: %r' % part_path)
remove_file(part_path)
self.reconstruction_part_count += 1
continue
partition = int(partition)
if override_partitions and (partition not in
@ -833,6 +854,7 @@ class ObjectReconstructor(Daemon):
'part_path': part_path,
}
yield part_info
self.reconstruction_part_count += 1
def build_reconstruction_jobs(self, part_info):
"""
@ -850,10 +872,14 @@ class ObjectReconstructor(Daemon):
def _reset_stats(self):
self.start = time.time()
self.job_count = 0
self.part_count = 0
self.device_count = 0
self.suffix_count = 0
self.suffix_sync = 0
self.suffix_hash = 0
self.reconstruction_count = 0
self.reconstruction_part_count = 0
self.reconstruction_device_count = 0
self.last_reconstruction_count = -1
def delete_partition(self, path):

View File

@ -24,7 +24,7 @@ import shutil
import re
import random
import struct
from eventlet import Timeout
from eventlet import Timeout, sleep
from contextlib import closing, nested, contextmanager
from gzip import GzipFile
@ -599,10 +599,74 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertFalse(jobs) # that should be all of them
check_jobs(part_num)
def test_run_once(self):
with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
def _run_once(self, http_count, extra_devices, override_devices=None):
ring_devs = list(self.policy.object_ring.devs)
for device, parts in extra_devices.items():
device_path = os.path.join(self.devices, device)
os.mkdir(device_path)
for part in range(parts):
os.makedirs(os.path.join(device_path, 'objects-1', str(part)))
# we update the ring to make is_local happy
devs = [dict(d) for d in ring_devs]
for d in devs:
d['device'] = device
self.policy.object_ring.devs.extend(devs)
self.reconstructor.stats_interval = 0
self.process_job = lambda j: sleep(0)
with mocked_http_conn(*[200] * http_count, body=pickle.dumps({})):
with mock_ssync_sender():
self.reconstructor.run_once()
self.reconstructor.run_once(devices=override_devices)
def test_run_once(self):
# sda1: 3 is done in setup
extra_devices = {
'sdb1': 4,
'sdc1': 1,
'sdd1': 0,
}
self._run_once(18, extra_devices)
stats_lines = set()
for line in self.logger.get_lines_for_level('info'):
if 'devices reconstructed in' not in line:
continue
stat_line = line.split('of', 1)[0].strip()
stats_lines.add(stat_line)
acceptable = set([
'0/3 (0.00%) partitions',
'8/8 (100.00%) partitions',
])
matched = stats_lines & acceptable
self.assertEqual(matched, acceptable,
'missing some expected acceptable:\n%s' % (
'\n'.join(sorted(acceptable - matched))))
self.assertEqual(self.reconstructor.reconstruction_device_count, 4)
self.assertEqual(self.reconstructor.reconstruction_part_count, 8)
self.assertEqual(self.reconstructor.part_count, 8)
def test_run_once_override_devices(self):
# sda1: 3 is done in setup
extra_devices = {
'sdb1': 4,
'sdc1': 1,
'sdd1': 0,
}
self._run_once(2, extra_devices, 'sdc1')
stats_lines = set()
for line in self.logger.get_lines_for_level('info'):
if 'devices reconstructed in' not in line:
continue
stat_line = line.split('of', 1)[0].strip()
stats_lines.add(stat_line)
acceptable = set([
'1/1 (100.00%) partitions',
])
matched = stats_lines & acceptable
self.assertEqual(matched, acceptable,
'missing some expected acceptable:\n%s' % (
'\n'.join(sorted(acceptable - matched))))
self.assertEqual(self.reconstructor.reconstruction_device_count, 1)
self.assertEqual(self.reconstructor.reconstruction_part_count, 1)
self.assertEqual(self.reconstructor.part_count, 1)
def test_get_response(self):
part = self.part_nums[0]
@ -621,6 +685,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_reconstructor_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the reconstructor
self.reconstructor._reset_stats()
rmtree(self.objects_1)
os.mkdir(self.objects_1)
@ -699,6 +764,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertEqual(expected_partners, sorted(got_partners))
def test_collect_parts(self):
self.reconstructor._reset_stats()
parts = []
for part_info in self.reconstructor.collect_parts():
parts.append(part_info['partition'])
@ -709,6 +775,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def blowup_mkdirs(path):
raise OSError('Ow!')
self.reconstructor._reset_stats()
with mock.patch.object(object_reconstructor, 'mkdirs', blowup_mkdirs):
rmtree(self.objects_1, ignore_errors=1)
parts = []
@ -734,6 +801,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# since our collect_parts job is a generator, that yields directly
# into build_jobs and then spawns it's safe to do the remove_files
# without making reconstructor startup slow
self.reconstructor._reset_stats()
for part_info in self.reconstructor.collect_parts():
self.assertNotEqual(pol_1_part_1_path, part_info['part_path'])
self.assertFalse(os.path.exists(pol_1_part_1_path))
@ -1033,6 +1101,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.reconstructor.job_count = 1
def tearDown(self):
self.reconstructor._reset_stats()
self.reconstructor.stats_line()
shutil.rmtree(self.testdir)