diff --git a/swift/common/constraints.py b/swift/common/constraints.py index 005c7875f9..2db9f05ca4 100644 --- a/swift/common/constraints.py +++ b/swift/common/constraints.py @@ -320,7 +320,8 @@ def check_delete_headers(request): raise HTTPBadRequest(request=request, content_type='text/plain', body='Non-integer X-Delete-At') - if x_delete_at < time.time(): + if x_delete_at < time.time() and not utils.config_true_value( + request.headers.get('x-backend-replication', 'f')): raise HTTPBadRequest(request=request, content_type='text/plain', body='X-Delete-At in past') return request diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 769a954d92..b26dd74df9 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1920,6 +1920,8 @@ class BaseDiskFile(object): :param use_splice: if true, use zero-copy splice() to send data :param pipe_size: size of pipe buffer used in zero-copy operations :param use_linkat: if True, use open() with linkat() to create obj file + :param open_expired: if True, open() will not raise a DiskFileExpired if + object is expired """ reader_cls = None # must be set by subclasses writer_cls = None # must be set by subclasses @@ -1927,7 +1929,7 @@ class BaseDiskFile(object): def __init__(self, mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, - use_linkat=False, **kwargs): + use_linkat=False, open_expired=False, **kwargs): self._manager = mgr self._device_path = device_path self._logger = mgr.logger @@ -1936,6 +1938,7 @@ class BaseDiskFile(object): self._use_splice = use_splice self._pipe_size = pipe_size self._use_linkat = use_linkat + self._open_expired = open_expired # This might look a lttle hacky i.e tracking number of newly created # dirs to fsync only those many later. If there is a better way, # please suggest. @@ -2229,7 +2232,7 @@ class BaseDiskFile(object): data_file, "bad metadata x-delete-at value %s" % ( self._metadata['X-Delete-At'])) else: - if x_delete_at <= time.time(): + if x_delete_at <= time.time() and not self._open_expired: raise DiskFileExpired(metadata=self._metadata) try: metadata_size = int(self._metadata['Content-Length']) diff --git a/swift/obj/server.py b/swift/obj/server.py index b6d911656a..1efa3997c1 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -516,7 +516,8 @@ class ObjectController(BaseStorageServer): try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy) + policy=policy, open_expired=config_true_value( + request.headers.get('x-backend-replication', 'false'))) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -653,9 +654,6 @@ class ObjectController(BaseStorageServer): if error_response: return error_response new_delete_at = int(request.headers.get('X-Delete-At') or 0) - if new_delete_at and new_delete_at < time.time(): - return HTTPBadRequest(body='X-Delete-At in past', request=request, - content_type='text/plain') try: fsize = request.message_length() except ValueError as e: diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index e28d5f2882..99a356ea27 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -255,7 +255,7 @@ class Receiver(object): try: df = self.diskfile_mgr.get_diskfile_from_hash( self.device, self.partition, remote['object_hash'], - self.policy, frag_index=self.frag_index) + self.policy, frag_index=self.frag_index, open_expired=True) except exceptions.DiskFileNotExist: return {} try: diff --git a/test/unit/obj/common.py b/test/unit/obj/common.py index 3fc3dc2be7..b18ee40d7e 100644 --- a/test/unit/obj/common.py +++ b/test/unit/obj/common.py @@ -75,7 +75,7 @@ class BaseTest(unittest.TestCase): account='a', container='c', obj='o', body='test', extra_metadata=None, policy=None, frag_index=None, timestamp=None, df_mgr=None, - commit=True): + commit=True, verify=True): policy = policy or POLICIES.legacy object_parts = account, container, obj timestamp = Timestamp(time.time()) if timestamp is None else timestamp @@ -86,7 +86,7 @@ class BaseTest(unittest.TestCase): frag_index=frag_index) write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata, commit=commit) - if commit: + if commit and verify: # when we write and commit stub data, sanity check it's readable # and not quarantined because of any validation check with df.open(): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 08d45e8387..3490ab94b6 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -2899,22 +2899,23 @@ class DiskFileMixin(BaseDiskFileTestMixin): pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL)) def _simple_get_diskfile(self, partition='0', account='a', container='c', - obj='o', policy=None, frag_index=None): + obj='o', policy=None, frag_index=None, **kwargs): policy = policy or POLICIES.default df_mgr = self.df_router[policy] if policy.policy_type == EC_POLICY and frag_index is None: frag_index = 2 return df_mgr.get_diskfile(self.existing_device, partition, account, container, obj, - policy=policy, frag_index=frag_index) + policy=policy, frag_index=frag_index, + **kwargs) def _create_test_file(self, data, timestamp=None, metadata=None, - account='a', container='c', obj='o'): + account='a', container='c', obj='o', **kwargs): if metadata is None: metadata = {} metadata.setdefault('name', '/%s/%s/%s' % (account, container, obj)) df = self._simple_get_diskfile(account=account, container=container, - obj=obj) + obj=obj, **kwargs) if timestamp is None: timestamp = time() timestamp = Timestamp(timestamp) @@ -2984,6 +2985,16 @@ class DiskFileMixin(BaseDiskFileTestMixin): self._create_test_file, '1234567890', metadata={'X-Delete-At': '0'}) + try: + self._create_test_file('1234567890', open_expired=True, + metadata={'X-Delete-At': '0', + 'X-Object-Meta-Foo': 'bar'}) + df = self._simple_get_diskfile(open_expired=True) + md = df.read_metadata() + self.assertEqual(md['X-Object-Meta-Foo'], 'bar') + except SwiftException as err: + self.fail("Unexpected swift exception raised: %r" % err) + def test_open_not_expired(self): try: self._create_test_file( diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 7150ad294f..abfb71467f 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -24,7 +24,7 @@ import itertools from six.moves import urllib from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ - DiskFileDeleted + DiskFileDeleted, DiskFileExpired from swift.common import utils from swift.common.storage_policy import POLICIES, EC_POLICY from swift.common.utils import Timestamp @@ -165,7 +165,9 @@ class TestBaseSsync(BaseTest): self.assertNotEqual(v, rx_metadata.pop(k, None)) continue else: - self.assertEqual(v, rx_metadata.pop(k), k) + actual = rx_metadata.pop(k) + self.assertEqual(v, actual, 'Expected %r but got %r for %s' % + (v, actual, k)) self.assertFalse(rx_metadata) expected_body = self._get_object_data(tx_df._name, frag_index=frag_index) @@ -1344,6 +1346,98 @@ class TestSsyncReplication(TestBaseSsync): self.assertEqual(metadata['X-Object-Meta-Test'], oname) self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname) + def _check_no_longer_expired_object(self, obj_name, df, policy): + # verify that objects with x-delete-at metadata that are not expired + # can be sync'd + rx_node_index = 0 + + def do_ssync(): + # create ssync sender instance... + suffixes = [os.path.basename(os.path.dirname(df._datadir))] + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + return sender() + + with self.assertRaises(DiskFileExpired): + df.open() # sanity check - expired + t1_meta = next(self.ts_iter) + df.write_metadata({'X-Timestamp': t1_meta.internal}) # no x-delete-at + df.open() # sanity check - no longer expired + + success, in_sync_objs = do_ssync() + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + self._verify_ondisk_files({obj_name: [df]}, policy) + + # update object metadata with x-delete-at in distant future + t2_meta = next(self.ts_iter) + df.write_metadata({'X-Timestamp': t2_meta.internal, + 'X-Delete-At': str(int(t2_meta) + 10000)}) + df.open() # sanity check - not expired + + success, in_sync_objs = do_ssync() + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + self._verify_ondisk_files({obj_name: [df]}, policy) + + # update object metadata with x-delete-at in not so distant future to + # check that we can update rx with older x-delete-at than it's current + t3_meta = next(self.ts_iter) + df.write_metadata({'X-Timestamp': t3_meta.internal, + 'X-Delete-At': str(int(t2_meta) + 5000)}) + df.open() # sanity check - not expired + + success, in_sync_objs = do_ssync() + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + self._verify_ondisk_files({obj_name: [df]}, policy) + + def test_no_longer_expired_object_syncs(self): + policy = POLICIES.default + # simulate o1 that was PUT with x-delete-at that is now expired but + # later had a POST that had no x-delete-at: object should not expire. + tx_df_mgr = self.daemon._df_router[policy] + t1 = next(self.ts_iter) + obj_name = 'o1' + metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'} + df = self._make_diskfile( + obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name), + extra_metadata=metadata, timestamp=t1, policy=policy, + df_mgr=tx_df_mgr, verify=False) + + self._check_no_longer_expired_object(obj_name, df, policy) + + def test_no_longer_expired_object_syncs_meta(self): + policy = POLICIES.default + # simulate o1 that was PUT with x-delete-at that is now expired but + # later had a POST that had no x-delete-at: object should not expire. + tx_df_mgr = self.daemon._df_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + t1 = next(self.ts_iter) + obj_name = 'o1' + metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'} + df = self._make_diskfile( + obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name), + extra_metadata=metadata, timestamp=t1, policy=policy, + df_mgr=tx_df_mgr, verify=False) + # rx got the .data file but is missing the .meta + rx_df = self._make_diskfile( + obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name), + extra_metadata=metadata, timestamp=t1, policy=policy, + df_mgr=rx_df_mgr, verify=False) + with self.assertRaises(DiskFileExpired): + rx_df.open() # sanity check - expired + + self._check_no_longer_expired_object(obj_name, df, policy) + def test_meta_file_not_synced_to_legacy_receiver(self): # verify that the sender does sync a data file to a legacy receiver, # but does not PUT meta file content to a legacy receiver diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index f3b5c32af1..c21d8b3588 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -35,6 +35,7 @@ from swift.obj.reconstructor import ObjectReconstructor from test import listen_zero, unit from test.unit import debug_logger, patch_policies, make_timestamp_iter +from test.unit.obj.common import write_diskfile @unit.patch_policies() @@ -665,6 +666,36 @@ class TestReceiver(unittest.TestCase): self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.exception.called) + def test_MISSING_CHECK_missing_meta_expired_data(self): + # verify that even when rx disk file has expired x-delete-at, it will + # still be opened and checked for missing meta + self.controller.logger = mock.MagicMock() + ts1 = next(make_timestamp_iter()) + df = self.controller.get_diskfile( + 'sda1', '1', self.account1, self.container1, self.object1, + POLICIES[0]) + write_diskfile(df, ts1, extra_metadata={'X-Delete-At': 0}) + + # make a request - expect newer metadata to be wanted + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + ts1.internal + ' m:30d40\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = req.get_response(self.controller) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', + 'c2519f265f9633e74f9b2fe3b9bec27d m', + ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + @patch_policies(with_ec_default=True) def test_MISSING_CHECK_missing_durable(self): self.controller.logger = mock.MagicMock()