From 853853edcedec2b2d2ea6c46c812111d8a7895d0 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Tue, 3 Sep 2013 15:27:57 -0400 Subject: [PATCH] Push cooperative sleep call down into ThreadPool The PUT REST API has no idea how writes are performed, so when thread pools are in use, the sleep is not necessary, though it is still necessary when thread pools are not in use. Since the ThreadPool object knows when threads are actually in use, it can take care of being cooperative with the eventlet hub. In addition, we can hide the cooperative iterator hook, given that the only other consumer of this was the auditor, which does not need it any longer. The only consumer of the DiskFile class that wants the cooperative behavior is the REST API layer of the object server, which is also using thread pools. Change-Id: Ibc4ac672899f9a35fd68c85d7f56403c19b4f991 Signed-off-by: Peter Portante --- swift/common/utils.py | 10 +++++++--- swift/obj/diskfile.py | 12 +++--------- swift/obj/mem_diskfile.py | 13 +++---------- swift/obj/server.py | 4 +--- test/unit/obj/test_diskfile.py | 13 ------------- 5 files changed, 14 insertions(+), 38 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index 1290b94d73..54bb42ed4f 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2164,14 +2164,18 @@ class ThreadPool(object): Exceptions thrown will be reraised in the calling thread. - If the threadpool was initialized with nthreads=0, just calls - func(*args, **kwargs). + If the threadpool was initialized with nthreads=0, it invokes + func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure + the eventlet hub has a chance to execute. It is more likely the hub + will be invoked when queuing operations to an external thread. :returns: result of calling func :raises: whatever func raises """ if self.nthreads <= 0: - return func(*args, **kwargs) + result = func(*args, **kwargs) + sleep() + return result ev = event.Event() self._run_queue.put((ev, func, args, kwargs), block=False) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index f8c1d10037..ce30c77592 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -713,12 +713,11 @@ class DiskFileReader(object): :param device_path: on-disk device path, used when quarantining an obj :param logger: logger caller wants this object to use :param quarantine_hook: 1-arg callable called w/reason when quarantined - :param iter_hook: called when __iter__ returns a chunk :param keep_cache: should resulting reads be kept in the buffer cache """ def __init__(self, fp, data_file, obj_size, etag, threadpool, disk_chunk_size, keep_cache_size, device_path, logger, - quarantine_hook, iter_hook=None, keep_cache=False): + quarantine_hook, keep_cache=False): # Parameter tracking self._fp = fp self._data_file = data_file @@ -729,7 +728,6 @@ class DiskFileReader(object): self._device_path = device_path self._logger = logger self._quarantine_hook = quarantine_hook - self._iter_hook = iter_hook if keep_cache: # Caller suggests we keep this in cache, only do it if the # object's size is less than the maximum. @@ -767,8 +765,6 @@ class DiskFileReader(object): self._bytes_read - dropped_cache) dropped_cache = self._bytes_read yield chunk - if self._iter_hook: - self._iter_hook() else: self._read_to_eof = True self._drop_cache(self._fp.fileno(), dropped_cache, @@ -1259,7 +1255,7 @@ class DiskFile(object): with self.open(): return self.get_metadata() - def reader(self, iter_hook=None, keep_cache=False, + def reader(self, keep_cache=False, _quarantine_hook=lambda m: None): """ Return a :class:`swift.common.swob.Response` class compatible @@ -1269,7 +1265,6 @@ class DiskFile(object): For this implementation, the responsibility of closing the open file is passed to the :class:`swift.obj.diskfile.DiskFileReader` object. - :param iter_hook: called when __iter__ returns a chunk :param keep_cache: caller's preference for keeping data read in the OS buffer cache :param _quarantine_hook: 1-arg callable called when obj quarantined; @@ -1282,8 +1277,7 @@ class DiskFile(object): self._fp, self._data_file, int(self._metadata['Content-Length']), self._metadata['ETag'], self._threadpool, self._disk_chunk_size, self._mgr.keep_cache_size, self._device_path, self._logger, - quarantine_hook=_quarantine_hook, - iter_hook=iter_hook, keep_cache=keep_cache) + quarantine_hook=_quarantine_hook, keep_cache=keep_cache) # At this point the reader object is now responsible for closing # the file pointer. self._fp = None diff --git a/swift/obj/mem_diskfile.py b/swift/obj/mem_diskfile.py index da206dea72..b780ebbee6 100644 --- a/swift/obj/mem_diskfile.py +++ b/swift/obj/mem_diskfile.py @@ -112,14 +112,12 @@ class DiskFileReader(object): :param fp: open file object pointer reference :param obj_size: on-disk size of object in bytes :param etag: MD5 hash of object from metadata - :param iter_hook: called when __iter__ returns a chunk """ - def __init__(self, name, fp, obj_size, etag, iter_hook=None): + def __init__(self, name, fp, obj_size, etag): self._name = name self._fp = fp self._obj_size = obj_size self._etag = etag - self._iter_hook = iter_hook # self._iter_etag = None self._bytes_read = 0 @@ -144,8 +142,6 @@ class DiskFileReader(object): self._iter_etag.update(chunk) self._bytes_read += len(chunk) yield chunk - if self._iter_hook: - self._iter_hook() else: self._read_to_eof = True break @@ -234,7 +230,6 @@ class DiskFile(object): :param account: account name for the object :param container: container name for the object :param obj: object name for the object - :param iter_hook: called when __iter__ returns a chunk :param keep_cache: caller's preference for keeping data read in the cache """ @@ -348,19 +343,17 @@ class DiskFile(object): with self.open(): return self.get_metadata() - def reader(self, iter_hook=None, keep_cache=False): + def reader(self, keep_cache=False): """ Return a swift.common.swob.Response class compatible "app_iter" object. The responsibility of closing the open file is passed to the DiskFileReader object. - :param iter_hook: :param keep_cache: """ dr = DiskFileReader(self._name, self._fp, int(self._metadata['Content-Length']), - self._metadata['ETag'], - iter_hook=iter_hook) + self._metadata['ETag']) # At this point the reader object is now responsible for # the file pointer. self._fp = None diff --git a/swift/obj/server.py b/swift/obj/server.py index c528ae97ea..cfb7695dd4 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -407,7 +407,6 @@ class ObjectController(object): return HTTPRequestTimeout(request=request) etag.update(chunk) upload_size = writer.write(chunk) - sleep() elapsed_time += time.time() - start_time if upload_size: self.logger.transfer_rate( @@ -505,8 +504,7 @@ class ObjectController(object): ('X-Auth-Token' not in request.headers and 'X-Storage-Token' not in request.headers)) response = Response( - app_iter=disk_file.reader( - iter_hook=sleep, keep_cache=keep_cache), + app_iter=disk_file.reader(keep_cache=keep_cache), request=request, conditional_response=True) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index e038636ed4..d1c3aa2fa2 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -790,19 +790,6 @@ class TestDiskFile(unittest.TestCase): df.unit_test_len = fsize return df - def test_iter_hook(self): - hook_call_count = [0] - - def hook(): - hook_call_count[0] += 1 - - df = self._get_open_disk_file(fsize=65, csize=8) - with df.open(): - for _ in df.reader(iter_hook=hook): - pass - - self.assertEquals(hook_call_count[0], 9) - def test_keep_cache(self): df = self._get_open_disk_file(fsize=65) with mock.patch("swift.obj.diskfile.drop_buffer_cache") as foo: