From 2d1c4381918757ff025848e3cba884d8ee1a5939 Mon Sep 17 00:00:00 2001 From: Romain LE DISEZ Date: Mon, 22 Oct 2018 11:55:25 +0200 Subject: [PATCH] SSYNC: stop sharing global available_map/send_map Change-Id: Iaba8abb81dec792ee92e3715ecc459b57755fcae --- swift/obj/ssync_sender.py | 38 +++++----- test/unit/obj/test_ssync_sender.py | 114 ++++++++++++----------------- 2 files changed, 68 insertions(+), 84 deletions(-) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 025a4c6363..c628c531d5 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -139,16 +139,9 @@ class Sender(object): self.node = node self.job = job self.suffixes = suffixes - # available_map has an entry for each object in given suffixes that - # is available to be sync'd; each entry is a hash => dict of timestamps - # of data file or tombstone file and/or meta file - self.available_map = {} # When remote_check_objs is given in job, ssync_sender trys only to # make sure those objects exist or not in remote. self.remote_check_objs = remote_check_objs - # send_map has an entry for each object that the receiver wants to - # be sync'ed; each entry maps an object hash => dict of wanted parts - self.send_map = {} def __call__(self): """ @@ -172,17 +165,25 @@ class Sender(object): # abort the replication attempt and log a simple error. All # other exceptions will be logged with a full stack trace. connection, response = self.connect() - self.missing_check(connection, response) + # available_map has an entry for each object in given suffixes + # that is available to be sync'd; + # each entry is a hash => dict of timestamps of data file or + # tombstone file and/or meta file + # send_map has an entry for each object that the receiver wants + # to be sync'ed; + # each entry maps an object hash => dict of wanted parts + available_map, send_map = self.missing_check(connection, + response) if self.remote_check_objs is None: - self.updates(connection, response) - can_delete_obj = self.available_map + self.updates(connection, response, send_map) + can_delete_obj = available_map else: # when we are initialized with remote_check_objs we don't # *send* any requested updates; instead we only collect # what's already in sync and safe for deletion - in_sync_hashes = (set(self.available_map.keys()) - - set(self.send_map.keys())) - can_delete_obj = dict((hash_, self.available_map[hash_]) + in_sync_hashes = (set(available_map.keys()) - + set(send_map.keys())) + can_delete_obj = dict((hash_, available_map[hash_]) for hash_ in in_sync_hashes) return True, can_delete_obj except (exceptions.MessageTimeout, @@ -264,6 +265,8 @@ class Sender(object): Full documentation of this can be found at :py:meth:`.Receiver.missing_check`. """ + available_map = {} + send_map = {} # First, send our list. with exceptions.MessageTimeout( self.daemon.node_timeout, 'missing_check start'): @@ -279,7 +282,7 @@ class Sender(object): objhash_timestamps[0] in self.remote_check_objs, hash_gen) for object_hash, timestamps in hash_gen: - self.available_map[object_hash] = timestamps + available_map[object_hash] = timestamps with exceptions.MessageTimeout( self.daemon.node_timeout, 'missing_check send line'): @@ -313,9 +316,10 @@ class Sender(object): break parts = line.split() if parts: - self.send_map[parts[0]] = decode_wanted(parts[1:]) + send_map[parts[0]] = decode_wanted(parts[1:]) + return available_map, send_map - def updates(self, connection, response): + def updates(self, connection, response, send_map): """ Handles the sender-side of the UPDATES step of an SSYNC request. @@ -328,7 +332,7 @@ class Sender(object): self.daemon.node_timeout, 'updates start'): msg = ':UPDATES: START\r\n' connection.send('%x\r\n%s\r\n' % (len(msg), msg)) - for object_hash, want in self.send_map.items(): + for object_hash, want in send_map.items(): object_hash = urllib.parse.unquote(object_hash) try: df = self.df_mgr.get_diskfile_from_hash( diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index d21b63b5fc..f2a2aa54af 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -169,7 +169,7 @@ class TestSender(BaseTest): self.sender.suffixes = ['abc'] self.sender.connect = mock.MagicMock(return_value=(connection, response)) - self.sender.missing_check = mock.MagicMock() + self.sender.missing_check = mock.MagicMock(return_value=({}, {})) self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() @@ -177,7 +177,7 @@ class TestSender(BaseTest): self.assertEqual(candidates, {}) self.sender.connect.assert_called_once_with() self.sender.missing_check.assert_called_once_with(connection, response) - self.sender.updates.assert_called_once_with(connection, response) + self.sender.updates.assert_called_once_with(connection, response, {}) self.sender.disconnect.assert_called_once_with(connection) def test_connect(self): @@ -352,12 +352,14 @@ class TestSender(BaseTest): expected_calls)) def test_call(self): - def patch_sender(sender): + def patch_sender(sender, available_map, send_map): connection = FakeConnection() response = FakeResponse() sender.connect = mock.MagicMock(return_value=(connection, response)) sender.missing_check = mock.MagicMock() + sender.missing_check = mock.MagicMock(return_value=(available_map, + send_map)) sender.updates = mock.MagicMock() sender.disconnect = mock.MagicMock() @@ -379,8 +381,7 @@ class TestSender(BaseTest): # no suffixes -> no work done sender = ssync_sender.Sender( self.daemon, node, job, [], remote_check_objs=None) - patch_sender(sender) - sender.available_map = available_map + patch_sender(sender, available_map, {}) success, candidates = sender() self.assertTrue(success) self.assertEqual({}, candidates) @@ -388,8 +389,7 @@ class TestSender(BaseTest): # all objs in sync sender = ssync_sender.Sender( self.daemon, node, job, ['ignored'], remote_check_objs=None) - patch_sender(sender) - sender.available_map = available_map + patch_sender(sender, available_map, {}) success, candidates = sender() self.assertTrue(success) self.assertEqual(available_map, candidates) @@ -399,9 +399,7 @@ class TestSender(BaseTest): sender = ssync_sender.Sender( self.daemon, node, job, ['ignored'], remote_check_objs=None) - patch_sender(sender) - sender.send_map = {wanted: []} - sender.available_map = available_map + patch_sender(sender, available_map, {wanted: []}) success, candidates = sender() self.assertTrue(success) self.assertEqual(available_map, candidates) @@ -413,9 +411,7 @@ class TestSender(BaseTest): sender = ssync_sender.Sender( self.daemon, node, job, ['ignored'], remote_check_objs=remote_check_objs) - patch_sender(sender) - sender.send_map = {wanted: []} - sender.available_map = available_map + patch_sender(sender, available_map, {wanted: []}) success, candidates = sender() self.assertTrue(success) expected_map = dict([('9d41d8cd98f00b204e9800998ecf0abc', @@ -736,13 +732,14 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection, response) + available_map, send_map = self.sender.missing_check(connection, + response) self.assertEqual( ''.join(connection.sent), '17\r\n:MISSING_CHECK: START\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.send_map, {}) - self.assertEqual(self.sender.available_map, {}) + self.assertEqual(send_map, {}) + self.assertEqual(available_map, {}) def test_missing_check_has_suffixes(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -778,7 +775,8 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection, response) + available_map, send_map = self.sender.missing_check(connection, + response) self.assertEqual( ''.join(connection.sent), '17\r\n:MISSING_CHECK: START\r\n\r\n' @@ -788,7 +786,7 @@ class TestSender(BaseTest): '3f\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 ' 'm:186a0,t:4\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.send_map, {}) + self.assertEqual(send_map, {}) candidates = [('9d41d8cd98f00b204e9800998ecf0abc', dict(ts_data=Timestamp(1380144470.00000))), ('9d41d8cd98f00b204e9800998ecf0def', @@ -798,7 +796,7 @@ class TestSender(BaseTest): dict(ts_data=Timestamp(1380144474.44444), ts_meta=Timestamp(1380144475.44444), ts_ctype=Timestamp(1380144474.44448)))] - self.assertEqual(self.sender.available_map, dict(candidates)) + self.assertEqual(available_map, dict(candidates)) def test_missing_check_far_end_disconnect(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -833,9 +831,6 @@ class TestSender(BaseTest): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.available_map, - dict([('9d41d8cd98f00b204e9800998ecf0abc', - dict(ts_data=Timestamp(1380144470.00000)))])) def test_missing_check_far_end_disconnect2(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -871,9 +866,6 @@ class TestSender(BaseTest): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.available_map, - dict([('9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)})])) def test_missing_check_far_end_unexpected(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -908,9 +900,6 @@ class TestSender(BaseTest): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.available_map, - dict([('9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)})])) def test_missing_check_send_map(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -938,15 +927,15 @@ class TestSender(BaseTest): '0123abc dm\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection, response) + available_map, send_map = self.sender.missing_check(connection, + response) self.assertEqual( ''.join(connection.sent), '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual( - self.sender.send_map, {'0123abc': {'data': True, 'meta': True}}) - self.assertEqual(self.sender.available_map, + self.assertEqual(send_map, {'0123abc': {'data': True, 'meta': True}}) + self.assertEqual(available_map, dict([('9d41d8cd98f00b204e9800998ecf0abc', {'ts_data': Timestamp(1380144470.00000)})])) @@ -978,10 +967,10 @@ class TestSender(BaseTest): '0123abc d extra response parts\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection, response) - self.assertEqual(self.sender.send_map, - {'0123abc': {'data': True}}) - self.assertEqual(self.sender.available_map, + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual(send_map, {'0123abc': {'data': True}}) + self.assertEqual(available_map, dict([('9d41d8cd98f00b204e9800998ecf0abc', {'ts_data': Timestamp(1380144470.00000)})])) @@ -991,16 +980,15 @@ class TestSender(BaseTest): response = FakeResponse() self.sender.daemon.node_timeout = 0.01 self.assertRaises(exceptions.MessageTimeout, self.sender.updates, - connection, response) + connection, response, {}) def test_updates_empty_send_map(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection, response) + self.sender.updates(connection, response, {}) self.assertEqual( ''.join(connection.sent), '11\r\n:UPDATES: START\r\n\r\n' @@ -1008,7 +996,6 @@ class TestSender(BaseTest): def test_updates_unexpected_response_lines1(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( 'abc\r\n' @@ -1016,7 +1003,7 @@ class TestSender(BaseTest): ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection, response) + self.sender.updates(connection, response, {}) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'abc'") @@ -1027,7 +1014,6 @@ class TestSender(BaseTest): def test_updates_unexpected_response_lines2(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' @@ -1035,7 +1021,7 @@ class TestSender(BaseTest): ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection, response) + self.sender.updates(connection, response, {}) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'abc'") @@ -1060,14 +1046,14 @@ class TestSender(BaseTest): 'frag_index': 0, } self.sender.node = {} - self.sender.send_map = {object_hash: {'data': True}} + send_map = {object_hash: {'data': True}} self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection, response) + self.sender.updates(connection, response, send_map) self.sender.send_delete.assert_called_once_with( connection, '/a/c/o', delete_timestamp) self.assertEqual(self.sender.send_put.mock_calls, []) @@ -1094,12 +1080,12 @@ class TestSender(BaseTest): 'frag_index': 0, } self.sender.node = {} - self.sender.send_map = {object_hash: {'data': True}} + send_map = {object_hash: {'data': True}} response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection, response) + self.sender.updates(connection, response, send_map) self.assertEqual( ''.join(connection.sent), '11\r\n:UPDATES: START\r\n\r\n' @@ -1134,7 +1120,7 @@ class TestSender(BaseTest): } self.sender.node = {} # receiver requested data only - self.sender.send_map = {object_hash: {'data': True}} + send_map = {object_hash: {'data': True}} self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() self.sender.send_post = mock.MagicMock() @@ -1142,7 +1128,7 @@ class TestSender(BaseTest): chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection, response) + self.sender.updates(connection, response, send_map) self.assertEqual(self.sender.send_delete.mock_calls, []) self.assertEqual(self.sender.send_post.mock_calls, []) self.assertEqual(1, len(self.sender.send_put.mock_calls)) @@ -1181,7 +1167,7 @@ class TestSender(BaseTest): } self.sender.node = {} # receiver requested only meta - self.sender.send_map = {object_hash: {'meta': True}} + send_map = {object_hash: {'meta': True}} self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() self.sender.send_post = mock.MagicMock() @@ -1189,7 +1175,7 @@ class TestSender(BaseTest): chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection, response) + self.sender.updates(connection, response, send_map) self.assertEqual(self.sender.send_delete.mock_calls, []) self.assertEqual(self.sender.send_put.mock_calls, []) self.assertEqual(1, len(self.sender.send_post.mock_calls)) @@ -1228,7 +1214,7 @@ class TestSender(BaseTest): } self.sender.node = {} # receiver requested data and meta - self.sender.send_map = {object_hash: {'meta': True, 'data': True}} + send_map = {object_hash: {'meta': True, 'data': True}} self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() self.sender.send_post = mock.MagicMock() @@ -1236,7 +1222,7 @@ class TestSender(BaseTest): chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection, response) + self.sender.updates(connection, response, send_map) self.assertEqual(self.sender.send_delete.mock_calls, []) self.assertEqual(1, len(self.sender.send_put.mock_calls)) self.assertEqual(1, len(self.sender.send_post.mock_calls)) @@ -1272,14 +1258,14 @@ class TestSender(BaseTest): 'policy': POLICIES[0], 'frag_index': 0} self.sender.node = {} - self.sender.send_map = {object_hash: {'data': True}} + send_map = {object_hash: {'data': True}} self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection, response) + self.sender.updates(connection, response, send_map) args, _kwargs = self.sender.send_put.call_args connection, path, df = args self.assertEqual(path, '/a/c/o') @@ -1291,7 +1277,6 @@ class TestSender(BaseTest): def test_updates_read_response_timeout_start(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' @@ -1305,15 +1290,14 @@ class TestSender(BaseTest): response.readline = delayed_readline self.sender.daemon.http_timeout = 0.01 self.assertRaises(exceptions.MessageTimeout, self.sender.updates, - connection, response) + connection, response, {}) def test_updates_read_response_disconnect_start(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse(chunk_body='\r\n') exc = None try: - self.sender.updates(connection, response) + self.sender.updates(connection, response, {}) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') @@ -1324,7 +1308,6 @@ class TestSender(BaseTest): def test_updates_read_response_unexp_start(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( 'anything else\r\n' @@ -1332,7 +1315,7 @@ class TestSender(BaseTest): ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection, response) + self.sender.updates(connection, response, {}) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'anything else'") @@ -1343,7 +1326,6 @@ class TestSender(BaseTest): def test_updates_read_response_timeout_end(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' @@ -1359,18 +1341,17 @@ class TestSender(BaseTest): response.readline = delayed_readline self.sender.daemon.http_timeout = 0.01 self.assertRaises(exceptions.MessageTimeout, self.sender.updates, - connection, response) + connection, response, {}) def test_updates_read_response_disconnect_end(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' '\r\n')) exc = None try: - self.sender.updates(connection, response) + self.sender.updates(connection, response, {}) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') @@ -1381,7 +1362,6 @@ class TestSender(BaseTest): def test_updates_read_response_unexp_end(self): connection = FakeConnection() - self.sender.send_map = {} response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' @@ -1389,7 +1369,7 @@ class TestSender(BaseTest): ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection, response) + self.sender.updates(connection, response, {}) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'anything else'")