Merge "Cleanup and extend end to end ssync tests"
This commit is contained in:
@ -29,9 +29,8 @@ from swift.common import exceptions, utils
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
from swift.common.swob import Request
from swift.common.utils import Timestamp, FileLikeIter
from swift.obj import ssync_sender, diskfile, server, ssync_receiver
from swift.common.utils import Timestamp
from swift.obj import ssync_sender, diskfile, server
from swift.obj.reconstructor import RebuildingECDiskFileStream
from test.unit import debug_logger, patch_policies
@ -1245,67 +1244,52 @@ class TestSender(BaseTestSender):
class TestSsync(BaseTestSender):
class TestBaseSsync(BaseTestSender):
Test interactions between sender and receiver. The basis for each test is
actual diskfile state on either side - the connection between sender and
receiver is faked. Assertions are made about the final state of the sender
and receiver diskfiles.
Provides a framework to test end to end interactions between sender and
receiver. The basis for each test is actual diskfile state on either side.
The connection between sender and receiver is wrapped to capture ssync
traffic for subsequent verification of the protocol. Assertions are made
about the final state of the sender and receiver diskfiles.
def make_fake_ssync_connect(self, sender, rx_obj_controller, device,
partition, policy):
trace = []
def make_connect_wrapper(self, sender):
Make a wrapper function for the ssync_sender.Sender.connect() method
that will in turn wrap the HTTConnection.send() and the
Sender.readline() so that ssync protocol messages can be captured.
orig_connect = sender.connect
trace = dict(messages=[])
def add_trace(type, msg):
# record a protocol event for later analysis
if msg.strip():
trace.append((type, msg.strip()))
trace['messages'].append((type, msg.strip()))
def start_response(status, headers, exc_info=None):
assert(status == '200 OK')
def make_send_wrapper(send):
def wrapped_send(msg):
_msg = msg.split('\r\n', 1)[1]
_msg = _msg.rsplit('\r\n', 1)[0]
add_trace('tx', _msg)
return wrapped_send
class FakeConnection:
def __init__(self, trace):
self.trace = trace
self.queue = []
self.src = FileLikeIter(self.queue)
def make_readline_wrapper(readline):
def wrapped_readline():
data = readline()
add_trace('rx', data)
bytes_read = trace.setdefault('readline_bytes', 0)
trace['readline_bytes'] = bytes_read + len(data)
return data
return wrapped_readline
def send(self, msg):
msg = msg.split('\r\n', 1)[1]
msg = msg.rsplit('\r\n', 1)[0]
add_trace('tx', msg)
def close(self):
def wrap_gen(gen):
# Strip response head and tail
while True:
msg =
if msg:
add_trace('rx', msg)
msg = '%x\r\n%s\r\n' % (len(msg), msg)
yield msg
except StopIteration:
def fake_connect():
sender.connection = FakeConnection(trace)
headers = {'Transfer-Encoding': 'chunked',
'X-Backend-Storage-Policy-Index': str(int(policy))}
path = '/%s/%s' % (device, partition)
req = Request.blank(path, environ=env, headers=headers)
req.environ['wsgi.input'] = sender.connection.src
resp = rx_obj_controller(req.environ, start_response)
wrapped_gen = wrap_gen(resp)
sender.response = FileLikeIter(wrapped_gen)
sender.response.fp = sender.response
return fake_connect
def wrapped_connect():
sender.connection.send = make_send_wrapper(
sender.readline = make_readline_wrapper(sender.readline)
return wrapped_connect, trace
def setUp(self):
self.device = 'dev'
@ -1325,19 +1309,24 @@ class TestSsync(BaseTestSender):
'replication_one_per_device': 'false',
'log_requests': 'false'}
self.rx_controller = server.ObjectController(conf)
self.orig_ensure_flush = ssync_receiver.Receiver._ensure_flush
ssync_receiver.Receiver._ensure_flush = lambda *args: ''
self.ts_iter = (Timestamp(t)
for t in itertools.count(int(time.time())))
self.rx_ip = ''
sock = eventlet.listen((self.rx_ip, 0))
self.rx_server = eventlet.spawn(
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
self.rx_port = sock.getsockname()[1]
self.rx_node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device}
def tearDown(self):
if self.orig_ensure_flush:
ssync_receiver.Receiver._ensure_flush = self.orig_ensure_flush
shutil.rmtree(self.tmpdir, ignore_errors=True)
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
frag_indexes = [] if frag_indexes is None else frag_indexes
frag_indexes = [None] if frag_indexes is None else frag_indexes
metadata = {'Content-Type': 'plain/text'}
diskfiles = []
for frag_index in frag_indexes:
@ -1372,22 +1361,28 @@ class TestSsync(BaseTestSender):
return df
def _verify_diskfile_sync(self, tx_df, rx_df, frag_index):
def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False):
# verify that diskfiles' metadata match
# sanity check, they are not the same ondisk files!
self.assertNotEqual(tx_df._datadir, rx_df._datadir)
rx_metadata = dict(rx_df.get_metadata())
for k, v in tx_df.get_metadata().iteritems():
self.assertEqual(v, rx_metadata.pop(k))
if k == 'X-Object-Sysmeta-Ec-Frag-Index':
# if tx_df had a frag_index then rx_df should also have one
self.assertTrue(k in rx_metadata)
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
elif k == 'ETag' and not same_etag:
self.assertNotEqual(v, rx_metadata.pop(k, None))
self.assertEqual(v, rx_metadata.pop(k), k)
# ugh, ssync duplicates ETag with Etag so have to clear it out here
if 'Etag' in rx_metadata:
if frag_index:
rx_metadata = rx_df.get_metadata()
fi_key = 'X-Object-Sysmeta-Ec-Frag-Index'
self.assertTrue(fi_key in rx_metadata)
self.assertEqual(frag_index, int(rx_metadata[fi_key]))
expected_body = '%s___%s' % (tx_df._name, frag_index)
actual_body = ''.join([chunk for chunk in rx_df.reader()])
self.assertEqual(expected_body, actual_body)
def _analyze_trace(self, trace):
@ -1445,7 +1440,7 @@ class TestSsync(BaseTestSender):
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
results = dict((k, []) for k in phases)
handler = unexpected
lines = list(trace)
lines = list(trace.get('messages', []))
while lines:
line = lines.pop()
@ -1471,27 +1466,35 @@ class TestSsync(BaseTestSender):
'Message outside of a phase: %s' % results.get(None))
return results
def _verify_ondisk_files(self, tx_objs, policy, rx_node_index):
# verify tx and rx files that should be in sync
def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None,
Verify tx and rx files that should be in sync.
:param tx_objs: sender diskfiles
:param policy: storage policy instance
:param tx_frag_index: the fragment index of tx diskfiles that should
have been used as a source for sync'ing
:param rx_frag_index: the fragment index of expected rx diskfiles
for o_name, diskfiles in tx_objs.iteritems():
for tx_df in diskfiles:
frag_index = tx_df._frag_index
if frag_index == rx_node_index:
# this frag_index should have been sync'd,
if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
# this diskfile should have been sync'd,
# check rx file is ok
rx_df = self._open_rx_diskfile(o_name, policy, frag_index)
self._verify_diskfile_sync(tx_df, rx_df, frag_index)
expected_body = '/a/c/%s___%s' % (o_name, rx_node_index)
actual_body = ''.join([chunk for chunk in rx_df.reader()])
self.assertEqual(expected_body, actual_body)
rx_df = self._open_rx_diskfile(
o_name, policy, rx_frag_index)
# for EC revert job or replication etags should match
match_etag = (tx_frag_index == rx_frag_index)
tx_df, rx_df, rx_frag_index, match_etag)
# this frag_index should not have been sync'd,
# this diskfile should not have been sync'd,
# check no rx file,
o_name, policy, frag_index=frag_index)
self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
o_name, policy,
# check tx file still intact - ssync does not do any cleanup!
self._open_tx_diskfile(o_name, policy, frag_index)
def _verify_tombstones(self, tx_objs, policy):
# verify tx and rx tombstones that should be in sync
@ -1509,13 +1512,17 @@ class TestSsync(BaseTestSender):
rx_delete_time = exc.timestamp
self.assertEqual(tx_delete_time, rx_delete_time)
class TestSsyncEC(TestBaseSsync):
def test_handoff_fragment_revert(self):
# test that a sync_revert type job does send the correct frag archives
# to the receiver, and that those frag archives are then removed from
# local node.
# to the receiver
policy = POLICIES.default
rx_node_index = 0
tx_node_index = 1
# for a revert job we iterate over frag index that belongs on
# remote node
frag_index = rx_node_index
# create sender side diskfiles...
@ -1557,20 +1564,18 @@ class TestSsync(BaseTestSender):
job = {'device': self.device,
'partition': self.partition,
'policy': policy,
'frag_index': frag_index,
'purge': True}
node = {'index': rx_node_index}
self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# fake connection from tx to rx...
self.sender.connect = self.make_fake_ssync_connect(
self.sender, self.rx_controller, self.device, self.partition,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
# verify protocol
results = self._analyze_trace(self.sender.connection.trace)
results = self._analyze_trace(trace)
# sender has handoff frags for o1, o3 and o4 and ts for o5
self.assertEqual(4, len(results['tx_missing']))
# receiver is missing frags for o1, o3 and ts for o5
@ -1591,7 +1596,8 @@ class TestSsync(BaseTestSender):
self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
# verify on disk files...
self._verify_ondisk_files(tx_objs, policy, rx_node_index)
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy)
def test_fragment_sync(self):
@ -1656,19 +1662,17 @@ class TestSsync(BaseTestSender):
'policy': policy,
'frag_index': frag_index,
'sync_diskfile_builder': fake_reconstruct_fa}
node = {'index': rx_node_index}
self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# fake connection from tx to rx...
self.sender.connect = self.make_fake_ssync_connect(
self.sender, self.rx_controller, self.device, self.partition,
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
# verify protocol
results = self._analyze_trace(self.sender.connection.trace)
results = self._analyze_trace(trace)
# sender has primary for o1, o2 and o3, o4 and ts for o5
self.assertEqual(5, len(results['tx_missing']))
# receiver is missing o1, o2 and o3 and ts for o5
@ -1702,9 +1706,135 @@ class TestSsync(BaseTestSender):
# verify on disk files...
self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
self._verify_ondisk_files(tx_objs, policy, rx_node_index)
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy)
class TestSsyncReplication(TestBaseSsync):
def test_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 and o2 are on tx only
t1 =
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
t2 =
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
# o3 is on tx and older copy on rx
t3a =
rx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
t3b =
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
# o4 in sync on rx and tx
t4 =
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4)
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4)
# o5 is a tombstone, missing on receiver
t5 =
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5)
# o6 is a tombstone, in sync on tx and rx
t6 =
tx_tombstones['o6'] = self._create_ondisk_files(
tx_df_mgr, 'o6', policy, t6)
rx_tombstones['o6'] = self._create_ondisk_files(
rx_df_mgr, 'o6', policy, t6)
# o7 is a tombstone on tx, older data on rx
t7a =
rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a)
t7b =
tx_tombstones['o7'] = self._create_ondisk_files(
tx_df_mgr, 'o7', policy, t7b)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs))
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(7, len(results['tx_missing']))
self.assertEqual(5, len(results['rx_missing']))
self.assertEqual(5, len(results['tx_updates']))
sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3'))
expected_body = '%s___None' % subreq['path']
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7'))
['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'],
# verify on disk files...
self._verify_ondisk_files(tx_objs, policy)
self._verify_tombstones(tx_tombstones, policy)
def test_nothing_to_sync(self):
job = {'device': self.device,
'partition': self.partition,
'policy': POLICIES.default}
node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device,
'index': 0}
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
result, in_sync_objs = sender()
results = self._analyze_trace(trace)
# Minimal receiver response as read by sender:
# 2 * 4098 <-- _ensure_flush() twice
# + 23 <-- :MISSING CHECK START\r\n
# + 2 <-- \r\n (minimal missing check response)
# + 21 <-- :MISSING CHECK END\r\n
# + 17 <-- :UPDATES START\r\n
# + 15 <-- :UPDATES END\r\n
# TOTAL = 8274
self.assertEqual(8274, trace.get('readline_bytes'))
if __name__ == '__main__':
Reference in New Issue
Block a user