Fix starvation in object server with fast clients.
When an object server was handling concurrent GET or POST requests from very fast clients, it would starve other connected clients. The greenthreads responsible for servicing the fast clients would hog the processor and only rarely yield to another greenthread. The reason this happens for GET requests is found in eventlet.greenio.GreenSocket, in the send() method. When you call .send(data) on a GreenSocket, it immediately calls .send(data) on its underlying real socket (socket._socketobject). If the real socket accepts all the data, then GreenSocket.send() returns without yielding to another greenthread. Only if the real socket failed to accept all the data (either .send(data) < len(data) or by raising EWOULDBLOCK) does the GreenSocket yield control. Under most workloads, this isn't a problem. The TCP connection to client X can only consume data so quickly, and therefore the greenthread serving client X will frequently encounter a full socket buffer and yield control, so no clients starve. However, when there's a lot of contention for a single object from a large number of fast clients (e.g. on a LAN connected w/10Gb Ethernet), then one winds up in a situation where reading from the disk is slower than writing to the network, and so full socket buffers become rare, and therefore so do context switches. The end result is that many clients time out waiting for data. The situation for PUT requests is analogous; GreenSocket.recv() seldom encounters EWOULDBLOCK, so greenthreads seldom yield. This patch calls eventlet.sleep() to yield control after each chunk, preventing any one greenthread's IO from blocking the hub for very long. This code has the flaw that it will greenthread-switch twice when a send() or recv() does block, but since there isn't a way to find out if a switch occurred or not, there's no way to avoid it. Since greenlet switches are quite fast (faster than system calls, which the object server does a lot of), this shouldn't have a significant performance impact. Change-Id: I8549adfb4a198739b80979236c27b76df607eebf
This commit is contained in:
parent
7a9c2d6ea5
commit
783f16035a
@ -104,12 +104,15 @@ class DiskFile(object):
|
|||||||
:param container: container name for the object
|
:param container: container name for the object
|
||||||
:param obj: object name for the object
|
:param obj: object name for the object
|
||||||
:param keep_data_fp: if True, don't close the fp, otherwise close it
|
:param keep_data_fp: if True, don't close the fp, otherwise close it
|
||||||
:param disk_chunk_Size: size of chunks on file reads
|
:param disk_chunk_size: size of chunks on file reads
|
||||||
|
:param iter_hook: called when __iter__ returns a chunk
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, path, device, partition, account, container, obj,
|
def __init__(self, path, device, partition, account, container, obj,
|
||||||
logger, keep_data_fp=False, disk_chunk_size=65536):
|
logger, keep_data_fp=False, disk_chunk_size=65536,
|
||||||
|
iter_hook=None):
|
||||||
self.disk_chunk_size = disk_chunk_size
|
self.disk_chunk_size = disk_chunk_size
|
||||||
|
self.iter_hook = iter_hook
|
||||||
self.name = '/' + '/'.join((account, container, obj))
|
self.name = '/' + '/'.join((account, container, obj))
|
||||||
name_hash = hash_path(account, container, obj)
|
name_hash = hash_path(account, container, obj)
|
||||||
self.datadir = os.path.join(path, device,
|
self.datadir = os.path.join(path, device,
|
||||||
@ -173,6 +176,8 @@ class DiskFile(object):
|
|||||||
read - dropped_cache)
|
read - dropped_cache)
|
||||||
dropped_cache = read
|
dropped_cache = read
|
||||||
yield chunk
|
yield chunk
|
||||||
|
if self.iter_hook:
|
||||||
|
self.iter_hook()
|
||||||
else:
|
else:
|
||||||
self.read_to_eof = True
|
self.read_to_eof = True
|
||||||
self.drop_cache(self.fp.fileno(), dropped_cache,
|
self.drop_cache(self.fp.fileno(), dropped_cache,
|
||||||
@ -590,6 +595,7 @@ class ObjectController(object):
|
|||||||
tpool.execute(os.fdatasync, fd)
|
tpool.execute(os.fdatasync, fd)
|
||||||
drop_buffer_cache(fd, last_sync, upload_size - last_sync)
|
drop_buffer_cache(fd, last_sync, upload_size - last_sync)
|
||||||
last_sync = upload_size
|
last_sync = upload_size
|
||||||
|
sleep()
|
||||||
|
|
||||||
if 'content-length' in request.headers and \
|
if 'content-length' in request.headers and \
|
||||||
int(request.headers['content-length']) != upload_size:
|
int(request.headers['content-length']) != upload_size:
|
||||||
@ -650,7 +656,8 @@ class ObjectController(object):
|
|||||||
return HTTPInsufficientStorage(drive=device, request=request)
|
return HTTPInsufficientStorage(drive=device, request=request)
|
||||||
file = DiskFile(self.devices, device, partition, account, container,
|
file = DiskFile(self.devices, device, partition, account, container,
|
||||||
obj, self.logger, keep_data_fp=True,
|
obj, self.logger, keep_data_fp=True,
|
||||||
disk_chunk_size=self.disk_chunk_size)
|
disk_chunk_size=self.disk_chunk_size,
|
||||||
|
iter_hook=sleep)
|
||||||
if file.is_deleted() or ('X-Delete-At' in file.metadata and
|
if file.is_deleted() or ('X-Delete-At' in file.metadata and
|
||||||
int(file.metadata['X-Delete-At']) <= time.time()):
|
int(file.metadata['X-Delete-At']) <= time.time()):
|
||||||
if request.headers.get('if-match') == '*':
|
if request.headers.get('if-match') == '*':
|
||||||
|
@ -90,6 +90,18 @@ class TestDiskFile(unittest.TestCase):
|
|||||||
'o', FakeLogger()).mkstemp():
|
'o', FakeLogger()).mkstemp():
|
||||||
self.assert_(os.path.exists(tmpdir))
|
self.assert_(os.path.exists(tmpdir))
|
||||||
|
|
||||||
|
def test_iter_hook(self):
|
||||||
|
hook_call_count = [0]
|
||||||
|
def hook():
|
||||||
|
hook_call_count[0] += 1
|
||||||
|
|
||||||
|
df = self._get_data_file(fsize=65, csize=8, iter_hook=hook)
|
||||||
|
print repr(df.__dict__)
|
||||||
|
for _ in df:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.assertEquals(hook_call_count[0], 9)
|
||||||
|
|
||||||
def test_quarantine(self):
|
def test_quarantine(self):
|
||||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||||
FakeLogger())
|
FakeLogger())
|
||||||
@ -136,7 +148,8 @@ class TestDiskFile(unittest.TestCase):
|
|||||||
self.assert_('-' in os.path.basename(double_uuid_path))
|
self.assert_('-' in os.path.basename(double_uuid_path))
|
||||||
|
|
||||||
def _get_data_file(self, invalid_type=None, obj_name='o',
|
def _get_data_file(self, invalid_type=None, obj_name='o',
|
||||||
fsize=1024, csize=8, extension='.data', ts=None):
|
fsize=1024, csize=8, extension='.data', ts=None,
|
||||||
|
iter_hook=None):
|
||||||
'''returns a DiskFile'''
|
'''returns a DiskFile'''
|
||||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||||
obj_name, FakeLogger())
|
obj_name, FakeLogger())
|
||||||
@ -168,7 +181,8 @@ class TestDiskFile(unittest.TestCase):
|
|||||||
|
|
||||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||||
obj_name, FakeLogger(),
|
obj_name, FakeLogger(),
|
||||||
keep_data_fp=True, disk_chunk_size=csize)
|
keep_data_fp=True, disk_chunk_size=csize,
|
||||||
|
iter_hook=iter_hook)
|
||||||
if invalid_type == 'Zero-Byte':
|
if invalid_type == 'Zero-Byte':
|
||||||
os.remove(df.data_file)
|
os.remove(df.data_file)
|
||||||
fp = open(df.data_file, 'w')
|
fp = open(df.data_file, 'w')
|
||||||
|
Loading…
Reference in New Issue
Block a user