Cleanup and extend end to end ssync tests

Extends the existing end to end ssync tests with
a test using replication policy.

Also some cleanup and improvements to the test framework e.g. rather
than faking the connection between sender and receiver, use a real
connection and wrap it to capture traffic for verification.

Change-Id: Id71d2eb3fb8fa15c016ef151aacf95f97196a902
This commit is contained in:
Alistair Coles 2015-05-01 13:02:29 +01:00
parent 31263ce315
commit 98b725fec6

@ -29,9 +29,8 @@ from swift.common import exceptions, utils
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileDeleted DiskFileDeleted
from swift.common.swob import Request from swift.common.utils import Timestamp
from swift.common.utils import Timestamp, FileLikeIter from swift.obj import ssync_sender, diskfile, server
from swift.obj import ssync_sender, diskfile, server, ssync_receiver
from swift.obj.reconstructor import RebuildingECDiskFileStream from swift.obj.reconstructor import RebuildingECDiskFileStream
from test.unit import debug_logger, patch_policies from test.unit import debug_logger, patch_policies
@ -1245,67 +1244,52 @@ class TestSender(BaseTestSender):
self.assertTrue(self.sender.connection.closed) self.assertTrue(self.sender.connection.closed)
@patch_policies(with_ec_default=True) class TestBaseSsync(BaseTestSender):
class TestSsync(BaseTestSender):
""" """
Test interactions between sender and receiver. The basis for each test is Provides a framework to test end to end interactions between sender and
actual diskfile state on either side - the connection between sender and receiver. The basis for each test is actual diskfile state on either side.
receiver is faked. Assertions are made about the final state of the sender The connection between sender and receiver is wrapped to capture ssync
and receiver diskfiles. traffic for subsequent verification of the protocol. Assertions are made
about the final state of the sender and receiver diskfiles.
""" """
def make_fake_ssync_connect(self, sender, rx_obj_controller, device, def make_connect_wrapper(self, sender):
partition, policy): """
trace = [] Make a wrapper function for the ssync_sender.Sender.connect() method
that will in turn wrap the HTTConnection.send() and the
Sender.readline() so that ssync protocol messages can be captured.
"""
orig_connect = sender.connect
trace = dict(messages=[])
def add_trace(type, msg): def add_trace(type, msg):
# record a protocol event for later analysis # record a protocol event for later analysis
if msg.strip(): if msg.strip():
trace.append((type, msg.strip())) trace['messages'].append((type, msg.strip()))
def start_response(status, headers, exc_info=None): def make_send_wrapper(send):
assert(status == '200 OK') def wrapped_send(msg):
_msg = msg.split('\r\n', 1)[1]
_msg = _msg.rsplit('\r\n', 1)[0]
add_trace('tx', _msg)
send(msg)
return wrapped_send
class FakeConnection: def make_readline_wrapper(readline):
def __init__(self, trace): def wrapped_readline():
self.trace = trace data = readline()
self.queue = [] add_trace('rx', data)
self.src = FileLikeIter(self.queue) bytes_read = trace.setdefault('readline_bytes', 0)
trace['readline_bytes'] = bytes_read + len(data)
return data
return wrapped_readline
def send(self, msg): def wrapped_connect():
msg = msg.split('\r\n', 1)[1] orig_connect()
msg = msg.rsplit('\r\n', 1)[0] sender.connection.send = make_send_wrapper(
add_trace('tx', msg) sender.connection.send)
self.queue.append(msg) sender.readline = make_readline_wrapper(sender.readline)
return wrapped_connect, trace
def close(self):
pass
def wrap_gen(gen):
# Strip response head and tail
while True:
try:
msg = gen.next()
if msg:
add_trace('rx', msg)
msg = '%x\r\n%s\r\n' % (len(msg), msg)
yield msg
except StopIteration:
break
def fake_connect():
sender.connection = FakeConnection(trace)
headers = {'Transfer-Encoding': 'chunked',
'X-Backend-Storage-Policy-Index': str(int(policy))}
env = {'REQUEST_METHOD': 'SSYNC'}
path = '/%s/%s' % (device, partition)
req = Request.blank(path, environ=env, headers=headers)
req.environ['wsgi.input'] = sender.connection.src
resp = rx_obj_controller(req.environ, start_response)
wrapped_gen = wrap_gen(resp)
sender.response = FileLikeIter(wrapped_gen)
sender.response.fp = sender.response
return fake_connect
def setUp(self): def setUp(self):
self.device = 'dev' self.device = 'dev'
@ -1325,19 +1309,24 @@ class TestSsync(BaseTestSender):
'replication_one_per_device': 'false', 'replication_one_per_device': 'false',
'log_requests': 'false'} 'log_requests': 'false'}
self.rx_controller = server.ObjectController(conf) self.rx_controller = server.ObjectController(conf)
self.orig_ensure_flush = ssync_receiver.Receiver._ensure_flush
ssync_receiver.Receiver._ensure_flush = lambda *args: ''
self.ts_iter = (Timestamp(t) self.ts_iter = (Timestamp(t)
for t in itertools.count(int(time.time()))) for t in itertools.count(int(time.time())))
self.rx_ip = '127.0.0.1'
sock = eventlet.listen((self.rx_ip, 0))
self.rx_server = eventlet.spawn(
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
self.rx_port = sock.getsockname()[1]
self.rx_node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device}
def tearDown(self): def tearDown(self):
if self.orig_ensure_flush: self.rx_server.kill()
ssync_receiver.Receiver._ensure_flush = self.orig_ensure_flush
shutil.rmtree(self.tmpdir, ignore_errors=True) shutil.rmtree(self.tmpdir, ignore_errors=True)
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp, def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
frag_indexes=None): frag_indexes=None):
frag_indexes = [] if frag_indexes is None else frag_indexes frag_indexes = [None] if frag_indexes is None else frag_indexes
metadata = {'Content-Type': 'plain/text'} metadata = {'Content-Type': 'plain/text'}
diskfiles = [] diskfiles = []
for frag_index in frag_indexes: for frag_index in frag_indexes:
@ -1372,22 +1361,28 @@ class TestSsync(BaseTestSender):
df.open() df.open()
return df return df
def _verify_diskfile_sync(self, tx_df, rx_df, frag_index): def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False):
# verify that diskfiles' metadata match # verify that diskfiles' metadata match
# sanity check, they are not the same ondisk files! # sanity check, they are not the same ondisk files!
self.assertNotEqual(tx_df._datadir, rx_df._datadir) self.assertNotEqual(tx_df._datadir, rx_df._datadir)
rx_metadata = dict(rx_df.get_metadata()) rx_metadata = dict(rx_df.get_metadata())
for k, v in tx_df.get_metadata().iteritems(): for k, v in tx_df.get_metadata().iteritems():
self.assertEqual(v, rx_metadata.pop(k)) if k == 'X-Object-Sysmeta-Ec-Frag-Index':
# if tx_df had a frag_index then rx_df should also have one
self.assertTrue(k in rx_metadata)
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
elif k == 'ETag' and not same_etag:
self.assertNotEqual(v, rx_metadata.pop(k, None))
continue
else:
self.assertEqual(v, rx_metadata.pop(k), k)
# ugh, ssync duplicates ETag with Etag so have to clear it out here # ugh, ssync duplicates ETag with Etag so have to clear it out here
if 'Etag' in rx_metadata: if 'Etag' in rx_metadata:
rx_metadata.pop('Etag') rx_metadata.pop('Etag')
self.assertFalse(rx_metadata) self.assertFalse(rx_metadata)
if frag_index: expected_body = '%s___%s' % (tx_df._name, frag_index)
rx_metadata = rx_df.get_metadata() actual_body = ''.join([chunk for chunk in rx_df.reader()])
fi_key = 'X-Object-Sysmeta-Ec-Frag-Index' self.assertEqual(expected_body, actual_body)
self.assertTrue(fi_key in rx_metadata)
self.assertEqual(frag_index, int(rx_metadata[fi_key]))
def _analyze_trace(self, trace): def _analyze_trace(self, trace):
""" """
@ -1445,7 +1440,7 @@ class TestSsync(BaseTestSender):
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates') phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
results = dict((k, []) for k in phases) results = dict((k, []) for k in phases)
handler = unexpected handler = unexpected
lines = list(trace) lines = list(trace.get('messages', []))
lines.reverse() lines.reverse()
while lines: while lines:
line = lines.pop() line = lines.pop()
@ -1471,27 +1466,35 @@ class TestSsync(BaseTestSender):
'Message outside of a phase: %s' % results.get(None)) 'Message outside of a phase: %s' % results.get(None))
return results return results
def _verify_ondisk_files(self, tx_objs, policy, rx_node_index): def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None,
# verify tx and rx files that should be in sync rx_frag_index=None):
"""
Verify tx and rx files that should be in sync.
:param tx_objs: sender diskfiles
:param policy: storage policy instance
:param tx_frag_index: the fragment index of tx diskfiles that should
have been used as a source for sync'ing
:param rx_frag_index: the fragment index of expected rx diskfiles
"""
for o_name, diskfiles in tx_objs.iteritems(): for o_name, diskfiles in tx_objs.iteritems():
for tx_df in diskfiles: for tx_df in diskfiles:
frag_index = tx_df._frag_index if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
if frag_index == rx_node_index: # this diskfile should have been sync'd,
# this frag_index should have been sync'd,
# check rx file is ok # check rx file is ok
rx_df = self._open_rx_diskfile(o_name, policy, frag_index) rx_df = self._open_rx_diskfile(
self._verify_diskfile_sync(tx_df, rx_df, frag_index) o_name, policy, rx_frag_index)
expected_body = '/a/c/%s___%s' % (o_name, rx_node_index) # for EC revert job or replication etags should match
actual_body = ''.join([chunk for chunk in rx_df.reader()]) match_etag = (tx_frag_index == rx_frag_index)
self.assertEqual(expected_body, actual_body) self._verify_diskfile_sync(
tx_df, rx_df, rx_frag_index, match_etag)
else: else:
# this frag_index should not have been sync'd, # this diskfile should not have been sync'd,
# check no rx file, # check no rx file,
self.assertRaises(DiskFileNotExist, self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
self._open_rx_diskfile, o_name, policy,
o_name, policy, frag_index=frag_index) frag_index=tx_df._frag_index)
# check tx file still intact - ssync does not do any cleanup! # check tx file still intact - ssync does not do any cleanup!
self._open_tx_diskfile(o_name, policy, frag_index) tx_df.open()
def _verify_tombstones(self, tx_objs, policy): def _verify_tombstones(self, tx_objs, policy):
# verify tx and rx tombstones that should be in sync # verify tx and rx tombstones that should be in sync
@ -1509,13 +1512,17 @@ class TestSsync(BaseTestSender):
rx_delete_time = exc.timestamp rx_delete_time = exc.timestamp
self.assertEqual(tx_delete_time, rx_delete_time) self.assertEqual(tx_delete_time, rx_delete_time)
@patch_policies(with_ec_default=True)
class TestSsyncEC(TestBaseSsync):
def test_handoff_fragment_revert(self): def test_handoff_fragment_revert(self):
# test that a sync_revert type job does send the correct frag archives # test that a sync_revert type job does send the correct frag archives
# to the receiver, and that those frag archives are then removed from # to the receiver
# local node.
policy = POLICIES.default policy = POLICIES.default
rx_node_index = 0 rx_node_index = 0
tx_node_index = 1 tx_node_index = 1
# for a revert job we iterate over frag index that belongs on
# remote node
frag_index = rx_node_index frag_index = rx_node_index
# create sender side diskfiles... # create sender side diskfiles...
@ -1557,20 +1564,18 @@ class TestSsync(BaseTestSender):
job = {'device': self.device, job = {'device': self.device,
'partition': self.partition, 'partition': self.partition,
'policy': policy, 'policy': policy,
'frag_index': frag_index, 'frag_index': frag_index}
'purge': True} node = dict(self.rx_node)
node = {'index': rx_node_index} node.update({'index': rx_node_index})
self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes) sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# fake connection from tx to rx... # wrap connection from tx to rx to capture ssync messages...
self.sender.connect = self.make_fake_ssync_connect( sender.connect, trace = self.make_connect_wrapper(sender)
self.sender, self.rx_controller, self.device, self.partition,
policy)
# run the sync protocol... # run the sync protocol...
self.sender() sender()
# verify protocol # verify protocol
results = self._analyze_trace(self.sender.connection.trace) results = self._analyze_trace(trace)
# sender has handoff frags for o1, o3 and o4 and ts for o5 # sender has handoff frags for o1, o3 and o4 and ts for o5
self.assertEqual(4, len(results['tx_missing'])) self.assertEqual(4, len(results['tx_missing']))
# receiver is missing frags for o1, o3 and ts for o5 # receiver is missing frags for o1, o3 and ts for o5
@ -1591,7 +1596,8 @@ class TestSsync(BaseTestSender):
self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths)) self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
# verify on disk files... # verify on disk files...
self._verify_ondisk_files(tx_objs, policy, rx_node_index) self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy) self._verify_tombstones(tx_tombstones, policy)
def test_fragment_sync(self): def test_fragment_sync(self):
@ -1656,19 +1662,17 @@ class TestSsync(BaseTestSender):
'policy': policy, 'policy': policy,
'frag_index': frag_index, 'frag_index': frag_index,
'sync_diskfile_builder': fake_reconstruct_fa} 'sync_diskfile_builder': fake_reconstruct_fa}
node = {'index': rx_node_index} node = dict(self.rx_node)
self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes) node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# fake connection from tx to rx... # wrap connection from tx to rx to capture ssync messages...
self.sender.connect = self.make_fake_ssync_connect( sender.connect, trace = self.make_connect_wrapper(sender)
self.sender, self.rx_controller, self.device, self.partition,
policy)
# run the sync protocol... # run the sync protocol...
self.sender() sender()
# verify protocol # verify protocol
results = self._analyze_trace(self.sender.connection.trace) results = self._analyze_trace(trace)
# sender has primary for o1, o2 and o3, o4 and ts for o5 # sender has primary for o1, o2 and o3, o4 and ts for o5
self.assertEqual(5, len(results['tx_missing'])) self.assertEqual(5, len(results['tx_missing']))
# receiver is missing o1, o2 and o3 and ts for o5 # receiver is missing o1, o2 and o3 and ts for o5
@ -1702,9 +1706,135 @@ class TestSsync(BaseTestSender):
# verify on disk files... # verify on disk files...
self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths)) self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
self._verify_ondisk_files(tx_objs, policy, rx_node_index) self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy) self._verify_tombstones(tx_tombstones, policy)
@patch_policies
class TestSsyncReplication(TestBaseSsync):
def test_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 and o2 are on tx only
t1 = self.ts_iter.next()
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
t2 = self.ts_iter.next()
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
# o3 is on tx and older copy on rx
t3a = self.ts_iter.next()
rx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
t3b = self.ts_iter.next()
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
# o4 in sync on rx and tx
t4 = self.ts_iter.next()
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4)
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4)
# o5 is a tombstone, missing on receiver
t5 = self.ts_iter.next()
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5)
tx_tombstones['o5'][0].delete(t5)
# o6 is a tombstone, in sync on tx and rx
t6 = self.ts_iter.next()
tx_tombstones['o6'] = self._create_ondisk_files(
tx_df_mgr, 'o6', policy, t6)
tx_tombstones['o6'][0].delete(t6)
rx_tombstones['o6'] = self._create_ondisk_files(
rx_df_mgr, 'o6', policy, t6)
rx_tombstones['o6'][0].delete(t6)
# o7 is a tombstone on tx, older data on rx
t7a = self.ts_iter.next()
rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a)
t7b = self.ts_iter.next()
tx_tombstones['o7'] = self._create_ondisk_files(
tx_df_mgr, 'o7', policy, t7b)
tx_tombstones['o7'][0].delete(t7b)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs))
self.assertTrue(success)
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(7, len(results['tx_missing']))
self.assertEqual(5, len(results['rx_missing']))
self.assertEqual(5, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
self.assertTrue(
subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3'))
expected_body = '%s___None' % subreq['path']
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7'))
sync_paths.append(subreq.get('path'))
self.assertEqual(
['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'],
sorted(sync_paths))
# verify on disk files...
self._verify_ondisk_files(tx_objs, policy)
self._verify_tombstones(tx_tombstones, policy)
def test_nothing_to_sync(self):
job = {'device': self.device,
'partition': self.partition,
'policy': POLICIES.default}
node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device,
'index': 0}
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
result, in_sync_objs = sender()
self.assertTrue(result)
self.assertFalse(in_sync_objs)
results = self._analyze_trace(trace)
self.assertFalse(results['tx_missing'])
self.assertFalse(results['rx_missing'])
self.assertFalse(results['tx_updates'])
self.assertFalse(results['rx_updates'])
# Minimal receiver response as read by sender:
# 2 * 4098 <-- _ensure_flush() twice
# + 23 <-- :MISSING CHECK START\r\n
# + 2 <-- \r\n (minimal missing check response)
# + 21 <-- :MISSING CHECK END\r\n
# + 17 <-- :UPDATES START\r\n
# + 15 <-- :UPDATES END\r\n
# TOTAL = 8274
self.assertEqual(8274, trace.get('readline_bytes'))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()