Merge "Push cooperative sleep call down into ThreadPool"

This commit is contained in:
Jenkins 2013-11-28 00:16:21 +00:00 committed by Gerrit Code Review
commit 669d22bf6b
5 changed files with 14 additions and 38 deletions

View File

@ -2257,14 +2257,18 @@ class ThreadPool(object):
Exceptions thrown will be reraised in the calling thread. Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, just calls If the threadpool was initialized with nthreads=0, it invokes
func(*args, **kwargs). func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure
the eventlet hub has a chance to execute. It is more likely the hub
will be invoked when queuing operations to an external thread.
:returns: result of calling func :returns: result of calling func
:raises: whatever func raises :raises: whatever func raises
""" """
if self.nthreads <= 0: if self.nthreads <= 0:
return func(*args, **kwargs) result = func(*args, **kwargs)
sleep()
return result
ev = event.Event() ev = event.Event()
self._run_queue.put((ev, func, args, kwargs), block=False) self._run_queue.put((ev, func, args, kwargs), block=False)

View File

@ -713,12 +713,11 @@ class DiskFileReader(object):
:param device_path: on-disk device path, used when quarantining an obj :param device_path: on-disk device path, used when quarantining an obj
:param logger: logger caller wants this object to use :param logger: logger caller wants this object to use
:param quarantine_hook: 1-arg callable called w/reason when quarantined :param quarantine_hook: 1-arg callable called w/reason when quarantined
:param iter_hook: called when __iter__ returns a chunk
:param keep_cache: should resulting reads be kept in the buffer cache :param keep_cache: should resulting reads be kept in the buffer cache
""" """
def __init__(self, fp, data_file, obj_size, etag, threadpool, def __init__(self, fp, data_file, obj_size, etag, threadpool,
disk_chunk_size, keep_cache_size, device_path, logger, disk_chunk_size, keep_cache_size, device_path, logger,
quarantine_hook, iter_hook=None, keep_cache=False): quarantine_hook, keep_cache=False):
# Parameter tracking # Parameter tracking
self._fp = fp self._fp = fp
self._data_file = data_file self._data_file = data_file
@ -729,7 +728,6 @@ class DiskFileReader(object):
self._device_path = device_path self._device_path = device_path
self._logger = logger self._logger = logger
self._quarantine_hook = quarantine_hook self._quarantine_hook = quarantine_hook
self._iter_hook = iter_hook
if keep_cache: if keep_cache:
# Caller suggests we keep this in cache, only do it if the # Caller suggests we keep this in cache, only do it if the
# object's size is less than the maximum. # object's size is less than the maximum.
@ -767,8 +765,6 @@ class DiskFileReader(object):
self._bytes_read - dropped_cache) self._bytes_read - dropped_cache)
dropped_cache = self._bytes_read dropped_cache = self._bytes_read
yield chunk yield chunk
if self._iter_hook:
self._iter_hook()
else: else:
self._read_to_eof = True self._read_to_eof = True
self._drop_cache(self._fp.fileno(), dropped_cache, self._drop_cache(self._fp.fileno(), dropped_cache,
@ -1259,7 +1255,7 @@ class DiskFile(object):
with self.open(): with self.open():
return self.get_metadata() return self.get_metadata()
def reader(self, iter_hook=None, keep_cache=False, def reader(self, keep_cache=False,
_quarantine_hook=lambda m: None): _quarantine_hook=lambda m: None):
""" """
Return a :class:`swift.common.swob.Response` class compatible Return a :class:`swift.common.swob.Response` class compatible
@ -1269,7 +1265,6 @@ class DiskFile(object):
For this implementation, the responsibility of closing the open file For this implementation, the responsibility of closing the open file
is passed to the :class:`swift.obj.diskfile.DiskFileReader` object. is passed to the :class:`swift.obj.diskfile.DiskFileReader` object.
:param iter_hook: called when __iter__ returns a chunk
:param keep_cache: caller's preference for keeping data read in the :param keep_cache: caller's preference for keeping data read in the
OS buffer cache OS buffer cache
:param _quarantine_hook: 1-arg callable called when obj quarantined; :param _quarantine_hook: 1-arg callable called when obj quarantined;
@ -1282,8 +1277,7 @@ class DiskFile(object):
self._fp, self._data_file, int(self._metadata['Content-Length']), self._fp, self._data_file, int(self._metadata['Content-Length']),
self._metadata['ETag'], self._threadpool, self._disk_chunk_size, self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
self._mgr.keep_cache_size, self._device_path, self._logger, self._mgr.keep_cache_size, self._device_path, self._logger,
quarantine_hook=_quarantine_hook, quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
iter_hook=iter_hook, keep_cache=keep_cache)
# At this point the reader object is now responsible for closing # At this point the reader object is now responsible for closing
# the file pointer. # the file pointer.
self._fp = None self._fp = None

View File

@ -112,14 +112,12 @@ class DiskFileReader(object):
:param fp: open file object pointer reference :param fp: open file object pointer reference
:param obj_size: on-disk size of object in bytes :param obj_size: on-disk size of object in bytes
:param etag: MD5 hash of object from metadata :param etag: MD5 hash of object from metadata
:param iter_hook: called when __iter__ returns a chunk
""" """
def __init__(self, name, fp, obj_size, etag, iter_hook=None): def __init__(self, name, fp, obj_size, etag):
self._name = name self._name = name
self._fp = fp self._fp = fp
self._obj_size = obj_size self._obj_size = obj_size
self._etag = etag self._etag = etag
self._iter_hook = iter_hook
# #
self._iter_etag = None self._iter_etag = None
self._bytes_read = 0 self._bytes_read = 0
@ -144,8 +142,6 @@ class DiskFileReader(object):
self._iter_etag.update(chunk) self._iter_etag.update(chunk)
self._bytes_read += len(chunk) self._bytes_read += len(chunk)
yield chunk yield chunk
if self._iter_hook:
self._iter_hook()
else: else:
self._read_to_eof = True self._read_to_eof = True
break break
@ -234,7 +230,6 @@ class DiskFile(object):
:param account: account name for the object :param account: account name for the object
:param container: container name for the object :param container: container name for the object
:param obj: object name for the object :param obj: object name for the object
:param iter_hook: called when __iter__ returns a chunk
:param keep_cache: caller's preference for keeping data read in the cache :param keep_cache: caller's preference for keeping data read in the cache
""" """
@ -348,19 +343,17 @@ class DiskFile(object):
with self.open(): with self.open():
return self.get_metadata() return self.get_metadata()
def reader(self, iter_hook=None, keep_cache=False): def reader(self, keep_cache=False):
""" """
Return a swift.common.swob.Response class compatible "app_iter" Return a swift.common.swob.Response class compatible "app_iter"
object. The responsibility of closing the open file is passed to the object. The responsibility of closing the open file is passed to the
DiskFileReader object. DiskFileReader object.
:param iter_hook:
:param keep_cache: :param keep_cache:
""" """
dr = DiskFileReader(self._name, self._fp, dr = DiskFileReader(self._name, self._fp,
int(self._metadata['Content-Length']), int(self._metadata['Content-Length']),
self._metadata['ETag'], self._metadata['ETag'])
iter_hook=iter_hook)
# At this point the reader object is now responsible for # At this point the reader object is now responsible for
# the file pointer. # the file pointer.
self._fp = None self._fp = None

View File

@ -407,7 +407,6 @@ class ObjectController(object):
return HTTPRequestTimeout(request=request) return HTTPRequestTimeout(request=request)
etag.update(chunk) etag.update(chunk)
upload_size = writer.write(chunk) upload_size = writer.write(chunk)
sleep()
elapsed_time += time.time() - start_time elapsed_time += time.time() - start_time
if upload_size: if upload_size:
self.logger.transfer_rate( self.logger.transfer_rate(
@ -505,8 +504,7 @@ class ObjectController(object):
('X-Auth-Token' not in request.headers and ('X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers)) 'X-Storage-Token' not in request.headers))
response = Response( response = Response(
app_iter=disk_file.reader( app_iter=disk_file.reader(keep_cache=keep_cache),
iter_hook=sleep, keep_cache=keep_cache),
request=request, conditional_response=True) request=request, conditional_response=True)
response.headers['Content-Type'] = metadata.get( response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream') 'Content-Type', 'application/octet-stream')

View File

@ -790,19 +790,6 @@ class TestDiskFile(unittest.TestCase):
df.unit_test_len = fsize df.unit_test_len = fsize
return df return df
def test_iter_hook(self):
hook_call_count = [0]
def hook():
hook_call_count[0] += 1
df = self._get_open_disk_file(fsize=65, csize=8)
with df.open():
for _ in df.reader(iter_hook=hook):
pass
self.assertEquals(hook_call_count[0], 9)
def test_keep_cache(self): def test_keep_cache(self):
df = self._get_open_disk_file(fsize=65) df = self._get_open_disk_file(fsize=65)
with mock.patch("swift.obj.diskfile.drop_buffer_cache") as foo: with mock.patch("swift.obj.diskfile.drop_buffer_cache") as foo: