diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index d29eb36f93..f7211763c0 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -4097,127 +4097,6 @@ class TestReplicatedObjectController( finally: time.time = orig_time - @unpatch_policies - def test_ec_client_disconnect(self): - prolis = _test_sockets[0] - - # create connection - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - - # create container - fd.write('PUT /v1/a/ec-discon HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Content-Length: 0\r\n' - 'X-Storage-Token: t\r\n' - 'X-Storage-Policy: ec\r\n' - '\r\n') - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 2' - self.assertEqual(headers[:len(exp)], exp) - - # create object - obj = 'a' * 4 * 64 * 2 ** 10 - fd.write('PUT /v1/a/ec-discon/test HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: donuts\r\n' - '\r\n%s' % (len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - class WrappedTimeout(ChunkWriteTimeout): - def __enter__(self): - timeouts[self] = traceback.extract_stack() - return super(WrappedTimeout, self).__enter__() - - def __exit__(self, typ, value, tb): - timeouts[self] = None - return super(WrappedTimeout, self).__exit__(typ, value, tb) - - timeouts = {} - with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout', - WrappedTimeout): - with mock.patch.object(_test_servers[0], 'client_timeout', new=5): - # get object - fd.write('GET /v1/a/ec-discon/test HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'X-Storage-Token: t\r\n' - '\r\n') - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 200' - self.assertEqual(headers[:len(exp)], exp) - - # read most of the object, and disconnect - fd.read(10) - sock.fd._sock.close() - self._sleep_enough( - lambda: - _test_servers[0].logger.get_lines_for_level('warning')) - - # check for disconnect message! - expected = ['Client disconnected on read'] * 2 - self.assertEqual( - _test_servers[0].logger.get_lines_for_level('warning'), - expected) - # check that no coro was left waiting to write - self.assertTrue(timeouts) # sanity - WrappedTimeout did get called - missing_exits = [tb for tb in timeouts.values() if tb is not None] - self.assertFalse( - missing_exits, 'Failed to exit all ChunkWriteTimeouts.\n' + - ''.join(['No exit from ChunkWriteTimeout entered at:\n' + - ''.join(traceback.format_list(tb)[:-1]) - for tb in missing_exits])) - # and check that the ChunkWriteTimeouts did not raise Exceptions - self.assertFalse(_test_servers[0].logger.get_lines_for_level('error')) - - @unpatch_policies - def test_ec_client_put_disconnect(self): - prolis = _test_sockets[0] - - # create connection - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - - # create container - fd.write('PUT /v1/a/ec-discon HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Content-Length: 0\r\n' - 'X-Storage-Token: t\r\n' - 'X-Storage-Policy: ec\r\n' - '\r\n') - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 2' - self.assertEqual(headers[:len(exp)], exp) - - # create object - obj = 'a' * 4 * 64 * 2 ** 10 - fd.write('PUT /v1/a/ec-discon/test HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: donuts\r\n' - '\r\n%s' % (len(obj), obj[:-10])) - fd.flush() - fd.close() - sock.close() - # sleep to trampoline enough - condition = \ - lambda: _test_servers[0].logger.get_lines_for_level('warning') - self._sleep_enough(condition) - expected = ['Client disconnected without sending enough data'] - warns = _test_servers[0].logger.get_lines_for_level('warning') - self.assertEqual(expected, warns) - errors = _test_servers[0].logger.get_lines_for_level('error') - self.assertEqual([], errors) - @unpatch_policies def test_leak_1(self): _request_instances = weakref.WeakKeyDictionary() @@ -5615,6 +5494,125 @@ class BaseTestECObjectController(BaseTestObjectController): os.rename(self.ec_policy.object_ring.serialized_path + '.bak', self.ec_policy.object_ring.serialized_path) + def test_ec_client_disconnect(self): + prolis = _test_sockets[0] + + # create connection + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + + # create container + fd.write('PUT /v1/a/%s-discon HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Content-Length: 0\r\n' + 'X-Storage-Token: t\r\n' + 'X-Storage-Policy: %s\r\n' + '\r\n' % (self.ec_policy.name, self.ec_policy.name)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' + self.assertEqual(headers[:len(exp)], exp) + + # create object + obj = 'a' * 4 * 64 * 2 ** 10 + fd.write('PUT /v1/a/%s-discon/test HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: donuts\r\n' + '\r\n%s' % (self.ec_policy.name, len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + class WrappedTimeout(ChunkWriteTimeout): + def __enter__(self): + timeouts[self] = traceback.extract_stack() + return super(WrappedTimeout, self).__enter__() + + def __exit__(self, typ, value, tb): + timeouts[self] = None + return super(WrappedTimeout, self).__exit__(typ, value, tb) + + timeouts = {} + with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout', + WrappedTimeout): + with mock.patch.object(_test_servers[0], 'client_timeout', new=5): + # get object + fd.write('GET /v1/a/%s-discon/test HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + '\r\n' % self.ec_policy.name) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEqual(headers[:len(exp)], exp) + + # read most of the object, and disconnect + fd.read(10) + sock.fd._sock.close() + self._sleep_enough( + lambda: + _test_servers[0].logger.get_lines_for_level('warning')) + + # check for disconnect message! + expected = ['Client disconnected on read'] * 2 + self.assertEqual( + _test_servers[0].logger.get_lines_for_level('warning'), + expected) + # check that no coro was left waiting to write + self.assertTrue(timeouts) # sanity - WrappedTimeout did get called + missing_exits = [tb for tb in timeouts.values() if tb is not None] + self.assertFalse( + missing_exits, 'Failed to exit all ChunkWriteTimeouts.\n' + + ''.join(['No exit from ChunkWriteTimeout entered at:\n' + + ''.join(traceback.format_list(tb)[:-1]) + for tb in missing_exits])) + # and check that the ChunkWriteTimeouts did not raise Exceptions + self.assertFalse(_test_servers[0].logger.get_lines_for_level('error')) + + def test_ec_client_put_disconnect(self): + prolis = _test_sockets[0] + + # create connection + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + + # create container + fd.write('PUT /v1/a/%s-discon HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Content-Length: 0\r\n' + 'X-Storage-Token: t\r\n' + 'X-Storage-Policy: %s\r\n' + '\r\n' % (self.ec_policy.name, self.ec_policy.name)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' + self.assertEqual(headers[:len(exp)], exp) + + # create object + obj = 'a' * 4 * 64 * 2 ** 10 + fd.write('PUT /v1/a/%s-discon/test HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: donuts\r\n' + '\r\n%s' % (self.ec_policy.name, len(obj), obj[:-10])) + fd.flush() + fd.close() + sock.close() + # sleep to trampoline enough + condition = \ + lambda: _test_servers[0].logger.get_lines_for_level('warning') + self._sleep_enough(condition) + expected = ['Client disconnected without sending enough data'] + warns = _test_servers[0].logger.get_lines_for_level('warning') + self.assertEqual(expected, warns) + errors = _test_servers[0].logger.get_lines_for_level('error') + self.assertEqual([], errors) + class TestECObjectController(BaseTestECObjectController, unittest.TestCase): def setUp(self):