Have REPLICATE with suffixes just append to hashes.invalid
This only applies to post-sync REPLICATE calls, none of which actually look at the response anyway. Change-Id: I1de62140e7eb9a23152bb9fdb1fa0934e827bfda
This commit is contained in:
parent
6d988a4518
commit
5eaf15486e
swift/obj
test
@ -1509,13 +1509,15 @@ class BaseDiskFileManager(object):
|
|||||||
partition, account, container, obj,
|
partition, account, container, obj,
|
||||||
policy=policy, **kwargs)
|
policy=policy, **kwargs)
|
||||||
|
|
||||||
def get_hashes(self, device, partition, suffixes, policy):
|
def get_hashes(self, device, partition, suffixes, policy,
|
||||||
|
skip_rehash=False):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
:param device: name of target device
|
:param device: name of target device
|
||||||
:param partition: partition name
|
:param partition: partition name
|
||||||
:param suffixes: a list of suffix directories to be recalculated
|
:param suffixes: a list of suffix directories to be recalculated
|
||||||
:param policy: the StoragePolicy instance
|
:param policy: the StoragePolicy instance
|
||||||
|
:param skip_rehash: just mark the suffixes dirty; return None
|
||||||
:returns: a dictionary that maps suffix directories
|
:returns: a dictionary that maps suffix directories
|
||||||
"""
|
"""
|
||||||
dev_path = self.get_dev_path(device)
|
dev_path = self.get_dev_path(device)
|
||||||
@ -1524,8 +1526,14 @@ class BaseDiskFileManager(object):
|
|||||||
partition_path = get_part_path(dev_path, policy, partition)
|
partition_path = get_part_path(dev_path, policy, partition)
|
||||||
if not os.path.exists(partition_path):
|
if not os.path.exists(partition_path):
|
||||||
mkdirs(partition_path)
|
mkdirs(partition_path)
|
||||||
|
if skip_rehash:
|
||||||
|
for suffix in suffixes or []:
|
||||||
|
invalidate_hash(os.path.join(partition_path, suffix))
|
||||||
|
return None
|
||||||
|
else:
|
||||||
_junk, hashes = tpool.execute(
|
_junk, hashes = tpool.execute(
|
||||||
self._get_hashes, device, partition, policy, recalculate=suffixes)
|
self._get_hashes, device, partition, policy,
|
||||||
|
recalculate=suffixes)
|
||||||
return hashes
|
return hashes
|
||||||
|
|
||||||
def _listdir(self, path):
|
def _listdir(self, path):
|
||||||
|
@ -1298,7 +1298,8 @@ class ObjectController(BaseStorageServer):
|
|||||||
suffixes = suffix_parts.split('-') if suffix_parts else []
|
suffixes = suffix_parts.split('-') if suffix_parts else []
|
||||||
try:
|
try:
|
||||||
hashes = self._diskfile_router[policy].get_hashes(
|
hashes = self._diskfile_router[policy].get_hashes(
|
||||||
device, partition, suffixes, policy)
|
device, partition, suffixes, policy,
|
||||||
|
skip_rehash=bool(suffixes))
|
||||||
except DiskFileDeviceUnavailable:
|
except DiskFileDeviceUnavailable:
|
||||||
resp = HTTPInsufficientStorage(drive=device, request=request)
|
resp = HTTPInsufficientStorage(drive=device, request=request)
|
||||||
else:
|
else:
|
||||||
|
@ -149,6 +149,16 @@ class TestReplicatorFunctions(ReplProbeTest):
|
|||||||
|
|
||||||
for directory in test_node_dir_list:
|
for directory in test_node_dir_list:
|
||||||
self.assertIn(directory, new_dir_list[0])
|
self.assertIn(directory, new_dir_list[0])
|
||||||
|
|
||||||
|
# We want to make sure that replication is completely
|
||||||
|
# settled; any invalidated hashes should be rehashed so
|
||||||
|
# hashes.pkl is stable
|
||||||
|
for directory in os.listdir(
|
||||||
|
os.path.join(test_node, data_dir)):
|
||||||
|
hashes_invalid_path = os.path.join(
|
||||||
|
test_node, data_dir, directory, 'hashes.invalid')
|
||||||
|
self.assertEqual(os.stat(
|
||||||
|
hashes_invalid_path).st_size, 0)
|
||||||
break
|
break
|
||||||
except Exception:
|
except Exception:
|
||||||
if time.time() - begin > 60:
|
if time.time() - begin > 60:
|
||||||
|
@ -6931,6 +6931,14 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
# so hashes should have the latest suffix hash...
|
# so hashes should have the latest suffix hash...
|
||||||
self.assertEqual(hashes[suffix], non_local['hash'])
|
self.assertEqual(hashes[suffix], non_local['hash'])
|
||||||
|
|
||||||
|
non_local['called'] = False
|
||||||
|
with mock.patch.object(df_mgr, '_hash_suffix', mock_hash_suffix):
|
||||||
|
df_mgr.get_hashes('sda1', '0', [suffix], policy,
|
||||||
|
skip_rehash=True)
|
||||||
|
self.assertFalse(non_local['called'])
|
||||||
|
with open(invalidations_file) as f:
|
||||||
|
self.assertEqual(suffix, f.read().strip('\n')) # sanity
|
||||||
|
|
||||||
def test_hash_invalidations_race_get_hashes_same_suffix_new(self):
|
def test_hash_invalidations_race_get_hashes_same_suffix_new(self):
|
||||||
self._check_hash_invalidations_race_get_hashes_same_suffix(False)
|
self._check_hash_invalidations_race_get_hashes_same_suffix(False)
|
||||||
|
|
||||||
|
@ -2011,7 +2011,8 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
node['replication_port'], node['device'],
|
node['replication_port'], node['device'],
|
||||||
repl_job['partition'], 'REPLICATE',
|
repl_job['partition'], 'REPLICATE',
|
||||||
'', headers=self.headers))
|
'', headers=self.headers))
|
||||||
reqs.append(mock.call(node['replication_ip'],
|
reqs.append(mock.call(
|
||||||
|
node['replication_ip'],
|
||||||
node['replication_port'], node['device'],
|
node['replication_port'], node['device'],
|
||||||
repl_job['partition'], 'REPLICATE',
|
repl_job['partition'], 'REPLICATE',
|
||||||
'/a83', headers=self.headers))
|
'/a83', headers=self.headers))
|
||||||
|
@ -6960,7 +6960,7 @@ class TestObjectController(unittest.TestCase):
|
|||||||
try:
|
try:
|
||||||
diskfile.DiskFileManager._get_hashes = fake_get_hashes
|
diskfile.DiskFileManager._get_hashes = fake_get_hashes
|
||||||
tpool.execute = my_tpool_execute
|
tpool.execute = my_tpool_execute
|
||||||
req = Request.blank('/sda1/p/suff',
|
req = Request.blank('/sda1/p/',
|
||||||
environ={'REQUEST_METHOD': 'REPLICATE'},
|
environ={'REQUEST_METHOD': 'REPLICATE'},
|
||||||
headers={})
|
headers={})
|
||||||
resp = req.get_response(self.object_controller)
|
resp = req.get_response(self.object_controller)
|
||||||
@ -6984,7 +6984,7 @@ class TestObjectController(unittest.TestCase):
|
|||||||
try:
|
try:
|
||||||
diskfile.DiskFileManager._get_hashes = fake_get_hashes
|
diskfile.DiskFileManager._get_hashes = fake_get_hashes
|
||||||
tpool.execute = my_tpool_execute
|
tpool.execute = my_tpool_execute
|
||||||
req = Request.blank('/sda1/p/suff',
|
req = Request.blank('/sda1/p/',
|
||||||
environ={'REQUEST_METHOD': 'REPLICATE'},
|
environ={'REQUEST_METHOD': 'REPLICATE'},
|
||||||
headers={})
|
headers={})
|
||||||
with mock.patch('swift.obj.server.pickle.dumps') as fake_pickle:
|
with mock.patch('swift.obj.server.pickle.dumps') as fake_pickle:
|
||||||
@ -7012,7 +7012,7 @@ class TestObjectController(unittest.TestCase):
|
|||||||
try:
|
try:
|
||||||
diskfile.DiskFileManager._get_hashes = fake_get_hashes
|
diskfile.DiskFileManager._get_hashes = fake_get_hashes
|
||||||
tpool.execute = my_tpool_execute
|
tpool.execute = my_tpool_execute
|
||||||
req = Request.blank('/sda1/p/suff',
|
req = Request.blank('/sda1/p/',
|
||||||
environ={'REQUEST_METHOD': 'REPLICATE'},
|
environ={'REQUEST_METHOD': 'REPLICATE'},
|
||||||
headers={})
|
headers={})
|
||||||
self.assertRaises(Timeout, self.object_controller.REPLICATE, req)
|
self.assertRaises(Timeout, self.object_controller.REPLICATE, req)
|
||||||
@ -7059,15 +7059,28 @@ class TestObjectController(unittest.TestCase):
|
|||||||
# tombstone still exists
|
# tombstone still exists
|
||||||
self.assertTrue(os.path.exists(tombstone_file))
|
self.assertTrue(os.path.exists(tombstone_file))
|
||||||
|
|
||||||
# after reclaim REPLICATE will rehash
|
# after reclaim REPLICATE will mark invalid (but NOT rehash!)
|
||||||
replicate_request = Request.blank(
|
replicate_request = Request.blank(
|
||||||
'/sda1/0/%s' % suffix, method='REPLICATE',
|
'/sda1/0/%s' % suffix, method='REPLICATE',
|
||||||
headers={
|
headers={
|
||||||
'x-backend-storage-policy-index': int(policy),
|
'x-backend-storage-policy-index': int(policy),
|
||||||
})
|
})
|
||||||
the_future = time() + 200
|
with mock.patch('swift.obj.diskfile.time.time',
|
||||||
with mock.patch('swift.obj.diskfile.time.time') as mock_time:
|
return_value=time() + 200):
|
||||||
mock_time.return_value = the_future
|
resp = replicate_request.get_response(self.object_controller)
|
||||||
|
self.assertEqual(resp.status_int, 200)
|
||||||
|
self.assertEqual(None, pickle.loads(resp.body))
|
||||||
|
# no rehash means tombstone still exists...
|
||||||
|
self.assertTrue(os.path.exists(tombstone_file))
|
||||||
|
|
||||||
|
# but at some point (like the next pre-sync REPLICATE) it rehashes
|
||||||
|
replicate_request = Request.blank(
|
||||||
|
'/sda1/0/', method='REPLICATE',
|
||||||
|
headers={
|
||||||
|
'x-backend-storage-policy-index': int(policy),
|
||||||
|
})
|
||||||
|
with mock.patch('swift.obj.diskfile.time.time',
|
||||||
|
return_value=time() + 200):
|
||||||
resp = replicate_request.get_response(self.object_controller)
|
resp = replicate_request.get_response(self.object_controller)
|
||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
self.assertEqual({}, pickle.loads(resp.body))
|
self.assertEqual({}, pickle.loads(resp.body))
|
||||||
@ -7076,8 +7089,8 @@ class TestObjectController(unittest.TestCase):
|
|||||||
|
|
||||||
# N.B. with a small reclaim age like this - if proxy clocks get far
|
# N.B. with a small reclaim age like this - if proxy clocks get far
|
||||||
# enough out of whack ...
|
# enough out of whack ...
|
||||||
with mock.patch('swift.obj.diskfile.time.time') as mock_time:
|
with mock.patch('swift.obj.diskfile.time.time',
|
||||||
mock_time.return_value = the_future
|
return_value=time() + 200):
|
||||||
resp = delete_request.get_response(self.object_controller)
|
resp = delete_request.get_response(self.object_controller)
|
||||||
# we won't even create the tombstone
|
# we won't even create the tombstone
|
||||||
self.assertFalse(os.path.exists(tombstone_file))
|
self.assertFalse(os.path.exists(tombstone_file))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user