Forklift the DiskFile interface into it's own module
* new module swift.obj.diskfile I parameterized two constants from obj.server into the DiskFile's __init__ * DATADIR -> obj_dir * DISALLOWED_HEADERS -> disallowed_metadata_keys I'm not sure if this is the right long term abstraction but for now it avoids circular imports. Change-Id: I3962202c07c4b2fbfc26f9776c8a5c96292ae199
This commit is contained in:
parent
167897bba5
commit
c9de9f2b8d
@ -18,6 +18,7 @@ import time
|
||||
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.obj import diskfile
|
||||
from swift.obj import server as object_server
|
||||
from swift.common.utils import get_logger, audit_location_generator, \
|
||||
ratelimit_sleep, config_true_value, dump_recon_cache, list_from_csv, json
|
||||
@ -159,13 +160,13 @@ class AuditorWorker(object):
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
name = object_server.read_metadata(path)['name']
|
||||
name = diskfile.read_metadata(path)['name']
|
||||
except (Exception, Timeout) as exc:
|
||||
raise AuditException('Error when reading metadata: %s' % exc)
|
||||
_junk, account, container, obj = name.split('/', 3)
|
||||
df = object_server.DiskFile(self.devices, device, partition,
|
||||
account, container, obj, self.logger,
|
||||
keep_data_fp=True)
|
||||
df = diskfile.DiskFile(self.devices, device, partition,
|
||||
account, container, obj, self.logger,
|
||||
keep_data_fp=True)
|
||||
try:
|
||||
try:
|
||||
obj_size = df.get_data_file_size()
|
||||
@ -199,7 +200,7 @@ class AuditorWorker(object):
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and will '
|
||||
'be quarantined: %(err)s'),
|
||||
{'obj': path, 'err': err})
|
||||
object_server.quarantine_renamer(
|
||||
diskfile.quarantine_renamer(
|
||||
os.path.join(self.devices, device), path)
|
||||
return
|
||||
except (Exception, Timeout):
|
||||
|
454
swift/obj/diskfile.py
Normal file
454
swift/obj/diskfile.py
Normal file
@ -0,0 +1,454 @@
|
||||
# Copyright (c) 2010-2012 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Disk File Interface for Swift Object Server"""
|
||||
|
||||
from __future__ import with_statement
|
||||
import cPickle as pickle
|
||||
import errno
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from hashlib import md5
|
||||
from tempfile import mkstemp
|
||||
from contextlib import contextmanager
|
||||
|
||||
from xattr import getxattr, setxattr
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.common.utils import mkdirs, normalize_timestamp, \
|
||||
storage_directory, hash_path, renamer, fallocate, fsync, \
|
||||
fdatasync, drop_buffer_cache, ThreadPool
|
||||
from swift.common.exceptions import DiskFileError, \
|
||||
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
|
||||
from swift.obj.base import invalidate_hash, \
|
||||
quarantine_renamer
|
||||
from swift.common.swob import multi_range_iterator
|
||||
|
||||
|
||||
PICKLE_PROTOCOL = 2
|
||||
METADATA_KEY = 'user.swift.metadata'
|
||||
|
||||
|
||||
def read_metadata(fd):
|
||||
"""
|
||||
Helper function to read the pickled metadata from an object file.
|
||||
|
||||
:param fd: file descriptor to load the metadata from
|
||||
|
||||
:returns: dictionary of metadata
|
||||
"""
|
||||
metadata = ''
|
||||
key = 0
|
||||
try:
|
||||
while True:
|
||||
metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
|
||||
key += 1
|
||||
except IOError:
|
||||
pass
|
||||
return pickle.loads(metadata)
|
||||
|
||||
|
||||
def write_metadata(fd, metadata):
|
||||
"""
|
||||
Helper function to write pickled metadata for an object file.
|
||||
|
||||
:param fd: file descriptor to write the metadata
|
||||
:param metadata: metadata to write
|
||||
"""
|
||||
metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
|
||||
key = 0
|
||||
while metastr:
|
||||
setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
|
||||
metastr = metastr[254:]
|
||||
key += 1
|
||||
|
||||
|
||||
class DiskWriter(object):
|
||||
"""
|
||||
Encapsulation of the write context for servicing PUT REST API
|
||||
requests. Serves as the context manager object for DiskFile's writer()
|
||||
method.
|
||||
"""
|
||||
def __init__(self, disk_file, fd, tmppath, threadpool):
|
||||
self.disk_file = disk_file
|
||||
self.fd = fd
|
||||
self.tmppath = tmppath
|
||||
self.upload_size = 0
|
||||
self.last_sync = 0
|
||||
self.threadpool = threadpool
|
||||
|
||||
def write(self, chunk):
|
||||
"""
|
||||
Write a chunk of data into the temporary file.
|
||||
|
||||
:param chunk: the chunk of data to write as a string object
|
||||
"""
|
||||
|
||||
def _write_entire_chunk(chunk):
|
||||
while chunk:
|
||||
written = os.write(self.fd, chunk)
|
||||
self.upload_size += written
|
||||
chunk = chunk[written:]
|
||||
|
||||
self.threadpool.run_in_thread(_write_entire_chunk, chunk)
|
||||
|
||||
# For large files sync every 512MB (by default) written
|
||||
diff = self.upload_size - self.last_sync
|
||||
if diff >= self.disk_file.bytes_per_sync:
|
||||
self.threadpool.force_run_in_thread(fdatasync, self.fd)
|
||||
drop_buffer_cache(self.fd, self.last_sync, diff)
|
||||
self.last_sync = self.upload_size
|
||||
|
||||
def put(self, metadata, extension='.data'):
|
||||
"""
|
||||
Finalize writing the file on disk, and renames it from the temp file
|
||||
to the real location. This should be called after the data has been
|
||||
written to the temp file.
|
||||
|
||||
:param metadata: dictionary of metadata to be written
|
||||
:param extension: extension to be used when making the file
|
||||
"""
|
||||
assert self.tmppath is not None
|
||||
timestamp = normalize_timestamp(metadata['X-Timestamp'])
|
||||
metadata['name'] = self.disk_file.name
|
||||
|
||||
def finalize_put():
|
||||
# Write the metadata before calling fsync() so that both data and
|
||||
# metadata are flushed to disk.
|
||||
write_metadata(self.fd, metadata)
|
||||
# We call fsync() before calling drop_cache() to lower the amount
|
||||
# of redundant work the drop cache code will perform on the pages
|
||||
# (now that after fsync the pages will be all clean).
|
||||
fsync(self.fd)
|
||||
# From the Department of the Redundancy Department, make sure
|
||||
# we call drop_cache() after fsync() to avoid redundant work
|
||||
# (pages all clean).
|
||||
drop_buffer_cache(self.fd, 0, self.upload_size)
|
||||
invalidate_hash(os.path.dirname(self.disk_file.datadir))
|
||||
# After the rename completes, this object will be available for
|
||||
# other requests to reference.
|
||||
renamer(self.tmppath,
|
||||
os.path.join(self.disk_file.datadir,
|
||||
timestamp + extension))
|
||||
|
||||
self.threadpool.force_run_in_thread(finalize_put)
|
||||
self.disk_file.metadata = metadata
|
||||
|
||||
|
||||
class DiskFile(object):
|
||||
"""
|
||||
Manage object files on disk.
|
||||
|
||||
:param path: path to devices on the node
|
||||
:param device: device name
|
||||
:param partition: partition on the device the object lives in
|
||||
:param account: account name for the object
|
||||
:param container: container name for the object
|
||||
:param obj: object name for the object
|
||||
:param keep_data_fp: if True, don't close the fp, otherwise close it
|
||||
:param disk_chunk_size: size of chunks on file reads
|
||||
:param bytes_per_sync: number of bytes between fdatasync calls
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param threadpool: thread pool in which to do blocking operations
|
||||
|
||||
:raises DiskFileCollision: on md5 collision
|
||||
"""
|
||||
|
||||
def __init__(self, path, device, partition, account, container, obj,
|
||||
logger, keep_data_fp=False, disk_chunk_size=65536,
|
||||
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
|
||||
threadpool=None, obj_dir='objects',
|
||||
disallowed_metadata_keys=None):
|
||||
self.disk_chunk_size = disk_chunk_size
|
||||
self.bytes_per_sync = bytes_per_sync
|
||||
self.iter_hook = iter_hook
|
||||
self.name = '/' + '/'.join((account, container, obj))
|
||||
name_hash = hash_path(account, container, obj)
|
||||
self.datadir = os.path.join(
|
||||
path, device, storage_directory(obj_dir, partition, name_hash))
|
||||
self.device_path = os.path.join(path, device)
|
||||
self.tmpdir = os.path.join(path, device, 'tmp')
|
||||
self.logger = logger
|
||||
self.disallowed_metadata_keys = disallowed_metadata_keys
|
||||
self.metadata = {}
|
||||
self.meta_file = None
|
||||
self.data_file = None
|
||||
self.fp = None
|
||||
self.iter_etag = None
|
||||
self.started_at_0 = False
|
||||
self.read_to_eof = False
|
||||
self.quarantined_dir = None
|
||||
self.keep_cache = False
|
||||
self.suppress_file_closing = False
|
||||
self.threadpool = threadpool or ThreadPool(nthreads=0)
|
||||
if not os.path.exists(self.datadir):
|
||||
return
|
||||
files = sorted(os.listdir(self.datadir), reverse=True)
|
||||
for afile in files:
|
||||
if afile.endswith('.ts'):
|
||||
self.data_file = self.meta_file = None
|
||||
self.metadata = {'deleted': True}
|
||||
return
|
||||
if afile.endswith('.meta') and not self.meta_file:
|
||||
self.meta_file = os.path.join(self.datadir, afile)
|
||||
if afile.endswith('.data') and not self.data_file:
|
||||
self.data_file = os.path.join(self.datadir, afile)
|
||||
break
|
||||
if not self.data_file:
|
||||
return
|
||||
self.fp = open(self.data_file, 'rb')
|
||||
self.metadata = read_metadata(self.fp)
|
||||
if not keep_data_fp:
|
||||
self.close(verify_file=False)
|
||||
if self.meta_file:
|
||||
with open(self.meta_file) as mfp:
|
||||
for key in self.metadata.keys():
|
||||
if key.lower() not in self.disallowed_metadata_keys:
|
||||
del self.metadata[key]
|
||||
self.metadata.update(read_metadata(mfp))
|
||||
if 'name' in self.metadata:
|
||||
if self.metadata['name'] != self.name:
|
||||
self.logger.error(_('Client path %(client)s does not match '
|
||||
'path stored in object metadata %(meta)s'),
|
||||
{'client': self.name,
|
||||
'meta': self.metadata['name']})
|
||||
raise DiskFileCollision('Client path does not match path '
|
||||
'stored in object metadata')
|
||||
|
||||
def __iter__(self):
|
||||
"""Returns an iterator over the data file."""
|
||||
try:
|
||||
dropped_cache = 0
|
||||
read = 0
|
||||
self.started_at_0 = False
|
||||
self.read_to_eof = False
|
||||
if self.fp.tell() == 0:
|
||||
self.started_at_0 = True
|
||||
self.iter_etag = md5()
|
||||
while True:
|
||||
chunk = self.threadpool.run_in_thread(
|
||||
self.fp.read, self.disk_chunk_size)
|
||||
if chunk:
|
||||
if self.iter_etag:
|
||||
self.iter_etag.update(chunk)
|
||||
read += len(chunk)
|
||||
if read - dropped_cache > (1024 * 1024):
|
||||
self._drop_cache(self.fp.fileno(), dropped_cache,
|
||||
read - dropped_cache)
|
||||
dropped_cache = read
|
||||
yield chunk
|
||||
if self.iter_hook:
|
||||
self.iter_hook()
|
||||
else:
|
||||
self.read_to_eof = True
|
||||
self._drop_cache(self.fp.fileno(), dropped_cache,
|
||||
read - dropped_cache)
|
||||
break
|
||||
finally:
|
||||
if not self.suppress_file_closing:
|
||||
self.close()
|
||||
|
||||
def app_iter_range(self, start, stop):
|
||||
"""Returns an iterator over the data file for range (start, stop)"""
|
||||
if start or start == 0:
|
||||
self.fp.seek(start)
|
||||
if stop is not None:
|
||||
length = stop - start
|
||||
else:
|
||||
length = None
|
||||
for chunk in self:
|
||||
if length is not None:
|
||||
length -= len(chunk)
|
||||
if length < 0:
|
||||
# Chop off the extra:
|
||||
yield chunk[:length]
|
||||
break
|
||||
yield chunk
|
||||
|
||||
def app_iter_ranges(self, ranges, content_type, boundary, size):
|
||||
"""Returns an iterator over the data file for a set of ranges"""
|
||||
if not ranges:
|
||||
yield ''
|
||||
else:
|
||||
try:
|
||||
self.suppress_file_closing = True
|
||||
for chunk in multi_range_iterator(
|
||||
ranges, content_type, boundary, size,
|
||||
self.app_iter_range):
|
||||
yield chunk
|
||||
finally:
|
||||
self.suppress_file_closing = False
|
||||
self.close()
|
||||
|
||||
def _handle_close_quarantine(self):
|
||||
"""Check if file needs to be quarantined"""
|
||||
try:
|
||||
self.get_data_file_size()
|
||||
except DiskFileError:
|
||||
self.quarantine()
|
||||
return
|
||||
except DiskFileNotExist:
|
||||
return
|
||||
|
||||
if self.iter_etag and self.started_at_0 and self.read_to_eof and \
|
||||
'ETag' in self.metadata and \
|
||||
self.iter_etag.hexdigest() != self.metadata.get('ETag'):
|
||||
self.quarantine()
|
||||
|
||||
def close(self, verify_file=True):
|
||||
"""
|
||||
Close the file. Will handle quarantining file if necessary.
|
||||
|
||||
:param verify_file: Defaults to True. If false, will not check
|
||||
file to see if it needs quarantining.
|
||||
"""
|
||||
if self.fp:
|
||||
try:
|
||||
if verify_file:
|
||||
self._handle_close_quarantine()
|
||||
except (Exception, Timeout), e:
|
||||
self.logger.error(_(
|
||||
'ERROR DiskFile %(data_file)s in '
|
||||
'%(data_dir)s close failure: %(exc)s : %(stack)'),
|
||||
{'exc': e, 'stack': ''.join(traceback.format_stack()),
|
||||
'data_file': self.data_file, 'data_dir': self.datadir})
|
||||
finally:
|
||||
self.fp.close()
|
||||
self.fp = None
|
||||
|
||||
def is_deleted(self):
|
||||
"""
|
||||
Check if the file is deleted.
|
||||
|
||||
:returns: True if the file doesn't exist or has been flagged as
|
||||
deleted.
|
||||
"""
|
||||
return not self.data_file or 'deleted' in self.metadata
|
||||
|
||||
def is_expired(self):
|
||||
"""
|
||||
Check if the file is expired.
|
||||
|
||||
:returns: True if the file has an X-Delete-At in the past
|
||||
"""
|
||||
return ('X-Delete-At' in self.metadata and
|
||||
int(self.metadata['X-Delete-At']) <= time.time())
|
||||
|
||||
@contextmanager
|
||||
def writer(self, size=None):
|
||||
"""
|
||||
Context manager to write a file. We create a temporary file first, and
|
||||
then return a DiskWriter object to encapsulate the state.
|
||||
|
||||
:param size: optional initial size of file to explicitly allocate on
|
||||
disk
|
||||
:raises DiskFileNoSpace: if a size is specified and allocation fails
|
||||
"""
|
||||
if not os.path.exists(self.tmpdir):
|
||||
mkdirs(self.tmpdir)
|
||||
fd, tmppath = mkstemp(dir=self.tmpdir)
|
||||
try:
|
||||
if size is not None and size > 0:
|
||||
try:
|
||||
fallocate(fd, size)
|
||||
except OSError:
|
||||
raise DiskFileNoSpace()
|
||||
yield DiskWriter(self, fd, tmppath, self.threadpool)
|
||||
finally:
|
||||
try:
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.unlink(tmppath)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def put_metadata(self, metadata, tombstone=False):
|
||||
"""
|
||||
Short hand for putting metadata to .meta and .ts files.
|
||||
|
||||
:param metadata: dictionary of metadata to be written
|
||||
:param tombstone: whether or not we are writing a tombstone
|
||||
"""
|
||||
extension = '.ts' if tombstone else '.meta'
|
||||
with self.writer() as writer:
|
||||
writer.put(metadata, extension=extension)
|
||||
|
||||
def unlinkold(self, timestamp):
|
||||
"""
|
||||
Remove any older versions of the object file. Any file that has an
|
||||
older timestamp than timestamp will be deleted.
|
||||
|
||||
:param timestamp: timestamp to compare with each file
|
||||
"""
|
||||
timestamp = normalize_timestamp(timestamp)
|
||||
|
||||
def _unlinkold():
|
||||
for fname in os.listdir(self.datadir):
|
||||
if fname < timestamp:
|
||||
try:
|
||||
os.unlink(os.path.join(self.datadir, fname))
|
||||
except OSError, err: # pragma: no cover
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
self.threadpool.run_in_thread(_unlinkold)
|
||||
|
||||
def _drop_cache(self, fd, offset, length):
|
||||
"""Method for no-oping buffer cache drop method."""
|
||||
if not self.keep_cache:
|
||||
drop_buffer_cache(fd, offset, length)
|
||||
|
||||
def quarantine(self):
|
||||
"""
|
||||
In the case that a file is corrupted, move it to a quarantined
|
||||
area to allow replication to fix it.
|
||||
|
||||
:returns: if quarantine is successful, path to quarantined
|
||||
directory otherwise None
|
||||
"""
|
||||
if not (self.is_deleted() or self.quarantined_dir):
|
||||
self.quarantined_dir = self.threadpool.run_in_thread(
|
||||
quarantine_renamer, self.device_path, self.data_file)
|
||||
self.logger.increment('quarantines')
|
||||
return self.quarantined_dir
|
||||
|
||||
def get_data_file_size(self):
|
||||
"""
|
||||
Returns the os.path.getsize for the file. Raises an exception if this
|
||||
file does not match the Content-Length stored in the metadata. Or if
|
||||
self.data_file does not exist.
|
||||
|
||||
:returns: file size as an int
|
||||
:raises DiskFileError: on file size mismatch.
|
||||
:raises DiskFileNotExist: on file not existing (including deleted)
|
||||
"""
|
||||
try:
|
||||
file_size = 0
|
||||
if self.data_file:
|
||||
file_size = self.threadpool.run_in_thread(
|
||||
os.path.getsize, self.data_file)
|
||||
if 'Content-Length' in self.metadata:
|
||||
metadata_size = int(self.metadata['Content-Length'])
|
||||
if file_size != metadata_size:
|
||||
raise DiskFileError(
|
||||
'Content-Length of %s does not match file size '
|
||||
'of %s' % (metadata_size, file_size))
|
||||
return file_size
|
||||
except OSError, err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
raise DiskFileNotExist('Data File does not exist.')
|
@ -17,23 +17,18 @@
|
||||
|
||||
from __future__ import with_statement
|
||||
import cPickle as pickle
|
||||
import errno
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from hashlib import md5
|
||||
from tempfile import mkstemp
|
||||
from urllib import unquote
|
||||
from contextlib import contextmanager
|
||||
|
||||
from xattr import getxattr, setxattr
|
||||
from eventlet import sleep, Timeout
|
||||
|
||||
from swift.common.utils import mkdirs, normalize_timestamp, public, \
|
||||
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
|
||||
split_path, drop_buffer_cache, get_logger, write_pickle, \
|
||||
hash_path, split_path, get_logger, write_pickle, \
|
||||
config_true_value, validate_device_partition, timing_stats, \
|
||||
ThreadPool, replication
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
@ -41,436 +36,23 @@ from swift.common.constraints import check_object_creation, check_mount, \
|
||||
check_float, check_utf8
|
||||
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
|
||||
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
|
||||
from swift.obj.base import invalidate_hash, \
|
||||
quarantine_renamer, get_hashes
|
||||
from swift.obj.base import get_hashes
|
||||
from swift.common.http import is_success
|
||||
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
|
||||
HTTPInternalServerError, HTTPNoContent, HTTPNotFound, HTTPNotModified, \
|
||||
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
|
||||
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, UTC, \
|
||||
HTTPInsufficientStorage, HTTPForbidden, multi_range_iterator, \
|
||||
HeaderKeyDict
|
||||
HTTPInsufficientStorage, HTTPForbidden, HeaderKeyDict
|
||||
from swift.obj.diskfile import DiskFile
|
||||
|
||||
|
||||
DATADIR = 'objects'
|
||||
ASYNCDIR = 'async_pending'
|
||||
PICKLE_PROTOCOL = 2
|
||||
METADATA_KEY = 'user.swift.metadata'
|
||||
MAX_OBJECT_NAME_LENGTH = 1024
|
||||
# keep these lower-case
|
||||
DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split())
|
||||
|
||||
|
||||
def read_metadata(fd):
|
||||
"""
|
||||
Helper function to read the pickled metadata from an object file.
|
||||
|
||||
:param fd: file descriptor to load the metadata from
|
||||
|
||||
:returns: dictionary of metadata
|
||||
"""
|
||||
metadata = ''
|
||||
key = 0
|
||||
try:
|
||||
while True:
|
||||
metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
|
||||
key += 1
|
||||
except IOError:
|
||||
pass
|
||||
return pickle.loads(metadata)
|
||||
|
||||
|
||||
def write_metadata(fd, metadata):
|
||||
"""
|
||||
Helper function to write pickled metadata for an object file.
|
||||
|
||||
:param fd: file descriptor to write the metadata
|
||||
:param metadata: metadata to write
|
||||
"""
|
||||
metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
|
||||
key = 0
|
||||
while metastr:
|
||||
setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
|
||||
metastr = metastr[254:]
|
||||
key += 1
|
||||
|
||||
|
||||
class DiskWriter(object):
|
||||
"""
|
||||
Encapsulation of the write context for servicing PUT REST API
|
||||
requests. Serves as the context manager object for DiskFile's writer()
|
||||
method.
|
||||
"""
|
||||
def __init__(self, disk_file, fd, tmppath, threadpool):
|
||||
self.disk_file = disk_file
|
||||
self.fd = fd
|
||||
self.tmppath = tmppath
|
||||
self.upload_size = 0
|
||||
self.last_sync = 0
|
||||
self.threadpool = threadpool
|
||||
|
||||
def write(self, chunk):
|
||||
"""
|
||||
Write a chunk of data into the temporary file.
|
||||
|
||||
:param chunk: the chunk of data to write as a string object
|
||||
"""
|
||||
|
||||
def _write_entire_chunk(chunk):
|
||||
while chunk:
|
||||
written = os.write(self.fd, chunk)
|
||||
self.upload_size += written
|
||||
chunk = chunk[written:]
|
||||
|
||||
self.threadpool.run_in_thread(_write_entire_chunk, chunk)
|
||||
|
||||
# For large files sync every 512MB (by default) written
|
||||
diff = self.upload_size - self.last_sync
|
||||
if diff >= self.disk_file.bytes_per_sync:
|
||||
self.threadpool.force_run_in_thread(fdatasync, self.fd)
|
||||
drop_buffer_cache(self.fd, self.last_sync, diff)
|
||||
self.last_sync = self.upload_size
|
||||
|
||||
def put(self, metadata, extension='.data'):
|
||||
"""
|
||||
Finalize writing the file on disk, and renames it from the temp file
|
||||
to the real location. This should be called after the data has been
|
||||
written to the temp file.
|
||||
|
||||
:param metadata: dictionary of metadata to be written
|
||||
:param extension: extension to be used when making the file
|
||||
"""
|
||||
assert self.tmppath is not None
|
||||
timestamp = normalize_timestamp(metadata['X-Timestamp'])
|
||||
metadata['name'] = self.disk_file.name
|
||||
|
||||
def finalize_put():
|
||||
# Write the metadata before calling fsync() so that both data and
|
||||
# metadata are flushed to disk.
|
||||
write_metadata(self.fd, metadata)
|
||||
# We call fsync() before calling drop_cache() to lower the amount
|
||||
# of redundant work the drop cache code will perform on the pages
|
||||
# (now that after fsync the pages will be all clean).
|
||||
fsync(self.fd)
|
||||
# From the Department of the Redundancy Department, make sure
|
||||
# we call drop_cache() after fsync() to avoid redundant work
|
||||
# (pages all clean).
|
||||
drop_buffer_cache(self.fd, 0, self.upload_size)
|
||||
invalidate_hash(os.path.dirname(self.disk_file.datadir))
|
||||
# After the rename completes, this object will be available for
|
||||
# other requests to reference.
|
||||
renamer(self.tmppath,
|
||||
os.path.join(self.disk_file.datadir,
|
||||
timestamp + extension))
|
||||
|
||||
self.threadpool.force_run_in_thread(finalize_put)
|
||||
self.disk_file.metadata = metadata
|
||||
|
||||
|
||||
class DiskFile(object):
|
||||
"""
|
||||
Manage object files on disk.
|
||||
|
||||
:param path: path to devices on the node
|
||||
:param device: device name
|
||||
:param partition: partition on the device the object lives in
|
||||
:param account: account name for the object
|
||||
:param container: container name for the object
|
||||
:param obj: object name for the object
|
||||
:param keep_data_fp: if True, don't close the fp, otherwise close it
|
||||
:param disk_chunk_size: size of chunks on file reads
|
||||
:param bytes_per_sync: number of bytes between fdatasync calls
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param threadpool: thread pool in which to do blocking operations
|
||||
|
||||
:raises DiskFileCollision: on md5 collision
|
||||
"""
|
||||
|
||||
def __init__(self, path, device, partition, account, container, obj,
|
||||
logger, keep_data_fp=False, disk_chunk_size=65536,
|
||||
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
|
||||
threadpool=None):
|
||||
self.disk_chunk_size = disk_chunk_size
|
||||
self.bytes_per_sync = bytes_per_sync
|
||||
self.iter_hook = iter_hook
|
||||
self.name = '/' + '/'.join((account, container, obj))
|
||||
name_hash = hash_path(account, container, obj)
|
||||
self.datadir = os.path.join(
|
||||
path, device, storage_directory(DATADIR, partition, name_hash))
|
||||
self.device_path = os.path.join(path, device)
|
||||
self.tmpdir = os.path.join(path, device, 'tmp')
|
||||
self.logger = logger
|
||||
self.metadata = {}
|
||||
self.meta_file = None
|
||||
self.data_file = None
|
||||
self.fp = None
|
||||
self.iter_etag = None
|
||||
self.started_at_0 = False
|
||||
self.read_to_eof = False
|
||||
self.quarantined_dir = None
|
||||
self.keep_cache = False
|
||||
self.suppress_file_closing = False
|
||||
self.threadpool = threadpool or ThreadPool(nthreads=0)
|
||||
if not os.path.exists(self.datadir):
|
||||
return
|
||||
files = sorted(os.listdir(self.datadir), reverse=True)
|
||||
for afile in files:
|
||||
if afile.endswith('.ts'):
|
||||
self.data_file = self.meta_file = None
|
||||
self.metadata = {'deleted': True}
|
||||
return
|
||||
if afile.endswith('.meta') and not self.meta_file:
|
||||
self.meta_file = os.path.join(self.datadir, afile)
|
||||
if afile.endswith('.data') and not self.data_file:
|
||||
self.data_file = os.path.join(self.datadir, afile)
|
||||
break
|
||||
if not self.data_file:
|
||||
return
|
||||
self.fp = open(self.data_file, 'rb')
|
||||
self.metadata = read_metadata(self.fp)
|
||||
if not keep_data_fp:
|
||||
self.close(verify_file=False)
|
||||
if self.meta_file:
|
||||
with open(self.meta_file) as mfp:
|
||||
for key in self.metadata.keys():
|
||||
if key.lower() not in DISALLOWED_HEADERS:
|
||||
del self.metadata[key]
|
||||
self.metadata.update(read_metadata(mfp))
|
||||
if 'name' in self.metadata:
|
||||
if self.metadata['name'] != self.name:
|
||||
self.logger.error(_('Client path %(client)s does not match '
|
||||
'path stored in object metadata %(meta)s'),
|
||||
{'client': self.name,
|
||||
'meta': self.metadata['name']})
|
||||
raise DiskFileCollision('Client path does not match path '
|
||||
'stored in object metadata')
|
||||
|
||||
def __iter__(self):
|
||||
"""Returns an iterator over the data file."""
|
||||
try:
|
||||
dropped_cache = 0
|
||||
read = 0
|
||||
self.started_at_0 = False
|
||||
self.read_to_eof = False
|
||||
if self.fp.tell() == 0:
|
||||
self.started_at_0 = True
|
||||
self.iter_etag = md5()
|
||||
while True:
|
||||
chunk = self.threadpool.run_in_thread(
|
||||
self.fp.read, self.disk_chunk_size)
|
||||
if chunk:
|
||||
if self.iter_etag:
|
||||
self.iter_etag.update(chunk)
|
||||
read += len(chunk)
|
||||
if read - dropped_cache > (1024 * 1024):
|
||||
self._drop_cache(self.fp.fileno(), dropped_cache,
|
||||
read - dropped_cache)
|
||||
dropped_cache = read
|
||||
yield chunk
|
||||
if self.iter_hook:
|
||||
self.iter_hook()
|
||||
else:
|
||||
self.read_to_eof = True
|
||||
self._drop_cache(self.fp.fileno(), dropped_cache,
|
||||
read - dropped_cache)
|
||||
break
|
||||
finally:
|
||||
if not self.suppress_file_closing:
|
||||
self.close()
|
||||
|
||||
def app_iter_range(self, start, stop):
|
||||
"""Returns an iterator over the data file for range (start, stop)"""
|
||||
if start or start == 0:
|
||||
self.fp.seek(start)
|
||||
if stop is not None:
|
||||
length = stop - start
|
||||
else:
|
||||
length = None
|
||||
for chunk in self:
|
||||
if length is not None:
|
||||
length -= len(chunk)
|
||||
if length < 0:
|
||||
# Chop off the extra:
|
||||
yield chunk[:length]
|
||||
break
|
||||
yield chunk
|
||||
|
||||
def app_iter_ranges(self, ranges, content_type, boundary, size):
|
||||
"""Returns an iterator over the data file for a set of ranges"""
|
||||
if not ranges:
|
||||
yield ''
|
||||
else:
|
||||
try:
|
||||
self.suppress_file_closing = True
|
||||
for chunk in multi_range_iterator(
|
||||
ranges, content_type, boundary, size,
|
||||
self.app_iter_range):
|
||||
yield chunk
|
||||
finally:
|
||||
self.suppress_file_closing = False
|
||||
self.close()
|
||||
|
||||
def _handle_close_quarantine(self):
|
||||
"""Check if file needs to be quarantined"""
|
||||
try:
|
||||
self.get_data_file_size()
|
||||
except DiskFileError:
|
||||
self.quarantine()
|
||||
return
|
||||
except DiskFileNotExist:
|
||||
return
|
||||
|
||||
if self.iter_etag and self.started_at_0 and self.read_to_eof and \
|
||||
'ETag' in self.metadata and \
|
||||
self.iter_etag.hexdigest() != self.metadata.get('ETag'):
|
||||
self.quarantine()
|
||||
|
||||
def close(self, verify_file=True):
|
||||
"""
|
||||
Close the file. Will handle quarantining file if necessary.
|
||||
|
||||
:param verify_file: Defaults to True. If false, will not check
|
||||
file to see if it needs quarantining.
|
||||
"""
|
||||
if self.fp:
|
||||
try:
|
||||
if verify_file:
|
||||
self._handle_close_quarantine()
|
||||
except (Exception, Timeout), e:
|
||||
self.logger.error(_(
|
||||
'ERROR DiskFile %(data_file)s in '
|
||||
'%(data_dir)s close failure: %(exc)s : %(stack)'),
|
||||
{'exc': e, 'stack': ''.join(traceback.format_stack()),
|
||||
'data_file': self.data_file, 'data_dir': self.datadir})
|
||||
finally:
|
||||
self.fp.close()
|
||||
self.fp = None
|
||||
|
||||
def is_deleted(self):
|
||||
"""
|
||||
Check if the file is deleted.
|
||||
|
||||
:returns: True if the file doesn't exist or has been flagged as
|
||||
deleted.
|
||||
"""
|
||||
return not self.data_file or 'deleted' in self.metadata
|
||||
|
||||
def is_expired(self):
|
||||
"""
|
||||
Check if the file is expired.
|
||||
|
||||
:returns: True if the file has an X-Delete-At in the past
|
||||
"""
|
||||
return ('X-Delete-At' in self.metadata and
|
||||
int(self.metadata['X-Delete-At']) <= time.time())
|
||||
|
||||
@contextmanager
|
||||
def writer(self, size=None):
|
||||
"""
|
||||
Context manager to write a file. We create a temporary file first, and
|
||||
then return a DiskWriter object to encapsulate the state.
|
||||
|
||||
:param size: optional initial size of file to explicitly allocate on
|
||||
disk
|
||||
:raises DiskFileNoSpace: if a size is specified and allocation fails
|
||||
"""
|
||||
if not os.path.exists(self.tmpdir):
|
||||
mkdirs(self.tmpdir)
|
||||
fd, tmppath = mkstemp(dir=self.tmpdir)
|
||||
try:
|
||||
if size is not None and size > 0:
|
||||
try:
|
||||
fallocate(fd, size)
|
||||
except OSError:
|
||||
raise DiskFileNoSpace()
|
||||
yield DiskWriter(self, fd, tmppath, self.threadpool)
|
||||
finally:
|
||||
try:
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.unlink(tmppath)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def put_metadata(self, metadata, tombstone=False):
|
||||
"""
|
||||
Short hand for putting metadata to .meta and .ts files.
|
||||
|
||||
:param metadata: dictionary of metadata to be written
|
||||
:param tombstone: whether or not we are writing a tombstone
|
||||
"""
|
||||
extension = '.ts' if tombstone else '.meta'
|
||||
with self.writer() as writer:
|
||||
writer.put(metadata, extension=extension)
|
||||
|
||||
def unlinkold(self, timestamp):
|
||||
"""
|
||||
Remove any older versions of the object file. Any file that has an
|
||||
older timestamp than timestamp will be deleted.
|
||||
|
||||
:param timestamp: timestamp to compare with each file
|
||||
"""
|
||||
timestamp = normalize_timestamp(timestamp)
|
||||
|
||||
def _unlinkold():
|
||||
for fname in os.listdir(self.datadir):
|
||||
if fname < timestamp:
|
||||
try:
|
||||
os.unlink(os.path.join(self.datadir, fname))
|
||||
except OSError, err: # pragma: no cover
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
self.threadpool.run_in_thread(_unlinkold)
|
||||
|
||||
def _drop_cache(self, fd, offset, length):
|
||||
"""Method for no-oping buffer cache drop method."""
|
||||
if not self.keep_cache:
|
||||
drop_buffer_cache(fd, offset, length)
|
||||
|
||||
def quarantine(self):
|
||||
"""
|
||||
In the case that a file is corrupted, move it to a quarantined
|
||||
area to allow replication to fix it.
|
||||
|
||||
:returns: if quarantine is successful, path to quarantined
|
||||
directory otherwise None
|
||||
"""
|
||||
if not (self.is_deleted() or self.quarantined_dir):
|
||||
self.quarantined_dir = self.threadpool.run_in_thread(
|
||||
quarantine_renamer, self.device_path, self.data_file)
|
||||
self.logger.increment('quarantines')
|
||||
return self.quarantined_dir
|
||||
|
||||
def get_data_file_size(self):
|
||||
"""
|
||||
Returns the os.path.getsize for the file. Raises an exception if this
|
||||
file does not match the Content-Length stored in the metadata. Or if
|
||||
self.data_file does not exist.
|
||||
|
||||
:returns: file size as an int
|
||||
:raises DiskFileError: on file size mismatch.
|
||||
:raises DiskFileNotExist: on file not existing (including deleted)
|
||||
"""
|
||||
try:
|
||||
file_size = 0
|
||||
if self.data_file:
|
||||
file_size = self.threadpool.run_in_thread(
|
||||
os.path.getsize, self.data_file)
|
||||
if 'Content-Length' in self.metadata:
|
||||
metadata_size = int(self.metadata['Content-Length'])
|
||||
if file_size != metadata_size:
|
||||
raise DiskFileError(
|
||||
'Content-Length of %s does not match file size '
|
||||
'of %s' % (metadata_size, file_size))
|
||||
return file_size
|
||||
except OSError, err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
raise DiskFileNotExist('Data File does not exist.')
|
||||
|
||||
|
||||
class ObjectController(object):
|
||||
"""Implements the WSGI application for the Swift Object Server."""
|
||||
|
||||
@ -525,6 +107,8 @@ class ObjectController(object):
|
||||
kwargs.setdefault('disk_chunk_size', self.disk_chunk_size)
|
||||
kwargs.setdefault('logger', self.logger)
|
||||
kwargs.setdefault('threadpool', self.threadpools[device])
|
||||
kwargs.setdefault('obj_dir', DATADIR)
|
||||
kwargs.setdefault('disallowed_metadata_keys', DISALLOWED_HEADERS)
|
||||
return DiskFile(self.devices, device, partition, account,
|
||||
container, obj, **kwargs)
|
||||
|
||||
|
@ -23,7 +23,7 @@ from swiftclient import client
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.utils import hash_path, readconf
|
||||
from swift.obj.server import write_metadata, read_metadata
|
||||
from swift.obj.diskfile import write_metadata, read_metadata
|
||||
from test.probe.common import kill_servers, reset_environment
|
||||
|
||||
|
||||
|
@ -23,8 +23,8 @@ from hashlib import md5
|
||||
from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger
|
||||
from swift.obj import auditor
|
||||
from swift.obj import server as object_server
|
||||
from swift.obj.server import DiskFile, write_metadata, DATADIR
|
||||
from swift.obj.diskfile import DiskFile, write_metadata
|
||||
from swift.obj.server import DATADIR
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
storage_directory
|
||||
from swift.obj.base import invalidate_hash
|
||||
@ -158,7 +158,7 @@ class TestAuditor(unittest.TestCase):
|
||||
'Content-Length': str(os.fstat(writer.fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
with mock.patch('swift.obj.server.DiskFile',
|
||||
with mock.patch('swift.obj.diskfile.DiskFile',
|
||||
lambda *_: 1 / 0):
|
||||
self.auditor.audit_all_objects()
|
||||
self.assertEquals(self.auditor.errors, pre_errors + 1)
|
||||
@ -338,16 +338,16 @@ class TestAuditor(unittest.TestCase):
|
||||
rat[0] = True
|
||||
DiskFile.close(self, verify_file=verify_file)
|
||||
self.setup_bad_zero_byte()
|
||||
was_df = object_server.DiskFile
|
||||
was_df = auditor.diskfile.DiskFile
|
||||
try:
|
||||
object_server.DiskFile = FakeFile
|
||||
auditor.diskfile.DiskFile = FakeFile
|
||||
self.auditor.run_once(zero_byte_fps=50)
|
||||
quarantine_path = os.path.join(self.devices,
|
||||
'sda', 'quarantined', 'objects')
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
self.assertTrue(rat[0])
|
||||
finally:
|
||||
object_server.DiskFile = was_df
|
||||
auditor.diskfile.DiskFile = was_df
|
||||
|
||||
def test_run_forever(self):
|
||||
|
||||
|
382
test/unit/obj/test_diskfile.py
Normal file
382
test/unit/obj/test_diskfile.py
Normal file
@ -0,0 +1,382 @@
|
||||
#-*- coding:utf-8 -*-
|
||||
# Copyright (c) 2010-2012 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Tests for swift.obj.diskfile """
|
||||
|
||||
import cPickle as pickle
|
||||
import operator
|
||||
import os
|
||||
import mock
|
||||
import unittest
|
||||
import email
|
||||
from shutil import rmtree
|
||||
from StringIO import StringIO
|
||||
from time import gmtime, strftime, time
|
||||
from tempfile import mkdtemp
|
||||
from hashlib import md5
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
|
||||
from test.unit import FakeLogger
|
||||
from test.unit import _setxattr as setxattr
|
||||
from test.unit import connect_tcp, readuntil2crlfs
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
NullLogger, storage_directory, public, \
|
||||
replication
|
||||
from swift.common.exceptions import DiskFileNotExist
|
||||
from swift.common import constraints
|
||||
from eventlet import tpool
|
||||
from swift.common.swob import Request, HeaderKeyDict
|
||||
|
||||
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
"""Test swift.obj.diskfile.DiskFile"""
|
||||
|
||||
def setUp(self):
|
||||
""" Set up for testing swift.obj.diskfile"""
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
self._orig_tpool_exc = tpool.execute
|
||||
tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs)
|
||||
|
||||
def tearDown(self):
|
||||
""" Tear down for testing swift.obj.diskfile"""
|
||||
rmtree(os.path.dirname(self.testdir))
|
||||
tpool.execute = self._orig_tpool_exc
|
||||
|
||||
def _create_test_file(self, data, keep_data_fp=True):
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
f.write(data)
|
||||
setxattr(f.fileno(), diskfile.METADATA_KEY,
|
||||
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
|
||||
f.close()
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=keep_data_fp)
|
||||
return df
|
||||
|
||||
def test_disk_file_app_iter_corners(self):
|
||||
df = self._create_test_file('1234567890')
|
||||
self.assertEquals(''.join(df.app_iter_range(0, None)), '1234567890')
|
||||
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
self.assertEqual(''.join(df.app_iter_range(5, None)), '67890')
|
||||
|
||||
def test_disk_file_app_iter_ranges(self):
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
it = df.app_iter_ranges([(0, 10), (10, 20), (20, 30)], 'plain/text',
|
||||
'\r\n--someheader\r\n', 30)
|
||||
value = ''.join(it)
|
||||
self.assert_('0123456789' in value)
|
||||
self.assert_('1123456789' in value)
|
||||
self.assert_('2123456789' in value)
|
||||
|
||||
def test_disk_file_app_iter_ranges_edges(self):
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
it = df.app_iter_ranges([(3, 10), (0, 2)], 'application/whatever',
|
||||
'\r\n--someheader\r\n', 30)
|
||||
value = ''.join(it)
|
||||
self.assert_('3456789' in value)
|
||||
self.assert_('01' in value)
|
||||
|
||||
def test_disk_file_large_app_iter_ranges(self):
|
||||
"""
|
||||
This test case is to make sure that the disk file app_iter_ranges
|
||||
method all the paths being tested.
|
||||
"""
|
||||
long_str = '01234567890' * 65536
|
||||
target_strs = ['3456789', long_str[0:65590]]
|
||||
df = self._create_test_file(long_str)
|
||||
|
||||
it = df.app_iter_ranges([(3, 10), (0, 65590)], 'plain/text',
|
||||
'5e816ff8b8b8e9a5d355497e5d9e0301', 655360)
|
||||
|
||||
"""
|
||||
the produced string actually missing the MIME headers
|
||||
need to add these headers to make it as real MIME message.
|
||||
The body of the message is produced by method app_iter_ranges
|
||||
off of DiskFile object.
|
||||
"""
|
||||
header = ''.join(['Content-Type: multipart/byteranges;',
|
||||
'boundary=',
|
||||
'5e816ff8b8b8e9a5d355497e5d9e0301\r\n'])
|
||||
|
||||
value = header + ''.join(it)
|
||||
|
||||
parts = map(lambda p: p.get_payload(decode=True),
|
||||
email.message_from_string(value).walk())[1:3]
|
||||
self.assertEqual(parts, target_strs)
|
||||
|
||||
def test_disk_file_app_iter_ranges_empty(self):
|
||||
"""
|
||||
This test case tests when empty value passed into app_iter_ranges
|
||||
When ranges passed into the method is either empty array or None,
|
||||
this method will yield empty string
|
||||
"""
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
it = df.app_iter_ranges([], 'application/whatever',
|
||||
'\r\n--someheader\r\n', 100)
|
||||
self.assertEqual(''.join(it), '')
|
||||
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
it = df.app_iter_ranges(None, 'app/something',
|
||||
'\r\n--someheader\r\n', 150)
|
||||
self.assertEqual(''.join(it), '')
|
||||
|
||||
def test_disk_file_mkstemp_creates_dir(self):
|
||||
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
|
||||
os.rmdir(tmpdir)
|
||||
with diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
'o', FakeLogger()).writer() as writer:
|
||||
self.assert_(os.path.exists(tmpdir))
|
||||
|
||||
def test_iter_hook(self):
|
||||
hook_call_count = [0]
|
||||
def hook():
|
||||
hook_call_count[0] += 1
|
||||
|
||||
df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook)
|
||||
for _ in df:
|
||||
pass
|
||||
|
||||
self.assertEquals(hook_call_count[0], 9)
|
||||
|
||||
def test_quarantine(self):
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), diskfile.METADATA_KEY,
|
||||
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
df.quarantine()
|
||||
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
|
||||
'objects', os.path.basename(os.path.dirname(
|
||||
df.data_file)))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
|
||||
def test_quarantine_same_file(self):
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), diskfile.METADATA_KEY,
|
||||
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
new_dir = df.quarantine()
|
||||
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
|
||||
'objects', os.path.basename(os.path.dirname(
|
||||
df.data_file)))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
self.assertEquals(quar_dir, new_dir)
|
||||
# have to remake the datadir and file
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), diskfile.METADATA_KEY,
|
||||
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
|
||||
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
double_uuid_path = df.quarantine()
|
||||
self.assert_(os.path.isdir(double_uuid_path))
|
||||
self.assert_('-' in os.path.basename(double_uuid_path))
|
||||
|
||||
def _get_disk_file(self, invalid_type=None, obj_name='o',
|
||||
fsize=1024, csize=8, mark_deleted=False, ts=None,
|
||||
iter_hook=None):
|
||||
'''returns a DiskFile'''
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
obj_name, FakeLogger())
|
||||
data = '0' * fsize
|
||||
etag = md5()
|
||||
if ts:
|
||||
timestamp = ts
|
||||
else:
|
||||
timestamp = str(normalize_timestamp(time()))
|
||||
with df.writer() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(writer.fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
if invalid_type == 'ETag':
|
||||
etag = md5()
|
||||
etag.update('1' + '0' * (fsize - 1))
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
diskfile.write_metadata(writer.fd, metadata)
|
||||
if invalid_type == 'Content-Length':
|
||||
metadata['Content-Length'] = fsize - 1
|
||||
diskfile.write_metadata(writer.fd, metadata)
|
||||
|
||||
if mark_deleted:
|
||||
metadata = {
|
||||
'X-Timestamp': timestamp,
|
||||
'deleted': True
|
||||
}
|
||||
df.put_metadata(metadata, tombstone=True)
|
||||
|
||||
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
obj_name, FakeLogger(),
|
||||
keep_data_fp=True, disk_chunk_size=csize,
|
||||
iter_hook=iter_hook)
|
||||
if invalid_type == 'Zero-Byte':
|
||||
os.remove(df.data_file)
|
||||
fp = open(df.data_file, 'w')
|
||||
fp.close()
|
||||
df.unit_test_len = fsize
|
||||
return df
|
||||
|
||||
def test_quarantine_valids(self):
|
||||
df = self._get_disk_file(obj_name='1')
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
|
||||
df = self._get_disk_file(obj_name='2', csize=1)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
|
||||
df = self._get_disk_file(obj_name='3', csize=100000)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
|
||||
def run_quarantine_invalids(self, invalid_type):
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='1')
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type,
|
||||
obj_name='2', csize=1)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type,
|
||||
obj_name='3', csize=100000)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='4')
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='5')
|
||||
for chunk in df.app_iter_range(0, df.unit_test_len):
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='6')
|
||||
for chunk in df.app_iter_range(0, df.unit_test_len + 100):
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
expected_quar = False
|
||||
# for the following, Content-Length/Zero-Byte errors will always result
|
||||
# in a quarantine, even if the whole file isn't check-summed
|
||||
if invalid_type in ('Zero-Byte', 'Content-Length'):
|
||||
expected_quar = True
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='7')
|
||||
for chunk in df.app_iter_range(1, df.unit_test_len):
|
||||
pass
|
||||
self.assertEquals(bool(df.quarantined_dir), expected_quar)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='8')
|
||||
for chunk in df.app_iter_range(0, df.unit_test_len - 1):
|
||||
pass
|
||||
self.assertEquals(bool(df.quarantined_dir), expected_quar)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='8')
|
||||
for chunk in df.app_iter_range(1, df.unit_test_len + 1):
|
||||
pass
|
||||
self.assertEquals(bool(df.quarantined_dir), expected_quar)
|
||||
|
||||
def test_quarantine_invalids(self):
|
||||
self.run_quarantine_invalids('ETag')
|
||||
self.run_quarantine_invalids('Content-Length')
|
||||
self.run_quarantine_invalids('Zero-Byte')
|
||||
|
||||
def test_quarantine_deleted_files(self):
|
||||
df = self._get_disk_file(invalid_type='Content-Length')
|
||||
df.close()
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type='Content-Length',
|
||||
mark_deleted=True)
|
||||
df.close()
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type='Content-Length',
|
||||
mark_deleted=True)
|
||||
self.assertRaises(DiskFileNotExist, df.get_data_file_size)
|
||||
|
||||
def test_put_metadata(self):
|
||||
df = self._get_disk_file()
|
||||
ts = time()
|
||||
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
|
||||
df.put_metadata(metadata)
|
||||
exp_name = '%s.meta' % str(normalize_timestamp(ts))
|
||||
dl = os.listdir(df.datadir)
|
||||
self.assertEquals(len(dl), 2)
|
||||
self.assertTrue(exp_name in set(dl))
|
||||
|
||||
def test_put_metadata_ts(self):
|
||||
df = self._get_disk_file()
|
||||
ts = time()
|
||||
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
|
||||
df.put_metadata(metadata, tombstone=True)
|
||||
exp_name = '%s.ts' % str(normalize_timestamp(ts))
|
||||
dl = os.listdir(df.datadir)
|
||||
self.assertEquals(len(dl), 2)
|
||||
self.assertTrue(exp_name in set(dl))
|
||||
|
||||
def test_unlinkold(self):
|
||||
df1 = self._get_disk_file()
|
||||
future_time = str(normalize_timestamp(time() + 100))
|
||||
df2 = self._get_disk_file(ts=future_time)
|
||||
self.assertEquals(len(os.listdir(df1.datadir)), 2)
|
||||
df1.unlinkold(future_time)
|
||||
self.assertEquals(len(os.listdir(df1.datadir)), 1)
|
||||
self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time)
|
||||
|
||||
def test_close_error(self):
|
||||
|
||||
def err():
|
||||
raise Exception("bad")
|
||||
|
||||
df = self._get_disk_file(fsize=1024 * 1024 * 2)
|
||||
df._handle_close_quarantine = err
|
||||
for chunk in df:
|
||||
pass
|
||||
# close is called at the end of the iterator
|
||||
self.assertEquals(df.fp, None)
|
||||
self.assertEquals(len(df.logger.log_dict['error']), 1)
|
||||
|
||||
def test_quarantine_twice(self):
|
||||
df = self._get_disk_file(invalid_type='Content-Length')
|
||||
self.assert_(os.path.isfile(df.data_file))
|
||||
quar_dir = df.quarantine()
|
||||
self.assertFalse(os.path.isfile(df.data_file))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
self.assertEquals(df.quarantine(), None)
|
@ -14,14 +14,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Tests for swift.object_server """
|
||||
""" Tests for swift.obj.server """
|
||||
|
||||
import cPickle as pickle
|
||||
import operator
|
||||
import os
|
||||
import mock
|
||||
import unittest
|
||||
import email
|
||||
from shutil import rmtree
|
||||
from StringIO import StringIO
|
||||
from time import gmtime, strftime, time
|
||||
@ -30,361 +29,18 @@ from hashlib import md5
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout
|
||||
from test.unit import FakeLogger
|
||||
from test.unit import _setxattr as setxattr
|
||||
from test.unit import connect_tcp, readuntil2crlfs
|
||||
from swift.obj import server as object_server
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
NullLogger, storage_directory, public, \
|
||||
replication
|
||||
from swift.common.exceptions import DiskFileNotExist
|
||||
from swift.common import constraints
|
||||
from eventlet import tpool
|
||||
from swift.common.swob import Request, HeaderKeyDict
|
||||
|
||||
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
"""Test swift.obj.server.DiskFile"""
|
||||
|
||||
def setUp(self):
|
||||
""" Set up for testing swift.object_server.ObjectController """
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
|
||||
self._real_tpool_execute = tpool.execute
|
||||
def fake_exe(meth, *args, **kwargs):
|
||||
return meth(*args, **kwargs)
|
||||
tpool.execute = fake_exe
|
||||
|
||||
def tearDown(self):
|
||||
""" Tear down for testing swift.object_server.ObjectController """
|
||||
rmtree(os.path.dirname(self.testdir))
|
||||
tpool.execute = self._real_tpool_execute
|
||||
|
||||
def _create_test_file(self, data, keep_data_fp=True):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
f.write(data)
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
f.close()
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=keep_data_fp)
|
||||
return df
|
||||
|
||||
def test_disk_file_app_iter_corners(self):
|
||||
df = self._create_test_file('1234567890')
|
||||
self.assertEquals(''.join(df.app_iter_range(0, None)), '1234567890')
|
||||
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
self.assertEqual(''.join(df.app_iter_range(5, None)), '67890')
|
||||
|
||||
def test_disk_file_app_iter_ranges(self):
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
it = df.app_iter_ranges([(0, 10), (10, 20), (20, 30)], 'plain/text',
|
||||
'\r\n--someheader\r\n', 30)
|
||||
value = ''.join(it)
|
||||
self.assert_('0123456789' in value)
|
||||
self.assert_('1123456789' in value)
|
||||
self.assert_('2123456789' in value)
|
||||
|
||||
def test_disk_file_app_iter_ranges_edges(self):
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
it = df.app_iter_ranges([(3, 10), (0, 2)], 'application/whatever',
|
||||
'\r\n--someheader\r\n', 30)
|
||||
value = ''.join(it)
|
||||
self.assert_('3456789' in value)
|
||||
self.assert_('01' in value)
|
||||
|
||||
def test_disk_file_large_app_iter_ranges(self):
|
||||
"""
|
||||
This test case is to make sure that the disk file app_iter_ranges
|
||||
method all the paths being tested.
|
||||
"""
|
||||
long_str = '01234567890' * 65536
|
||||
target_strs = ['3456789', long_str[0:65590]]
|
||||
df = self._create_test_file(long_str)
|
||||
|
||||
it = df.app_iter_ranges([(3, 10), (0, 65590)], 'plain/text',
|
||||
'5e816ff8b8b8e9a5d355497e5d9e0301', 655360)
|
||||
|
||||
"""
|
||||
the produced string actually missing the MIME headers
|
||||
need to add these headers to make it as real MIME message.
|
||||
The body of the message is produced by method app_iter_ranges
|
||||
off of DiskFile object.
|
||||
"""
|
||||
header = ''.join(['Content-Type: multipart/byteranges;',
|
||||
'boundary=',
|
||||
'5e816ff8b8b8e9a5d355497e5d9e0301\r\n'])
|
||||
|
||||
value = header + ''.join(it)
|
||||
|
||||
parts = map(lambda p: p.get_payload(decode=True),
|
||||
email.message_from_string(value).walk())[1:3]
|
||||
self.assertEqual(parts, target_strs)
|
||||
|
||||
def test_disk_file_app_iter_ranges_empty(self):
|
||||
"""
|
||||
This test case tests when empty value passed into app_iter_ranges
|
||||
When ranges passed into the method is either empty array or None,
|
||||
this method will yield empty string
|
||||
"""
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
it = df.app_iter_ranges([], 'application/whatever',
|
||||
'\r\n--someheader\r\n', 100)
|
||||
self.assertEqual(''.join(it), '')
|
||||
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
it = df.app_iter_ranges(None, 'app/something',
|
||||
'\r\n--someheader\r\n', 150)
|
||||
self.assertEqual(''.join(it), '')
|
||||
|
||||
def test_disk_file_mkstemp_creates_dir(self):
|
||||
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
|
||||
os.rmdir(tmpdir)
|
||||
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
'o', FakeLogger()).writer() as writer:
|
||||
self.assert_(os.path.exists(tmpdir))
|
||||
|
||||
def test_iter_hook(self):
|
||||
hook_call_count = [0]
|
||||
def hook():
|
||||
hook_call_count[0] += 1
|
||||
|
||||
df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook)
|
||||
for _ in df:
|
||||
pass
|
||||
|
||||
self.assertEquals(hook_call_count[0], 9)
|
||||
|
||||
def test_quarantine(self):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
df.quarantine()
|
||||
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
|
||||
'objects', os.path.basename(os.path.dirname(
|
||||
df.data_file)))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
|
||||
def test_quarantine_same_file(self):
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger())
|
||||
new_dir = df.quarantine()
|
||||
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
|
||||
'objects', os.path.basename(os.path.dirname(
|
||||
df.data_file)))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
self.assertEquals(quar_dir, new_dir)
|
||||
# have to remake the datadir and file
|
||||
mkdirs(df.datadir)
|
||||
f = open(os.path.join(df.datadir,
|
||||
normalize_timestamp(time()) + '.data'), 'wb')
|
||||
setxattr(f.fileno(), object_server.METADATA_KEY,
|
||||
pickle.dumps({}, object_server.PICKLE_PROTOCOL))
|
||||
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
double_uuid_path = df.quarantine()
|
||||
self.assert_(os.path.isdir(double_uuid_path))
|
||||
self.assert_('-' in os.path.basename(double_uuid_path))
|
||||
|
||||
def _get_disk_file(self, invalid_type=None, obj_name='o',
|
||||
fsize=1024, csize=8, mark_deleted=False, ts=None,
|
||||
iter_hook=None):
|
||||
'''returns a DiskFile'''
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
obj_name, FakeLogger())
|
||||
data = '0' * fsize
|
||||
etag = md5()
|
||||
if ts:
|
||||
timestamp = ts
|
||||
else:
|
||||
timestamp = str(normalize_timestamp(time()))
|
||||
with df.writer() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(writer.fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
if invalid_type == 'ETag':
|
||||
etag = md5()
|
||||
etag.update('1' + '0' * (fsize - 1))
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
object_server.write_metadata(writer.fd, metadata)
|
||||
if invalid_type == 'Content-Length':
|
||||
metadata['Content-Length'] = fsize - 1
|
||||
object_server.write_metadata(writer.fd, metadata)
|
||||
|
||||
if mark_deleted:
|
||||
metadata = {
|
||||
'X-Timestamp': timestamp,
|
||||
'deleted': True
|
||||
}
|
||||
df.put_metadata(metadata, tombstone=True)
|
||||
|
||||
df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
|
||||
obj_name, FakeLogger(),
|
||||
keep_data_fp=True, disk_chunk_size=csize,
|
||||
iter_hook=iter_hook)
|
||||
if invalid_type == 'Zero-Byte':
|
||||
os.remove(df.data_file)
|
||||
fp = open(df.data_file, 'w')
|
||||
fp.close()
|
||||
df.unit_test_len = fsize
|
||||
return df
|
||||
|
||||
def test_quarantine_valids(self):
|
||||
df = self._get_disk_file(obj_name='1')
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
|
||||
df = self._get_disk_file(obj_name='2', csize=1)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
|
||||
df = self._get_disk_file(obj_name='3', csize=100000)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
|
||||
def run_quarantine_invalids(self, invalid_type):
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='1')
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type,
|
||||
obj_name='2', csize=1)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type,
|
||||
obj_name='3', csize=100000)
|
||||
for chunk in df:
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='4')
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='5')
|
||||
for chunk in df.app_iter_range(0, df.unit_test_len):
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='6')
|
||||
for chunk in df.app_iter_range(0, df.unit_test_len + 100):
|
||||
pass
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
expected_quar = False
|
||||
# for the following, Content-Length/Zero-Byte errors will always result
|
||||
# in a quarantine, even if the whole file isn't check-summed
|
||||
if invalid_type in ('Zero-Byte', 'Content-Length'):
|
||||
expected_quar = True
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='7')
|
||||
for chunk in df.app_iter_range(1, df.unit_test_len):
|
||||
pass
|
||||
self.assertEquals(bool(df.quarantined_dir), expected_quar)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='8')
|
||||
for chunk in df.app_iter_range(0, df.unit_test_len - 1):
|
||||
pass
|
||||
self.assertEquals(bool(df.quarantined_dir), expected_quar)
|
||||
df = self._get_disk_file(invalid_type=invalid_type, obj_name='8')
|
||||
for chunk in df.app_iter_range(1, df.unit_test_len + 1):
|
||||
pass
|
||||
self.assertEquals(bool(df.quarantined_dir), expected_quar)
|
||||
|
||||
def test_quarantine_invalids(self):
|
||||
self.run_quarantine_invalids('ETag')
|
||||
self.run_quarantine_invalids('Content-Length')
|
||||
self.run_quarantine_invalids('Zero-Byte')
|
||||
|
||||
def test_quarantine_deleted_files(self):
|
||||
df = self._get_disk_file(invalid_type='Content-Length')
|
||||
df.close()
|
||||
self.assertTrue(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type='Content-Length',
|
||||
mark_deleted=True)
|
||||
df.close()
|
||||
self.assertFalse(df.quarantined_dir)
|
||||
df = self._get_disk_file(invalid_type='Content-Length',
|
||||
mark_deleted=True)
|
||||
self.assertRaises(DiskFileNotExist, df.get_data_file_size)
|
||||
|
||||
def test_put_metadata(self):
|
||||
df = self._get_disk_file()
|
||||
ts = time()
|
||||
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
|
||||
df.put_metadata(metadata)
|
||||
exp_name = '%s.meta' % str(normalize_timestamp(ts))
|
||||
dl = os.listdir(df.datadir)
|
||||
self.assertEquals(len(dl), 2)
|
||||
self.assertTrue(exp_name in set(dl))
|
||||
|
||||
def test_put_metadata_ts(self):
|
||||
df = self._get_disk_file()
|
||||
ts = time()
|
||||
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
|
||||
df.put_metadata(metadata, tombstone=True)
|
||||
exp_name = '%s.ts' % str(normalize_timestamp(ts))
|
||||
dl = os.listdir(df.datadir)
|
||||
self.assertEquals(len(dl), 2)
|
||||
self.assertTrue(exp_name in set(dl))
|
||||
|
||||
def test_unlinkold(self):
|
||||
df1 = self._get_disk_file()
|
||||
future_time = str(normalize_timestamp(time() + 100))
|
||||
df2 = self._get_disk_file(ts=future_time)
|
||||
self.assertEquals(len(os.listdir(df1.datadir)), 2)
|
||||
df1.unlinkold(future_time)
|
||||
self.assertEquals(len(os.listdir(df1.datadir)), 1)
|
||||
self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time)
|
||||
|
||||
def test_close_error(self):
|
||||
|
||||
def err():
|
||||
raise Exception("bad")
|
||||
|
||||
df = self._get_disk_file(fsize=1024 * 1024 * 2)
|
||||
df._handle_close_quarantine = err
|
||||
for chunk in df:
|
||||
pass
|
||||
# close is called at the end of the iterator
|
||||
self.assertEquals(df.fp, None)
|
||||
self.assertEquals(len(df.logger.log_dict['error']), 1)
|
||||
|
||||
def test_quarantine_twice(self):
|
||||
df = self._get_disk_file(invalid_type='Content-Length')
|
||||
self.assert_(os.path.isfile(df.data_file))
|
||||
quar_dir = df.quarantine()
|
||||
self.assertFalse(os.path.isfile(df.data_file))
|
||||
self.assert_(os.path.isdir(quar_dir))
|
||||
self.assertEquals(df.quarantine(), None)
|
||||
|
||||
|
||||
class TestObjectController(unittest.TestCase):
|
||||
""" Test swift.obj.server.ObjectController """
|
||||
|
||||
@ -398,10 +54,13 @@ class TestObjectController(unittest.TestCase):
|
||||
conf = {'devices': self.testdir, 'mount_check': 'false'}
|
||||
self.object_controller = object_server.ObjectController(conf)
|
||||
self.object_controller.bytes_per_sync = 1
|
||||
self._orig_tpool_exc = tpool.execute
|
||||
tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs)
|
||||
|
||||
def tearDown(self):
|
||||
""" Tear down for testing swift.object_server.ObjectController """
|
||||
rmtree(os.path.dirname(self.testdir))
|
||||
tpool.execute = self._orig_tpool_exc
|
||||
|
||||
def test_REQUEST_SPECIAL_CHARS(self):
|
||||
obj = 'special昆%20/%'
|
||||
@ -660,15 +319,15 @@ class TestObjectController(unittest.TestCase):
|
||||
req.body = 'VERIFY'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
|
||||
file_name = os.path.basename(file.data_file)
|
||||
with open(file.data_file) as fp:
|
||||
metadata = object_server.read_metadata(fp)
|
||||
metadata = diskfile.read_metadata(fp)
|
||||
os.unlink(file.data_file)
|
||||
with open(file.data_file, 'w') as fp:
|
||||
object_server.write_metadata(fp, metadata)
|
||||
diskfile.write_metadata(fp, metadata)
|
||||
|
||||
self.assertEquals(os.listdir(file.datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
@ -742,7 +401,7 @@ class TestObjectController(unittest.TestCase):
|
||||
timestamp + '.data')
|
||||
self.assert_(os.path.isfile(objfile))
|
||||
self.assertEquals(open(objfile).read(), 'VERIFY')
|
||||
self.assertEquals(object_server.read_metadata(objfile),
|
||||
self.assertEquals(diskfile.read_metadata(objfile),
|
||||
{'X-Timestamp': timestamp,
|
||||
'Content-Length': '6',
|
||||
'ETag': '0b4c12d7e0a73840c1c4f148fda3b037',
|
||||
@ -772,7 +431,7 @@ class TestObjectController(unittest.TestCase):
|
||||
timestamp + '.data')
|
||||
self.assert_(os.path.isfile(objfile))
|
||||
self.assertEquals(open(objfile).read(), 'VERIFY TWO')
|
||||
self.assertEquals(object_server.read_metadata(objfile),
|
||||
self.assertEquals(diskfile.read_metadata(objfile),
|
||||
{'X-Timestamp': timestamp,
|
||||
'Content-Length': '10',
|
||||
'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',
|
||||
@ -814,7 +473,7 @@ class TestObjectController(unittest.TestCase):
|
||||
timestamp + '.data')
|
||||
self.assert_(os.path.isfile(objfile))
|
||||
self.assertEquals(open(objfile).read(), 'VERIFY THREE')
|
||||
self.assertEquals(object_server.read_metadata(objfile),
|
||||
self.assertEquals(diskfile.read_metadata(objfile),
|
||||
{'X-Timestamp': timestamp,
|
||||
'Content-Length': '12',
|
||||
'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',
|
||||
@ -964,15 +623,15 @@ class TestObjectController(unittest.TestCase):
|
||||
req.body = 'VERIFY'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
|
||||
file_name = os.path.basename(file.data_file)
|
||||
with open(file.data_file) as fp:
|
||||
metadata = object_server.read_metadata(fp)
|
||||
metadata = diskfile.read_metadata(fp)
|
||||
os.unlink(file.data_file)
|
||||
with open(file.data_file, 'w') as fp:
|
||||
object_server.write_metadata(fp, metadata)
|
||||
diskfile.write_metadata(fp, metadata)
|
||||
|
||||
self.assertEquals(os.listdir(file.datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
@ -1253,7 +912,7 @@ class TestObjectController(unittest.TestCase):
|
||||
req.body = 'VERIFY'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
file_name = os.path.basename(file.data_file)
|
||||
etag = md5()
|
||||
@ -1261,7 +920,7 @@ class TestObjectController(unittest.TestCase):
|
||||
etag = etag.hexdigest()
|
||||
metadata = {'X-Timestamp': timestamp,
|
||||
'Content-Length': 6, 'ETag': etag}
|
||||
object_server.write_metadata(file.fp, metadata)
|
||||
diskfile.write_metadata(file.fp, metadata)
|
||||
self.assertEquals(os.listdir(file.datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
resp = self.object_controller.GET(req)
|
||||
@ -1284,14 +943,14 @@ class TestObjectController(unittest.TestCase):
|
||||
req.body = 'VERIFY'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
file_name = os.path.basename(file.data_file)
|
||||
with open(file.data_file) as fp:
|
||||
metadata = object_server.read_metadata(fp)
|
||||
metadata = diskfile.read_metadata(fp)
|
||||
os.unlink(file.data_file)
|
||||
with open(file.data_file, 'w') as fp:
|
||||
object_server.write_metadata(fp, metadata)
|
||||
diskfile.write_metadata(fp, metadata)
|
||||
|
||||
self.assertEquals(os.listdir(file.datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
@ -1311,7 +970,7 @@ class TestObjectController(unittest.TestCase):
|
||||
req.body = 'VERIFY'
|
||||
resp = self.object_controller.PUT(req)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
|
||||
FakeLogger(), keep_data_fp=True)
|
||||
file_name = os.path.basename(file.data_file)
|
||||
etag = md5()
|
||||
@ -1319,7 +978,7 @@ class TestObjectController(unittest.TestCase):
|
||||
etag = etag.hexdigest()
|
||||
metadata = {'X-Timestamp': timestamp,
|
||||
'Content-Length': 6, 'ETag': etag}
|
||||
object_server.write_metadata(file.fp, metadata)
|
||||
diskfile.write_metadata(file.fp, metadata)
|
||||
self.assertEquals(os.listdir(file.datadir)[0], file_name)
|
||||
req = Request.blank('/sda1/p/a/c/o')
|
||||
req.range = 'bytes=0-4' # partial
|
||||
@ -1485,10 +1144,10 @@ class TestObjectController(unittest.TestCase):
|
||||
return False
|
||||
def my_storage_directory(*args):
|
||||
return self.testdir+'/collide'
|
||||
_storage_directory = object_server.storage_directory
|
||||
_storage_directory = diskfile.storage_directory
|
||||
_check = object_server.check_object_creation
|
||||
try:
|
||||
object_server.storage_directory = my_storage_directory
|
||||
diskfile.storage_directory = my_storage_directory
|
||||
object_server.check_object_creation = my_check
|
||||
inbuf = StringIO()
|
||||
errbuf = StringIO()
|
||||
@ -1537,7 +1196,7 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(outbuf.getvalue()[:4], '403 ')
|
||||
|
||||
finally:
|
||||
object_server.storage_directory = _storage_directory
|
||||
diskfile.storage_directory = _storage_directory
|
||||
object_server.check_object_creation = _check
|
||||
|
||||
def test_invalid_method_doesnt_exist(self):
|
||||
@ -1742,7 +1401,7 @@ class TestObjectController(unittest.TestCase):
|
||||
storage_directory(object_server.DATADIR, 'p', hash_path('a', 'c',
|
||||
'o')), timestamp + '.data')
|
||||
self.assert_(os.path.isfile(objfile))
|
||||
self.assertEquals(object_server.read_metadata(objfile),
|
||||
self.assertEquals(diskfile.read_metadata(objfile),
|
||||
{'X-Timestamp': timestamp,
|
||||
'Content-Length': '0', 'Content-Type': 'text/plain', 'name':
|
||||
'/a/c/o', 'X-Object-Manifest': 'c/o/', 'ETag':
|
||||
@ -2759,9 +2418,9 @@ class TestObjectController(unittest.TestCase):
|
||||
def fake_fallocate(fd, size):
|
||||
raise OSError(42, 'Unable to fallocate(%d)' % size)
|
||||
|
||||
orig_fallocate = object_server.fallocate
|
||||
orig_fallocate = diskfile.fallocate
|
||||
try:
|
||||
object_server.fallocate = fake_fallocate
|
||||
diskfile.fallocate = fake_fallocate
|
||||
timestamp = normalize_timestamp(time())
|
||||
body_reader = IgnoredBody()
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
@ -2775,7 +2434,7 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(resp.status_int, 507)
|
||||
self.assertFalse(body_reader.read_called)
|
||||
finally:
|
||||
object_server.fallocate = orig_fallocate
|
||||
diskfile.fallocate = orig_fallocate
|
||||
|
||||
def test_serv_reserv(self):
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user