diff --git a/swift/common/internal_client.py b/swift/common/internal_client.py index 2413d9ad6d..d9911326f1 100644 --- a/swift/common/internal_client.py +++ b/swift/common/internal_client.py @@ -769,6 +769,7 @@ class SimpleClient(object): req.get_method = lambda: method conn = urllib2.urlopen(req, timeout=timeout) body = conn.read() + info = conn.info() try: body_data = json.loads(body) except ValueError: @@ -792,13 +793,13 @@ class SimpleClient(object): url, conn.getcode(), sent_content_length, - conn.info()['content-length'], + info['content-length'], trans_start, trans_stop, trans_stop - trans_start, additional_info ))) - return [None, body_data] + return [info, body_data] def retry_request(self, method, **kwargs): retries = kwargs.pop('retries', self.retries) @@ -837,6 +838,12 @@ class SimpleClient(object): contents=contents.read(), **kwargs) +def head_object(url, **kwargs): + """For usage with container sync """ + client = SimpleClient(url=url) + return client.retry_request('HEAD', **kwargs) + + def put_object(url, **kwargs): """For usage with container sync """ client = SimpleClient(url=url) diff --git a/swift/container/sync.py b/swift/container/sync.py index 2ade27a5a1..c2fd863eec 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -29,7 +29,8 @@ from swift.container.backend import ContainerBroker from swift.container.sync_store import ContainerSyncStore from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.internal_client import ( - delete_object, put_object, InternalClient, UnexpectedResponse) + delete_object, put_object, head_object, + InternalClient, UnexpectedResponse) from swift.common.exceptions import ClientException from swift.common.ring import Ring from swift.common.ring.utils import is_local_device @@ -396,10 +397,84 @@ class ContainerSync(Daemon): self.logger.exception(_('ERROR Syncing %s'), broker if broker else path) + def _update_sync_to_headers(self, name, sync_to, user_key, + realm, realm_key, method, headers): + """ + Updates container sync headers + + :param name: The name of the object + :param sync_to: The URL to the remote container. + :param user_key: The X-Container-Sync-Key to use when sending requests + to the other container. + :param realm: The realm from self.realms_conf, if there is one. + If None, fallback to using the older allowed_sync_hosts + way of syncing. + :param realm_key: The realm key from self.realms_conf, if there + is one. If None, fallback to using the older + allowed_sync_hosts way of syncing. + :param method: HTTP method to create sig with + :param headers: headers to update with container sync headers + """ + if realm and realm_key: + nonce = uuid.uuid4().hex + path = urlparse(sync_to).path + '/' + quote(name) + sig = self.realms_conf.get_sig(method, path, + headers.get('x-timestamp', 0), + nonce, realm_key, + user_key) + headers['x-container-sync-auth'] = '%s %s %s' % (realm, + nonce, + sig) + else: + headers['x-container-sync-key'] = user_key + + def _object_in_remote_container(self, name, sync_to, user_key, + realm, realm_key, timestamp): + """ + Performs head object on remote to eliminate extra remote put and + local get object calls + + :param name: The name of the object in the updated row in the local + database triggering the sync update. + :param sync_to: The URL to the remote container. + :param user_key: The X-Container-Sync-Key to use when sending requests + to the other container. + :param realm: The realm from self.realms_conf, if there is one. + If None, fallback to using the older allowed_sync_hosts + way of syncing. + :param realm_key: The realm key from self.realms_conf, if there + is one. If None, fallback to using the older + allowed_sync_hosts way of syncing. + :param timestamp: last modified date of local object + :returns: True if object already exists in remote + """ + headers = {'x-timestamp': timestamp.internal} + self._update_sync_to_headers(name, sync_to, user_key, realm, + realm_key, 'HEAD', headers) + try: + metadata, _ = head_object(sync_to, name=name, + headers=headers, + proxy=self.select_http_proxy(), + logger=self.logger, + retries=0) + remote_ts = Timestamp(metadata.get('x-timestamp', 0)) + self.logger.debug("remote obj timestamp %s local obj %s" % + (timestamp.internal, remote_ts.internal)) + if timestamp <= remote_ts: + return True + # Object in remote should be updated + return False + except ClientException as http_err: + # Object not in remote + if http_err.http_status == 404: + return False + raise http_err + def container_sync_row(self, row, sync_to, user_key, broker, info, realm, realm_key): """ Sends the update the row indicates to the sync_to container. + Update can be either delete or put. :param row: The updated row in the local database triggering the sync update. @@ -427,17 +502,9 @@ class ContainerSync(Daemon): # timestamp of the source tombstone try: headers = {'x-timestamp': ts_data.internal} - if realm and realm_key: - nonce = uuid.uuid4().hex - path = urlparse(sync_to).path + '/' + quote( - row['name']) - sig = self.realms_conf.get_sig( - 'DELETE', path, headers['x-timestamp'], nonce, - realm_key, user_key) - headers['x-container-sync-auth'] = '%s %s %s' % ( - realm, nonce, sig) - else: - headers['x-container-sync-key'] = user_key + self._update_sync_to_headers(row['name'], sync_to, + user_key, realm, realm_key, + 'DELETE', headers) delete_object(sync_to, name=row['name'], headers=headers, proxy=self.select_http_proxy(), logger=self.logger, @@ -451,6 +518,10 @@ class ContainerSync(Daemon): else: # when sync'ing a live object, use ts_meta - this is the time # at which the source object was last modified by a PUT or POST + if self._object_in_remote_container(row['name'], + sync_to, user_key, realm, + realm_key, ts_meta): + return True exc = None # look up for the newest one headers_out = {'X-Newest': True, @@ -485,16 +556,8 @@ class ContainerSync(Daemon): if 'content-type' in headers: headers['content-type'] = clean_content_type( headers['content-type']) - if realm and realm_key: - nonce = uuid.uuid4().hex - path = urlparse(sync_to).path + '/' + quote(row['name']) - sig = self.realms_conf.get_sig( - 'PUT', path, headers['x-timestamp'], nonce, realm_key, - user_key) - headers['x-container-sync-auth'] = '%s %s %s' % ( - realm, nonce, sig) - else: - headers['x-container-sync-key'] = user_key + self._update_sync_to_headers(row['name'], sync_to, user_key, + realm, realm_key, 'PUT', headers) put_object(sync_to, name=row['name'], headers=headers, contents=FileLikeIter(body), proxy=self.select_http_proxy(), logger=self.logger, diff --git a/test/probe/test_container_sync.py b/test/probe/test_container_sync.py index 7282cfd50a..ea98e32383 100644 --- a/test/probe/test_container_sync.py +++ b/test/probe/test_container_sync.py @@ -266,6 +266,26 @@ class TestContainerSync(ReplProbeTest): % item) for item in mismatched_headers]) self.fail(msg) + def test_sync_newer_remote(self): + source_container, dest_container = self._setup_synced_containers() + + # upload to source + object_name = 'object-%s' % uuid.uuid4() + client.put_object(self.url, self.token, source_container, object_name, + 'old-source-body') + + # upload to dest with same name + client.put_object(self.url, self.token, dest_container, object_name, + 'new-test-body') + + # cycle container-sync + Manager(['container-sync']).once() + + # verify that the remote object did not change + resp_headers, body = client.get_object(self.url, self.token, + dest_container, object_name) + self.assertEqual(body, 'new-test-body') + if __name__ == "__main__": unittest.main() diff --git a/test/unit/common/test_internal_client.py b/test/unit/common/test_internal_client.py index 834206e55b..68dbc3e18d 100644 --- a/test/unit/common/test_internal_client.py +++ b/test/unit/common/test_internal_client.py @@ -343,6 +343,9 @@ class TestInternalClient(unittest.TestCase): def read(self): return json.dumps(body) + def info(self): + return {} + for timeout in (0.0, 42.0, None): mocked_func = 'swift.common.internal_client.urllib2.urlopen' with mock.patch(mocked_func) as mock_urlopen: @@ -1181,76 +1184,84 @@ class TestGetAuth(unittest.TestCase): 'http://127.0.0.1', 'user', 'key', auth_version=2.0) -mock_time_value = 1401224049.98 - - -def mock_time(): - global mock_time_value - mock_time_value += 1 - return mock_time_value - - class TestSimpleClient(unittest.TestCase): + def _test_get_head(self, request, urlopen, method): + + mock_time_value = [1401224049.98] + + def mock_time(): + # global mock_time_value + mock_time_value[0] += 1 + return mock_time_value[0] + + with mock.patch('swift.common.internal_client.time', mock_time): + # basic request, only url as kwarg + request.return_value.get_type.return_value = "http" + urlopen.return_value.read.return_value = '' + urlopen.return_value.getcode.return_value = 200 + urlopen.return_value.info.return_value = {'content-length': '345'} + sc = internal_client.SimpleClient(url='http://127.0.0.1') + logger = FakeLogger() + retval = sc.retry_request( + method, headers={'content-length': '123'}, logger=logger) + self.assertEqual(urlopen.call_count, 1) + request.assert_called_with('http://127.0.0.1?format=json', + headers={'content-length': '123'}, + data=None) + self.assertEqual([{'content-length': '345'}, None], retval) + self.assertEqual(method, request.return_value.get_method()) + self.assertEqual(logger.log_dict['debug'], [( + ('-> 2014-05-27T20:54:11 ' + method + + ' http://127.0.0.1%3Fformat%3Djson 200 ' + '123 345 1401224050.98 1401224051.98 1.0 -',), {})]) + + # Check if JSON is decoded + urlopen.return_value.read.return_value = '{}' + retval = sc.retry_request(method) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with token + sc = internal_client.SimpleClient(url='http://127.0.0.1', + token='token') + retval = sc.retry_request(method) + request.assert_called_with('http://127.0.0.1?format=json', + headers={'X-Auth-Token': 'token'}, + data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with prefix + sc = internal_client.SimpleClient(url='http://127.0.0.1', + token='token') + retval = sc.retry_request(method, prefix="pre_") + request.assert_called_with( + 'http://127.0.0.1?format=json&prefix=pre_', + headers={'X-Auth-Token': 'token'}, data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with container name + retval = sc.retry_request(method, container='cont') + request.assert_called_with('http://127.0.0.1/cont?format=json', + headers={'X-Auth-Token': 'token'}, + data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + + # same as above, now with object name + retval = sc.retry_request(method, container='cont', name='obj') + request.assert_called_with('http://127.0.0.1/cont/obj', + headers={'X-Auth-Token': 'token'}, + data=None) + self.assertEqual([{'content-length': '345'}, {}], retval) + @mock.patch('eventlet.green.urllib2.urlopen') @mock.patch('eventlet.green.urllib2.Request') - @mock.patch('swift.common.internal_client.time', mock_time) def test_get(self, request, urlopen): - # basic GET request, only url as kwarg - request.return_value.get_type.return_value = "http" - urlopen.return_value.read.return_value = '' - urlopen.return_value.getcode.return_value = 200 - urlopen.return_value.info.return_value = {'content-length': '345'} - sc = internal_client.SimpleClient(url='http://127.0.0.1') - logger = FakeLogger() - retval = sc.retry_request( - 'GET', headers={'content-length': '123'}, logger=logger) - self.assertEqual(urlopen.call_count, 1) - request.assert_called_with('http://127.0.0.1?format=json', - headers={'content-length': '123'}, - data=None) - self.assertEqual([None, None], retval) - self.assertEqual('GET', request.return_value.get_method()) - self.assertEqual(logger.log_dict['debug'], [( - ('-> 2014-05-27T20:54:11 GET http://127.0.0.1%3Fformat%3Djson 200 ' - '123 345 1401224050.98 1401224051.98 1.0 -',), {})]) + self._test_get_head(request, urlopen, 'GET') - # Check if JSON is decoded - urlopen.return_value.read.return_value = '{}' - retval = sc.retry_request('GET') - self.assertEqual([None, {}], retval) - - # same as above, now with token - sc = internal_client.SimpleClient(url='http://127.0.0.1', - token='token') - retval = sc.retry_request('GET') - request.assert_called_with('http://127.0.0.1?format=json', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) - - # same as above, now with prefix - sc = internal_client.SimpleClient(url='http://127.0.0.1', - token='token') - retval = sc.retry_request('GET', prefix="pre_") - request.assert_called_with('http://127.0.0.1?format=json&prefix=pre_', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) - - # same as above, now with container name - retval = sc.retry_request('GET', container='cont') - request.assert_called_with('http://127.0.0.1/cont?format=json', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) - - # same as above, now with object name - retval = sc.retry_request('GET', container='cont', name='obj') - request.assert_called_with('http://127.0.0.1/cont/obj', - headers={'X-Auth-Token': 'token'}, - data=None) - self.assertEqual([None, {}], retval) + @mock.patch('eventlet.green.urllib2.urlopen') + @mock.patch('eventlet.green.urllib2.Request') + def test_head(self, request, urlopen): + self._test_get_head(request, urlopen, 'HEAD') @mock.patch('eventlet.green.urllib2.urlopen') @mock.patch('eventlet.green.urllib2.Request') @@ -1272,6 +1283,7 @@ class TestSimpleClient(unittest.TestCase): request.return_value.get_type.return_value = "http" mock_resp = mock.MagicMock() mock_resp.read.return_value = '' + mock_resp.info.return_value = {} urlopen.side_effect = [urllib2.URLError(''), mock_resp] sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1, token='token') @@ -1283,13 +1295,14 @@ class TestSimpleClient(unittest.TestCase): self.assertEqual(urlopen.call_count, 2) request.assert_called_with('http://127.0.0.1?format=json', data=None, headers={'X-Auth-Token': 'token'}) - self.assertEqual([None, None], retval) + self.assertEqual([{}, None], retval) self.assertEqual(sc.attempts, 2) @mock.patch('eventlet.green.urllib2.urlopen') def test_get_with_retries_param(self, mock_urlopen): mock_response = mock.MagicMock() mock_response.read.return_value = '' + mock_response.info.return_value = {} mock_urlopen.side_effect = internal_client.httplib.BadStatusLine('') c = internal_client.SimpleClient(url='http://127.0.0.1', token='token') self.assertEqual(c.retries, 5) @@ -1315,7 +1328,7 @@ class TestSimpleClient(unittest.TestCase): retval = c.retry_request('GET', retries=1) self.assertEqual(mock_sleep.call_count, 1) self.assertEqual(mock_urlopen.call_count, 2) - self.assertEqual([None, None], retval) + self.assertEqual([{}, None], retval) @mock.patch('eventlet.green.urllib2.urlopen') def test_request_with_retries_with_HTTPError(self, mock_urlopen): @@ -1380,9 +1393,13 @@ class TestSimpleClient(unittest.TestCase): url = 'https://127.0.0.1:1/a' class FakeConn(object): + def read(self): return 'irrelevant' + def info(self): + return {} + mocked = 'swift.common.internal_client.urllib2.urlopen' # module level methods diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index 9cb56d2d05..ef4a4f5a82 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -855,6 +855,8 @@ class TestContainerSync(unittest.TestCase): def _test_container_sync_row_put(self, realm, realm_key): orig_uuid = sync.uuid orig_put_object = sync.put_object + orig_head_object = sync.head_object + try: class FakeUUID(object): class uuid4(object): @@ -891,6 +893,7 @@ class TestContainerSync(unittest.TestCase): sync.put_object = fake_put_object expected_put_count = 0 + excepted_failure_count = 0 with mock.patch('swift.container.sync.InternalClient'): cs = sync.ContainerSync({}, container_ring=FakeRing(), @@ -911,6 +914,14 @@ class TestContainerSync(unittest.TestCase): # Success as everything says it worked. # simulate a row with data at 1.1 and later ctype, meta times created_at = ts_data.internal + '+1388+1388' # last modified = 1.2 + + def fake_object_in_rcontainer(row, sync_to, user_key, + broker, realm, realm_key): + return False + + orig_object_in_rcontainer = cs._object_in_remote_container + cs._object_in_remote_container = fake_object_in_rcontainer + self.assertTrue(cs.container_sync_row( {'deleted': False, 'name': 'object', @@ -935,6 +946,7 @@ class TestContainerSync(unittest.TestCase): iter('contents')) cs.swift.get_object = fake_get_object + # Success as everything says it worked, also checks 'date' and # 'last-modified' headers are removed and that 'etag' header is # stripped of double quotes. @@ -980,6 +992,7 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 self.assertEqual(len(exc), 1) self.assertEqual(str(exc[-1]), 'test exception') @@ -1003,6 +1016,7 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 self.assertEqual(len(exc), 1) self.assertEqual(str(exc[-1]), 'test client exception') @@ -1029,6 +1043,8 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) self.assertLogMessage('info', 'Unauth') def fake_put_object(*args, **kwargs): @@ -1044,6 +1060,8 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) self.assertLogMessage('info', 'Not found', 1) def fake_put_object(*args, **kwargs): @@ -1059,10 +1077,121 @@ class TestContainerSync(unittest.TestCase): {'account': 'a', 'container': 'c', 'storage_policy_index': 0}, realm, realm_key)) self.assertEqual(cs.container_puts, expected_put_count) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) self.assertLogMessage('error', 'ERROR Syncing') + + # Test the following cases: + # remote has the same date and a put doesn't take place + # remote has more up to date copy and a put doesn't take place + # head_object returns ClientException(404) and a put takes place + # head_object returns other ClientException put doesn't take place + # and we get failure + # head_object returns other Exception put does not take place + # and we get failure + # remote returns old copy and a put takes place + test_row = {'deleted': False, + 'name': 'object', + 'created_at': timestamp.internal, + 'etag': '1111'} + test_info = {'account': 'a', + 'container': 'c', + 'storage_policy_index': 0} + + actual_puts = [] + + def fake_put_object(*args, **kwargs): + actual_puts.append((args, kwargs)) + + def fake_head_object(*args, **kwargs): + return ({'x-timestamp': '1.2'}, '') + + sync.put_object = fake_put_object + sync.head_object = fake_head_object + cs._object_in_remote_container = orig_object_in_rcontainer + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, + realm, realm_key)) + # No additional put has taken place + self.assertEqual(len(actual_puts), 0) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + return ({'x-timestamp': '1.3'}, '') + + sync.head_object = fake_head_object + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, + realm, realm_key)) + # No additional put has taken place + self.assertEqual(len(actual_puts), 0) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + + actual_puts = [] + + def fake_head_object(*args, **kwargs): + raise ClientException('test client exception', http_status=404) + + sync.head_object = fake_head_object + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # Additional put has taken place + self.assertEqual(len(actual_puts), 1) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + raise ClientException('test client exception', http_status=401) + + sync.head_object = fake_head_object + self.assertFalse(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # No additional put has taken place, failures increased + self.assertEqual(len(actual_puts), 1) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + raise Exception() + + sync.head_object = fake_head_object + self.assertFalse(cs.container_sync_row( + test_row, + 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # No additional put has taken place, failures increased + self.assertEqual(len(actual_puts), 1) + excepted_failure_count += 1 + self.assertEqual(cs.container_failures, excepted_failure_count) + + def fake_head_object(*args, **kwargs): + return ({'x-timestamp': '1.1'}, '') + + sync.head_object = fake_head_object + self.assertTrue(cs.container_sync_row( + test_row, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), + test_info, realm, realm_key)) + # Additional put has taken place + self.assertEqual(len(actual_puts), 2) + # No additional errors + self.assertEqual(cs.container_failures, excepted_failure_count) + finally: sync.uuid = orig_uuid sync.put_object = orig_put_object + sync.head_object = orig_head_object def test_select_http_proxy_None(self):