diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 8e2564e8a8..ed744f7f23 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -414,6 +414,19 @@ stats_interval 3600 Interval in seconds between logging replication statistics reclaim_age 604800 Time elapsed in seconds before an object can be reclaimed +handoffs_first false If set to True, partitions that are + not supposed to be on the node will be + replicated first. The default setting + should not be changed, except for + extreme situations. +handoff_delete auto By default handoff partitions will be + removed when it has successfully + replicated to all the cannonical nodes. + If set to an integer n, it will remove + the partition if it is successfully + replicated to n nodes. The default + setting should not be changed, except + for extremem situations. ================== ================= ======================================= [object-updater] diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 61c3ddc74d..e7639d35e7 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -31,7 +31,7 @@ from swift.common.ring import Ring from swift.common.utils import whataremyips, unlink_older_than, \ compute_eta, get_logger, dump_recon_cache, \ rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \ - tpool_reraise + tpool_reraise, config_auto_int_value from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE @@ -83,6 +83,10 @@ class ObjectReplicator(Daemon): 'user-agent': 'obj-replicator %s' % os.getpid()} self.rsync_error_log_line_length = \ int(conf.get('rsync_error_log_line_length', 0)) + self.handoffs_first = config_true_value(conf.get('handoffs_first', + False)) + self.handoff_delete = config_auto_int_value( + conf.get('handoff_delete', 'auto'), 0) def _rsync(self, args): """ @@ -212,8 +216,15 @@ class ObjectReplicator(Daemon): '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() responses.append(success) - if not suffixes or (len(responses) == - len(job['nodes']) and all(responses)): + if self.handoff_delete: + # delete handoff if we have had handoff_delete successes + delete_handoff = len([resp for resp in responses if resp]) >= \ + self.handoff_delete + else: + # delete handoff if all syncs were successful + delete_handoff = len(responses) == len(job['nodes']) and \ + all(responses) + if not suffixes or delete_handoff: self.logger.info(_("Removing partition: %s"), job['path']) tpool.execute(shutil.rmtree, job['path'], ignore_errors=True) except (Exception, Timeout): @@ -412,6 +423,9 @@ class ObjectReplicator(Daemon): except (ValueError, OSError): continue random.shuffle(jobs) + if self.handoffs_first: + # Move the handoff parts to the front of the list + jobs.sort(key=lambda job: not job['delete']) self.job_count = len(jobs) return jobs diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 7d56c47dc7..c8805494f5 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -236,7 +236,7 @@ class TestObjectReplicator(unittest.TestCase): for job in jobs: jobs_by_part[job['partition']] = job self.assertEquals(len(jobs_to_delete), 1) - self.assertTrue('1', jobs_to_delete[0]['partition']) + self.assertEquals('1', jobs_to_delete[0]['partition']) self.assertEquals( [node['id'] for node in jobs_by_part['0']['nodes']], [1, 2]) self.assertEquals( @@ -251,6 +251,12 @@ class TestObjectReplicator(unittest.TestCase): self.assertEquals(jobs_by_part[part]['path'], os.path.join(self.objects, part)) + def test_collect_jobs_handoffs_first(self): + self.replicator.handoffs_first = True + jobs = self.replicator.collect_jobs() + self.assertTrue(jobs[0]['delete']) + self.assertEquals('1', jobs[0]['partition']) + def test_collect_jobs_removes_zbf(self): """ After running xfs_repair, a partition directory could become a @@ -292,13 +298,137 @@ class TestObjectReplicator(unittest.TestCase): with mock.patch('swift.obj.replicator.http_connect', mock_http_connect(200)): df = diskfile.DiskFile(self.devices, - 'sda', '0', 'a', 'c', 'o', FakeLogger()) + 'sda', '1', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) + print df.datadir + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '1', data_dir) part_path = os.path.join(self.objects, '1') self.assertTrue(os.access(part_path, os.F_OK)) - self.replicator.replicate() + nodes = [node for node in + self.ring.get_part_nodes(1) + if node['ip'] not in _ips()] + process_arg_checker = [] + for node in nodes: + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1) + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) + with _mock_process(process_arg_checker): + self.replicator.replicate() self.assertFalse(os.access(part_path, os.F_OK)) + def test_delete_partition_with_failures(self): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + df = diskfile.DiskFile(self.devices, + 'sda', '1', 'a', 'c', 'o', FakeLogger()) + mkdirs(df.datadir) + print df.datadir + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '1', data_dir) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + nodes = [node for node in + self.ring.get_part_nodes(1) + if node['ip'] not in _ips()] + process_arg_checker = [] + for i, node in enumerate(nodes): + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1) + if i == 0: + # force one of the rsync calls to fail + ret_code = 1 + else: + ret_code = 0 + process_arg_checker.append( + (ret_code, '', ['rsync', whole_path_from, rsync_mod])) + with _mock_process(process_arg_checker): + self.replicator.replicate() + # The path should still exist + self.assertTrue(os.access(part_path, os.F_OK)) + + def test_delete_partition_with_handoff_delete(self): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + self.replicator.handoff_delete = 2 + df = diskfile.DiskFile(self.devices, + 'sda', '1', 'a', 'c', 'o', FakeLogger()) + mkdirs(df.datadir) + print df.datadir + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '1', data_dir) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + nodes = [node for node in + self.ring.get_part_nodes(1) + if node['ip'] not in _ips()] + process_arg_checker = [] + for i, node in enumerate(nodes): + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1) + if i == 0: + # force one of the rsync calls to fail + ret_code = 1 + else: + ret_code = 0 + process_arg_checker.append( + (ret_code, '', ['rsync', whole_path_from, rsync_mod])) + with _mock_process(process_arg_checker): + self.replicator.replicate() + self.assertFalse(os.access(part_path, os.F_OK)) + + def test_delete_partition_with_handoff_delete_failures(self): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + self.replicator.handoff_delete = 2 + df = diskfile.DiskFile(self.devices, + 'sda', '1', 'a', 'c', 'o', FakeLogger()) + mkdirs(df.datadir) + print df.datadir + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '1', data_dir) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + nodes = [node for node in + self.ring.get_part_nodes(1) + if node['ip'] not in _ips()] + process_arg_checker = [] + for i, node in enumerate(nodes): + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1) + if i in (0, 1): + # force two of the rsync calls to fail + ret_code = 1 + else: + ret_code = 0 + process_arg_checker.append( + (ret_code, '', ['rsync', whole_path_from, rsync_mod])) + with _mock_process(process_arg_checker): + self.replicator.replicate() + # The file should still exist + self.assertTrue(os.access(part_path, os.F_OK)) + def test_delete_partition_override_params(self): df = diskfile.DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())