SSYNC: stop sharing global available_map/send_map

Change-Id: Iaba8abb81dec792ee92e3715ecc459b57755fcae
This commit is contained in:
Romain LE DISEZ 2018-10-22 11:55:25 +02:00
parent 6b94cf204a
commit 2d1c438191
2 changed files with 68 additions and 84 deletions

View File

@ -139,16 +139,9 @@ class Sender(object):
self.node = node
self.job = job
self.suffixes = suffixes
# available_map has an entry for each object in given suffixes that
# is available to be sync'd; each entry is a hash => dict of timestamps
# of data file or tombstone file and/or meta file
self.available_map = {}
# When remote_check_objs is given in job, ssync_sender trys only to
# make sure those objects exist or not in remote.
self.remote_check_objs = remote_check_objs
# send_map has an entry for each object that the receiver wants to
# be sync'ed; each entry maps an object hash => dict of wanted parts
self.send_map = {}
def __call__(self):
"""
@ -172,17 +165,25 @@ class Sender(object):
# abort the replication attempt and log a simple error. All
# other exceptions will be logged with a full stack trace.
connection, response = self.connect()
self.missing_check(connection, response)
# available_map has an entry for each object in given suffixes
# that is available to be sync'd;
# each entry is a hash => dict of timestamps of data file or
# tombstone file and/or meta file
# send_map has an entry for each object that the receiver wants
# to be sync'ed;
# each entry maps an object hash => dict of wanted parts
available_map, send_map = self.missing_check(connection,
response)
if self.remote_check_objs is None:
self.updates(connection, response)
can_delete_obj = self.available_map
self.updates(connection, response, send_map)
can_delete_obj = available_map
else:
# when we are initialized with remote_check_objs we don't
# *send* any requested updates; instead we only collect
# what's already in sync and safe for deletion
in_sync_hashes = (set(self.available_map.keys()) -
set(self.send_map.keys()))
can_delete_obj = dict((hash_, self.available_map[hash_])
in_sync_hashes = (set(available_map.keys()) -
set(send_map.keys()))
can_delete_obj = dict((hash_, available_map[hash_])
for hash_ in in_sync_hashes)
return True, can_delete_obj
except (exceptions.MessageTimeout,
@ -264,6 +265,8 @@ class Sender(object):
Full documentation of this can be found at
:py:meth:`.Receiver.missing_check`.
"""
available_map = {}
send_map = {}
# First, send our list.
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check start'):
@ -279,7 +282,7 @@ class Sender(object):
objhash_timestamps[0] in
self.remote_check_objs, hash_gen)
for object_hash, timestamps in hash_gen:
self.available_map[object_hash] = timestamps
available_map[object_hash] = timestamps
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
@ -313,9 +316,10 @@ class Sender(object):
break
parts = line.split()
if parts:
self.send_map[parts[0]] = decode_wanted(parts[1:])
send_map[parts[0]] = decode_wanted(parts[1:])
return available_map, send_map
def updates(self, connection, response):
def updates(self, connection, response, send_map):
"""
Handles the sender-side of the UPDATES step of an SSYNC
request.
@ -328,7 +332,7 @@ class Sender(object):
self.daemon.node_timeout, 'updates start'):
msg = ':UPDATES: START\r\n'
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for object_hash, want in self.send_map.items():
for object_hash, want in send_map.items():
object_hash = urllib.parse.unquote(object_hash)
try:
df = self.df_mgr.get_diskfile_from_hash(

View File

@ -169,7 +169,7 @@ class TestSender(BaseTest):
self.sender.suffixes = ['abc']
self.sender.connect = mock.MagicMock(return_value=(connection,
response))
self.sender.missing_check = mock.MagicMock()
self.sender.missing_check = mock.MagicMock(return_value=({}, {}))
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
@ -177,7 +177,7 @@ class TestSender(BaseTest):
self.assertEqual(candidates, {})
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with(connection, response)
self.sender.updates.assert_called_once_with(connection, response)
self.sender.updates.assert_called_once_with(connection, response, {})
self.sender.disconnect.assert_called_once_with(connection)
def test_connect(self):
@ -352,12 +352,14 @@ class TestSender(BaseTest):
expected_calls))
def test_call(self):
def patch_sender(sender):
def patch_sender(sender, available_map, send_map):
connection = FakeConnection()
response = FakeResponse()
sender.connect = mock.MagicMock(return_value=(connection,
response))
sender.missing_check = mock.MagicMock()
sender.missing_check = mock.MagicMock(return_value=(available_map,
send_map))
sender.updates = mock.MagicMock()
sender.disconnect = mock.MagicMock()
@ -379,8 +381,7 @@ class TestSender(BaseTest):
# no suffixes -> no work done
sender = ssync_sender.Sender(
self.daemon, node, job, [], remote_check_objs=None)
patch_sender(sender)
sender.available_map = available_map
patch_sender(sender, available_map, {})
success, candidates = sender()
self.assertTrue(success)
self.assertEqual({}, candidates)
@ -388,8 +389,7 @@ class TestSender(BaseTest):
# all objs in sync
sender = ssync_sender.Sender(
self.daemon, node, job, ['ignored'], remote_check_objs=None)
patch_sender(sender)
sender.available_map = available_map
patch_sender(sender, available_map, {})
success, candidates = sender()
self.assertTrue(success)
self.assertEqual(available_map, candidates)
@ -399,9 +399,7 @@ class TestSender(BaseTest):
sender = ssync_sender.Sender(
self.daemon, node, job, ['ignored'],
remote_check_objs=None)
patch_sender(sender)
sender.send_map = {wanted: []}
sender.available_map = available_map
patch_sender(sender, available_map, {wanted: []})
success, candidates = sender()
self.assertTrue(success)
self.assertEqual(available_map, candidates)
@ -413,9 +411,7 @@ class TestSender(BaseTest):
sender = ssync_sender.Sender(
self.daemon, node, job, ['ignored'],
remote_check_objs=remote_check_objs)
patch_sender(sender)
sender.send_map = {wanted: []}
sender.available_map = available_map
patch_sender(sender, available_map, {wanted: []})
success, candidates = sender()
self.assertTrue(success)
expected_map = dict([('9d41d8cd98f00b204e9800998ecf0abc',
@ -736,13 +732,14 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection, response)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_map, {})
self.assertEqual(self.sender.available_map, {})
self.assertEqual(send_map, {})
self.assertEqual(available_map, {})
def test_missing_check_has_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -778,7 +775,8 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection, response)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
@ -788,7 +786,7 @@ class TestSender(BaseTest):
'3f\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 '
'm:186a0,t:4\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_map, {})
self.assertEqual(send_map, {})
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
dict(ts_data=Timestamp(1380144470.00000))),
('9d41d8cd98f00b204e9800998ecf0def',
@ -798,7 +796,7 @@ class TestSender(BaseTest):
dict(ts_data=Timestamp(1380144474.44444),
ts_meta=Timestamp(1380144475.44444),
ts_ctype=Timestamp(1380144474.44448)))]
self.assertEqual(self.sender.available_map, dict(candidates))
self.assertEqual(available_map, dict(candidates))
def test_missing_check_far_end_disconnect(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -833,9 +831,6 @@ class TestSender(BaseTest):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
dict(ts_data=Timestamp(1380144470.00000)))]))
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -871,9 +866,6 @@ class TestSender(BaseTest):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -908,9 +900,6 @@ class TestSender(BaseTest):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
def test_missing_check_send_map(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -938,15 +927,15 @@ class TestSender(BaseTest):
'0123abc dm\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection, response)
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(
self.sender.send_map, {'0123abc': {'data': True, 'meta': True}})
self.assertEqual(self.sender.available_map,
self.assertEqual(send_map, {'0123abc': {'data': True, 'meta': True}})
self.assertEqual(available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
@ -978,10 +967,10 @@ class TestSender(BaseTest):
'0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection, response)
self.assertEqual(self.sender.send_map,
{'0123abc': {'data': True}})
self.assertEqual(self.sender.available_map,
available_map, send_map = self.sender.missing_check(connection,
response)
self.assertEqual(send_map, {'0123abc': {'data': True}})
self.assertEqual(available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
{'ts_data': Timestamp(1380144470.00000)})]))
@ -991,16 +980,15 @@ class TestSender(BaseTest):
response = FakeResponse()
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection, response)
connection, response, {})
def test_updates_empty_send_map(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response)
self.sender.updates(connection, response, {})
self.assertEqual(
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
@ -1008,7 +996,6 @@ class TestSender(BaseTest):
def test_updates_unexpected_response_lines1(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
'abc\r\n'
@ -1016,7 +1003,7 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection, response)
self.sender.updates(connection, response, {})
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
@ -1027,7 +1014,6 @@ class TestSender(BaseTest):
def test_updates_unexpected_response_lines2(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@ -1035,7 +1021,7 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection, response)
self.sender.updates(connection, response, {})
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
@ -1060,14 +1046,14 @@ class TestSender(BaseTest):
'frag_index': 0,
}
self.sender.node = {}
self.sender.send_map = {object_hash: {'data': True}}
send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response)
self.sender.updates(connection, response, send_map)
self.sender.send_delete.assert_called_once_with(
connection, '/a/c/o', delete_timestamp)
self.assertEqual(self.sender.send_put.mock_calls, [])
@ -1094,12 +1080,12 @@ class TestSender(BaseTest):
'frag_index': 0,
}
self.sender.node = {}
self.sender.send_map = {object_hash: {'data': True}}
send_map = {object_hash: {'data': True}}
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response)
self.sender.updates(connection, response, send_map)
self.assertEqual(
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
@ -1134,7 +1120,7 @@ class TestSender(BaseTest):
}
self.sender.node = {}
# receiver requested data only
self.sender.send_map = {object_hash: {'data': True}}
send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
@ -1142,7 +1128,7 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response)
self.sender.updates(connection, response, send_map)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_post.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
@ -1181,7 +1167,7 @@ class TestSender(BaseTest):
}
self.sender.node = {}
# receiver requested only meta
self.sender.send_map = {object_hash: {'meta': True}}
send_map = {object_hash: {'meta': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
@ -1189,7 +1175,7 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response)
self.sender.updates(connection, response, send_map)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_put.mock_calls, [])
self.assertEqual(1, len(self.sender.send_post.mock_calls))
@ -1228,7 +1214,7 @@ class TestSender(BaseTest):
}
self.sender.node = {}
# receiver requested data and meta
self.sender.send_map = {object_hash: {'meta': True, 'data': True}}
send_map = {object_hash: {'meta': True, 'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
@ -1236,7 +1222,7 @@ class TestSender(BaseTest):
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response)
self.sender.updates(connection, response, send_map)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
self.assertEqual(1, len(self.sender.send_post.mock_calls))
@ -1272,14 +1258,14 @@ class TestSender(BaseTest):
'policy': POLICIES[0],
'frag_index': 0}
self.sender.node = {}
self.sender.send_map = {object_hash: {'data': True}}
send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection, response)
self.sender.updates(connection, response, send_map)
args, _kwargs = self.sender.send_put.call_args
connection, path, df = args
self.assertEqual(path, '/a/c/o')
@ -1291,7 +1277,6 @@ class TestSender(BaseTest):
def test_updates_read_response_timeout_start(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@ -1305,15 +1290,14 @@ class TestSender(BaseTest):
response.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection, response)
connection, response, {})
def test_updates_read_response_disconnect_start(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(chunk_body='\r\n')
exc = None
try:
self.sender.updates(connection, response)
self.sender.updates(connection, response, {})
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
@ -1324,7 +1308,6 @@ class TestSender(BaseTest):
def test_updates_read_response_unexp_start(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
'anything else\r\n'
@ -1332,7 +1315,7 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection, response)
self.sender.updates(connection, response, {})
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
@ -1343,7 +1326,6 @@ class TestSender(BaseTest):
def test_updates_read_response_timeout_end(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@ -1359,18 +1341,17 @@ class TestSender(BaseTest):
response.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection, response)
connection, response, {})
def test_updates_read_response_disconnect_end(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
'\r\n'))
exc = None
try:
self.sender.updates(connection, response)
self.sender.updates(connection, response, {})
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
@ -1381,7 +1362,6 @@ class TestSender(BaseTest):
def test_updates_read_response_unexp_end(self):
connection = FakeConnection()
self.sender.send_map = {}
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@ -1389,7 +1369,7 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection, response)
self.sender.updates(connection, response, {})
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")