diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 6f6d6bda81..e73afe98c3 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -337,22 +337,34 @@ class ObjectReconstructor(Daemon): """ Logs various stats for the currently running reconstruction pass. """ - if self.reconstruction_count: + if (self.device_count and self.part_count and + self.reconstruction_device_count): elapsed = (time.time() - self.start) or 0.000001 - rate = self.reconstruction_count / elapsed + rate = self.reconstruction_part_count / elapsed + total_part_count = (self.part_count * + self.device_count / + self.reconstruction_device_count) self.logger.info( _("%(reconstructed)d/%(total)d (%(percentage).2f%%)" - " partitions reconstructed in %(time).2fs (%(rate).2f/sec, " - "%(remaining)s remaining)"), - {'reconstructed': self.reconstruction_count, - 'total': self.job_count, + " partitions of %(device)d/%(dtotal)d " + "(%(dpercentage).2f%%) devices" + " reconstructed in %(time).2fs " + "(%(rate).2f/sec, %(remaining)s remaining)"), + {'reconstructed': self.reconstruction_part_count, + 'total': self.part_count, 'percentage': - self.reconstruction_count * 100.0 / self.job_count, + self.reconstruction_part_count * 100.0 / self.part_count, + 'device': self.reconstruction_device_count, + 'dtotal': self.device_count, + 'dpercentage': + self.reconstruction_device_count * 100.0 / self.device_count, 'time': time.time() - self.start, 'rate': rate, - 'remaining': '%d%s' % compute_eta(self.start, - self.reconstruction_count, - self.job_count)}) - if self.suffix_count: + 'remaining': '%d%s' % + compute_eta(self.start, + self.reconstruction_part_count, + total_part_count)}) + + if self.suffix_count and self.partition_times: self.logger.info( _("%(checked)d suffixes checked - " "%(hashed).2f%% hashed, %(synced).2f%% synced"), @@ -781,16 +793,22 @@ class ObjectReconstructor(Daemon): self._diskfile_mgr = self._df_router[policy] self.load_object_ring(policy) data_dir = get_data_dir(policy) - local_devices = itertools.ifilter( + local_devices = list(itertools.ifilter( lambda dev: dev and is_local_device( ips, self.port, dev['replication_ip'], dev['replication_port']), - policy.object_ring.devs) + policy.object_ring.devs)) + + if override_devices: + self.device_count = len(override_devices) + else: + self.device_count = len(local_devices) for local_dev in local_devices: if override_devices and (local_dev['device'] not in override_devices): continue + self.reconstruction_device_count += 1 dev_path = self._df_router[policy].get_dev_path( local_dev['device']) if not dev_path: @@ -814,6 +832,8 @@ class ObjectReconstructor(Daemon): self.logger.exception( 'Unable to list partitions in %r' % obj_path) continue + + self.part_count += len(partitions) for partition in partitions: part_path = join(obj_path, partition) if not (partition.isdigit() and @@ -821,6 +841,7 @@ class ObjectReconstructor(Daemon): self.logger.warning( 'Unexpected entity in data dir: %r' % part_path) remove_file(part_path) + self.reconstruction_part_count += 1 continue partition = int(partition) if override_partitions and (partition not in @@ -833,6 +854,7 @@ class ObjectReconstructor(Daemon): 'part_path': part_path, } yield part_info + self.reconstruction_part_count += 1 def build_reconstruction_jobs(self, part_info): """ @@ -850,10 +872,14 @@ class ObjectReconstructor(Daemon): def _reset_stats(self): self.start = time.time() self.job_count = 0 + self.part_count = 0 + self.device_count = 0 self.suffix_count = 0 self.suffix_sync = 0 self.suffix_hash = 0 self.reconstruction_count = 0 + self.reconstruction_part_count = 0 + self.reconstruction_device_count = 0 self.last_reconstruction_count = -1 def delete_partition(self, path): diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index f753de00ce..d5c9fe9746 100755 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -24,7 +24,7 @@ import shutil import re import random import struct -from eventlet import Timeout +from eventlet import Timeout, sleep from contextlib import closing, nested, contextmanager from gzip import GzipFile @@ -599,10 +599,74 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertFalse(jobs) # that should be all of them check_jobs(part_num) - def test_run_once(self): - with mocked_http_conn(*[200] * 12, body=pickle.dumps({})): + def _run_once(self, http_count, extra_devices, override_devices=None): + ring_devs = list(self.policy.object_ring.devs) + for device, parts in extra_devices.items(): + device_path = os.path.join(self.devices, device) + os.mkdir(device_path) + for part in range(parts): + os.makedirs(os.path.join(device_path, 'objects-1', str(part))) + # we update the ring to make is_local happy + devs = [dict(d) for d in ring_devs] + for d in devs: + d['device'] = device + self.policy.object_ring.devs.extend(devs) + self.reconstructor.stats_interval = 0 + self.process_job = lambda j: sleep(0) + with mocked_http_conn(*[200] * http_count, body=pickle.dumps({})): with mock_ssync_sender(): - self.reconstructor.run_once() + self.reconstructor.run_once(devices=override_devices) + + def test_run_once(self): + # sda1: 3 is done in setup + extra_devices = { + 'sdb1': 4, + 'sdc1': 1, + 'sdd1': 0, + } + self._run_once(18, extra_devices) + stats_lines = set() + for line in self.logger.get_lines_for_level('info'): + if 'devices reconstructed in' not in line: + continue + stat_line = line.split('of', 1)[0].strip() + stats_lines.add(stat_line) + acceptable = set([ + '0/3 (0.00%) partitions', + '8/8 (100.00%) partitions', + ]) + matched = stats_lines & acceptable + self.assertEqual(matched, acceptable, + 'missing some expected acceptable:\n%s' % ( + '\n'.join(sorted(acceptable - matched)))) + self.assertEqual(self.reconstructor.reconstruction_device_count, 4) + self.assertEqual(self.reconstructor.reconstruction_part_count, 8) + self.assertEqual(self.reconstructor.part_count, 8) + + def test_run_once_override_devices(self): + # sda1: 3 is done in setup + extra_devices = { + 'sdb1': 4, + 'sdc1': 1, + 'sdd1': 0, + } + self._run_once(2, extra_devices, 'sdc1') + stats_lines = set() + for line in self.logger.get_lines_for_level('info'): + if 'devices reconstructed in' not in line: + continue + stat_line = line.split('of', 1)[0].strip() + stats_lines.add(stat_line) + acceptable = set([ + '1/1 (100.00%) partitions', + ]) + matched = stats_lines & acceptable + self.assertEqual(matched, acceptable, + 'missing some expected acceptable:\n%s' % ( + '\n'.join(sorted(acceptable - matched)))) + self.assertEqual(self.reconstructor.reconstruction_device_count, 1) + self.assertEqual(self.reconstructor.reconstruction_part_count, 1) + self.assertEqual(self.reconstructor.part_count, 1) def test_get_response(self): part = self.part_nums[0] @@ -621,6 +685,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_reconstructor_skips_bogus_partition_dirs(self): # A directory in the wrong place shouldn't crash the reconstructor + self.reconstructor._reset_stats() rmtree(self.objects_1) os.mkdir(self.objects_1) @@ -699,6 +764,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(expected_partners, sorted(got_partners)) def test_collect_parts(self): + self.reconstructor._reset_stats() parts = [] for part_info in self.reconstructor.collect_parts(): parts.append(part_info['partition']) @@ -709,6 +775,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def blowup_mkdirs(path): raise OSError('Ow!') + self.reconstructor._reset_stats() with mock.patch.object(object_reconstructor, 'mkdirs', blowup_mkdirs): rmtree(self.objects_1, ignore_errors=1) parts = [] @@ -734,6 +801,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # since our collect_parts job is a generator, that yields directly # into build_jobs and then spawns it's safe to do the remove_files # without making reconstructor startup slow + self.reconstructor._reset_stats() for part_info in self.reconstructor.collect_parts(): self.assertNotEqual(pol_1_part_1_path, part_info['part_path']) self.assertFalse(os.path.exists(pol_1_part_1_path)) @@ -1033,6 +1101,7 @@ class TestObjectReconstructor(unittest.TestCase): self.reconstructor.job_count = 1 def tearDown(self): + self.reconstructor._reset_stats() self.reconstructor.stats_line() shutil.rmtree(self.testdir)