Object-server: add periodic greenthread yielding during file write
Currently, when object-server serves PUT request and DiskFile writer write file chunks to disk, there is no explicit eventlet sleep called. When network outpace the slow disk IO, it's possible one large and slow PUT request could cause eventlet hub not to schedule any other green threads for a long period of time. To improve this, this patch enable the configurable yield parameter 'cooperative_period' into object server controller write path. Related-Change: I80b04bad0601b6cd6caef35498f89d4ba70a4fd4 Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: I1c0aba9830433f093d024b4c39cd3a3b2f0d69f1
This commit is contained in:
parent
7662cde704
commit
ea1d84c1d7
@ -148,10 +148,11 @@ use = egg:swift#object
|
|||||||
# 'keep_cache_private' is false.
|
# 'keep_cache_private' is false.
|
||||||
# keep_cache_slo_manifest = false
|
# keep_cache_slo_manifest = false
|
||||||
#
|
#
|
||||||
# cooperative_period defines how frequent object server GET request will
|
# cooperative_period defines how frequent object server GET/PUT request will
|
||||||
# perform the cooperative yielding during iterating the disk chunks. For
|
# perform the cooperative yielding during iterating the disk chunks. For
|
||||||
# example, value of '5' will insert one sleep() after every 5 disk_chunk_size
|
# example, value of '5' will insert one sleep() after every 5 disk_chunk_size
|
||||||
# chunk reads. A value of '0' (the default) will turn off cooperative yielding.
|
# chunk reads/writes. A value of '0' (the default) will turn off cooperative
|
||||||
|
# yielding.
|
||||||
# cooperative_period = 0
|
# cooperative_period = 0
|
||||||
#
|
#
|
||||||
# on PUTs, sync data every n MB
|
# on PUTs, sync data every n MB
|
||||||
|
@ -35,7 +35,7 @@ from swift.common.utils import public, get_logger, \
|
|||||||
get_expirer_container, parse_mime_headers, \
|
get_expirer_container, parse_mime_headers, \
|
||||||
iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads, \
|
iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads, \
|
||||||
config_auto_int_value, split_path, get_redirect_data, \
|
config_auto_int_value, split_path, get_redirect_data, \
|
||||||
normalize_timestamp, md5, parse_options
|
normalize_timestamp, md5, parse_options, CooperativeIterator
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.constraints import check_object_creation, \
|
from swift.common.constraints import check_object_creation, \
|
||||||
valid_timestamp, check_utf8, AUTO_CREATE_ACCOUNT_PREFIX
|
valid_timestamp, check_utf8, AUTO_CREATE_ACCOUNT_PREFIX
|
||||||
@ -908,13 +908,20 @@ class ObjectController(BaseStorageServer):
|
|||||||
elapsed_time = 0
|
elapsed_time = 0
|
||||||
upload_expiration = time.time() + self.max_upload_time
|
upload_expiration = time.time() + self.max_upload_time
|
||||||
timeout_reader = self._make_timeout_reader(obj_input)
|
timeout_reader = self._make_timeout_reader(obj_input)
|
||||||
for chunk in iter(timeout_reader, b''):
|
|
||||||
|
# Wrap the chunks in CooperativeIterator with specified period
|
||||||
|
cooperative_reader = CooperativeIterator(
|
||||||
|
iter(timeout_reader, b''), period=self.cooperative_period
|
||||||
|
)
|
||||||
|
|
||||||
|
for chunk in cooperative_reader:
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
if start_time > upload_expiration:
|
if start_time > upload_expiration:
|
||||||
self.logger.increment('PUT.timeouts')
|
self.logger.increment('PUT.timeouts')
|
||||||
raise HTTPRequestTimeout(request=request)
|
raise HTTPRequestTimeout(request=request)
|
||||||
writer.write(chunk)
|
writer.write(chunk)
|
||||||
elapsed_time += time.time() - start_time
|
elapsed_time += time.time() - start_time
|
||||||
|
|
||||||
upload_size, etag = writer.chunks_finished()
|
upload_size, etag = writer.chunks_finished()
|
||||||
if fsize is not None and fsize != upload_size:
|
if fsize is not None and fsize != upload_size:
|
||||||
raise HTTPClientDisconnect(request=request)
|
raise HTTPClientDisconnect(request=request)
|
||||||
|
@ -4786,6 +4786,88 @@ class TestObjectController(BaseTestCase):
|
|||||||
self.assertEqual(b' bytes', resp.body)
|
self.assertEqual(b' bytes', resp.body)
|
||||||
self.assertFalse(mock_sleep.called)
|
self.assertFalse(mock_sleep.called)
|
||||||
|
|
||||||
|
def test_PUT_cooperative_period_config(self):
|
||||||
|
# Test DiskFile writer actually sleeps when writing chunks. When
|
||||||
|
# cooperative_period is 1, disk writer sleeps once.
|
||||||
|
conf = {'devices': self.testdir, 'mount_check': 'false',
|
||||||
|
'container_update_timeout': 0.0,
|
||||||
|
'cooperative_period': '1'}
|
||||||
|
obj_controller = object_server.ObjectController(
|
||||||
|
conf, logger=self.logger)
|
||||||
|
|
||||||
|
timestamp = normalize_timestamp(time())
|
||||||
|
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||||
|
headers={'X-Timestamp': timestamp,
|
||||||
|
'Content-Type': 'application/x-test'})
|
||||||
|
req.body = b'7 bytes'
|
||||||
|
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||||
|
resp = req.get_response(obj_controller)
|
||||||
|
self.assertEqual(resp.status_int, 201)
|
||||||
|
self.assertEqual(1, mock_sleep.call_count)
|
||||||
|
|
||||||
|
# Test DiskFile writer actually sleeps when writing chunks. And verify
|
||||||
|
# number of sleeps when 'disk_chunk_size' is set.
|
||||||
|
conf['cooperative_period'] = '2'
|
||||||
|
conf['network_chunk_size'] = 2
|
||||||
|
obj_controller = object_server.ObjectController(
|
||||||
|
conf, logger=self.logger)
|
||||||
|
|
||||||
|
timestamp = normalize_timestamp(time())
|
||||||
|
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||||
|
headers={'X-Timestamp': timestamp,
|
||||||
|
'Content-Type': 'application/x-test'})
|
||||||
|
req.body = b'7 bytes'
|
||||||
|
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||||
|
resp = req.get_response(obj_controller)
|
||||||
|
self.assertEqual(resp.status_int, 201)
|
||||||
|
self.assertEqual(2, mock_sleep.call_count)
|
||||||
|
|
||||||
|
conf['cooperative_period'] = '2'
|
||||||
|
conf['network_chunk_size'] = 3
|
||||||
|
obj_controller = object_server.ObjectController(
|
||||||
|
conf, logger=self.logger)
|
||||||
|
|
||||||
|
timestamp = normalize_timestamp(time())
|
||||||
|
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||||
|
headers={'X-Timestamp': timestamp,
|
||||||
|
'Content-Type': 'application/x-test'})
|
||||||
|
req.body = b'7 bytes'
|
||||||
|
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||||
|
resp = req.get_response(obj_controller)
|
||||||
|
self.assertEqual(resp.status_int, 201)
|
||||||
|
self.assertEqual(1, mock_sleep.call_count)
|
||||||
|
|
||||||
|
# Test DiskFile reader won't sleep with cooperative_period set as 0.
|
||||||
|
conf['cooperative_period'] = '0'
|
||||||
|
obj_controller = object_server.ObjectController(
|
||||||
|
conf, logger=self.logger)
|
||||||
|
|
||||||
|
timestamp = normalize_timestamp(time())
|
||||||
|
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||||
|
headers={'X-Timestamp': timestamp,
|
||||||
|
'Content-Type': 'application/x-test'})
|
||||||
|
req.body = b'7 bytes'
|
||||||
|
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||||
|
resp = req.get_response(obj_controller)
|
||||||
|
self.assertEqual(resp.status_int, 201)
|
||||||
|
self.assertFalse(mock_sleep.called)
|
||||||
|
|
||||||
|
# Test DiskFile reader won't sleep with default cooperative_period
|
||||||
|
# which is also 0.
|
||||||
|
conf.pop('cooperative_period')
|
||||||
|
obj_controller = object_server.ObjectController(
|
||||||
|
conf, logger=self.logger)
|
||||||
|
|
||||||
|
timestamp = normalize_timestamp(time())
|
||||||
|
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||||
|
headers={'X-Timestamp': timestamp,
|
||||||
|
'Content-Type': 'application/x-test'})
|
||||||
|
req.body = b'7 bytes'
|
||||||
|
with mock.patch('swift.common.utils.sleep') as mock_sleep:
|
||||||
|
resp = req.get_response(obj_controller)
|
||||||
|
self.assertEqual(resp.status_int, 201)
|
||||||
|
self.assertFalse(mock_sleep.called)
|
||||||
|
|
||||||
@mock.patch("time.time", mock_time)
|
@mock.patch("time.time", mock_time)
|
||||||
def test_DELETE(self):
|
def test_DELETE(self):
|
||||||
# Test swift.obj.server.ObjectController.DELETE
|
# Test swift.obj.server.ObjectController.DELETE
|
||||||
|
Loading…
Reference in New Issue
Block a user