Hide the file descriptor and disk write methodology for PUTs

Towards moving the DiskFile class into place as the API definition for
pluggable DiskFile backends, we hide the file descriptor and the
method of writing data to disks. The mkstemp() method has been renamed
to writer(), and no longer returns an fd, but a new object that
encapsulates the state tracked for writes. This new object is then
used directly to perform the reminder of the write operations and
application of required semantics.

Change-Id: Ib37ed37b34a2ce6b442d69f83ca011c918114434
Signed-off-by: Peter Portante <peter.portante@redhat.com>
This commit is contained in:
Peter Portante 2013-04-18 20:42:36 -04:00 committed by Gerrit Code Review
parent 6b4cba8371
commit d0a27f477b
3 changed files with 190 additions and 166 deletions

View File

@ -94,6 +94,67 @@ def write_metadata(fd, metadata):
key += 1 key += 1
class DiskWriter(object):
"""
Encapsulation of the write context for servicing PUT REST API
requests. Serves as the context manager object for DiskFile's writer()
method.
"""
def __init__(self, disk_file, fd, tmppath):
self.disk_file = disk_file
self.fd = fd
self.tmppath = tmppath
self.upload_size = 0
self.last_sync = 0
def write(self, chunk):
"""
Write a chunk of data into the temporary file.
:param chunk: the chunk of data to write as a string object
"""
while chunk:
written = os.write(self.fd, chunk)
self.upload_size += written
chunk = chunk[written:]
# For large files sync every 512MB (by default) written
diff = self.upload_size - self.last_sync
if diff >= self.disk_file.bytes_per_sync:
tpool.execute(fdatasync, self.fd)
drop_buffer_cache(self.fd, self.last_sync, diff)
self.last_sync = self.upload_size
def put(self, metadata, extension='.data'):
"""
Finalize writing the file on disk, and renames it from the temp file
to the real location. This should be called after the data has been
written to the temp file.
:param metadata: dictionary of metadata to be written
:param extension: extension to be used when making the file
"""
assert self.tmppath is not None
timestamp = normalize_timestamp(metadata['X-Timestamp'])
metadata['name'] = self.disk_file.name
# Write the metadata before calling fsync() so that both data and
# metadata are flushed to disk.
write_metadata(self.fd, metadata)
# We call fsync() before calling drop_cache() to lower the amount of
# redundant work the drop cache code will perform on the pages (now
# that after fsync the pages will be all clean).
tpool.execute(fsync, self.fd)
# From the Department of the Redundancy Department, make sure we
# call drop_cache() after fsync() to avoid redundant work (pages
# all clean).
drop_buffer_cache(self.fd, 0, self.upload_size)
invalidate_hash(os.path.dirname(self.disk_file.datadir))
# After the rename completes, this object will be available for other
# requests to reference.
renamer(self.tmppath,
os.path.join(self.disk_file.datadir, timestamp + extension))
self.disk_file.metadata = metadata
class DiskFile(object): class DiskFile(object):
""" """
Manage object files on disk. Manage object files on disk.
@ -106,14 +167,16 @@ class DiskFile(object):
:param obj: object name for the object :param obj: object name for the object
:param keep_data_fp: if True, don't close the fp, otherwise close it :param keep_data_fp: if True, don't close the fp, otherwise close it
:param disk_chunk_size: size of chunks on file reads :param disk_chunk_size: size of chunks on file reads
:param bytes_per_sync: number of bytes between fdatasync calls
:param iter_hook: called when __iter__ returns a chunk :param iter_hook: called when __iter__ returns a chunk
:raises DiskFileCollision: on md5 collision :raises DiskFileCollision: on md5 collision
""" """
def __init__(self, path, device, partition, account, container, obj, def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536, logger, keep_data_fp=False, disk_chunk_size=65536,
iter_hook=None): bytes_per_sync=(512 * 1024 * 1024), iter_hook=None):
self.disk_chunk_size = disk_chunk_size self.disk_chunk_size = disk_chunk_size
self.bytes_per_sync = bytes_per_sync
self.iter_hook = iter_hook self.iter_hook = iter_hook
self.name = '/' + '/'.join((account, container, obj)) self.name = '/' + '/'.join((account, container, obj))
name_hash = hash_path(account, container, obj) name_hash = hash_path(account, container, obj)
@ -121,7 +184,6 @@ class DiskFile(object):
path, device, storage_directory(DATADIR, partition, name_hash)) path, device, storage_directory(DATADIR, partition, name_hash))
self.device_path = os.path.join(path, device) self.device_path = os.path.join(path, device)
self.tmpdir = os.path.join(path, device, 'tmp') self.tmpdir = os.path.join(path, device, 'tmp')
self.tmppath = None
self.logger = logger self.logger = logger
self.metadata = {} self.metadata = {}
self.meta_file = None self.meta_file = None
@ -184,7 +246,7 @@ class DiskFile(object):
self.iter_etag.update(chunk) self.iter_etag.update(chunk)
read += len(chunk) read += len(chunk)
if read - dropped_cache > (1024 * 1024): if read - dropped_cache > (1024 * 1024):
self.drop_cache(self.fp.fileno(), dropped_cache, self._drop_cache(self.fp.fileno(), dropped_cache,
read - dropped_cache) read - dropped_cache)
dropped_cache = read dropped_cache = read
yield chunk yield chunk
@ -192,7 +254,7 @@ class DiskFile(object):
self.iter_hook() self.iter_hook()
else: else:
self.read_to_eof = True self.read_to_eof = True
self.drop_cache(self.fp.fileno(), dropped_cache, self._drop_cache(self.fp.fileno(), dropped_cache,
read - dropped_cache) read - dropped_cache)
break break
finally: finally:
@ -286,66 +348,35 @@ class DiskFile(object):
int(self.metadata['X-Delete-At']) <= time.time()) int(self.metadata['X-Delete-At']) <= time.time())
@contextmanager @contextmanager
def mkstemp(self, size=None): def writer(self, size=None):
""" """
Contextmanager to make a temporary file. Context manager to write a file. We create a temporary file first, and
then return a DiskWriter object to encapsulate the state.
:param size: optional initial size of file to allocate on disk :param size: optional initial size of file to explicitly allocate on
:raises DiskFileNoSpace: if a size is specified and fallocate fails disk
:raises DiskFileNoSpace: if a size is specified and allocation fails
""" """
if not os.path.exists(self.tmpdir): if not os.path.exists(self.tmpdir):
mkdirs(self.tmpdir) mkdirs(self.tmpdir)
fd, self.tmppath = mkstemp(dir=self.tmpdir) fd, tmppath = mkstemp(dir=self.tmpdir)
try: try:
if size is not None and size > 0: if size is not None and size > 0:
try: try:
fallocate(fd, size) fallocate(fd, size)
except OSError: except OSError:
raise DiskFileNoSpace() raise DiskFileNoSpace()
yield fd yield DiskWriter(self, fd, tmppath)
finally: finally:
try: try:
os.close(fd) os.close(fd)
except OSError: except OSError:
pass pass
tmppath, self.tmppath = self.tmppath, None
try: try:
os.unlink(tmppath) os.unlink(tmppath)
except OSError: except OSError:
pass pass
def put(self, fd, fsize, metadata, extension='.data'):
"""
Finalize writing the file on disk, and renames it from the temp file to
the real location. This should be called after the data has been
written to the temp file.
:param fd: file descriptor of the temp file
:param fsize: final on-disk size of the created file
:param metadata: dictionary of metadata to be written
:param extension: extension to be used when making the file
"""
assert self.tmppath is not None
metadata['name'] = self.name
timestamp = normalize_timestamp(metadata['X-Timestamp'])
# Write the metadata before calling fsync() so that both data and
# metadata are flushed to disk.
write_metadata(fd, metadata)
# We call fsync() before calling drop_cache() to lower the amount of
# redundant work the drop cache code will perform on the pages (now
# that after fsync the pages will be all clean).
tpool.execute(fsync, fd)
# From the Department of the Redundancy Department, make sure we
# call drop_cache() after fsync() to avoid redundant work (pages
# all clean).
self.drop_cache(fd, 0, fsize)
invalidate_hash(os.path.dirname(self.datadir))
# After the rename completes, this object will be available for other
# requests to reference.
renamer(self.tmppath,
os.path.join(self.datadir, timestamp + extension))
self.metadata = metadata
def put_metadata(self, metadata, tombstone=False): def put_metadata(self, metadata, tombstone=False):
""" """
Short hand for putting metadata to .meta and .ts files. Short hand for putting metadata to .meta and .ts files.
@ -354,8 +385,8 @@ class DiskFile(object):
:param tombstone: whether or not we are writing a tombstone :param tombstone: whether or not we are writing a tombstone
""" """
extension = '.ts' if tombstone else '.meta' extension = '.ts' if tombstone else '.meta'
with self.mkstemp() as fd: with self.writer() as writer:
self.put(fd, 0, metadata, extension=extension) writer.put(metadata, extension=extension)
def unlinkold(self, timestamp): def unlinkold(self, timestamp):
""" """
@ -373,7 +404,7 @@ class DiskFile(object):
if err.errno != errno.ENOENT: if err.errno != errno.ENOENT:
raise raise
def drop_cache(self, fd, offset, length): def _drop_cache(self, fd, offset, length):
"""Method for no-oping buffer cache drop method.""" """Method for no-oping buffer cache drop method."""
if not self.keep_cache: if not self.keep_cache:
drop_buffer_cache(fd, offset, length) drop_buffer_cache(fd, offset, length)
@ -636,8 +667,8 @@ class ObjectController(object):
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
disk_file = DiskFile(self.devices, device, partition, account, disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger, container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size) disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
if disk_file.is_deleted() or disk_file.is_expired(): if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request) return HTTPNotFound(request=request)
try: try:
@ -689,42 +720,33 @@ class ObjectController(object):
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
disk_file = DiskFile(self.devices, device, partition, account, disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger, container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size) disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
orig_timestamp = disk_file.metadata.get('X-Timestamp') orig_timestamp = disk_file.metadata.get('X-Timestamp')
upload_expiration = time.time() + self.max_upload_time upload_expiration = time.time() + self.max_upload_time
etag = md5() etag = md5()
fsize = request.headers.get('content-length', None) fsize = request.headers.get('content-length', None)
if fsize is not None: if fsize is not None:
fsize = int(fsize) fsize = int(fsize)
upload_size = 0
last_sync = 0
elapsed_time = 0 elapsed_time = 0
try: try:
with disk_file.mkstemp(size=fsize) as fd: with disk_file.writer(size=fsize) as writer:
reader = request.environ['wsgi.input'].read reader = request.environ['wsgi.input'].read
for chunk in iter(lambda: reader(self.network_chunk_size), ''): for chunk in iter(lambda: reader(self.network_chunk_size), ''):
start_time = time.time() start_time = time.time()
upload_size += len(chunk) if start_time > upload_expiration:
if time.time() > upload_expiration:
self.logger.increment('PUT.timeouts') self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request) return HTTPRequestTimeout(request=request)
etag.update(chunk) etag.update(chunk)
while chunk: writer.write(chunk)
written = os.write(fd, chunk)
chunk = chunk[written:]
# For large files sync every 512MB (by default) written
if upload_size - last_sync >= self.bytes_per_sync:
tpool.execute(fdatasync, fd)
drop_buffer_cache(fd, last_sync,
upload_size - last_sync)
last_sync = upload_size
sleep() sleep()
elapsed_time += time.time() - start_time elapsed_time += time.time() - start_time
upload_size = writer.upload_size
if upload_size: if upload_size:
self.logger.transfer_rate( self.logger.transfer_rate(
'PUT.' + device + '.timing', elapsed_time, upload_size) 'PUT.' + device + '.timing', elapsed_time,
upload_size)
if fsize is not None and fsize != upload_size: if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request) return HTTPClientDisconnect(request=request)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -744,7 +766,10 @@ class ObjectController(object):
if header_key in request.headers: if header_key in request.headers:
header_caps = header_key.title() header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key] metadata[header_caps] = request.headers[header_key]
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0) writer.put(metadata)
except DiskFileNoSpace:
return HTTPInsufficientStorage(drive=device, request=request)
disk_file.unlinkold(metadata['X-Timestamp'])
if old_delete_at != new_delete_at: if old_delete_at != new_delete_at:
if new_delete_at: if new_delete_at:
self.delete_at_update( self.delete_at_update(
@ -754,10 +779,6 @@ class ObjectController(object):
self.delete_at_update( self.delete_at_update(
'DELETE', old_delete_at, account, container, obj, 'DELETE', old_delete_at, account, container, obj,
request, device) request, device)
disk_file.put(fd, upload_size, metadata)
except DiskFileNoSpace:
return HTTPInsufficientStorage(drive=device, request=request)
disk_file.unlinkold(metadata['X-Timestamp'])
if not orig_timestamp or \ if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']: orig_timestamp < request.headers['x-timestamp']:
self.container_update( self.container_update(
@ -787,6 +808,7 @@ class ObjectController(object):
disk_file = DiskFile(self.devices, device, partition, account, disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger, keep_data_fp=True, container, obj, self.logger, keep_data_fp=True,
disk_chunk_size=self.disk_chunk_size, disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync,
iter_hook=sleep) iter_hook=sleep)
if disk_file.is_deleted() or disk_file.is_expired(): if disk_file.is_deleted() or disk_file.is_expired():
if request.headers.get('if-match') == '*': if request.headers.get('if-match') == '*':
@ -868,7 +890,8 @@ class ObjectController(object):
return HTTPInsufficientStorage(drive=device, request=request) return HTTPInsufficientStorage(drive=device, request=request)
disk_file = DiskFile(self.devices, device, partition, account, disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger, container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size) disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
if disk_file.is_deleted() or disk_file.is_expired(): if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request) return HTTPNotFound(request=request)
try: try:
@ -912,7 +935,8 @@ class ObjectController(object):
response_class = HTTPNoContent response_class = HTTPNoContent
disk_file = DiskFile(self.devices, device, partition, account, disk_file = DiskFile(self.devices, device, partition, account,
container, obj, self.logger, container, obj, self.logger,
disk_chunk_size=self.disk_chunk_size) disk_chunk_size=self.disk_chunk_size,
bytes_per_sync=self.bytes_per_sync)
if 'x-if-delete-at' in request.headers and \ if 'x-if-delete-at' in request.headers and \
int(request.headers['x-if-delete-at']) != \ int(request.headers['x-if-delete-at']) != \
int(disk_file.metadata.get('X-Delete-At') or 0): int(disk_file.metadata.get('X-Delete-At') or 0):

View File

@ -62,17 +62,17 @@ class TestAuditor(unittest.TestCase):
self.auditor = auditor.AuditorWorker(self.conf, self.logger) self.auditor = auditor.AuditorWorker(self.conf, self.logger)
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
self.disk_file.put(fd, 1024, metadata) writer.put(metadata)
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
self.auditor.object_audit( self.auditor.object_audit(
@ -80,7 +80,7 @@ class TestAuditor(unittest.TestCase):
'sda', '0') 'sda', '0')
self.assertEquals(self.auditor.quarantines, pre_quarantines) self.assertEquals(self.auditor.quarantines, pre_quarantines)
os.write(fd, 'extra_data') os.write(writer.fd, 'extra_data')
self.auditor.object_audit( self.auditor.object_audit(
os.path.join(self.disk_file.datadir, timestamp + '.data'), os.path.join(self.disk_file.datadir, timestamp + '.data'),
'sda', '0') 'sda', '0')
@ -91,17 +91,18 @@ class TestAuditor(unittest.TestCase):
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
self.disk_file.put(fd, 1024, metadata) writer.put(metadata)
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
# remake so it will have metadata # remake so it will have metadata
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
self.logger) self.logger)
@ -114,7 +115,10 @@ class TestAuditor(unittest.TestCase):
etag.update('1' + '0' * 1023) etag.update('1' + '0' * 1023)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata['ETag'] = etag metadata['ETag'] = etag
write_metadata(fd, metadata)
with self.disk_file.writer() as writer:
writer.write(data)
writer.put(metadata)
self.auditor.object_audit( self.auditor.object_audit(
os.path.join(self.disk_file.datadir, timestamp + '.data'), os.path.join(self.disk_file.datadir, timestamp + '.data'),
@ -152,17 +156,16 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
self.disk_file.put(fd, 1024, metadata) writer.put(metadata)
self.disk_file.close()
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines) self.assertEquals(self.auditor.quarantines, pre_quarantines)
@ -172,18 +175,17 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
self.disk_file.put(fd, 1024, metadata) writer.put(metadata)
self.disk_file.close() os.write(writer.fd, 'extra_data')
os.write(fd, 'extra_data')
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
@ -193,34 +195,32 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
data = '0' * 10 data = '0' * 10
etag = md5() etag = md5()
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
self.disk_file.put(fd, 10, metadata) writer.put(metadata)
self.disk_file.close()
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c', self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c',
'ob', self.logger) 'ob', self.logger)
data = '1' * 10 data = '1' * 10
etag = md5() etag = md5()
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
self.disk_file.put(fd, 10, metadata) writer.put(metadata)
self.disk_file.close() os.write(writer.fd, 'extra_data')
os.write(fd, 'extra_data')
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
@ -229,21 +229,21 @@ class TestAuditor(unittest.TestCase):
self.auditor.log_time = 0 self.auditor.log_time = 0
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())), 'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
self.disk_file.put(fd, 1024, metadata) writer.put(metadata)
etag = md5() etag = md5()
etag.update('1' + '0' * 1023) etag.update('1' + '0' * 1023)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata['ETag'] = etag metadata['ETag'] = etag
write_metadata(fd, metadata) write_metadata(writer.fd, metadata)
quarantine_path = os.path.join(self.devices, quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects') 'sda', 'quarantined', 'objects')
@ -268,18 +268,18 @@ class TestAuditor(unittest.TestCase):
fp.close() fp.close()
etag = md5() etag = md5()
with self.disk_file.mkstemp() as fd: with self.disk_file.writer() as writer:
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())), 'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10, 'Content-Length': 10,
} }
self.disk_file.put(fd, 10, metadata) writer.put(metadata)
etag = md5() etag = md5()
etag = etag.hexdigest() etag = etag.hexdigest()
metadata['ETag'] = etag metadata['ETag'] = etag
write_metadata(fd, metadata) write_metadata(writer.fd, metadata)
if self.disk_file.data_file: if self.disk_file.data_file:
return self.disk_file.data_file return self.disk_file.data_file
return ts_file_path return ts_file_path

