Fix SSYNC failing to replicate unexpired object

Fix a situation where SSYNC would fail to replicate a valid object because
the datafile contains an expired X-Delete-At information while a metafile
contains no X-Delete-At information. Example:
 - 1454619054.02968.data => contains X-Delete-At: 1454619654
 - 1454619056.04876.meta => does not contain X-Delete-At info

In this situation, the replicator tries to PUT the datafile, and then
to POST the metadata. Previously, if the receiver has the datafile but
current time is greater than the X-Delete-At, then it considers it to
be expired and requests no updates from the sender, so the metafile is
never synced. If the receiver does not have the datafile then it does
request updates from the sender, but the ssync PUT subrequest is
refused if the current time is greater than the X-Delete-At (the
object is expired). If the datafile is transfered, the ssync POST
subrequest fails because the object does not exist (expired).

This commit allows PUT and POST to works so that the object can be
replicated, by enabling the receiver object server to open expired
diskfiles when handling replication requests.

Closes-Bug: #1683689
Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Change-Id: I919994ead2b20dbb6c5671c208823e8b7f513715
This commit is contained in:
Romain LE DISEZ 2017-04-14 17:21:22 +02:00 committed by Alistair Coles
parent bcd0eb70af
commit 38d35797df
8 changed files with 154 additions and 16 deletions

View File

@ -320,7 +320,8 @@ def check_delete_headers(request):
raise HTTPBadRequest(request=request, content_type='text/plain', raise HTTPBadRequest(request=request, content_type='text/plain',
body='Non-integer X-Delete-At') body='Non-integer X-Delete-At')
if x_delete_at < time.time(): if x_delete_at < time.time() and not utils.config_true_value(
request.headers.get('x-backend-replication', 'f')):
raise HTTPBadRequest(request=request, content_type='text/plain', raise HTTPBadRequest(request=request, content_type='text/plain',
body='X-Delete-At in past') body='X-Delete-At in past')
return request return request

View File

@ -1903,6 +1903,8 @@ class BaseDiskFile(object):
:param use_splice: if true, use zero-copy splice() to send data :param use_splice: if true, use zero-copy splice() to send data
:param pipe_size: size of pipe buffer used in zero-copy operations :param pipe_size: size of pipe buffer used in zero-copy operations
:param use_linkat: if True, use open() with linkat() to create obj file :param use_linkat: if True, use open() with linkat() to create obj file
:param open_expired: if True, open() will not raise a DiskFileExpired if
object is expired
""" """
reader_cls = None # must be set by subclasses reader_cls = None # must be set by subclasses
writer_cls = None # must be set by subclasses writer_cls = None # must be set by subclasses
@ -1910,7 +1912,7 @@ class BaseDiskFile(object):
def __init__(self, mgr, device_path, partition, def __init__(self, mgr, device_path, partition,
account=None, container=None, obj=None, _datadir=None, account=None, container=None, obj=None, _datadir=None,
policy=None, use_splice=False, pipe_size=None, policy=None, use_splice=False, pipe_size=None,
use_linkat=False, **kwargs): use_linkat=False, open_expired=False, **kwargs):
self._manager = mgr self._manager = mgr
self._device_path = device_path self._device_path = device_path
self._logger = mgr.logger self._logger = mgr.logger
@ -1919,6 +1921,7 @@ class BaseDiskFile(object):
self._use_splice = use_splice self._use_splice = use_splice
self._pipe_size = pipe_size self._pipe_size = pipe_size
self._use_linkat = use_linkat self._use_linkat = use_linkat
self._open_expired = open_expired
# This might look a lttle hacky i.e tracking number of newly created # This might look a lttle hacky i.e tracking number of newly created
# dirs to fsync only those many later. If there is a better way, # dirs to fsync only those many later. If there is a better way,
# please suggest. # please suggest.
@ -2217,7 +2220,7 @@ class BaseDiskFile(object):
data_file, "bad metadata x-delete-at value %s" % ( data_file, "bad metadata x-delete-at value %s" % (
self._metadata['X-Delete-At'])) self._metadata['X-Delete-At']))
else: else:
if x_delete_at <= time.time(): if x_delete_at <= time.time() and not self._open_expired:
raise DiskFileExpired(metadata=self._metadata) raise DiskFileExpired(metadata=self._metadata)
try: try:
metadata_size = int(self._metadata['Content-Length']) metadata_size = int(self._metadata['Content-Length'])

View File

@ -516,7 +516,8 @@ class ObjectController(BaseStorageServer):
try: try:
disk_file = self.get_diskfile( disk_file = self.get_diskfile(
device, partition, account, container, obj, device, partition, account, container, obj,
policy=policy) policy=policy, open_expired=config_true_value(
request.headers.get('x-backend-replication', 'false')))
except DiskFileDeviceUnavailable: except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
try: try:
@ -653,9 +654,6 @@ class ObjectController(BaseStorageServer):
if error_response: if error_response:
return error_response return error_response
new_delete_at = int(request.headers.get('X-Delete-At') or 0) new_delete_at = int(request.headers.get('X-Delete-At') or 0)
if new_delete_at and new_delete_at < time.time():
return HTTPBadRequest(body='X-Delete-At in past', request=request,
content_type='text/plain')
try: try:
fsize = request.message_length() fsize = request.message_length()
except ValueError as e: except ValueError as e:

