Merge "Add handoffs_first and handoff_delete to obj-repl"
This commit is contained in:
commit
5650bdb27a
@ -414,6 +414,19 @@ stats_interval 3600 Interval in seconds between logging
|
||||
replication statistics
|
||||
reclaim_age 604800 Time elapsed in seconds before an
|
||||
object can be reclaimed
|
||||
handoffs_first false If set to True, partitions that are
|
||||
not supposed to be on the node will be
|
||||
replicated first. The default setting
|
||||
should not be changed, except for
|
||||
extreme situations.
|
||||
handoff_delete auto By default handoff partitions will be
|
||||
removed when it has successfully
|
||||
replicated to all the cannonical nodes.
|
||||
If set to an integer n, it will remove
|
||||
the partition if it is successfully
|
||||
replicated to n nodes. The default
|
||||
setting should not be changed, except
|
||||
for extremem situations.
|
||||
================== ================= =======================================
|
||||
|
||||
[object-updater]
|
||||
|
@ -31,7 +31,7 @@ from swift.common.ring import Ring
|
||||
from swift.common.utils import whataremyips, unlink_older_than, \
|
||||
compute_eta, get_logger, dump_recon_cache, \
|
||||
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
|
||||
tpool_reraise
|
||||
tpool_reraise, config_auto_int_value
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||
@ -83,6 +83,10 @@ class ObjectReplicator(Daemon):
|
||||
'user-agent': 'obj-replicator %s' % os.getpid()}
|
||||
self.rsync_error_log_line_length = \
|
||||
int(conf.get('rsync_error_log_line_length', 0))
|
||||
self.handoffs_first = config_true_value(conf.get('handoffs_first',
|
||||
False))
|
||||
self.handoff_delete = config_auto_int_value(
|
||||
conf.get('handoff_delete', 'auto'), 0)
|
||||
|
||||
def _rsync(self, args):
|
||||
"""
|
||||
@ -212,8 +216,15 @@ class ObjectReplicator(Daemon):
|
||||
'/' + '-'.join(suffixes), headers=self.headers)
|
||||
conn.getresponse().read()
|
||||
responses.append(success)
|
||||
if not suffixes or (len(responses) ==
|
||||
len(job['nodes']) and all(responses)):
|
||||
if self.handoff_delete:
|
||||
# delete handoff if we have had handoff_delete successes
|
||||
delete_handoff = len([resp for resp in responses if resp]) >= \
|
||||
self.handoff_delete
|
||||
else:
|
||||
# delete handoff if all syncs were successful
|
||||
delete_handoff = len(responses) == len(job['nodes']) and \
|
||||
all(responses)
|
||||
if not suffixes or delete_handoff:
|
||||
self.logger.info(_("Removing partition: %s"), job['path'])
|
||||
tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
|
||||
except (Exception, Timeout):
|
||||
@ -412,6 +423,9 @@ class ObjectReplicator(Daemon):
|
||||
except (ValueError, OSError):
|
||||
continue
|
||||
random.shuffle(jobs)
|
||||
if self.handoffs_first:
|
||||
# Move the handoff parts to the front of the list
|
||||
jobs.sort(key=lambda job: not job['delete'])
|
||||
self.job_count = len(jobs)
|
||||
return jobs
|
||||
|
||||
|
@ -236,7 +236,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
for job in jobs:
|
||||
jobs_by_part[job['partition']] = job
|
||||
self.assertEquals(len(jobs_to_delete), 1)
|
||||
self.assertTrue('1', jobs_to_delete[0]['partition'])
|
||||
self.assertEquals('1', jobs_to_delete[0]['partition'])
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_part['0']['nodes']], [1, 2])
|
||||
self.assertEquals(
|
||||
@ -251,6 +251,12 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertEquals(jobs_by_part[part]['path'],
|
||||
os.path.join(self.objects, part))
|
||||
|
||||
def test_collect_jobs_handoffs_first(self):
|
||||
self.replicator.handoffs_first = True
|
||||
jobs = self.replicator.collect_jobs()
|
||||
self.assertTrue(jobs[0]['delete'])
|
||||
self.assertEquals('1', jobs[0]['partition'])
|
||||
|
||||
def test_collect_jobs_removes_zbf(self):
|
||||
"""
|
||||
After running xfs_repair, a partition directory could become a
|
||||
@ -292,13 +298,137 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
df = diskfile.DiskFile(self.devices,
|
||||
'sda', '0', 'a', 'c', 'o', FakeLogger())
|
||||
'sda', '1', 'a', 'c', 'o', FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
print df.datadir
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
self.replicator.replicate()
|
||||
nodes = [node for node in
|
||||
self.ring.get_part_nodes(1)
|
||||
if node['ip'] not in _ips()]
|
||||
process_arg_checker = []
|
||||
for node in nodes:
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
with _mock_process(process_arg_checker):
|
||||
self.replicator.replicate()
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_with_failures(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
df = diskfile.DiskFile(self.devices,
|
||||
'sda', '1', 'a', 'c', 'o', FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
print df.datadir
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
nodes = [node for node in
|
||||
self.ring.get_part_nodes(1)
|
||||
if node['ip'] not in _ips()]
|
||||
process_arg_checker = []
|
||||
for i, node in enumerate(nodes):
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
||||
if i == 0:
|
||||
# force one of the rsync calls to fail
|
||||
ret_code = 1
|
||||
else:
|
||||
ret_code = 0
|
||||
process_arg_checker.append(
|
||||
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
with _mock_process(process_arg_checker):
|
||||
self.replicator.replicate()
|
||||
# The path should still exist
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_with_handoff_delete(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
self.replicator.handoff_delete = 2
|
||||
df = diskfile.DiskFile(self.devices,
|
||||
'sda', '1', 'a', 'c', 'o', FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
print df.datadir
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
nodes = [node for node in
|
||||
self.ring.get_part_nodes(1)
|
||||
if node['ip'] not in _ips()]
|
||||
process_arg_checker = []
|
||||
for i, node in enumerate(nodes):
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
||||
if i == 0:
|
||||
# force one of the rsync calls to fail
|
||||
ret_code = 1
|
||||
else:
|
||||
ret_code = 0
|
||||
process_arg_checker.append(
|
||||
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
with _mock_process(process_arg_checker):
|
||||
self.replicator.replicate()
|
||||
self.assertFalse(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_with_handoff_delete_failures(self):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
self.replicator.handoff_delete = 2
|
||||
df = diskfile.DiskFile(self.devices,
|
||||
'sda', '1', 'a', 'c', 'o', FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
print df.datadir
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, '1', data_dir)
|
||||
part_path = os.path.join(self.objects, '1')
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
nodes = [node for node in
|
||||
self.ring.get_part_nodes(1)
|
||||
if node['ip'] not in _ips()]
|
||||
process_arg_checker = []
|
||||
for i, node in enumerate(nodes):
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
|
||||
if i in (0, 1):
|
||||
# force two of the rsync calls to fail
|
||||
ret_code = 1
|
||||
else:
|
||||
ret_code = 0
|
||||
process_arg_checker.append(
|
||||
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
with _mock_process(process_arg_checker):
|
||||
self.replicator.replicate()
|
||||
# The file should still exist
|
||||
self.assertTrue(os.access(part_path, os.F_OK))
|
||||
|
||||
def test_delete_partition_override_params(self):
|
||||
df = diskfile.DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
|
Loading…
x
Reference in New Issue
Block a user