Merge "ssync: Round-trip offsets in meta/ctype Timestamps"
This commit is contained in:
commit
4b6f54d063
@ -54,13 +54,17 @@ def decode_missing(line):
|
||||
for item in [subpart for subpart in subparts if ':' in subpart]:
|
||||
k, v = item.split(':')
|
||||
if k == 'm':
|
||||
v, _, o = v.partition('__')
|
||||
# ignore ts_data offset when calculating ts_meta
|
||||
result['ts_meta'] = Timestamp(ts_data.normal,
|
||||
delta=int(v, 16))
|
||||
delta=int(v, 16),
|
||||
offset=int(o or '0', 16))
|
||||
elif k == 't':
|
||||
v, _, o = v.partition('__')
|
||||
# ignore ts_data offset when calculating ts_ctype
|
||||
result['ts_ctype'] = Timestamp(ts_data.normal,
|
||||
delta=int(v, 16))
|
||||
result['ts_ctype'] = Timestamp(Timestamp(ts_data).normal,
|
||||
delta=int(v, 16),
|
||||
offset=int(o or '0', 16))
|
||||
elif k == 'durable':
|
||||
result['durable'] = utils.config_true_value(v)
|
||||
return result
|
||||
|
@ -42,9 +42,13 @@ def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None,
|
||||
if ts_meta and ts_meta != ts_data:
|
||||
delta = ts_meta.raw - ts_data.raw
|
||||
extra_parts.append('m:%x' % delta)
|
||||
if ts_meta.offset:
|
||||
extra_parts[-1] += '__%x' % ts_meta.offset
|
||||
if ts_ctype and ts_ctype != ts_data:
|
||||
delta = ts_ctype.raw - ts_data.raw
|
||||
extra_parts.append('t:%x' % delta)
|
||||
if ts_ctype.offset:
|
||||
extra_parts[-1] += '__%x' % ts_ctype.offset
|
||||
if 'durable' in kwargs and kwargs['durable'] is False:
|
||||
# only send durable in the less common case that it is False
|
||||
extra_parts.append('durable:%s' % kwargs['durable'])
|
||||
|
@ -1437,19 +1437,21 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2)
|
||||
expected_subreqs['POST'].append('o2')
|
||||
|
||||
# o3 is on tx with meta, rx has newer data but no meta
|
||||
# o3 is on tx with meta, rx has newer data but no meta,
|
||||
# meta timestamp has an offset
|
||||
t3a = next(self.ts_iter)
|
||||
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
|
||||
t3b = next(self.ts_iter)
|
||||
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b)
|
||||
t3_meta = next(self.ts_iter)
|
||||
t3_meta = utils.Timestamp(t3_meta, offset=2)
|
||||
metadata = {'X-Timestamp': t3_meta.internal,
|
||||
'X-Object-Meta-Test': 'o3',
|
||||
'X-Object-Sysmeta-Test': 'sys_o3'}
|
||||
tx_objs['o3'][0].write_metadata(metadata)
|
||||
expected_subreqs['POST'].append('o3')
|
||||
|
||||
# o4 is on tx with meta, rx has older data and up to date meta
|
||||
# o4 is on tx with meta, rx has older data and up to date meta,
|
||||
t4a = next(self.ts_iter)
|
||||
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a)
|
||||
t4b = next(self.ts_iter)
|
||||
@ -1499,6 +1501,25 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
tx_objs['o7'][0].write_metadata(metadata)
|
||||
rx_tombstones['o7'][0].delete(next(self.ts_iter))
|
||||
|
||||
# o8 is on tx with meta, rx has in sync data but meta with different
|
||||
# offset
|
||||
t8 = next(self.ts_iter)
|
||||
rx_objs['o8'] = self._create_ondisk_files(rx_df_mgr, 'o8', policy, t8)
|
||||
tx_objs['o8'] = self._create_ondisk_files(tx_df_mgr, 'o8', policy, t8)
|
||||
t8_meta = next(self.ts_iter)
|
||||
t8_meta_offset = utils.Timestamp(t8_meta, offset=4)
|
||||
metadata = {'X-Timestamp': t8_meta_offset.internal,
|
||||
'X-Object-Meta-Test': 'o8',
|
||||
'X-Object-Sysmeta-Test': 'sys_o8'}
|
||||
tx_objs['o8'][0].write_metadata(metadata)
|
||||
# different ts_meta offset on rx
|
||||
t8_meta_offset = utils.Timestamp(t8_meta, offset=3)
|
||||
metadata = {'X-Timestamp': t8_meta_offset.internal,
|
||||
'X-Object-Meta-Test': 'o8',
|
||||
'X-Object-Sysmeta-Test': 'sys_o8'}
|
||||
rx_objs['o8'][0].write_metadata(metadata)
|
||||
expected_subreqs['POST'].append('o8')
|
||||
|
||||
suffixes = set()
|
||||
for diskfiles in list(tx_objs.values()) + list(tx_tombstones.values()):
|
||||
for df in diskfiles:
|
||||
@ -1516,13 +1537,13 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
# run the sync protocol...
|
||||
success, in_sync_objs = sender()
|
||||
|
||||
self.assertEqual(7, len(in_sync_objs))
|
||||
self.assertEqual(8, len(in_sync_objs))
|
||||
self.assertTrue(success)
|
||||
|
||||
# verify protocol
|
||||
results = self._analyze_trace(trace)
|
||||
self.assertEqual(7, len(results['tx_missing']))
|
||||
self.assertEqual(5, len(results['rx_missing']))
|
||||
self.assertEqual(8, len(results['tx_missing']))
|
||||
self.assertEqual(6, len(results['rx_missing']))
|
||||
for subreq in results.get('tx_updates'):
|
||||
obj = subreq['path'].split('/')[3]
|
||||
method = subreq['method']
|
||||
|
@ -2524,6 +2524,24 @@ class TestModuleMethods(unittest.TestCase):
|
||||
self.assertEqual(
|
||||
expected, ssync_receiver.decode_missing(msg.encode('ascii')))
|
||||
|
||||
# timestamps have offsets
|
||||
t_data_offset = utils.Timestamp(t_data, offset=99)
|
||||
t_meta_offset = utils.Timestamp(t_meta, offset=1)
|
||||
t_ctype_offset = utils.Timestamp(t_ctype, offset=2)
|
||||
expected = dict(object_hash=object_hash,
|
||||
ts_data=t_data_offset,
|
||||
ts_meta=t_meta_offset,
|
||||
ts_ctype=t_ctype_offset,
|
||||
durable=True)
|
||||
expected = ('%s %s_0000000000000063 m:%x__1,t:%x__2'
|
||||
% (object_hash, t_data.internal, d_meta_data,
|
||||
d_ctype_data))
|
||||
self.assertEqual(
|
||||
expected.encode('ascii'),
|
||||
ssync_sender.encode_missing(
|
||||
object_hash, t_data_offset, t_meta_offset, t_ctype_offset,
|
||||
durable=True))
|
||||
|
||||
# hex content type delta may be zero
|
||||
msg = '%s %s t:0,m:%x' % (object_hash, t_data.internal, d_meta_data)
|
||||
expected = dict(object_hash=object_hash,
|
||||
|
@ -2040,6 +2040,19 @@ class TestModuleMethods(unittest.TestCase):
|
||||
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type,
|
||||
durable=True))
|
||||
|
||||
# timestamps have offsets
|
||||
t_data_offset = utils.Timestamp(t_data, offset=99)
|
||||
t_meta_offset = utils.Timestamp(t_meta, offset=1)
|
||||
t_type_offset = utils.Timestamp(t_type, offset=2)
|
||||
expected = ('%s %s m:%x__1,t:%x__2'
|
||||
% (object_hash, t_data_offset.internal, d_meta_data,
|
||||
d_type_data))
|
||||
self.assertEqual(
|
||||
expected.encode('ascii'),
|
||||
ssync_sender.encode_missing(
|
||||
object_hash, t_data_offset, t_meta_offset, t_type_offset,
|
||||
durable=True))
|
||||
|
||||
# test encode and decode functions invert
|
||||
expected = {'object_hash': object_hash, 'ts_meta': t_meta,
|
||||
'ts_data': t_data, 'ts_ctype': t_type, 'durable': False}
|
||||
@ -2062,6 +2075,29 @@ class TestModuleMethods(unittest.TestCase):
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
t_meta_offset = utils.Timestamp(t_data, offset=2)
|
||||
expected = {'object_hash': object_hash, 'ts_meta': t_meta_offset,
|
||||
'ts_data': t_data, 'ts_ctype': t_type,
|
||||
'durable': False}
|
||||
msg = ssync_sender.encode_missing(**expected)
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
t_type_offset = utils.Timestamp(t_type, offset=3)
|
||||
expected = {'object_hash': object_hash, 'ts_meta': t_meta,
|
||||
'ts_data': t_data, 'ts_ctype': t_type_offset,
|
||||
'durable': False}
|
||||
msg = ssync_sender.encode_missing(**expected)
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
expected = {'object_hash': object_hash, 'ts_meta': t_meta_offset,
|
||||
'ts_data': t_data_offset, 'ts_ctype': t_type_offset,
|
||||
'durable': False}
|
||||
msg = ssync_sender.encode_missing(**expected)
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_decode_wanted(self):
|
||||
parts = ['d']
|
||||
expected = {'data': True}
|
||||
|
Loading…
x
Reference in New Issue
Block a user