View File

@ -257,7 +257,7 @@ class Receiver(object):
try: try:
df = self.diskfile_mgr.get_diskfile_from_hash( df = self.diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, remote['object_hash'], self.device, self.partition, remote['object_hash'],
self.policy, frag_index=self.frag_index) self.policy, frag_index=self.frag_index, open_expired=True)
except exceptions.DiskFileNotExist: except exceptions.DiskFileNotExist:
return {} return {}
try: try:

View File

@ -75,7 +75,7 @@ class BaseTest(unittest.TestCase):
account='a', container='c', obj='o', body='test', account='a', container='c', obj='o', body='test',
extra_metadata=None, policy=None, extra_metadata=None, policy=None,
frag_index=None, timestamp=None, df_mgr=None, frag_index=None, timestamp=None, df_mgr=None,
commit=True): commit=True, verify=True):
policy = policy or POLICIES.legacy policy = policy or POLICIES.legacy
object_parts = account, container, obj object_parts = account, container, obj
timestamp = Timestamp(time.time()) if timestamp is None else timestamp timestamp = Timestamp(time.time()) if timestamp is None else timestamp
@ -86,7 +86,7 @@ class BaseTest(unittest.TestCase):
frag_index=frag_index) frag_index=frag_index)
write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata, write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata,
commit=commit) commit=commit)
if commit: if commit and verify:
# when we write and commit stub data, sanity check it's readable # when we write and commit stub data, sanity check it's readable
# and not quarantined because of any validation check # and not quarantined because of any validation check
with df.open(): with df.open():

View File

