diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 42bd610eb6..fa38b658b2 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -29,9 +29,8 @@ from swift.common import exceptions, utils from swift.common.storage_policy import POLICIES from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ DiskFileDeleted -from swift.common.swob import Request -from swift.common.utils import Timestamp, FileLikeIter -from swift.obj import ssync_sender, diskfile, server, ssync_receiver +from swift.common.utils import Timestamp +from swift.obj import ssync_sender, diskfile, server from swift.obj.reconstructor import RebuildingECDiskFileStream from test.unit import debug_logger, patch_policies @@ -1245,67 +1244,52 @@ class TestSender(BaseTestSender): self.assertTrue(self.sender.connection.closed) -@patch_policies(with_ec_default=True) -class TestSsync(BaseTestSender): +class TestBaseSsync(BaseTestSender): """ - Test interactions between sender and receiver. The basis for each test is - actual diskfile state on either side - the connection between sender and - receiver is faked. Assertions are made about the final state of the sender - and receiver diskfiles. + Provides a framework to test end to end interactions between sender and + receiver. The basis for each test is actual diskfile state on either side. + The connection between sender and receiver is wrapped to capture ssync + traffic for subsequent verification of the protocol. Assertions are made + about the final state of the sender and receiver diskfiles. """ - def make_fake_ssync_connect(self, sender, rx_obj_controller, device, - partition, policy): - trace = [] + def make_connect_wrapper(self, sender): + """ + Make a wrapper function for the ssync_sender.Sender.connect() method + that will in turn wrap the HTTConnection.send() and the + Sender.readline() so that ssync protocol messages can be captured. + """ + orig_connect = sender.connect + trace = dict(messages=[]) def add_trace(type, msg): # record a protocol event for later analysis if msg.strip(): - trace.append((type, msg.strip())) + trace['messages'].append((type, msg.strip())) - def start_response(status, headers, exc_info=None): - assert(status == '200 OK') + def make_send_wrapper(send): + def wrapped_send(msg): + _msg = msg.split('\r\n', 1)[1] + _msg = _msg.rsplit('\r\n', 1)[0] + add_trace('tx', _msg) + send(msg) + return wrapped_send - class FakeConnection: - def __init__(self, trace): - self.trace = trace - self.queue = [] - self.src = FileLikeIter(self.queue) + def make_readline_wrapper(readline): + def wrapped_readline(): + data = readline() + add_trace('rx', data) + bytes_read = trace.setdefault('readline_bytes', 0) + trace['readline_bytes'] = bytes_read + len(data) + return data + return wrapped_readline - def send(self, msg): - msg = msg.split('\r\n', 1)[1] - msg = msg.rsplit('\r\n', 1)[0] - add_trace('tx', msg) - self.queue.append(msg) - - def close(self): - pass - - def wrap_gen(gen): - # Strip response head and tail - while True: - try: - msg = gen.next() - if msg: - add_trace('rx', msg) - msg = '%x\r\n%s\r\n' % (len(msg), msg) - yield msg - except StopIteration: - break - - def fake_connect(): - sender.connection = FakeConnection(trace) - headers = {'Transfer-Encoding': 'chunked', - 'X-Backend-Storage-Policy-Index': str(int(policy))} - env = {'REQUEST_METHOD': 'SSYNC'} - path = '/%s/%s' % (device, partition) - req = Request.blank(path, environ=env, headers=headers) - req.environ['wsgi.input'] = sender.connection.src - resp = rx_obj_controller(req.environ, start_response) - wrapped_gen = wrap_gen(resp) - sender.response = FileLikeIter(wrapped_gen) - sender.response.fp = sender.response - return fake_connect + def wrapped_connect(): + orig_connect() + sender.connection.send = make_send_wrapper( + sender.connection.send) + sender.readline = make_readline_wrapper(sender.readline) + return wrapped_connect, trace def setUp(self): self.device = 'dev' @@ -1325,19 +1309,24 @@ class TestSsync(BaseTestSender): 'replication_one_per_device': 'false', 'log_requests': 'false'} self.rx_controller = server.ObjectController(conf) - self.orig_ensure_flush = ssync_receiver.Receiver._ensure_flush - ssync_receiver.Receiver._ensure_flush = lambda *args: '' self.ts_iter = (Timestamp(t) for t in itertools.count(int(time.time()))) + self.rx_ip = '127.0.0.1' + sock = eventlet.listen((self.rx_ip, 0)) + self.rx_server = eventlet.spawn( + eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger()) + self.rx_port = sock.getsockname()[1] + self.rx_node = {'replication_ip': self.rx_ip, + 'replication_port': self.rx_port, + 'device': self.device} def tearDown(self): - if self.orig_ensure_flush: - ssync_receiver.Receiver._ensure_flush = self.orig_ensure_flush + self.rx_server.kill() shutil.rmtree(self.tmpdir, ignore_errors=True) def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp, frag_indexes=None): - frag_indexes = [] if frag_indexes is None else frag_indexes + frag_indexes = [None] if frag_indexes is None else frag_indexes metadata = {'Content-Type': 'plain/text'} diskfiles = [] for frag_index in frag_indexes: @@ -1372,22 +1361,28 @@ class TestSsync(BaseTestSender): df.open() return df - def _verify_diskfile_sync(self, tx_df, rx_df, frag_index): + def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False): # verify that diskfiles' metadata match # sanity check, they are not the same ondisk files! self.assertNotEqual(tx_df._datadir, rx_df._datadir) rx_metadata = dict(rx_df.get_metadata()) for k, v in tx_df.get_metadata().iteritems(): - self.assertEqual(v, rx_metadata.pop(k)) + if k == 'X-Object-Sysmeta-Ec-Frag-Index': + # if tx_df had a frag_index then rx_df should also have one + self.assertTrue(k in rx_metadata) + self.assertEqual(frag_index, int(rx_metadata.pop(k))) + elif k == 'ETag' and not same_etag: + self.assertNotEqual(v, rx_metadata.pop(k, None)) + continue + else: + self.assertEqual(v, rx_metadata.pop(k), k) # ugh, ssync duplicates ETag with Etag so have to clear it out here if 'Etag' in rx_metadata: rx_metadata.pop('Etag') self.assertFalse(rx_metadata) - if frag_index: - rx_metadata = rx_df.get_metadata() - fi_key = 'X-Object-Sysmeta-Ec-Frag-Index' - self.assertTrue(fi_key in rx_metadata) - self.assertEqual(frag_index, int(rx_metadata[fi_key])) + expected_body = '%s___%s' % (tx_df._name, frag_index) + actual_body = ''.join([chunk for chunk in rx_df.reader()]) + self.assertEqual(expected_body, actual_body) def _analyze_trace(self, trace): """ @@ -1445,7 +1440,7 @@ class TestSsync(BaseTestSender): phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates') results = dict((k, []) for k in phases) handler = unexpected - lines = list(trace) + lines = list(trace.get('messages', [])) lines.reverse() while lines: line = lines.pop() @@ -1471,27 +1466,35 @@ class TestSsync(BaseTestSender): 'Message outside of a phase: %s' % results.get(None)) return results - def _verify_ondisk_files(self, tx_objs, policy, rx_node_index): - # verify tx and rx files that should be in sync + def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None, + rx_frag_index=None): + """ + Verify tx and rx files that should be in sync. + :param tx_objs: sender diskfiles + :param policy: storage policy instance + :param tx_frag_index: the fragment index of tx diskfiles that should + have been used as a source for sync'ing + :param rx_frag_index: the fragment index of expected rx diskfiles + """ for o_name, diskfiles in tx_objs.iteritems(): for tx_df in diskfiles: - frag_index = tx_df._frag_index - if frag_index == rx_node_index: - # this frag_index should have been sync'd, + if tx_frag_index is None or tx_df._frag_index == tx_frag_index: + # this diskfile should have been sync'd, # check rx file is ok - rx_df = self._open_rx_diskfile(o_name, policy, frag_index) - self._verify_diskfile_sync(tx_df, rx_df, frag_index) - expected_body = '/a/c/%s___%s' % (o_name, rx_node_index) - actual_body = ''.join([chunk for chunk in rx_df.reader()]) - self.assertEqual(expected_body, actual_body) + rx_df = self._open_rx_diskfile( + o_name, policy, rx_frag_index) + # for EC revert job or replication etags should match + match_etag = (tx_frag_index == rx_frag_index) + self._verify_diskfile_sync( + tx_df, rx_df, rx_frag_index, match_etag) else: - # this frag_index should not have been sync'd, + # this diskfile should not have been sync'd, # check no rx file, - self.assertRaises(DiskFileNotExist, - self._open_rx_diskfile, - o_name, policy, frag_index=frag_index) + self.assertRaises(DiskFileNotExist, self._open_rx_diskfile, + o_name, policy, + frag_index=tx_df._frag_index) # check tx file still intact - ssync does not do any cleanup! - self._open_tx_diskfile(o_name, policy, frag_index) + tx_df.open() def _verify_tombstones(self, tx_objs, policy): # verify tx and rx tombstones that should be in sync @@ -1509,13 +1512,17 @@ class TestSsync(BaseTestSender): rx_delete_time = exc.timestamp self.assertEqual(tx_delete_time, rx_delete_time) + +@patch_policies(with_ec_default=True) +class TestSsyncEC(TestBaseSsync): def test_handoff_fragment_revert(self): # test that a sync_revert type job does send the correct frag archives - # to the receiver, and that those frag archives are then removed from - # local node. + # to the receiver policy = POLICIES.default rx_node_index = 0 tx_node_index = 1 + # for a revert job we iterate over frag index that belongs on + # remote node frag_index = rx_node_index # create sender side diskfiles... @@ -1557,20 +1564,18 @@ class TestSsync(BaseTestSender): job = {'device': self.device, 'partition': self.partition, 'policy': policy, - 'frag_index': frag_index, - 'purge': True} - node = {'index': rx_node_index} - self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes) - # fake connection from tx to rx... - self.sender.connect = self.make_fake_ssync_connect( - self.sender, self.rx_controller, self.device, self.partition, - policy) + 'frag_index': frag_index} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) # run the sync protocol... - self.sender() + sender() # verify protocol - results = self._analyze_trace(self.sender.connection.trace) + results = self._analyze_trace(trace) # sender has handoff frags for o1, o3 and o4 and ts for o5 self.assertEqual(4, len(results['tx_missing'])) # receiver is missing frags for o1, o3 and ts for o5 @@ -1591,7 +1596,8 @@ class TestSsync(BaseTestSender): self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths)) # verify on disk files... - self._verify_ondisk_files(tx_objs, policy, rx_node_index) + self._verify_ondisk_files( + tx_objs, policy, frag_index, rx_node_index) self._verify_tombstones(tx_tombstones, policy) def test_fragment_sync(self): @@ -1656,19 +1662,17 @@ class TestSsync(BaseTestSender): 'policy': policy, 'frag_index': frag_index, 'sync_diskfile_builder': fake_reconstruct_fa} - node = {'index': rx_node_index} - self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes) - - # fake connection from tx to rx... - self.sender.connect = self.make_fake_ssync_connect( - self.sender, self.rx_controller, self.device, self.partition, - policy) + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) # run the sync protocol... - self.sender() + sender() # verify protocol - results = self._analyze_trace(self.sender.connection.trace) + results = self._analyze_trace(trace) # sender has primary for o1, o2 and o3, o4 and ts for o5 self.assertEqual(5, len(results['tx_missing'])) # receiver is missing o1, o2 and o3 and ts for o5 @@ -1702,9 +1706,135 @@ class TestSsync(BaseTestSender): # verify on disk files... self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths)) - self._verify_ondisk_files(tx_objs, policy, rx_node_index) + self._verify_ondisk_files( + tx_objs, policy, frag_index, rx_node_index) self._verify_tombstones(tx_tombstones, policy) +@patch_policies +class TestSsyncReplication(TestBaseSsync): + def test_sync(self): + policy = POLICIES.default + rx_node_index = 0 + + # create sender side diskfiles... + tx_objs = {} + rx_objs = {} + tx_tombstones = {} + rx_tombstones = {} + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + # o1 and o2 are on tx only + t1 = self.ts_iter.next() + tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1) + t2 = self.ts_iter.next() + tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2) + # o3 is on tx and older copy on rx + t3a = self.ts_iter.next() + rx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a) + t3b = self.ts_iter.next() + tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b) + # o4 in sync on rx and tx + t4 = self.ts_iter.next() + tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4) + rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4) + # o5 is a tombstone, missing on receiver + t5 = self.ts_iter.next() + tx_tombstones['o5'] = self._create_ondisk_files( + tx_df_mgr, 'o5', policy, t5) + tx_tombstones['o5'][0].delete(t5) + # o6 is a tombstone, in sync on tx and rx + t6 = self.ts_iter.next() + tx_tombstones['o6'] = self._create_ondisk_files( + tx_df_mgr, 'o6', policy, t6) + tx_tombstones['o6'][0].delete(t6) + rx_tombstones['o6'] = self._create_ondisk_files( + rx_df_mgr, 'o6', policy, t6) + rx_tombstones['o6'][0].delete(t6) + # o7 is a tombstone on tx, older data on rx + t7a = self.ts_iter.next() + rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a) + t7b = self.ts_iter.next() + tx_tombstones['o7'] = self._create_ondisk_files( + tx_df_mgr, 'o7', policy, t7b) + tx_tombstones['o7'][0].delete(t7b) + + suffixes = set() + for diskfiles in (tx_objs.values() + tx_tombstones.values()): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + success, in_sync_objs = sender() + + self.assertEqual(7, len(in_sync_objs)) + self.assertTrue(success) + + # verify protocol + results = self._analyze_trace(trace) + self.assertEqual(7, len(results['tx_missing'])) + self.assertEqual(5, len(results['rx_missing'])) + self.assertEqual(5, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + sync_paths = [] + for subreq in results.get('tx_updates'): + if subreq.get('method') == 'PUT': + self.assertTrue( + subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3')) + expected_body = '%s___None' % subreq['path'] + self.assertEqual(expected_body, subreq['body']) + elif subreq.get('method') == 'DELETE': + self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7')) + sync_paths.append(subreq.get('path')) + self.assertEqual( + ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'], + sorted(sync_paths)) + + # verify on disk files... + self._verify_ondisk_files(tx_objs, policy) + self._verify_tombstones(tx_tombstones, policy) + + def test_nothing_to_sync(self): + job = {'device': self.device, + 'partition': self.partition, + 'policy': POLICIES.default} + node = {'replication_ip': self.rx_ip, + 'replication_port': self.rx_port, + 'device': self.device, + 'index': 0} + sender = ssync_sender.Sender(self.daemon, node, job, ['abc']) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + result, in_sync_objs = sender() + + self.assertTrue(result) + self.assertFalse(in_sync_objs) + results = self._analyze_trace(trace) + self.assertFalse(results['tx_missing']) + self.assertFalse(results['rx_missing']) + self.assertFalse(results['tx_updates']) + self.assertFalse(results['rx_updates']) + # Minimal receiver response as read by sender: + # 2 * 4098 <-- _ensure_flush() twice + # + 23 <-- :MISSING CHECK START\r\n + # + 2 <-- \r\n (minimal missing check response) + # + 21 <-- :MISSING CHECK END\r\n + # + 17 <-- :UPDATES START\r\n + # + 15 <-- :UPDATES END\r\n + # TOTAL = 8274 + self.assertEqual(8274, trace.get('readline_bytes')) + + if __name__ == '__main__': unittest.main()