View File

@ -145,7 +145,7 @@ class TestDiskFile(unittest.TestCase):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
os.rmdir(tmpdir) os.rmdir(tmpdir)
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
'o', FakeLogger()).mkstemp(): 'o', FakeLogger()).writer() as writer:
self.assert_(os.path.exists(tmpdir)) self.assert_(os.path.exists(tmpdir))
def test_iter_hook(self): def test_iter_hook(self):
@ -153,7 +153,7 @@ class TestDiskFile(unittest.TestCase):
def hook(): def hook():
hook_call_count[0] += 1 hook_call_count[0] += 1
df = self._get_data_file(fsize=65, csize=8, iter_hook=hook) df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook)
for _ in df: for _ in df:
pass pass
@ -204,7 +204,7 @@ class TestDiskFile(unittest.TestCase):
self.assert_(os.path.isdir(double_uuid_path)) self.assert_(os.path.isdir(double_uuid_path))
self.assert_('-' in os.path.basename(double_uuid_path)) self.assert_('-' in os.path.basename(double_uuid_path))
def _get_data_file(self, invalid_type=None, obj_name='o', def _get_disk_file(self, invalid_type=None, obj_name='o',
fsize=1024, csize=8, extension='.data', ts=None, fsize=1024, csize=8, extension='.data', ts=None,
iter_hook=None): iter_hook=None):
'''returns a DiskFile''' '''returns a DiskFile'''
@ -216,25 +216,25 @@ class TestDiskFile(unittest.TestCase):
timestamp = ts timestamp = ts
else: else:
timestamp = str(normalize_timestamp(time())) timestamp = str(normalize_timestamp(time()))
with df.mkstemp() as fd: with df.writer() as writer:
os.write(fd, data) writer.write(data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(writer.fd).st_size),
} }
df.put(fd, fsize, metadata, extension=extension) writer.put(metadata, extension=extension)
if invalid_type == 'ETag': if invalid_type == 'ETag':
etag = md5() etag = md5()
etag.update('1' + '0' * (fsize - 1)) etag.update('1' + '0' * (fsize - 1))
etag = etag.hexdigest() etag = etag.hexdigest()
metadata['ETag'] = etag metadata['ETag'] = etag
object_server.write_metadata(fd, metadata) object_server.write_metadata(writer.fd, metadata)
if invalid_type == 'Content-Length': if invalid_type == 'Content-Length':
metadata['Content-Length'] = fsize - 1 metadata['Content-Length'] = fsize - 1
object_server.write_metadata(fd, metadata) object_server.write_metadata(writer.fd, metadata)
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger(), obj_name, FakeLogger(),
@ -248,43 +248,43 @@ class TestDiskFile(unittest.TestCase):
return df return df
def test_quarantine_valids(self): def test_quarantine_valids(self):
df = self._get_data_file(obj_name='1') df = self._get_disk_file(obj_name='1')
for chunk in df: for chunk in df:
pass pass
self.assertFalse(df.quarantined_dir) self.assertFalse(df.quarantined_dir)
df = self._get_data_file(obj_name='2', csize=1) df = self._get_disk_file(obj_name='2', csize=1)
for chunk in df: for chunk in df:
pass pass
self.assertFalse(df.quarantined_dir) self.assertFalse(df.quarantined_dir)
df = self._get_data_file(obj_name='3', csize=100000) df = self._get_disk_file(obj_name='3', csize=100000)
for chunk in df: for chunk in df:
pass pass
self.assertFalse(df.quarantined_dir) self.assertFalse(df.quarantined_dir)
def run_quarantine_invalids(self, invalid_type): def run_quarantine_invalids(self, invalid_type):
df = self._get_data_file(invalid_type=invalid_type, obj_name='1') df = self._get_disk_file(invalid_type=invalid_type, obj_name='1')
for chunk in df: for chunk in df:
pass pass
self.assertTrue(df.quarantined_dir) self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, df = self._get_disk_file(invalid_type=invalid_type,
obj_name='2', csize=1) obj_name='2', csize=1)
for chunk in df: for chunk in df:
pass pass
self.assertTrue(df.quarantined_dir) self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, df = self._get_disk_file(invalid_type=invalid_type,
obj_name='3', csize=100000) obj_name='3', csize=100000)
for chunk in df: for chunk in df:
pass pass
self.assertTrue(df.quarantined_dir) self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='4') df = self._get_disk_file(invalid_type=invalid_type, obj_name='4')
self.assertFalse(df.quarantined_dir) self.assertFalse(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='5') df = self._get_disk_file(invalid_type=invalid_type, obj_name='5')
for chunk in df.app_iter_range(0, df.unit_test_len): for chunk in df.app_iter_range(0, df.unit_test_len):
pass pass
self.assertTrue(df.quarantined_dir) self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type=invalid_type, obj_name='6') df = self._get_disk_file(invalid_type=invalid_type, obj_name='6')
for chunk in df.app_iter_range(0, df.unit_test_len + 100): for chunk in df.app_iter_range(0, df.unit_test_len + 100):
pass pass
self.assertTrue(df.quarantined_dir) self.assertTrue(df.quarantined_dir)
@ -293,15 +293,15 @@ class TestDiskFile(unittest.TestCase):
# in a quarantine, even if the whole file isn't check-summed # in a quarantine, even if the whole file isn't check-summed
if invalid_type in ('Zero-Byte', 'Content-Length'): if invalid_type in ('Zero-Byte', 'Content-Length'):
expected_quar = True expected_quar = True
df = self._get_data_file(invalid_type=invalid_type, obj_name='7') df = self._get_disk_file(invalid_type=invalid_type, obj_name='7')
for chunk in df.app_iter_range(1, df.unit_test_len): for chunk in df.app_iter_range(1, df.unit_test_len):
pass pass
self.assertEquals(bool(df.quarantined_dir), expected_quar) self.assertEquals(bool(df.quarantined_dir), expected_quar)
df = self._get_data_file(invalid_type=invalid_type, obj_name='8') df = self._get_disk_file(invalid_type=invalid_type, obj_name='8')
for chunk in df.app_iter_range(0, df.unit_test_len - 1): for chunk in df.app_iter_range(0, df.unit_test_len - 1):
pass pass
self.assertEquals(bool(df.quarantined_dir), expected_quar) self.assertEquals(bool(df.quarantined_dir), expected_quar)
df = self._get_data_file(invalid_type=invalid_type, obj_name='8') df = self._get_disk_file(invalid_type=invalid_type, obj_name='8')
for chunk in df.app_iter_range(1, df.unit_test_len + 1): for chunk in df.app_iter_range(1, df.unit_test_len + 1):
pass pass
self.assertEquals(bool(df.quarantined_dir), expected_quar) self.assertEquals(bool(df.quarantined_dir), expected_quar)
@ -312,20 +312,20 @@ class TestDiskFile(unittest.TestCase):
self.run_quarantine_invalids('Zero-Byte') self.run_quarantine_invalids('Zero-Byte')
def test_quarantine_deleted_files(self): def test_quarantine_deleted_files(self):
df = self._get_data_file(invalid_type='Content-Length', df = self._get_disk_file(invalid_type='Content-Length',
extension='.data') extension='.data')
df.close() df.close()
self.assertTrue(df.quarantined_dir) self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type='Content-Length', df = self._get_disk_file(invalid_type='Content-Length',
extension='.ts') extension='.ts')
df.close() df.close()
self.assertFalse(df.quarantined_dir) self.assertFalse(df.quarantined_dir)
df = self._get_data_file(invalid_type='Content-Length', df = self._get_disk_file(invalid_type='Content-Length',
extension='.ts') extension='.ts')
self.assertRaises(DiskFileNotExist, df.get_data_file_size) self.assertRaises(DiskFileNotExist, df.get_data_file_size)
def test_put_metadata(self): def test_put_metadata(self):
df = self._get_data_file() df = self._get_disk_file()
ts = time() ts = time()
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
df.put_metadata(metadata) df.put_metadata(metadata)
@ -335,7 +335,7 @@ class TestDiskFile(unittest.TestCase):
self.assertTrue(exp_name in set(dl)) self.assertTrue(exp_name in set(dl))
def test_put_metadata_ts(self): def test_put_metadata_ts(self):
df = self._get_data_file() df = self._get_disk_file()
ts = time() ts = time()
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
df.put_metadata(metadata, tombstone=True) df.put_metadata(metadata, tombstone=True)
@ -345,9 +345,9 @@ class TestDiskFile(unittest.TestCase):
self.assertTrue(exp_name in set(dl)) self.assertTrue(exp_name in set(dl))
def test_unlinkold(self): def test_unlinkold(self):
df1 = self._get_data_file() df1 = self._get_disk_file()
future_time = str(normalize_timestamp(time() + 100)) future_time = str(normalize_timestamp(time() + 100))
df2 = self._get_data_file(ts=future_time) df2 = self._get_disk_file(ts=future_time)
self.assertEquals(len(os.listdir(df1.datadir)), 2) self.assertEquals(len(os.listdir(df1.datadir)), 2)
df1.unlinkold(future_time) df1.unlinkold(future_time)
self.assertEquals(len(os.listdir(df1.datadir)), 1) self.assertEquals(len(os.listdir(df1.datadir)), 1)
@ -358,7 +358,7 @@ class TestDiskFile(unittest.TestCase):
def err(): def err():
raise Exception("bad") raise Exception("bad")
df = self._get_data_file(fsize=1024 * 1024 * 2) df = self._get_disk_file(fsize=1024 * 1024 * 2)
df._handle_close_quarantine = err df._handle_close_quarantine = err
for chunk in df: for chunk in df:
pass pass
@ -367,7 +367,7 @@ class TestDiskFile(unittest.TestCase):
self.assertEquals(len(df.logger.log_dict['error']), 1) self.assertEquals(len(df.logger.log_dict['error']), 1)
def test_quarantine_twice(self): def test_quarantine_twice(self):
df = self._get_data_file(invalid_type='Content-Length', df = self._get_disk_file(invalid_type='Content-Length',
extension='.data') extension='.data')
self.assert_(os.path.isfile(df.data_file)) self.assert_(os.path.isfile(df.data_file))
quar_dir = df.quarantine() quar_dir = df.quarantine()