@ -2847,22 +2847,23 @@ class DiskFileMixin(BaseDiskFileTestMixin):
pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL)) pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL))
def _simple_get_diskfile(self, partition='0', account='a', container='c', def _simple_get_diskfile(self, partition='0', account='a', container='c',
obj='o', policy=None, frag_index=None): obj='o', policy=None, frag_index=None, **kwargs):
policy = policy or POLICIES.default policy = policy or POLICIES.default
df_mgr = self.df_router[policy] df_mgr = self.df_router[policy]
if policy.policy_type == EC_POLICY and frag_index is None: if policy.policy_type == EC_POLICY and frag_index is None:
frag_index = 2 frag_index = 2
return df_mgr.get_diskfile(self.existing_device, partition, return df_mgr.get_diskfile(self.existing_device, partition,
account, container, obj, account, container, obj,
policy=policy, frag_index=frag_index) policy=policy, frag_index=frag_index,
**kwargs)
def _create_test_file(self, data, timestamp=None, metadata=None, def _create_test_file(self, data, timestamp=None, metadata=None,
account='a', container='c', obj='o'): account='a', container='c', obj='o', **kwargs):
if metadata is None: if metadata is None:
metadata = {} metadata = {}
metadata.setdefault('name', '/%s/%s/%s' % (account, container, obj)) metadata.setdefault('name', '/%s/%s/%s' % (account, container, obj))
df = self._simple_get_diskfile(account=account, container=container, df = self._simple_get_diskfile(account=account, container=container,
obj=obj) obj=obj, **kwargs)
if timestamp is None: if timestamp is None:
timestamp = time() timestamp = time()
timestamp = Timestamp(timestamp) timestamp = Timestamp(timestamp)
@ -2932,6 +2933,16 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self._create_test_file, self._create_test_file,
'1234567890', metadata={'X-Delete-At': '0'}) '1234567890', metadata={'X-Delete-At': '0'})
try:
self._create_test_file('1234567890', open_expired=True,
metadata={'X-Delete-At': '0',
'X-Object-Meta-Foo': 'bar'})
df = self._simple_get_diskfile(open_expired=True)
md = df.read_metadata()
self.assertEqual(md['X-Object-Meta-Foo'], 'bar')
except SwiftException as err:
self.fail("Unexpected swift exception raised: %r" % err)
def test_open_not_expired(self): def test_open_not_expired(self):
try: try:
self._create_test_file( self._create_test_file(

View File

@ -24,7 +24,7 @@ import itertools
from six.moves import urllib from six.moves import urllib
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileDeleted DiskFileDeleted, DiskFileExpired
from swift.common import utils from swift.common import utils
from swift.common.storage_policy import POLICIES, EC_POLICY from swift.common.storage_policy import POLICIES, EC_POLICY
from swift.common.utils import Timestamp from swift.common.utils import Timestamp
@ -164,7 +164,9 @@ class TestBaseSsync(BaseTest):
self.assertNotEqual(v, rx_metadata.pop(k, None)) self.assertNotEqual(v, rx_metadata.pop(k, None))
continue continue
else: else:
self.assertEqual(v, rx_metadata.pop(k), k) actual = rx_metadata.pop(k)
self.assertEqual(v, actual, 'Expected %r but got %r for %s' %
(v, actual, k))
self.assertFalse(rx_metadata) self.assertFalse(rx_metadata)
expected_body = self._get_object_data(tx_df._name, expected_body = self._get_object_data(tx_df._name,
frag_index=frag_index) frag_index=frag_index)
@ -1343,6 +1345,98 @@ class TestSsyncReplication(TestBaseSsync):
self.assertEqual(metadata['X-Object-Meta-Test'], oname) self.assertEqual(metadata['X-Object-Meta-Test'], oname)
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname) self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
def _check_no_longer_expired_object(self, obj_name, df, policy):
# verify that objects with x-delete-at metadata that are not expired
# can be sync'd
rx_node_index = 0
def do_ssync():
# create ssync sender instance...
suffixes = [os.path.basename(os.path.dirname(df._datadir))]
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
return sender()
with self.assertRaises(DiskFileExpired):
df.open() # sanity check - expired
t1_meta = next(self.ts_iter)
df.write_metadata({'X-Timestamp': t1_meta.internal}) # no x-delete-at
df.open() # sanity check - no longer expired
success, in_sync_objs = do_ssync()
self.assertEqual(1, len(in_sync_objs))
self.assertTrue(success)
self._verify_ondisk_files({obj_name: [df]}, policy)
# update object metadata with x-delete-at in distant future
t2_meta = next(self.ts_iter)
df.write_metadata({'X-Timestamp': t2_meta.internal,
'X-Delete-At': str(int(t2_meta) + 10000)})
df.open() # sanity check - not expired
success, in_sync_objs = do_ssync()
self.assertEqual(1, len(in_sync_objs))
self.assertTrue(success)
self._verify_ondisk_files({obj_name: [df]}, policy)
# update object metadata with x-delete-at in not so distant future to
# check that we can update rx with older x-delete-at than it's current
t3_meta = next(self.ts_iter)
df.write_metadata({'X-Timestamp': t3_meta.internal,
'X-Delete-At': str(int(t2_meta) + 5000)})
df.open() # sanity check - not expired
success, in_sync_objs = do_ssync()
self.assertEqual(1, len(in_sync_objs))
self.assertTrue(success)
self._verify_ondisk_files({obj_name: [df]}, policy)
def test_no_longer_expired_object_syncs(self):
policy = POLICIES.default
# simulate o1 that was PUT with x-delete-at that is now expired but
# later had a POST that had no x-delete-at: object should not expire.
tx_df_mgr = self.daemon._df_router[policy]
t1 = next(self.ts_iter)
obj_name = 'o1'
metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'}
df = self._make_diskfile(
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
extra_metadata=metadata, timestamp=t1, policy=policy,
df_mgr=tx_df_mgr, verify=False)
self._check_no_longer_expired_object(obj_name, df, policy)
def test_no_longer_expired_object_syncs_meta(self):
policy = POLICIES.default
# simulate o1 that was PUT with x-delete-at that is now expired but
# later had a POST that had no x-delete-at: object should not expire.
tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
t1 = next(self.ts_iter)
obj_name = 'o1'
metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'}
df = self._make_diskfile(
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
extra_metadata=metadata, timestamp=t1, policy=policy,
df_mgr=tx_df_mgr, verify=False)
# rx got the .data file but is missing the .meta
rx_df = self._make_diskfile(
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
extra_metadata=metadata, timestamp=t1, policy=policy,
df_mgr=rx_df_mgr, verify=False)
with self.assertRaises(DiskFileExpired):
rx_df.open() # sanity check - expired
self._check_no_longer_expired_object(obj_name, df, policy)
def test_meta_file_not_synced_to_legacy_receiver(self): def test_meta_file_not_synced_to_legacy_receiver(self):
# verify that the sender does sync a data file to a legacy receiver, # verify that the sender does sync a data file to a legacy receiver,
# but does not PUT meta file content to a legacy receiver # but does not PUT meta file content to a legacy receiver

View File

@ -35,6 +35,7 @@ from swift.obj.reconstructor import ObjectReconstructor
from test import unit from test import unit
from test.unit import debug_logger, patch_policies, make_timestamp_iter from test.unit import debug_logger, patch_policies, make_timestamp_iter
from test.unit.obj.common import write_diskfile
@unit.patch_policies() @unit.patch_policies()
@ -665,6 +666,36 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called) self.assertFalse(self.controller.logger.exception.called)
def test_MISSING_CHECK_missing_meta_expired_data(self):
# verify that even when rx disk file has expired x-delete-at, it will
# still be opened and checked for missing meta
self.controller.logger = mock.MagicMock()
ts1 = next(make_timestamp_iter())
df = self.controller.get_diskfile(
'sda1', '1', self.account1, self.container1, self.object1,
POLICIES[0])
write_diskfile(df, ts1, extra_metadata={'X-Delete-At': 0})
# make a request - expect newer metadata to be wanted
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + ts1.internal + ' m:30d40\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
'c2519f265f9633e74f9b2fe3b9bec27d m',
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
@patch_policies(with_ec_default=True) @patch_policies(with_ec_default=True)
def test_MISSING_CHECK_missing_durable(self): def test_MISSING_CHECK_missing_durable(self):
self.controller.logger = mock.MagicMock() self.controller.logger = mock.MagicMock()