From 125d18e0ffac572749da58fad1591d5099b5cda7 Mon Sep 17 00:00:00 2001 From: OSHRITF Date: Wed, 20 Jan 2016 15:55:30 +0200 Subject: [PATCH] Container-Sync to perform HEAD before PUT object on remote This change adds a remote HEAD object request before each call to sync_row. Currently, container-sync-row attempts to replicate the object (using PUT) regardless of the existance of the object on the remote side, thus causing each object to be transferred on the wire several times (depending on the replication factor) An alternative to HEAD is to do a conditional PUT (using, 100-continue). However, this change is more involved and requires upgrade of both the client and server side clusters to work. In the Tokyo design summit it was decided to start with the HEAD approach. Change-Id: I60d982dd2cc79a0f13b0924507cd03d7f9c9d70b Closes-Bug: #1277223 --- swift/common/internal_client.py | 11 +- swift/container/sync.py | 107 ++++++++++++---- test/probe/test_container_sync.py | 20 +++ test/unit/common/test_internal_client.py | 149 +++++++++++++---------- test/unit/container/test_sync.py | 129 ++++++++++++++++++++ 5 files changed, 326 insertions(+), 90 deletions(-) diff --git a/swift/common/internal_client.py b/swift/common/internal_client.py index 2413d9ad6d..d9911326f1 100644 --- a/swift/common/internal_client.py +++ b/swift/common/internal_client.py @@ -769,6 +769,7 @@ class SimpleClient(object): req.get_method = lambda: method conn = urllib2.urlopen(req, timeout=timeout) body = conn.read() + info = conn.info() try: body_data = json.loads(body) except ValueError: @@ -792,13 +793,13 @@ class SimpleClient(object): url, conn.getcode(), sent_content_length, - conn.info()['content-length'], + info['content-length'], trans_start, trans_stop, trans_stop - trans_start, additional_info ))) - return [None, body_data] + return [info, body_data] def retry_request(self, method, **kwargs): retries = kwargs.pop('retries', self.retries) @@ -837,6 +838,12 @@ class SimpleClient(object): contents=contents.read(), **kwargs) +def head_object(url, **kwargs): + """For usage with container sync """ + client = SimpleClient(url=url) + return client.retry_request('HEAD', **kwargs) + + def put_object(url, **kwargs): """For usage with container sync """ client = SimpleClient(url=url) diff --git a/swift/container/sync.py b/swift/container/sync.py index 2ade27a5a1..c2fd863eec 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -29,7 +29,8 @@ from swift.container.backend import ContainerBroker from swift.container.sync_store import ContainerSyncStore from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.internal_client import ( - delete_object, put_object, InternalClient, UnexpectedResponse) + delete_object, put_object, head_object, + InternalClient, UnexpectedResponse) from swift.common.exceptions import ClientException from swift.common.ring import Ring from swift.common.ring.utils import is_local_device @@ -396,10 +397,84 @@ class ContainerSync(Daemon): self.logger.exception(_('ERROR Syncing %s'), broker if broker else path) + def _update_sync_to_headers(self, name, sync_to, user_key, + realm, realm_key, method, headers): + """ + Updates container sync headers + + :param name: The name of the object + :param sync_to: The URL to the remote container. + :param user_key: The X-Container-Sync-Key to use when sending requests + to the other container. + :param realm: The realm from self.realms_conf, if there is one. + If None, fallback to using the older allowed_sync_hosts + way of syncing. + :param realm_key: The realm key from self.realms_conf, if there + is one. If None, fallback to using the older + allowed_sync_hosts way of syncing. + :param method: HTTP method to create sig with + :param headers: headers to update with container sync headers + """ + if realm and realm_key: + nonce = uuid.uuid4().hex + path = urlparse(sync_to).path + '/' + quote(name) + sig = self.realms_conf.get_sig(method, path, + headers.get('x-timestamp', 0), + nonce, realm_key, + user_key) + headers['x-container-sync-auth'] = '%s %s %s' % (realm, + nonce, + sig) + else: + headers['x-container-sync-key'] = user_key + + def _object_in_remote_container(self, name, sync_to, user_key, + realm, realm_key, timestamp): + """ + Performs head object on remote to eliminate extra remote put and + local get object calls + + :param name: The name of the object in the updated row in the local + database triggering the sync update. + :param sync_to: The URL to the remote container. + :param user_key: The X-Container-Sync-Key to use when sending requests + to the other container. + :param realm: The realm from self.realms_conf, if there is one. + If None, fallback to using the older allowed_sync_hosts + way of syncing. + :param realm_key: The realm key from self.realms_conf, if there + is one. If None, fallback to using the older + allowed_sync_hosts way of syncing. + :param timestamp: last modified date of local object + :returns: True if object already exists in remote + """ + headers = {'x-timestamp': timestamp.internal} + self._update_sync_to_headers(name, sync_to, user_key, realm, + realm_key, 'HEAD', headers) + try: + metadata, _ = head_object(sync_to, name=name, + headers=headers, + proxy=self.select_http_proxy(), + logger=self.logger, + retries=0) + remote_ts = Timestamp(metadata.get('x-timestamp', 0)) + self.logger.debug("remote obj timestamp %s local obj %s" % + (timestamp.internal, remote_ts.internal)) + if timestamp <= remote_ts: + return True + # Object in remote should be updated + return False + except ClientException as http_err: + # Object not in remote + if http_err.http_status == 404: + return False + raise http_err + def container_sync_row(self, row, sync_to, user_key, broker, info, realm, realm_key): """ Sends the update the row indicates to the sync_to container. + Update can be either delete or put. :param row: The updated row in the local database triggering the sync update. @@ -427,17 +502,9 @@ class ContainerSync(Daemon): # timestamp of the source tombstone try: headers = {'x-timestamp': ts_data.internal} - if realm and realm_key: - nonce = uuid.uuid4().hex - path = urlparse(sync_to).path + '/' + quote( - row['name']) - sig = self.realms_conf.get_sig( - 'DELETE', path, headers['x-timestamp'], nonce, - realm_key, user_key) - headers['x-container-sync-auth'] = '%s %s %s' % ( - realm, nonce, sig) - else: - headers['x-container-sync-key'] = user_key + self._update_sync_to_headers(row['name'], sync_to, + user_key, realm, realm_key, + 'DELETE', headers) delete_object(sync_to, name=row['name'], headers=headers, proxy=self.select_http_proxy(), logger=self.logger, @@ -451,6 +518,10 @@ class ContainerSync(Daemon): else: # when sync'ing a live object, use ts_meta - this is the time # at which the source object was last modified by a PUT or POST + if self._object_in_remote_container(row['name'], + sync_to, user_key, realm, + realm_key, ts_meta): + return True exc = None # look up for the newest one headers_out = {'X-Newest': True, @@ -485,16 +556,8 @@ class ContainerSync(Daemon): if 'content-type' in headers: headers['content-type'] = clean_content_type( headers['content-type']) - if realm and realm_key: - nonce = uuid.uuid4().hex - path = urlparse(sync_to).path + '/' + quote(row['name']) - sig = self.realms_conf.get_sig( - 'PUT', path, headers['x-timestamp'], nonce, realm_key, - user_key) - headers['x-container-sync-auth'] = '%s %s %s' % ( - realm, nonce, sig) - else: - headers['x-container-sync-key'] = user_key + self._update_sync_to_headers(row['name'], sync_to, user_key, + realm, realm_key, 'PUT', headers) put_object(sync_to, name=row['name'], headers=headers, contents=FileLikeIter(body), proxy=self.select_http_proxy(), logger=self.logger, diff --git a/test/probe/test_container_sync.py b/test/probe/test_container_sync.py index 7282cfd50a..ea98e32383 100644 --- a/test/probe/test_container_sync.py +++ b/test/probe/test_container_sync.py @@ -266,6 +266,26 @@ class TestContainerSync(ReplProbeTest): % item) for item in mismatched_headers]) self.fail(msg) + def test_sync_newer_remote(self): + source_container, dest_container = self._setup_synced_containers() + + # upload to source + object_name = 'object-%s' % uuid.uuid4() + client.put_object(self.url, self.token, source_container, object_name, + 'old-source-body') + + # upload to dest with same name + client.put_object(self.url, self.token, dest_container, object_name, + 'new-test-body') + + # cycle container-sync + Manager(['container-sync']).once() + + # verify that the remote object did not change + resp_headers, body = client.get_object(self.url, self.token, + dest_container, object_name) + self.assertEqual(body, 'new-test-body') + if __name__ == "__main__": unittest.main() diff --git a/test/unit/common/test_internal_client.py b/test/unit/common/test_internal_client.py index 834206e55b..68dbc3e18d 100644 --- a/test/unit/common/test_internal_client.py +++ b/test/unit/common/test_internal_client.py @@ -343,6 +343,9 @@ class TestInternalClient(unittest.TestCase): def read(self): return json.dumps(body) + def info(self): + return {} + for timeout in (0.0, 42.0, None): mocked_func = 'swift.common.internal_client.urllib2.urlopen' with mock.patch(mocked_func) as mock_urlopen: @@ -1181,76 +1184,84 @@ class TestGetAuth(unittest.TestCase): 'http://127.0.0.1', 'user', 'key', auth_version=2.0) -mock_time_value = 1401224049.98 - - -def mock_time(): - global mock_time_value - mock_time_value += 1 - return mock_time_value - - class TestSimpleClient(unittest.TestCase): + def _test_get_head(self, request, urlopen, method): + + mock_time_value = [1401224049.98] + + def mock_time(): + # global mock_time_value + mock_time_value[0] += 1 + return mock_time_value[0] + + with mock.patch('swift.common.internal_client.time', mock_time): + # basic request, only url as kwarg + request.return_value.get_type.return_value = "http" + urlopen.return_value.read.return_value = '' + urlopen.return_value.getcode.return_value = 200 + urlopen.return_value.info.return_value = {'content-length': '345'} + sc = internal_client.SimpleClient(url='http://127.0.0.1') + logger = FakeLogger() + retval = sc.retry_request( + method, headers={'content-length': '123'}, logger=logger) + self.assertEqual(urlopen.call_count, 1) + request.assert_called_with('http://127.0.0.1?format=json', + headers={'content-length': '123'}, + data=None) + self.assertEqual([{'content-length': '345'}, None], retval) + self.assertEqual(method, request.return_value.get_method()) + self.assertEqual(logger.log_dict['debug'], [( + ('-> 2014-05-27T20:54:11 ' + method + + ' http://127.0.0.1%3Fformat%3Djson 200 ' + '123 345 1401224050.98 1401224051.98 1.0 -',), {})]) + + # Check if JSON is decoded + urlopen.return_value.read.return_value = '{}' + retval = sc.retry_request(method) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with token + sc = internal_client.SimpleClient(url='http://127.0.0.1', + token='token') + retval = sc.retry_request(method) + request.assert_called_with('http://127.0.0.1?format=json', + headers={'X-Auth-Token': 'token'}, + data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with prefix + sc = internal_client.SimpleClient(url='http://127.0.0.1', + token='token') + retval = sc.retry_request(method, prefix="pre_") + request.assert_called_with( + 'http://127.0.0.1?format=json&prefix=pre_', + headers={'X-Auth-Token': 'token'}, data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with container name + retval = sc.retry_request(method, container='cont') + request.assert_called_with('http://127.0.0.1/cont?format=json', + headers={'X-Auth-Token': 'token'}, + data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with object name + retval = sc.retry_request(method, container='cont', name='obj') + request.assert_called_with('http://127.0.0.1/cont/obj', + headers={'X-Auth-Token': 'token'}, + data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + @mock.patch('eventlet.green.urllib2.urlopen') @mock.patch('eventlet.green.urllib2.Request') - @mock.patch('swift.common.internal_client.time', mock_time) def test_get(self, request, urlopen): - # basic GET request, only url as kwarg - request.return_value.get_type.return_value = "http" - urlopen.return_value.read.return_value = '' - urlopen.return_value.getcode.return_value = 200 - urlopen.return_value.info.return_value = {'content-length': '345'} - sc = internal_client.SimpleClient(url='http://127.0.0.1') - logger = FakeLogger() - retval = sc.retry_request( - 'GET', headers={'content-length': '123'}, logger=logger) - self.assertEqual(urlopen.call_count, 1) - request.assert_called_with('http://127.0.0.1?format=json', - headers={'content-length': '123'}, - data=None) - self.assertEqual([None, None], retval) - self.assertEqual('GET', request.return_value.get_method()) - self.assertEqual(logger.log_dict['debug'], [( - ('-> 2014-05-27T20:54:11 GET http://127.0.0.1%3Fformat%3Djson 200 ' - '123 345 1401224050.98 1401224051.98 1.0 -',), {})]) + self._test_get_head(request, urlopen, 'GET') - # Check if JSON is decoded - urlopen.return_value.read.return_value = '{}' - retval = sc.retry_request('GET') - self.assertEqual([None, {}], retval) - - # same as above, now with token - sc = internal_client.SimpleClient(url='http://127.0.0.1', - token='token') - retval = sc.retry_request('GET') - request.assert_called_with('http://127.0.0.1?format=json', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) - - # same as above, now with prefix - sc = internal_client.SimpleClient(url='http://127.0.0.1', - token='token') - retval = sc.retry_request('GET', prefix="pre_") - request.assert_called_with('http://127.0.0.1?format=json&prefix=pre_', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) - - # same as above, now with container name - retval = sc.retry_request('GET', container='cont') - request.assert_called_with('http://127.0.0.1/cont?format=json', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) - - # same as above, now with object name - retval = sc.retry_request('GET', container='cont', name='obj') - request.assert_called_with('http://127.0.0.1/cont/obj', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) + @mock.patch('eventlet.green.urllib2.urlopen') + @mock.patch('eventlet.green.urllib2.Request') + def test_head(self, request, urlopen): + self._test_get_head(request, urlopen, 'HEAD') @mock.patch('eventlet.green.urllib2.urlopen') @mock.patch('eventlet.green.urllib2.Request') @@ -1272,6 +1283,7 @@ class TestSimpleClient(unittest.TestCase): request.return_value.get_type.return_value = "http" mock_resp = mock.MagicMock() mock_resp.read.return_value = '' + mock_resp.info.return_value = {} urlopen.side_effect = [urllib2.URLError(''), mock_resp] sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1, token='token') @@ -1283,13 +1295,14 @@ class TestSimpleClient(unittest.TestCase): self.assertEqual(urlopen.call_count, 2) request.assert_called_with('http://127.0.0.1?format=json', data=None, headers={'X-Auth-Token': 'token'}) - self.assertEqual([None, None], retval) + self.assertEqual([{}, None], retval) self.assertEqual(sc.attempts, 2) @mock.patch('eventlet.green.urllib2.urlopen') def test_get_with_retries_param(self, mock_urlopen): mock_response = mock.MagicMock() mock_response.read.return_value = '' + mock_response.info.return_value = {} mock_urlopen.side_effect = internal_client.httplib.BadStatusLine('') c = internal_client.SimpleClient(url='http://127.0.0.1', token='token') self.assertEqual(c.retries, 5) @@ -1315,7 +1328,7 @@ class TestSimpleClient(unittest.TestCase): retval = c.retry_request('GET', retries=1) self.assertEqual(mock_sleep.call_count, 1) self.assertEqual(mock_urlopen.call_count, 2) - self.assertEqual([None, None], retval) + self.assertEqual([{}, None], retval) @mock.patch('eventlet.green.urllib2.urlopen') def test_request_with_retries_with_HTTPError(self, mock_urlopen): @@ -1380,9 +1393,13 @@ class TestSimpleClient(unittest.TestCase): url = 'https://127.0.0.1:1/a' class FakeConn(object): + def read(self): return 'irrelevant' + def info(self): + return {} + mocked = 'swift.common.internal_client.urllib2.urlopen' # module level methods diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index 9cb56d2d05..ef4a4f5a82 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -855,6 +855,8 @@ class TestContainerSync(unittest.TestCase): def _test_container_sync_row_put(self, realm, realm_key): orig_uuid = sync.uuid orig_put_object = sync.put_object + orig_head_object = sync.head_object + try: class FakeUUID(object): class uuid4(object): @@ -891,6 +893,7 @@ class TestContainerSync(unittest.TestCase): sync.put_object = fake_put_object expected_put_count = 0 + excepted_failure_count = 0 with mock.patch('swift.container.sync.InternalClient'): cs = sync.ContainerSync({}, container_ring=FakeRing(), @@ -911,6 +914,14 @@ class TestContainerSync(unittest.TestCase): # Success as everything says it worked. # simulate a row with data at 1.1 and later ctype, meta times created_at = ts_data.internal + '+1388+1388' # last modified = 1.2 + + def fake_object_in_rcontainer(row, sync_to, user_key, + broker, realm, realm_key): + return False + + orig_object_in_rcontainer = cs._object_in_remote_container + cs._object_in_remote_container = fake_object_in_rcontainer + self.assertTrue(cs.container_sync_row( {'deleted': False, 'name': 'object', @@ -935,6 +946,7 @@ class TestContainerSync(unittest.TestCase): iter('contents')) cs.swift.get_object = fake_get_object + # Success as everything says it worked, also checks 'date' and # 'last-modified' headers are removed and that 'etag' header is # stripped of double quotes. @@ -980,6 +992,7 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 self.assertEqual(len(exc), 1) self.assertEqual(str(exc[-1]), 'test exception') @@ -1003,6 +1016,7 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 self.assertEqual(len(exc), 1) self.assertEqual(str(exc[-1]), 'test client exception') @@ -1029,6 +1043,8 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) self.assertLogMessage('info', 'Unauth') def fake_put_object(*args, **kwargs): @@ -1044,6 +1060,8 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) self.assertLogMessage('info', 'Not found', 1) def fake_put_object(*args, **kwargs): @@ -1059,10 +1077,121 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) self.assertLogMessage('error', 'ERROR Syncing') + + # Test the following cases: + # remote has the same date and a put doesn't take place + # remote has more up to date copy and a put doesn't take place + # head_object returns ClientException(404) and a put takes place + # head_object returns other ClientException put doesn't take place + # and we get failure + # head_object returns other Exception put does not take place + # and we get failure + # remote returns old copy and a put takes place + test_row = {'deleted': False, + 'name': 'object', + 'created_at': timestamp.internal, + 'etag': '1111'} + test_info = {'account': 'a', + 'container': 'c', + 'storage_policy_index': 0} + + actual_puts = [] + + def fake_put_object(*args, **kwargs): + actual_puts.append((args, kwargs)) + + def fake_head_object(*args, **kwargs): + return ({'x-timestamp': '1.2'}, '') + + sync.put_object = fake_put_object + sync.head_object = fake_head_object + cs._object_in_remote_container = orig_object_in_rcontainer + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, + realm, realm_key)) + # No additional put has taken place + self.assertEqual(len(actual_puts), 0) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + return ({'x-timestamp': '1.3'}, '') + + sync.head_object = fake_head_object + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, + realm, realm_key)) + # No additional put has taken place + self.assertEqual(len(actual_puts), 0) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + + actual_puts = [] + + def fake_head_object(*args, **kwargs): + raise ClientException('test client exception', http_status=404) + + sync.head_object = fake_head_object + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # Additional put has taken place + self.assertEqual(len(actual_puts), 1) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + raise ClientException('test client exception', http_status=401) + + sync.head_object = fake_head_object + self.assertFalse(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # No additional put has taken place, failures increased + self.assertEqual(len(actual_puts), 1) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + raise Exception() + + sync.head_object = fake_head_object + self.assertFalse(cs.container_sync_row( + test_row, + 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # No additional put has taken place, failures increased + self.assertEqual(len(actual_puts), 1) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + return ({'x-timestamp': '1.1'}, '') + + sync.head_object = fake_head_object + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # Additional put has taken place + self.assertEqual(len(actual_puts), 2) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + finally: sync.uuid = orig_uuid sync.put_object = orig_put_object + sync.head_object = orig_head_object def test_select_http_proxy_None(self):