diff --git a/swift/obj/server.py b/swift/obj/server.py index 0f19dd3a97..4b716c0c12 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -94,6 +94,67 @@ def write_metadata(fd, metadata): key += 1 +class DiskWriter(object): + """ + Encapsulation of the write context for servicing PUT REST API + requests. Serves as the context manager object for DiskFile's writer() + method. + """ + def __init__(self, disk_file, fd, tmppath): + self.disk_file = disk_file + self.fd = fd + self.tmppath = tmppath + self.upload_size = 0 + self.last_sync = 0 + + def write(self, chunk): + """ + Write a chunk of data into the temporary file. + + :param chunk: the chunk of data to write as a string object + """ + while chunk: + written = os.write(self.fd, chunk) + self.upload_size += written + chunk = chunk[written:] + # For large files sync every 512MB (by default) written + diff = self.upload_size - self.last_sync + if diff >= self.disk_file.bytes_per_sync: + tpool.execute(fdatasync, self.fd) + drop_buffer_cache(self.fd, self.last_sync, diff) + self.last_sync = self.upload_size + + def put(self, metadata, extension='.data'): + """ + Finalize writing the file on disk, and renames it from the temp file + to the real location. This should be called after the data has been + written to the temp file. + + :param metadata: dictionary of metadata to be written + :param extension: extension to be used when making the file + """ + assert self.tmppath is not None + timestamp = normalize_timestamp(metadata['X-Timestamp']) + metadata['name'] = self.disk_file.name + # Write the metadata before calling fsync() so that both data and + # metadata are flushed to disk. + write_metadata(self.fd, metadata) + # We call fsync() before calling drop_cache() to lower the amount of + # redundant work the drop cache code will perform on the pages (now + # that after fsync the pages will be all clean). + tpool.execute(fsync, self.fd) + # From the Department of the Redundancy Department, make sure we + # call drop_cache() after fsync() to avoid redundant work (pages + # all clean). + drop_buffer_cache(self.fd, 0, self.upload_size) + invalidate_hash(os.path.dirname(self.disk_file.datadir)) + # After the rename completes, this object will be available for other + # requests to reference. + renamer(self.tmppath, + os.path.join(self.disk_file.datadir, timestamp + extension)) + self.disk_file.metadata = metadata + + class DiskFile(object): """ Manage object files on disk. @@ -106,14 +167,16 @@ class DiskFile(object): :param obj: object name for the object :param keep_data_fp: if True, don't close the fp, otherwise close it :param disk_chunk_size: size of chunks on file reads + :param bytes_per_sync: number of bytes between fdatasync calls :param iter_hook: called when __iter__ returns a chunk :raises DiskFileCollision: on md5 collision """ def __init__(self, path, device, partition, account, container, obj, logger, keep_data_fp=False, disk_chunk_size=65536, - iter_hook=None): + bytes_per_sync=(512 * 1024 * 1024), iter_hook=None): self.disk_chunk_size = disk_chunk_size + self.bytes_per_sync = bytes_per_sync self.iter_hook = iter_hook self.name = '/' + '/'.join((account, container, obj)) name_hash = hash_path(account, container, obj) @@ -121,7 +184,6 @@ class DiskFile(object): path, device, storage_directory(DATADIR, partition, name_hash)) self.device_path = os.path.join(path, device) self.tmpdir = os.path.join(path, device, 'tmp') - self.tmppath = None self.logger = logger self.metadata = {} self.meta_file = None @@ -184,16 +246,16 @@ class DiskFile(object): self.iter_etag.update(chunk) read += len(chunk) if read - dropped_cache > (1024 * 1024): - self.drop_cache(self.fp.fileno(), dropped_cache, - read - dropped_cache) + self._drop_cache(self.fp.fileno(), dropped_cache, + read - dropped_cache) dropped_cache = read yield chunk if self.iter_hook: self.iter_hook() else: self.read_to_eof = True - self.drop_cache(self.fp.fileno(), dropped_cache, - read - dropped_cache) + self._drop_cache(self.fp.fileno(), dropped_cache, + read - dropped_cache) break finally: if not self.suppress_file_closing: @@ -286,66 +348,35 @@ class DiskFile(object): int(self.metadata['X-Delete-At']) <= time.time()) @contextmanager - def mkstemp(self, size=None): + def writer(self, size=None): """ - Contextmanager to make a temporary file. + Context manager to write a file. We create a temporary file first, and + then return a DiskWriter object to encapsulate the state. - :param size: optional initial size of file to allocate on disk - :raises DiskFileNoSpace: if a size is specified and fallocate fails + :param size: optional initial size of file to explicitly allocate on + disk + :raises DiskFileNoSpace: if a size is specified and allocation fails """ if not os.path.exists(self.tmpdir): mkdirs(self.tmpdir) - fd, self.tmppath = mkstemp(dir=self.tmpdir) + fd, tmppath = mkstemp(dir=self.tmpdir) try: if size is not None and size > 0: try: fallocate(fd, size) except OSError: raise DiskFileNoSpace() - yield fd + yield DiskWriter(self, fd, tmppath) finally: try: os.close(fd) except OSError: pass - tmppath, self.tmppath = self.tmppath, None try: os.unlink(tmppath) except OSError: pass - def put(self, fd, fsize, metadata, extension='.data'): - """ - Finalize writing the file on disk, and renames it from the temp file to - the real location. This should be called after the data has been - written to the temp file. - - :param fd: file descriptor of the temp file - :param fsize: final on-disk size of the created file - :param metadata: dictionary of metadata to be written - :param extension: extension to be used when making the file - """ - assert self.tmppath is not None - metadata['name'] = self.name - timestamp = normalize_timestamp(metadata['X-Timestamp']) - # Write the metadata before calling fsync() so that both data and - # metadata are flushed to disk. - write_metadata(fd, metadata) - # We call fsync() before calling drop_cache() to lower the amount of - # redundant work the drop cache code will perform on the pages (now - # that after fsync the pages will be all clean). - tpool.execute(fsync, fd) - # From the Department of the Redundancy Department, make sure we - # call drop_cache() after fsync() to avoid redundant work (pages - # all clean). - self.drop_cache(fd, 0, fsize) - invalidate_hash(os.path.dirname(self.datadir)) - # After the rename completes, this object will be available for other - # requests to reference. - renamer(self.tmppath, - os.path.join(self.datadir, timestamp + extension)) - self.metadata = metadata - def put_metadata(self, metadata, tombstone=False): """ Short hand for putting metadata to .meta and .ts files. @@ -354,8 +385,8 @@ class DiskFile(object): :param tombstone: whether or not we are writing a tombstone """ extension = '.ts' if tombstone else '.meta' - with self.mkstemp() as fd: - self.put(fd, 0, metadata, extension=extension) + with self.writer() as writer: + writer.put(metadata, extension=extension) def unlinkold(self, timestamp): """ @@ -373,7 +404,7 @@ class DiskFile(object): if err.errno != errno.ENOENT: raise - def drop_cache(self, fd, offset, length): + def _drop_cache(self, fd, offset, length): """Method for no-oping buffer cache drop method.""" if not self.keep_cache: drop_buffer_cache(fd, offset, length) @@ -636,8 +667,8 @@ class ObjectController(object): return HTTPInsufficientStorage(drive=device, request=request) disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, - disk_chunk_size=self.disk_chunk_size) - + disk_chunk_size=self.disk_chunk_size, + bytes_per_sync=self.bytes_per_sync) if disk_file.is_deleted() or disk_file.is_expired(): return HTTPNotFound(request=request) try: @@ -689,42 +720,33 @@ class ObjectController(object): return HTTPInsufficientStorage(drive=device, request=request) disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, - disk_chunk_size=self.disk_chunk_size) + disk_chunk_size=self.disk_chunk_size, + bytes_per_sync=self.bytes_per_sync) + old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0) orig_timestamp = disk_file.metadata.get('X-Timestamp') upload_expiration = time.time() + self.max_upload_time etag = md5() fsize = request.headers.get('content-length', None) if fsize is not None: fsize = int(fsize) - upload_size = 0 - last_sync = 0 elapsed_time = 0 try: - with disk_file.mkstemp(size=fsize) as fd: + with disk_file.writer(size=fsize) as writer: reader = request.environ['wsgi.input'].read for chunk in iter(lambda: reader(self.network_chunk_size), ''): start_time = time.time() - upload_size += len(chunk) - if time.time() > upload_expiration: + if start_time > upload_expiration: self.logger.increment('PUT.timeouts') return HTTPRequestTimeout(request=request) etag.update(chunk) - while chunk: - written = os.write(fd, chunk) - chunk = chunk[written:] - # For large files sync every 512MB (by default) written - if upload_size - last_sync >= self.bytes_per_sync: - tpool.execute(fdatasync, fd) - drop_buffer_cache(fd, last_sync, - upload_size - last_sync) - last_sync = upload_size + writer.write(chunk) sleep() elapsed_time += time.time() - start_time - + upload_size = writer.upload_size if upload_size: self.logger.transfer_rate( - 'PUT.' + device + '.timing', elapsed_time, upload_size) - + 'PUT.' + device + '.timing', elapsed_time, + upload_size) if fsize is not None and fsize != upload_size: return HTTPClientDisconnect(request=request) etag = etag.hexdigest() @@ -744,20 +766,19 @@ class ObjectController(object): if header_key in request.headers: header_caps = header_key.title() metadata[header_caps] = request.headers[header_key] - old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0) - if old_delete_at != new_delete_at: - if new_delete_at: - self.delete_at_update( - 'PUT', new_delete_at, account, container, obj, - request, device) - if old_delete_at: - self.delete_at_update( - 'DELETE', old_delete_at, account, container, obj, - request, device) - disk_file.put(fd, upload_size, metadata) + writer.put(metadata) except DiskFileNoSpace: return HTTPInsufficientStorage(drive=device, request=request) disk_file.unlinkold(metadata['X-Timestamp']) + if old_delete_at != new_delete_at: + if new_delete_at: + self.delete_at_update( + 'PUT', new_delete_at, account, container, obj, + request, device) + if old_delete_at: + self.delete_at_update( + 'DELETE', old_delete_at, account, container, obj, + request, device) if not orig_timestamp or \ orig_timestamp < request.headers['x-timestamp']: self.container_update( @@ -787,6 +808,7 @@ class ObjectController(object): disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size, + bytes_per_sync=self.bytes_per_sync, iter_hook=sleep) if disk_file.is_deleted() or disk_file.is_expired(): if request.headers.get('if-match') == '*': @@ -868,7 +890,8 @@ class ObjectController(object): return HTTPInsufficientStorage(drive=device, request=request) disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, - disk_chunk_size=self.disk_chunk_size) + disk_chunk_size=self.disk_chunk_size, + bytes_per_sync=self.bytes_per_sync) if disk_file.is_deleted() or disk_file.is_expired(): return HTTPNotFound(request=request) try: @@ -912,7 +935,8 @@ class ObjectController(object): response_class = HTTPNoContent disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, - disk_chunk_size=self.disk_chunk_size) + disk_chunk_size=self.disk_chunk_size, + bytes_per_sync=self.bytes_per_sync) if 'x-if-delete-at' in request.headers and \ int(request.headers['x-if-delete-at']) != \ int(disk_file.metadata.get('X-Delete-At') or 0): diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 1a9277377b..6d5d598b78 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -62,17 +62,17 @@ class TestAuditor(unittest.TestCase): self.auditor = auditor.AuditorWorker(self.conf, self.logger) data = '0' * 1024 etag = md5() - with self.disk_file.mkstemp() as fd: - os.write(fd, data) + with self.disk_file.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() timestamp = str(normalize_timestamp(time.time())) metadata = { 'ETag': etag, 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - self.disk_file.put(fd, 1024, metadata) + writer.put(metadata) pre_quarantines = self.auditor.quarantines self.auditor.object_audit( @@ -80,7 +80,7 @@ class TestAuditor(unittest.TestCase): 'sda', '0') self.assertEquals(self.auditor.quarantines, pre_quarantines) - os.write(fd, 'extra_data') + os.write(writer.fd, 'extra_data') self.auditor.object_audit( os.path.join(self.disk_file.datadir, timestamp + '.data'), 'sda', '0') @@ -91,35 +91,39 @@ class TestAuditor(unittest.TestCase): data = '0' * 1024 etag = md5() timestamp = str(normalize_timestamp(time.time())) - with self.disk_file.mkstemp() as fd: - os.write(fd, data) + with self.disk_file.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - self.disk_file.put(fd, 1024, metadata) + writer.put(metadata) pre_quarantines = self.auditor.quarantines - # remake so it will have metadata - self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', - self.logger) - self.auditor.object_audit( - os.path.join(self.disk_file.datadir, timestamp + '.data'), - 'sda', '0') - self.assertEquals(self.auditor.quarantines, pre_quarantines) - etag = md5() - etag.update('1' + '0' * 1023) - etag = etag.hexdigest() - metadata['ETag'] = etag - write_metadata(fd, metadata) + # remake so it will have metadata + self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', + self.logger) - self.auditor.object_audit( - os.path.join(self.disk_file.datadir, timestamp + '.data'), - 'sda', '0') - self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) + self.auditor.object_audit( + os.path.join(self.disk_file.datadir, timestamp + '.data'), + 'sda', '0') + self.assertEquals(self.auditor.quarantines, pre_quarantines) + etag = md5() + etag.update('1' + '0' * 1023) + etag = etag.hexdigest() + metadata['ETag'] = etag + + with self.disk_file.writer() as writer: + writer.write(data) + writer.put(metadata) + + self.auditor.object_audit( + os.path.join(self.disk_file.datadir, timestamp + '.data'), + 'sda', '0') + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_audit_no_meta(self): timestamp = str(normalize_timestamp(time.time())) @@ -152,17 +156,16 @@ class TestAuditor(unittest.TestCase): pre_quarantines = self.auditor.quarantines data = '0' * 1024 etag = md5() - with self.disk_file.mkstemp() as fd: - os.write(fd, data) + with self.disk_file.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - self.disk_file.put(fd, 1024, metadata) - self.disk_file.close() + writer.put(metadata) self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines) @@ -172,18 +175,17 @@ class TestAuditor(unittest.TestCase): pre_quarantines = self.auditor.quarantines data = '0' * 1024 etag = md5() - with self.disk_file.mkstemp() as fd: - os.write(fd, data) + with self.disk_file.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - self.disk_file.put(fd, 1024, metadata) - self.disk_file.close() - os.write(fd, 'extra_data') + writer.put(metadata) + os.write(writer.fd, 'extra_data') self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) @@ -193,34 +195,32 @@ class TestAuditor(unittest.TestCase): pre_quarantines = self.auditor.quarantines data = '0' * 10 etag = md5() - with self.disk_file.mkstemp() as fd: - os.write(fd, data) + with self.disk_file.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - self.disk_file.put(fd, 10, metadata) - self.disk_file.close() + writer.put(metadata) self.auditor.audit_all_objects() self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c', 'ob', self.logger) data = '1' * 10 etag = md5() - with self.disk_file.mkstemp() as fd: - os.write(fd, data) + with self.disk_file.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - self.disk_file.put(fd, 10, metadata) - self.disk_file.close() - os.write(fd, 'extra_data') + writer.put(metadata) + os.write(writer.fd, 'extra_data') self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) @@ -229,21 +229,21 @@ class TestAuditor(unittest.TestCase): self.auditor.log_time = 0 data = '0' * 1024 etag = md5() - with self.disk_file.mkstemp() as fd: - os.write(fd, data) + with self.disk_file.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': str(normalize_timestamp(time.time())), - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - self.disk_file.put(fd, 1024, metadata) + writer.put(metadata) etag = md5() etag.update('1' + '0' * 1023) etag = etag.hexdigest() metadata['ETag'] = etag - write_metadata(fd, metadata) + write_metadata(writer.fd, metadata) quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') @@ -268,18 +268,18 @@ class TestAuditor(unittest.TestCase): fp.close() etag = md5() - with self.disk_file.mkstemp() as fd: + with self.disk_file.writer() as writer: etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': str(normalize_timestamp(time.time())), 'Content-Length': 10, } - self.disk_file.put(fd, 10, metadata) + writer.put(metadata) etag = md5() etag = etag.hexdigest() metadata['ETag'] = etag - write_metadata(fd, metadata) + write_metadata(writer.fd, metadata) if self.disk_file.data_file: return self.disk_file.data_file return ts_file_path diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index e35f55e5eb..34410c1d49 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -145,7 +145,7 @@ class TestDiskFile(unittest.TestCase): tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') os.rmdir(tmpdir) with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', - 'o', FakeLogger()).mkstemp(): + 'o', FakeLogger()).writer() as writer: self.assert_(os.path.exists(tmpdir)) def test_iter_hook(self): @@ -153,7 +153,7 @@ class TestDiskFile(unittest.TestCase): def hook(): hook_call_count[0] += 1 - df = self._get_data_file(fsize=65, csize=8, iter_hook=hook) + df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook) for _ in df: pass @@ -204,7 +204,7 @@ class TestDiskFile(unittest.TestCase): self.assert_(os.path.isdir(double_uuid_path)) self.assert_('-' in os.path.basename(double_uuid_path)) - def _get_data_file(self, invalid_type=None, obj_name='o', + def _get_disk_file(self, invalid_type=None, obj_name='o', fsize=1024, csize=8, extension='.data', ts=None, iter_hook=None): '''returns a DiskFile''' @@ -216,25 +216,25 @@ class TestDiskFile(unittest.TestCase): timestamp = ts else: timestamp = str(normalize_timestamp(time())) - with df.mkstemp() as fd: - os.write(fd, data) + with df.writer() as writer: + writer.write(data) etag.update(data) etag = etag.hexdigest() metadata = { 'ETag': etag, 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(fd).st_size), + 'Content-Length': str(os.fstat(writer.fd).st_size), } - df.put(fd, fsize, metadata, extension=extension) + writer.put(metadata, extension=extension) if invalid_type == 'ETag': etag = md5() etag.update('1' + '0' * (fsize - 1)) etag = etag.hexdigest() metadata['ETag'] = etag - object_server.write_metadata(fd, metadata) + object_server.write_metadata(writer.fd, metadata) if invalid_type == 'Content-Length': metadata['Content-Length'] = fsize - 1 - object_server.write_metadata(fd, metadata) + object_server.write_metadata(writer.fd, metadata) df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', obj_name, FakeLogger(), @@ -248,43 +248,43 @@ class TestDiskFile(unittest.TestCase): return df def test_quarantine_valids(self): - df = self._get_data_file(obj_name='1') + df = self._get_disk_file(obj_name='1') for chunk in df: pass self.assertFalse(df.quarantined_dir) - df = self._get_data_file(obj_name='2', csize=1) + df = self._get_disk_file(obj_name='2', csize=1) for chunk in df: pass self.assertFalse(df.quarantined_dir) - df = self._get_data_file(obj_name='3', csize=100000) + df = self._get_disk_file(obj_name='3', csize=100000) for chunk in df: pass self.assertFalse(df.quarantined_dir) def run_quarantine_invalids(self, invalid_type): - df = self._get_data_file(invalid_type=invalid_type, obj_name='1') + df = self._get_disk_file(invalid_type=invalid_type, obj_name='1') for chunk in df: pass self.assertTrue(df.quarantined_dir) - df = self._get_data_file(invalid_type=invalid_type, + df = self._get_disk_file(invalid_type=invalid_type, obj_name='2', csize=1) for chunk in df: pass self.assertTrue(df.quarantined_dir) - df = self._get_data_file(invalid_type=invalid_type, + df = self._get_disk_file(invalid_type=invalid_type, obj_name='3', csize=100000) for chunk in df: pass self.assertTrue(df.quarantined_dir) - df = self._get_data_file(invalid_type=invalid_type, obj_name='4') + df = self._get_disk_file(invalid_type=invalid_type, obj_name='4') self.assertFalse(df.quarantined_dir) - df = self._get_data_file(invalid_type=invalid_type, obj_name='5') + df = self._get_disk_file(invalid_type=invalid_type, obj_name='5') for chunk in df.app_iter_range(0, df.unit_test_len): pass self.assertTrue(df.quarantined_dir) - df = self._get_data_file(invalid_type=invalid_type, obj_name='6') + df = self._get_disk_file(invalid_type=invalid_type, obj_name='6') for chunk in df.app_iter_range(0, df.unit_test_len + 100): pass self.assertTrue(df.quarantined_dir) @@ -293,15 +293,15 @@ class TestDiskFile(unittest.TestCase): # in a quarantine, even if the whole file isn't check-summed if invalid_type in ('Zero-Byte', 'Content-Length'): expected_quar = True - df = self._get_data_file(invalid_type=invalid_type, obj_name='7') + df = self._get_disk_file(invalid_type=invalid_type, obj_name='7') for chunk in df.app_iter_range(1, df.unit_test_len): pass self.assertEquals(bool(df.quarantined_dir), expected_quar) - df = self._get_data_file(invalid_type=invalid_type, obj_name='8') + df = self._get_disk_file(invalid_type=invalid_type, obj_name='8') for chunk in df.app_iter_range(0, df.unit_test_len - 1): pass self.assertEquals(bool(df.quarantined_dir), expected_quar) - df = self._get_data_file(invalid_type=invalid_type, obj_name='8') + df = self._get_disk_file(invalid_type=invalid_type, obj_name='8') for chunk in df.app_iter_range(1, df.unit_test_len + 1): pass self.assertEquals(bool(df.quarantined_dir), expected_quar) @@ -312,20 +312,20 @@ class TestDiskFile(unittest.TestCase): self.run_quarantine_invalids('Zero-Byte') def test_quarantine_deleted_files(self): - df = self._get_data_file(invalid_type='Content-Length', + df = self._get_disk_file(invalid_type='Content-Length', extension='.data') df.close() self.assertTrue(df.quarantined_dir) - df = self._get_data_file(invalid_type='Content-Length', + df = self._get_disk_file(invalid_type='Content-Length', extension='.ts') df.close() self.assertFalse(df.quarantined_dir) - df = self._get_data_file(invalid_type='Content-Length', + df = self._get_disk_file(invalid_type='Content-Length', extension='.ts') self.assertRaises(DiskFileNotExist, df.get_data_file_size) def test_put_metadata(self): - df = self._get_data_file() + df = self._get_disk_file() ts = time() metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } df.put_metadata(metadata) @@ -335,7 +335,7 @@ class TestDiskFile(unittest.TestCase): self.assertTrue(exp_name in set(dl)) def test_put_metadata_ts(self): - df = self._get_data_file() + df = self._get_disk_file() ts = time() metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } df.put_metadata(metadata, tombstone=True) @@ -345,9 +345,9 @@ class TestDiskFile(unittest.TestCase): self.assertTrue(exp_name in set(dl)) def test_unlinkold(self): - df1 = self._get_data_file() + df1 = self._get_disk_file() future_time = str(normalize_timestamp(time() + 100)) - df2 = self._get_data_file(ts=future_time) + df2 = self._get_disk_file(ts=future_time) self.assertEquals(len(os.listdir(df1.datadir)), 2) df1.unlinkold(future_time) self.assertEquals(len(os.listdir(df1.datadir)), 1) @@ -358,7 +358,7 @@ class TestDiskFile(unittest.TestCase): def err(): raise Exception("bad") - df = self._get_data_file(fsize=1024 * 1024 * 2) + df = self._get_disk_file(fsize=1024 * 1024 * 2) df._handle_close_quarantine = err for chunk in df: pass @@ -367,7 +367,7 @@ class TestDiskFile(unittest.TestCase): self.assertEquals(len(df.logger.log_dict['error']), 1) def test_quarantine_twice(self): - df = self._get_data_file(invalid_type='Content-Length', + df = self._get_disk_file(invalid_type='Content-Length', extension='.data') self.assert_(os.path.isfile(df.data_file)) quar_dir = df.quarantine()