Don't ssync data when only a durable is missing
If an EC diskfile is missing its .durable file (for example due to a partial PUT failure) then the ssync missing check will fail to open the file and will consider it missing. This can result in possible reconstruction of the fragment archive (for a sync job) and definite transmission of the fragment archive (for sync and revert jobs), which is wasteful. This patch makes the ssync receiver inspect the diskfile state after attempting to open it, and if fragments exist at the timestamp of the sender's diskfile, but a .durable file is missing, then the receiver will commit the diskfile at the sender's timestamp. As a result, there is no longer any need to send a fragment archive. Change-Id: I4766864fcc0a3553976e8fd85bbb2fc782f04abd
This commit is contained in:
parent
f57b4c94bf
commit
d456d9e934
@ -66,7 +66,6 @@ def encode_wanted(remote, local):
|
||||
The decoder for this line is
|
||||
:py:func:`~swift.obj.ssync_sender.decode_wanted`
|
||||
"""
|
||||
|
||||
want = {}
|
||||
if 'ts_data' in local:
|
||||
# we have something, let's get just the right stuff
|
||||
@ -248,7 +247,7 @@ class Receiver(object):
|
||||
raise swob.HTTPInsufficientStorage(drive=self.device)
|
||||
self.fp = self.request.environ['wsgi.input']
|
||||
|
||||
def _check_local(self, object_hash):
|
||||
def _check_local(self, remote, make_durable=True):
|
||||
"""
|
||||
Parse local diskfile and return results of current
|
||||
representative for comparison to remote.
|
||||
@ -257,21 +256,42 @@ class Receiver(object):
|
||||
"""
|
||||
try:
|
||||
df = self.diskfile_mgr.get_diskfile_from_hash(
|
||||
self.device, self.partition, object_hash,
|
||||
self.device, self.partition, remote['object_hash'],
|
||||
self.policy, frag_index=self.frag_index)
|
||||
except exceptions.DiskFileNotExist:
|
||||
return {}
|
||||
try:
|
||||
df.open()
|
||||
except exceptions.DiskFileDeleted as err:
|
||||
return {'ts_data': err.timestamp}
|
||||
except exceptions.DiskFileError as err:
|
||||
return {}
|
||||
return {
|
||||
'ts_data': df.data_timestamp,
|
||||
'ts_meta': df.timestamp,
|
||||
'ts_ctype': df.content_type_timestamp,
|
||||
}
|
||||
result = {'ts_data': err.timestamp}
|
||||
except exceptions.DiskFileError:
|
||||
result = {}
|
||||
else:
|
||||
result = {
|
||||
'ts_data': df.data_timestamp,
|
||||
'ts_meta': df.timestamp,
|
||||
'ts_ctype': df.content_type_timestamp,
|
||||
}
|
||||
if (make_durable and df.fragments and
|
||||
remote['ts_data'] in df.fragments and
|
||||
self.frag_index in df.fragments[remote['ts_data']] and
|
||||
(df.durable_timestamp is None or
|
||||
df.durable_timestamp < remote['ts_data'])):
|
||||
# We have the frag, just missing a .durable, so try to create the
|
||||
# .durable now. Try this just once to avoid looping if it fails.
|
||||
try:
|
||||
with df.create() as writer:
|
||||
writer.commit(remote['ts_data'])
|
||||
return self._check_local(remote, make_durable=False)
|
||||
except Exception:
|
||||
# if commit fails then log exception and fall back to wanting
|
||||
# a full update
|
||||
self.app.logger.exception(
|
||||
'%s/%s/%s EXCEPTION in replication.Receiver while '
|
||||
'attempting commit of %s'
|
||||
% (self.request.remote_addr, self.device, self.partition,
|
||||
df._datadir))
|
||||
return result
|
||||
|
||||
def _check_missing(self, line):
|
||||
"""
|
||||
@ -282,7 +302,7 @@ class Receiver(object):
|
||||
Anchor point for tests to mock legacy protocol changes.
|
||||
"""
|
||||
remote = decode_missing(line)
|
||||
local = self._check_local(remote['object_hash'])
|
||||
local = self._check_local(remote)
|
||||
return encode_wanted(remote, local)
|
||||
|
||||
def missing_check(self):
|
||||
|
@ -51,10 +51,11 @@ class BaseTest(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
|
||||
def _make_open_diskfile(self, device='dev', partition='9',
|
||||
account='a', container='c', obj='o', body='test',
|
||||
extra_metadata=None, policy=None,
|
||||
frag_index=None, timestamp=None, df_mgr=None):
|
||||
def _make_diskfile(self, device='dev', partition='9',
|
||||
account='a', container='c', obj='o', body='test',
|
||||
extra_metadata=None, policy=None,
|
||||
frag_index=None, timestamp=None, df_mgr=None,
|
||||
commit=True):
|
||||
policy = policy or POLICIES.legacy
|
||||
object_parts = account, container, obj
|
||||
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
|
||||
@ -75,6 +76,16 @@ class BaseTest(unittest.TestCase):
|
||||
if extra_metadata:
|
||||
metadata.update(extra_metadata)
|
||||
writer.put(metadata)
|
||||
writer.commit(timestamp)
|
||||
if commit:
|
||||
writer.commit(timestamp)
|
||||
return df
|
||||
|
||||
def _make_open_diskfile(self, device='dev', partition='9',
|
||||
account='a', container='c', obj='o', body='test',
|
||||
extra_metadata=None, policy=None,
|
||||
frag_index=None, timestamp=None, df_mgr=None):
|
||||
df = self._make_diskfile(device, partition, account, container, obj,
|
||||
body, extra_metadata, policy, frag_index,
|
||||
timestamp, df_mgr)
|
||||
df.open()
|
||||
return df
|
||||
|
@ -115,7 +115,7 @@ class TestBaseSsync(BaseTest):
|
||||
return wrapped_connect, trace
|
||||
|
||||
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
|
||||
frag_indexes=None):
|
||||
frag_indexes=None, commit=True):
|
||||
frag_indexes = [None] if frag_indexes is None else frag_indexes
|
||||
metadata = {'Content-Type': 'plain/text'}
|
||||
diskfiles = []
|
||||
@ -123,16 +123,18 @@ class TestBaseSsync(BaseTest):
|
||||
object_data = '/a/c/%s___%s' % (obj_name, frag_index)
|
||||
if frag_index is not None:
|
||||
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
|
||||
df = self._make_open_diskfile(
|
||||
df = self._make_diskfile(
|
||||
device=self.device, partition=self.partition, account='a',
|
||||
container='c', obj=obj_name, body=object_data,
|
||||
extra_metadata=metadata, timestamp=timestamp, policy=policy,
|
||||
frag_index=frag_index, df_mgr=df_mgr)
|
||||
# sanity checks
|
||||
listing = os.listdir(df._datadir)
|
||||
self.assertTrue(listing)
|
||||
for filename in listing:
|
||||
self.assertTrue(filename.startswith(timestamp.internal))
|
||||
frag_index=frag_index, df_mgr=df_mgr, commit=commit)
|
||||
if commit:
|
||||
df.open()
|
||||
# sanity checks
|
||||
listing = os.listdir(df._datadir)
|
||||
self.assertTrue(listing)
|
||||
for filename in listing:
|
||||
self.assertTrue(filename.startswith(timestamp.internal))
|
||||
diskfiles.append(df)
|
||||
return diskfiles
|
||||
|
||||
@ -325,10 +327,12 @@ class TestSsyncEC(TestBaseSsync):
|
||||
t2 = next(self.ts_iter)
|
||||
tx_objs['o2'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
|
||||
# o3 only has handoff
|
||||
# o3 only has handoff, rx has other frag index
|
||||
t3 = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o3', policy, t3, (rx_node_index,))
|
||||
rx_objs['o3'] = self._create_ondisk_files(
|
||||
rx_df_mgr, 'o3', policy, t3, (14,))
|
||||
# o4 primary and handoff fragment archives on tx, handoff in sync on rx
|
||||
t4 = next(self.ts_iter)
|
||||
tx_objs['o4'] = self._create_ondisk_files(
|
||||
@ -386,6 +390,111 @@ class TestSsyncEC(TestBaseSsync):
|
||||
tx_objs, policy, frag_index, rx_node_index)
|
||||
self._verify_tombstones(tx_tombstones, policy)
|
||||
|
||||
def test_handoff_fragment_only_missing_durable(self):
|
||||
# test that a sync_revert type job does not PUT when the rx is only
|
||||
# missing a durable file
|
||||
policy = POLICIES.default
|
||||
rx_node_index = frag_index = 0
|
||||
tx_node_index = 1
|
||||
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
expected_subreqs = defaultdict(list)
|
||||
|
||||
# o1 in sync on rx but rx missing .durable - no PUT required
|
||||
t1a = next(self.ts_iter) # older rx .data with .durable
|
||||
t1b = next(self.ts_iter) # rx .meta
|
||||
t1c = next(self.ts_iter) # tx .data with .durable, rx missing .durable
|
||||
obj_name = 'o1'
|
||||
tx_objs[obj_name] = self._create_ondisk_files(
|
||||
tx_df_mgr, obj_name, policy, t1c, (tx_node_index, rx_node_index,))
|
||||
rx_objs[obj_name] = self._create_ondisk_files(
|
||||
rx_df_mgr, obj_name, policy, t1a, (rx_node_index,))
|
||||
metadata = {'X-Timestamp': t1b.internal}
|
||||
rx_objs[obj_name][0].write_metadata(metadata)
|
||||
rx_objs[obj_name] = self._create_ondisk_files(
|
||||
rx_df_mgr, obj_name, policy, t1c, (rx_node_index, 9), commit=False)
|
||||
|
||||
# o2 on rx has wrong frag_indexes and missing .durable - PUT required
|
||||
t2 = next(self.ts_iter)
|
||||
obj_name = 'o2'
|
||||
tx_objs[obj_name] = self._create_ondisk_files(
|
||||
tx_df_mgr, obj_name, policy, t2, (tx_node_index, rx_node_index,))
|
||||
rx_objs[obj_name] = self._create_ondisk_files(
|
||||
rx_df_mgr, obj_name, policy, t2, (13, 14), commit=False)
|
||||
expected_subreqs['PUT'].append(obj_name)
|
||||
|
||||
# o3 on rx has frag at other time missing .durable - PUT required
|
||||
t3 = next(self.ts_iter)
|
||||
obj_name = 'o3'
|
||||
tx_objs[obj_name] = self._create_ondisk_files(
|
||||
tx_df_mgr, obj_name, policy, t3, (tx_node_index, rx_node_index,))
|
||||
t3b = next(self.ts_iter)
|
||||
rx_objs[obj_name] = self._create_ondisk_files(
|
||||
rx_df_mgr, obj_name, policy, t3b, (rx_node_index,), commit=False)
|
||||
expected_subreqs['PUT'].append(obj_name)
|
||||
|
||||
# o4 on rx has a newer tombstone and even newer frags - no PUT required
|
||||
t4 = next(self.ts_iter)
|
||||
obj_name = 'o4'
|
||||
tx_objs[obj_name] = self._create_ondisk_files(
|
||||
tx_df_mgr, obj_name, policy, t4, (tx_node_index, rx_node_index,))
|
||||
rx_objs[obj_name] = self._create_ondisk_files(
|
||||
rx_df_mgr, obj_name, policy, t4, (rx_node_index,))
|
||||
t4b = next(self.ts_iter)
|
||||
rx_objs[obj_name][0].delete(t4b)
|
||||
t4c = next(self.ts_iter)
|
||||
rx_objs[obj_name] = self._create_ondisk_files(
|
||||
rx_df_mgr, obj_name, policy, t4c, (rx_node_index,), commit=False)
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in tx_objs.values():
|
||||
for df in diskfiles:
|
||||
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
|
||||
|
||||
# create ssync sender instance...
|
||||
job = {'device': self.device,
|
||||
'partition': self.partition,
|
||||
'policy': policy,
|
||||
'frag_index': frag_index}
|
||||
node = dict(self.rx_node)
|
||||
node.update({'index': rx_node_index})
|
||||
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||
# wrap connection from tx to rx to capture ssync messages...
|
||||
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||
|
||||
# run the sync protocol...
|
||||
sender()
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(4, len(results['tx_missing']))
|
||||
self.assertEqual(2, len(results['rx_missing']))
|
||||
self.assertEqual(2, len(results['tx_updates']))
|
||||
self.assertFalse(results['rx_updates'])
|
||||
for subreq in results.get('tx_updates'):
|
||||
obj = subreq['path'].split('/')[3]
|
||||
method = subreq['method']
|
||||
self.assertTrue(obj in expected_subreqs[method],
|
||||
'Unexpected %s subreq for object %s, expected %s'
|
||||
% (method, obj, expected_subreqs[method]))
|
||||
expected_subreqs[method].remove(obj)
|
||||
if method == 'PUT':
|
||||
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
|
||||
self.assertEqual(expected_body, subreq['body'])
|
||||
# verify all expected subreqs consumed
|
||||
for _method, expected in expected_subreqs.items():
|
||||
self.assertFalse(expected)
|
||||
|
||||
# verify on disk files...
|
||||
tx_objs.pop('o4') # o4 should not have been sync'd
|
||||
self._verify_ondisk_files(
|
||||
tx_objs, policy, frag_index, rx_node_index)
|
||||
|
||||
def test_fragment_sync(self):
|
||||
# check that a sync_only type job does call reconstructor to build a
|
||||
# diskfile to send, and continues making progress despite an error
|
||||
|
@ -665,6 +665,117 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
def test_MISSING_CHECK_missing_durable(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.controller.logger)
|
||||
|
||||
# make rx disk file but don't commit it, so .durable is missing
|
||||
ts1 = next(make_timestamp_iter()).internal
|
||||
object_dir = utils.storage_directory(
|
||||
os.path.join(self.testdir, 'sda1',
|
||||
diskfile.get_data_dir(POLICIES[0])),
|
||||
'1', self.hash1)
|
||||
utils.mkdirs(object_dir)
|
||||
fp = open(os.path.join(object_dir, ts1 + '#2.data'), 'w+')
|
||||
fp.write('1')
|
||||
fp.flush()
|
||||
metadata1 = {
|
||||
'name': self.name1,
|
||||
'X-Timestamp': ts1,
|
||||
'Content-Length': '1'}
|
||||
diskfile.write_metadata(fp, metadata1)
|
||||
|
||||
# make a request - expect no data to be wanted
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC',
|
||||
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0',
|
||||
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'},
|
||||
body=':MISSING_CHECK: START\r\n' +
|
||||
self.hash1 + ' ' + ts1 + '\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[':MISSING_CHECK: START',
|
||||
':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
@mock.patch('swift.obj.diskfile.ECDiskFileWriter.commit')
|
||||
def test_MISSING_CHECK_missing_durable_but_commit_fails(self, mock_commit):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.controller.logger)
|
||||
|
||||
# make rx disk file but don't commit it, so .durable is missing
|
||||
ts1 = next(make_timestamp_iter()).internal
|
||||
object_dir = utils.storage_directory(
|
||||
os.path.join(self.testdir, 'sda1',
|
||||
diskfile.get_data_dir(POLICIES[0])),
|
||||
'1', self.hash1)
|
||||
utils.mkdirs(object_dir)
|
||||
fp = open(os.path.join(object_dir, ts1 + '#2.data'), 'w+')
|
||||
fp.write('1')
|
||||
fp.flush()
|
||||
metadata1 = {
|
||||
'name': self.name1,
|
||||
'X-Timestamp': ts1,
|
||||
'Content-Length': '1'}
|
||||
diskfile.write_metadata(fp, metadata1)
|
||||
|
||||
# make a request with commit disabled - expect data to be wanted
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC',
|
||||
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0',
|
||||
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'},
|
||||
body=':MISSING_CHECK: START\r\n' +
|
||||
self.hash1 + ' ' + ts1 + '\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[':MISSING_CHECK: START',
|
||||
self.hash1 + ' dm',
|
||||
':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
|
||||
# make a request with commit raising error - expect data to be wanted
|
||||
mock_commit.side_effect = Exception
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC',
|
||||
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0',
|
||||
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'},
|
||||
body=':MISSING_CHECK: START\r\n' +
|
||||
self.hash1 + ' ' + ts1 + '\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[':MISSING_CHECK: START',
|
||||
self.hash1 + ' dm',
|
||||
':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertTrue(self.controller.logger.exception.called)
|
||||
self.assertIn(
|
||||
'EXCEPTION in replication.Receiver while attempting commit of',
|
||||
self.controller.logger.exception.call_args[0][0])
|
||||
|
||||
def test_MISSING_CHECK_storage_policy(self):
|
||||
# update router post policy patch
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
|
Loading…
Reference in New Issue
Block a user