diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 9dc76bad23..55ec296ea1 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -18,6 +18,7 @@ import time from eventlet import Timeout +from swift.obj import diskfile from swift.obj import server as object_server from swift.common.utils import get_logger, audit_location_generator, \ ratelimit_sleep, config_true_value, dump_recon_cache, list_from_csv, json @@ -159,13 +160,13 @@ class AuditorWorker(object): """ try: try: - name = object_server.read_metadata(path)['name'] + name = diskfile.read_metadata(path)['name'] except (Exception, Timeout) as exc: raise AuditException('Error when reading metadata: %s' % exc) _junk, account, container, obj = name.split('/', 3) - df = object_server.DiskFile(self.devices, device, partition, - account, container, obj, self.logger, - keep_data_fp=True) + df = diskfile.DiskFile(self.devices, device, partition, + account, container, obj, self.logger, + keep_data_fp=True) try: try: obj_size = df.get_data_file_size() @@ -199,7 +200,7 @@ class AuditorWorker(object): self.logger.error(_('ERROR Object %(obj)s failed audit and will ' 'be quarantined: %(err)s'), {'obj': path, 'err': err}) - object_server.quarantine_renamer( + diskfile.quarantine_renamer( os.path.join(self.devices, device), path) return except (Exception, Timeout): diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py new file mode 100644 index 0000000000..cdfb4c50c2 --- /dev/null +++ b/swift/obj/diskfile.py @@ -0,0 +1,454 @@ +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Disk File Interface for Swift Object Server""" + +from __future__ import with_statement +import cPickle as pickle +import errno +import os +import time +import traceback +from hashlib import md5 +from tempfile import mkstemp +from contextlib import contextmanager + +from xattr import getxattr, setxattr +from eventlet import Timeout + +from swift.common.utils import mkdirs, normalize_timestamp, \ + storage_directory, hash_path, renamer, fallocate, fsync, \ + fdatasync, drop_buffer_cache, ThreadPool +from swift.common.exceptions import DiskFileError, \ + DiskFileNotExist, DiskFileCollision, DiskFileNoSpace +from swift.obj.base import invalidate_hash, \ + quarantine_renamer +from swift.common.swob import multi_range_iterator + + +PICKLE_PROTOCOL = 2 +METADATA_KEY = 'user.swift.metadata' + + +def read_metadata(fd): + """ + Helper function to read the pickled metadata from an object file. + + :param fd: file descriptor to load the metadata from + + :returns: dictionary of metadata + """ + metadata = '' + key = 0 + try: + while True: + metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or ''))) + key += 1 + except IOError: + pass + return pickle.loads(metadata) + + +def write_metadata(fd, metadata): + """ + Helper function to write pickled metadata for an object file. + + :param fd: file descriptor to write the metadata + :param metadata: metadata to write + """ + metastr = pickle.dumps(metadata, PICKLE_PROTOCOL) + key = 0 + while metastr: + setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254]) + metastr = metastr[254:] + key += 1 + + +class DiskWriter(object): + """ + Encapsulation of the write context for servicing PUT REST API + requests. Serves as the context manager object for DiskFile's writer() + method. + """ + def __init__(self, disk_file, fd, tmppath, threadpool): + self.disk_file = disk_file + self.fd = fd + self.tmppath = tmppath + self.upload_size = 0 + self.last_sync = 0 + self.threadpool = threadpool + + def write(self, chunk): + """ + Write a chunk of data into the temporary file. + + :param chunk: the chunk of data to write as a string object + """ + + def _write_entire_chunk(chunk): + while chunk: + written = os.write(self.fd, chunk) + self.upload_size += written + chunk = chunk[written:] + + self.threadpool.run_in_thread(_write_entire_chunk, chunk) + + # For large files sync every 512MB (by default) written + diff = self.upload_size - self.last_sync + if diff >= self.disk_file.bytes_per_sync: + self.threadpool.force_run_in_thread(fdatasync, self.fd) + drop_buffer_cache(self.fd, self.last_sync, diff) + self.last_sync = self.upload_size + + def put(self, metadata, extension='.data'): + """ + Finalize writing the file on disk, and renames it from the temp file + to the real location. This should be called after the data has been + written to the temp file. + + :param metadata: dictionary of metadata to be written + :param extension: extension to be used when making the file + """ + assert self.tmppath is not None + timestamp = normalize_timestamp(metadata['X-Timestamp']) + metadata['name'] = self.disk_file.name + + def finalize_put(): + # Write the metadata before calling fsync() so that both data and + # metadata are flushed to disk. + write_metadata(self.fd, metadata) + # We call fsync() before calling drop_cache() to lower the amount + # of redundant work the drop cache code will perform on the pages + # (now that after fsync the pages will be all clean). + fsync(self.fd) + # From the Department of the Redundancy Department, make sure + # we call drop_cache() after fsync() to avoid redundant work + # (pages all clean). + drop_buffer_cache(self.fd, 0, self.upload_size) + invalidate_hash(os.path.dirname(self.disk_file.datadir)) + # After the rename completes, this object will be available for + # other requests to reference. + renamer(self.tmppath, + os.path.join(self.disk_file.datadir, + timestamp + extension)) + + self.threadpool.force_run_in_thread(finalize_put) + self.disk_file.metadata = metadata + + +class DiskFile(object): + """ + Manage object files on disk. + + :param path: path to devices on the node + :param device: device name + :param partition: partition on the device the object lives in + :param account: account name for the object + :param container: container name for the object + :param obj: object name for the object + :param keep_data_fp: if True, don't close the fp, otherwise close it + :param disk_chunk_size: size of chunks on file reads + :param bytes_per_sync: number of bytes between fdatasync calls + :param iter_hook: called when __iter__ returns a chunk + :param threadpool: thread pool in which to do blocking operations + + :raises DiskFileCollision: on md5 collision + """ + + def __init__(self, path, device, partition, account, container, obj, + logger, keep_data_fp=False, disk_chunk_size=65536, + bytes_per_sync=(512 * 1024 * 1024), iter_hook=None, + threadpool=None, obj_dir='objects', + disallowed_metadata_keys=None): + self.disk_chunk_size = disk_chunk_size + self.bytes_per_sync = bytes_per_sync + self.iter_hook = iter_hook + self.name = '/' + '/'.join((account, container, obj)) + name_hash = hash_path(account, container, obj) + self.datadir = os.path.join( + path, device, storage_directory(obj_dir, partition, name_hash)) + self.device_path = os.path.join(path, device) + self.tmpdir = os.path.join(path, device, 'tmp') + self.logger = logger + self.disallowed_metadata_keys = disallowed_metadata_keys + self.metadata = {} + self.meta_file = None + self.data_file = None + self.fp = None + self.iter_etag = None + self.started_at_0 = False + self.read_to_eof = False + self.quarantined_dir = None + self.keep_cache = False + self.suppress_file_closing = False + self.threadpool = threadpool or ThreadPool(nthreads=0) + if not os.path.exists(self.datadir): + return + files = sorted(os.listdir(self.datadir), reverse=True) + for afile in files: + if afile.endswith('.ts'): + self.data_file = self.meta_file = None + self.metadata = {'deleted': True} + return + if afile.endswith('.meta') and not self.meta_file: + self.meta_file = os.path.join(self.datadir, afile) + if afile.endswith('.data') and not self.data_file: + self.data_file = os.path.join(self.datadir, afile) + break + if not self.data_file: + return + self.fp = open(self.data_file, 'rb') + self.metadata = read_metadata(self.fp) + if not keep_data_fp: + self.close(verify_file=False) + if self.meta_file: + with open(self.meta_file) as mfp: + for key in self.metadata.keys(): + if key.lower() not in self.disallowed_metadata_keys: + del self.metadata[key] + self.metadata.update(read_metadata(mfp)) + if 'name' in self.metadata: + if self.metadata['name'] != self.name: + self.logger.error(_('Client path %(client)s does not match ' + 'path stored in object metadata %(meta)s'), + {'client': self.name, + 'meta': self.metadata['name']}) + raise DiskFileCollision('Client path does not match path ' + 'stored in object metadata') + + def __iter__(self): + """Returns an iterator over the data file.""" + try: + dropped_cache = 0 + read = 0 + self.started_at_0 = False + self.read_to_eof = False + if self.fp.tell() == 0: + self.started_at_0 = True + self.iter_etag = md5() + while True: + chunk = self.threadpool.run_in_thread( + self.fp.read, self.disk_chunk_size) + if chunk: + if self.iter_etag: + self.iter_etag.update(chunk) + read += len(chunk) + if read - dropped_cache > (1024 * 1024): + self._drop_cache(self.fp.fileno(), dropped_cache, + read - dropped_cache) + dropped_cache = read + yield chunk + if self.iter_hook: + self.iter_hook() + else: + self.read_to_eof = True + self._drop_cache(self.fp.fileno(), dropped_cache, + read - dropped_cache) + break + finally: + if not self.suppress_file_closing: + self.close() + + def app_iter_range(self, start, stop): + """Returns an iterator over the data file for range (start, stop)""" + if start or start == 0: + self.fp.seek(start) + if stop is not None: + length = stop - start + else: + length = None + for chunk in self: + if length is not None: + length -= len(chunk) + if length < 0: + # Chop off the extra: + yield chunk[:length] + break + yield chunk + + def app_iter_ranges(self, ranges, content_type, boundary, size): + """Returns an iterator over the data file for a set of ranges""" + if not ranges: + yield '' + else: + try: + self.suppress_file_closing = True + for chunk in multi_range_iterator( + ranges, content_type, boundary, size, + self.app_iter_range): + yield chunk + finally: + self.suppress_file_closing = False + self.close() + + def _handle_close_quarantine(self): + """Check if file needs to be quarantined""" + try: + self.get_data_file_size() + except DiskFileError: + self.quarantine() + return + except DiskFileNotExist: + return + + if self.iter_etag and self.started_at_0 and self.read_to_eof and \ + 'ETag' in self.metadata and \ + self.iter_etag.hexdigest() != self.metadata.get('ETag'): + self.quarantine() + + def close(self, verify_file=True): + """ + Close the file. Will handle quarantining file if necessary. + + :param verify_file: Defaults to True. If false, will not check + file to see if it needs quarantining. + """ + if self.fp: + try: + if verify_file: + self._handle_close_quarantine() + except (Exception, Timeout), e: + self.logger.error(_( + 'ERROR DiskFile %(data_file)s in ' + '%(data_dir)s close failure: %(exc)s : %(stack)'), + {'exc': e, 'stack': ''.join(traceback.format_stack()), + 'data_file': self.data_file, 'data_dir': self.datadir}) + finally: + self.fp.close() + self.fp = None + + def is_deleted(self): + """ + Check if the file is deleted. + + :returns: True if the file doesn't exist or has been flagged as + deleted. + """ + return not self.data_file or 'deleted' in self.metadata + + def is_expired(self): + """ + Check if the file is expired. + + :returns: True if the file has an X-Delete-At in the past + """ + return ('X-Delete-At' in self.metadata and + int(self.metadata['X-Delete-At']) <= time.time()) + + @contextmanager + def writer(self, size=None): + """ + Context manager to write a file. We create a temporary file first, and + then return a DiskWriter object to encapsulate the state. + + :param size: optional initial size of file to explicitly allocate on + disk + :raises DiskFileNoSpace: if a size is specified and allocation fails + """ + if not os.path.exists(self.tmpdir): + mkdirs(self.tmpdir) + fd, tmppath = mkstemp(dir=self.tmpdir) + try: + if size is not None and size > 0: + try: + fallocate(fd, size) + except OSError: + raise DiskFileNoSpace() + yield DiskWriter(self, fd, tmppath, self.threadpool) + finally: + try: + os.close(fd) + except OSError: + pass + try: + os.unlink(tmppath) + except OSError: + pass + + def put_metadata(self, metadata, tombstone=False): + """ + Short hand for putting metadata to .meta and .ts files. + + :param metadata: dictionary of metadata to be written + :param tombstone: whether or not we are writing a tombstone + """ + extension = '.ts' if tombstone else '.meta' + with self.writer() as writer: + writer.put(metadata, extension=extension) + + def unlinkold(self, timestamp): + """ + Remove any older versions of the object file. Any file that has an + older timestamp than timestamp will be deleted. + + :param timestamp: timestamp to compare with each file + """ + timestamp = normalize_timestamp(timestamp) + + def _unlinkold(): + for fname in os.listdir(self.datadir): + if fname < timestamp: + try: + os.unlink(os.path.join(self.datadir, fname)) + except OSError, err: # pragma: no cover + if err.errno != errno.ENOENT: + raise + self.threadpool.run_in_thread(_unlinkold) + + def _drop_cache(self, fd, offset, length): + """Method for no-oping buffer cache drop method.""" + if not self.keep_cache: + drop_buffer_cache(fd, offset, length) + + def quarantine(self): + """ + In the case that a file is corrupted, move it to a quarantined + area to allow replication to fix it. + + :returns: if quarantine is successful, path to quarantined + directory otherwise None + """ + if not (self.is_deleted() or self.quarantined_dir): + self.quarantined_dir = self.threadpool.run_in_thread( + quarantine_renamer, self.device_path, self.data_file) + self.logger.increment('quarantines') + return self.quarantined_dir + + def get_data_file_size(self): + """ + Returns the os.path.getsize for the file. Raises an exception if this + file does not match the Content-Length stored in the metadata. Or if + self.data_file does not exist. + + :returns: file size as an int + :raises DiskFileError: on file size mismatch. + :raises DiskFileNotExist: on file not existing (including deleted) + """ + try: + file_size = 0 + if self.data_file: + file_size = self.threadpool.run_in_thread( + os.path.getsize, self.data_file) + if 'Content-Length' in self.metadata: + metadata_size = int(self.metadata['Content-Length']) + if file_size != metadata_size: + raise DiskFileError( + 'Content-Length of %s does not match file size ' + 'of %s' % (metadata_size, file_size)) + return file_size + except OSError, err: + if err.errno != errno.ENOENT: + raise + raise DiskFileNotExist('Data File does not exist.') diff --git a/swift/obj/server.py b/swift/obj/server.py index 63fa5cc21a..65389d8f69 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -17,23 +17,18 @@ from __future__ import with_statement import cPickle as pickle -import errno import os import time import traceback from collections import defaultdict from datetime import datetime from hashlib import md5 -from tempfile import mkstemp from urllib import unquote -from contextlib import contextmanager -from xattr import getxattr, setxattr from eventlet import sleep, Timeout from swift.common.utils import mkdirs, normalize_timestamp, public, \ - storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \ - split_path, drop_buffer_cache, get_logger, write_pickle, \ + hash_path, split_path, get_logger, write_pickle, \ config_true_value, validate_device_partition, timing_stats, \ ThreadPool, replication from swift.common.bufferedhttp import http_connect @@ -41,436 +36,23 @@ from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ DiskFileNotExist, DiskFileCollision, DiskFileNoSpace -from swift.obj.base import invalidate_hash, \ - quarantine_renamer, get_hashes +from swift.obj.base import get_hashes from swift.common.http import is_success from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPInternalServerError, HTTPNoContent, HTTPNotFound, HTTPNotModified, \ HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \ HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, UTC, \ - HTTPInsufficientStorage, HTTPForbidden, multi_range_iterator, \ - HeaderKeyDict + HTTPInsufficientStorage, HTTPForbidden, HeaderKeyDict +from swift.obj.diskfile import DiskFile DATADIR = 'objects' ASYNCDIR = 'async_pending' -PICKLE_PROTOCOL = 2 -METADATA_KEY = 'user.swift.metadata' MAX_OBJECT_NAME_LENGTH = 1024 # keep these lower-case DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split()) -def read_metadata(fd): - """ - Helper function to read the pickled metadata from an object file. - - :param fd: file descriptor to load the metadata from - - :returns: dictionary of metadata - """ - metadata = '' - key = 0 - try: - while True: - metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or ''))) - key += 1 - except IOError: - pass - return pickle.loads(metadata) - - -def write_metadata(fd, metadata): - """ - Helper function to write pickled metadata for an object file. - - :param fd: file descriptor to write the metadata - :param metadata: metadata to write - """ - metastr = pickle.dumps(metadata, PICKLE_PROTOCOL) - key = 0 - while metastr: - setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254]) - metastr = metastr[254:] - key += 1 - - -class DiskWriter(object): - """ - Encapsulation of the write context for servicing PUT REST API - requests. Serves as the context manager object for DiskFile's writer() - method. - """ - def __init__(self, disk_file, fd, tmppath, threadpool): - self.disk_file = disk_file - self.fd = fd - self.tmppath = tmppath - self.upload_size = 0 - self.last_sync = 0 - self.threadpool = threadpool - - def write(self, chunk): - """ - Write a chunk of data into the temporary file. - - :param chunk: the chunk of data to write as a string object - """ - - def _write_entire_chunk(chunk): - while chunk: - written = os.write(self.fd, chunk) - self.upload_size += written - chunk = chunk[written:] - - self.threadpool.run_in_thread(_write_entire_chunk, chunk) - - # For large files sync every 512MB (by default) written - diff = self.upload_size - self.last_sync - if diff >= self.disk_file.bytes_per_sync: - self.threadpool.force_run_in_thread(fdatasync, self.fd) - drop_buffer_cache(self.fd, self.last_sync, diff) - self.last_sync = self.upload_size - - def put(self, metadata, extension='.data'): - """ - Finalize writing the file on disk, and renames it from the temp file - to the real location. This should be called after the data has been - written to the temp file. - - :param metadata: dictionary of metadata to be written - :param extension: extension to be used when making the file - """ - assert self.tmppath is not None - timestamp = normalize_timestamp(metadata['X-Timestamp']) - metadata['name'] = self.disk_file.name - - def finalize_put(): - # Write the metadata before calling fsync() so that both data and - # metadata are flushed to disk. - write_metadata(self.fd, metadata) - # We call fsync() before calling drop_cache() to lower the amount - # of redundant work the drop cache code will perform on the pages - # (now that after fsync the pages will be all clean). - fsync(self.fd) - # From the Department of the Redundancy Department, make sure - # we call drop_cache() after fsync() to avoid redundant work - # (pages all clean). - drop_buffer_cache(self.fd, 0, self.upload_size) - invalidate_hash(os.path.dirname(self.disk_file.datadir)) - # After the rename completes, this object will be available for - # other requests to reference. - renamer(self.tmppath, - os.path.join(self.disk_file.datadir, - timestamp + extension)) - - self.threadpool.force_run_in_thread(finalize_put) - self.disk_file.metadata = metadata - - -class DiskFile(object): - """ - Manage object files on disk. - - :param path: path to devices on the node - :param device: device name - :param partition: partition on the device the object lives in - :param account: account name for the object - :param container: container name for the object - :param obj: object name for the object - :param keep_data_fp: if True, don't close the fp, otherwise close it - :param disk_chunk_size: size of chunks on file reads - :param bytes_per_sync: number of bytes between fdatasync calls - :param iter_hook: called when __iter__ returns a chunk - :param threadpool: thread pool in which to do blocking operations - - :raises DiskFileCollision: on md5 collision - """ - - def __init__(self, path, device, partition, account, container, obj, - logger, keep_data_fp=False, disk_chunk_size=65536, - bytes_per_sync=(512 * 1024 * 1024), iter_hook=None, - threadpool=None): - self.disk_chunk_size = disk_chunk_size - self.bytes_per_sync = bytes_per_sync - self.iter_hook = iter_hook - self.name = '/' + '/'.join((account, container, obj)) - name_hash = hash_path(account, container, obj) - self.datadir = os.path.join( - path, device, storage_directory(DATADIR, partition, name_hash)) - self.device_path = os.path.join(path, device) - self.tmpdir = os.path.join(path, device, 'tmp') - self.logger = logger - self.metadata = {} - self.meta_file = None - self.data_file = None - self.fp = None - self.iter_etag = None - self.started_at_0 = False - self.read_to_eof = False - self.quarantined_dir = None - self.keep_cache = False - self.suppress_file_closing = False - self.threadpool = threadpool or ThreadPool(nthreads=0) - if not os.path.exists(self.datadir): - return - files = sorted(os.listdir(self.datadir), reverse=True) - for afile in files: - if afile.endswith('.ts'): - self.data_file = self.meta_file = None - self.metadata = {'deleted': True} - return - if afile.endswith('.meta') and not self.meta_file: - self.meta_file = os.path.join(self.datadir, afile) - if afile.endswith('.data') and not self.data_file: - self.data_file = os.path.join(self.datadir, afile) - break - if not self.data_file: - return - self.fp = open(self.data_file, 'rb') - self.metadata = read_metadata(self.fp) - if not keep_data_fp: - self.close(verify_file=False) - if self.meta_file: - with open(self.meta_file) as mfp: - for key in self.metadata.keys(): - if key.lower() not in DISALLOWED_HEADERS: - del self.metadata[key] - self.metadata.update(read_metadata(mfp)) - if 'name' in self.metadata: - if self.metadata['name'] != self.name: - self.logger.error(_('Client path %(client)s does not match ' - 'path stored in object metadata %(meta)s'), - {'client': self.name, - 'meta': self.metadata['name']}) - raise DiskFileCollision('Client path does not match path ' - 'stored in object metadata') - - def __iter__(self): - """Returns an iterator over the data file.""" - try: - dropped_cache = 0 - read = 0 - self.started_at_0 = False - self.read_to_eof = False - if self.fp.tell() == 0: - self.started_at_0 = True - self.iter_etag = md5() - while True: - chunk = self.threadpool.run_in_thread( - self.fp.read, self.disk_chunk_size) - if chunk: - if self.iter_etag: - self.iter_etag.update(chunk) - read += len(chunk) - if read - dropped_cache > (1024 * 1024): - self._drop_cache(self.fp.fileno(), dropped_cache, - read - dropped_cache) - dropped_cache = read - yield chunk - if self.iter_hook: - self.iter_hook() - else: - self.read_to_eof = True - self._drop_cache(self.fp.fileno(), dropped_cache, - read - dropped_cache) - break - finally: - if not self.suppress_file_closing: - self.close() - - def app_iter_range(self, start, stop): - """Returns an iterator over the data file for range (start, stop)""" - if start or start == 0: - self.fp.seek(start) - if stop is not None: - length = stop - start - else: - length = None - for chunk in self: - if length is not None: - length -= len(chunk) - if length < 0: - # Chop off the extra: - yield chunk[:length] - break - yield chunk - - def app_iter_ranges(self, ranges, content_type, boundary, size): - """Returns an iterator over the data file for a set of ranges""" - if not ranges: - yield '' - else: - try: - self.suppress_file_closing = True - for chunk in multi_range_iterator( - ranges, content_type, boundary, size, - self.app_iter_range): - yield chunk - finally: - self.suppress_file_closing = False - self.close() - - def _handle_close_quarantine(self): - """Check if file needs to be quarantined""" - try: - self.get_data_file_size() - except DiskFileError: - self.quarantine() - return - except DiskFileNotExist: - return - - if self.iter_etag and self.started_at_0 and self.read_to_eof and \ - 'ETag' in self.metadata and \ - self.iter_etag.hexdigest() != self.metadata.get('ETag'): - self.quarantine() - - def close(self, verify_file=True): - """ - Close the file. Will handle quarantining file if necessary. - - :param verify_file: Defaults to True. If false, will not check - file to see if it needs quarantining. - """ - if self.fp: - try: - if verify_file: - self._handle_close_quarantine() - except (Exception, Timeout), e: - self.logger.error(_( - 'ERROR DiskFile %(data_file)s in ' - '%(data_dir)s close failure: %(exc)s : %(stack)'), - {'exc': e, 'stack': ''.join(traceback.format_stack()), - 'data_file': self.data_file, 'data_dir': self.datadir}) - finally: - self.fp.close() - self.fp = None - - def is_deleted(self): - """ - Check if the file is deleted. - - :returns: True if the file doesn't exist or has been flagged as - deleted. - """ - return not self.data_file or 'deleted' in self.metadata - - def is_expired(self): - """ - Check if the file is expired. - - :returns: True if the file has an X-Delete-At in the past - """ - return ('X-Delete-At' in self.metadata and - int(self.metadata['X-Delete-At']) <= time.time()) - - @contextmanager - def writer(self, size=None): - """ - Context manager to write a file. We create a temporary file first, and - then return a DiskWriter object to encapsulate the state. - - :param size: optional initial size of file to explicitly allocate on - disk - :raises DiskFileNoSpace: if a size is specified and allocation fails - """ - if not os.path.exists(self.tmpdir): - mkdirs(self.tmpdir) - fd, tmppath = mkstemp(dir=self.tmpdir) - try: - if size is not None and size > 0: - try: - fallocate(fd, size) - except OSError: - raise DiskFileNoSpace() - yield DiskWriter(self, fd, tmppath, self.threadpool) - finally: - try: - os.close(fd) - except OSError: - pass - try: - os.unlink(tmppath) - except OSError: - pass - - def put_metadata(self, metadata, tombstone=False): - """ - Short hand for putting metadata to .meta and .ts files. - - :param metadata: dictionary of metadata to be written - :param tombstone: whether or not we are writing a tombstone - """ - extension = '.ts' if tombstone else '.meta' - with self.writer() as writer: - writer.put(metadata, extension=extension) - - def unlinkold(self, timestamp): - """ - Remove any older versions of the object file. Any file that has an - older timestamp than timestamp will be deleted. - - :param timestamp: timestamp to compare with each file - """ - timestamp = normalize_timestamp(timestamp) - - def _unlinkold(): - for fname in os.listdir(self.datadir): - if fname < timestamp: - try: - os.unlink(os.path.join(self.datadir, fname)) - except OSError, err: # pragma: no cover - if err.errno != errno.ENOENT: - raise - self.threadpool.run_in_thread(_unlinkold) - - def _drop_cache(self, fd, offset, length): - """Method for no-oping buffer cache drop method.""" - if not self.keep_cache: - drop_buffer_cache(fd, offset, length) - - def quarantine(self): - """ - In the case that a file is corrupted, move it to a quarantined - area to allow replication to fix it. - - :returns: if quarantine is successful, path to quarantined - directory otherwise None - """ - if not (self.is_deleted() or self.quarantined_dir): - self.quarantined_dir = self.threadpool.run_in_thread( - quarantine_renamer, self.device_path, self.data_file) - self.logger.increment('quarantines') - return self.quarantined_dir - - def get_data_file_size(self): - """ - Returns the os.path.getsize for the file. Raises an exception if this - file does not match the Content-Length stored in the metadata. Or if - self.data_file does not exist. - - :returns: file size as an int - :raises DiskFileError: on file size mismatch. - :raises DiskFileNotExist: on file not existing (including deleted) - """ - try: - file_size = 0 - if self.data_file: - file_size = self.threadpool.run_in_thread( - os.path.getsize, self.data_file) - if 'Content-Length' in self.metadata: - metadata_size = int(self.metadata['Content-Length']) - if file_size != metadata_size: - raise DiskFileError( - 'Content-Length of %s does not match file size ' - 'of %s' % (metadata_size, file_size)) - return file_size - except OSError, err: - if err.errno != errno.ENOENT: - raise - raise DiskFileNotExist('Data File does not exist.') - - class ObjectController(object): """Implements the WSGI application for the Swift Object Server.""" @@ -525,6 +107,8 @@ class ObjectController(object): kwargs.setdefault('disk_chunk_size', self.disk_chunk_size) kwargs.setdefault('logger', self.logger) kwargs.setdefault('threadpool', self.threadpools[device]) + kwargs.setdefault('obj_dir', DATADIR) + kwargs.setdefault('disallowed_metadata_keys', DISALLOWED_HEADERS) return DiskFile(self.devices, device, partition, account, container, obj, **kwargs) diff --git a/test/probe/test_object_failures.py b/test/probe/test_object_failures.py index 1ca2d04c12..7b5e7ed175 100755 --- a/test/probe/test_object_failures.py +++ b/test/probe/test_object_failures.py @@ -23,7 +23,7 @@ from swiftclient import client from swift.common import direct_client from swift.common.utils import hash_path, readconf -from swift.obj.server import write_metadata, read_metadata +from swift.obj.diskfile import write_metadata, read_metadata from test.probe.common import kill_servers, reset_environment diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 762c2ded51..f5534bf3b9 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -23,8 +23,8 @@ from hashlib import md5 from tempfile import mkdtemp from test.unit import FakeLogger from swift.obj import auditor -from swift.obj import server as object_server -from swift.obj.server import DiskFile, write_metadata, DATADIR +from swift.obj.diskfile import DiskFile, write_metadata +from swift.obj.server import DATADIR from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory from swift.obj.base import invalidate_hash @@ -158,7 +158,7 @@ class TestAuditor(unittest.TestCase): 'Content-Length': str(os.fstat(writer.fd).st_size), } writer.put(metadata) - with mock.patch('swift.obj.server.DiskFile', + with mock.patch('swift.obj.diskfile.DiskFile', lambda *_: 1 / 0): self.auditor.audit_all_objects() self.assertEquals(self.auditor.errors, pre_errors + 1) @@ -338,16 +338,16 @@ class TestAuditor(unittest.TestCase): rat[0] = True DiskFile.close(self, verify_file=verify_file) self.setup_bad_zero_byte() - was_df = object_server.DiskFile + was_df = auditor.diskfile.DiskFile try: - object_server.DiskFile = FakeFile + auditor.diskfile.DiskFile = FakeFile self.auditor.run_once(zero_byte_fps=50) quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') self.assertTrue(os.path.isdir(quarantine_path)) self.assertTrue(rat[0]) finally: - object_server.DiskFile = was_df + auditor.diskfile.DiskFile = was_df def test_run_forever(self): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py new file mode 100644 index 0000000000..2ad353adc4 --- /dev/null +++ b/test/unit/obj/test_diskfile.py @@ -0,0 +1,382 @@ +#-*- coding:utf-8 -*- +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Tests for swift.obj.diskfile """ + +import cPickle as pickle +import operator +import os +import mock +import unittest +import email +from shutil import rmtree +from StringIO import StringIO +from time import gmtime, strftime, time +from tempfile import mkdtemp +from hashlib import md5 + +from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool +from test.unit import FakeLogger +from test.unit import _setxattr as setxattr +from test.unit import connect_tcp, readuntil2crlfs +from swift.obj import diskfile +from swift.common import utils +from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ + NullLogger, storage_directory, public, \ + replication +from swift.common.exceptions import DiskFileNotExist +from swift.common import constraints +from eventlet import tpool +from swift.common.swob import Request, HeaderKeyDict + + +class TestDiskFile(unittest.TestCase): + """Test swift.obj.diskfile.DiskFile""" + + def setUp(self): + """ Set up for testing swift.obj.diskfile""" + self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile') + mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) + self._orig_tpool_exc = tpool.execute + tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs) + + def tearDown(self): + """ Tear down for testing swift.obj.diskfile""" + rmtree(os.path.dirname(self.testdir)) + tpool.execute = self._orig_tpool_exc + + def _create_test_file(self, data, keep_data_fp=True): + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + f.write(data) + setxattr(f.fileno(), diskfile.METADATA_KEY, + pickle.dumps({}, diskfile.PICKLE_PROTOCOL)) + f.close() + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=keep_data_fp) + return df + + def test_disk_file_app_iter_corners(self): + df = self._create_test_file('1234567890') + self.assertEquals(''.join(df.app_iter_range(0, None)), '1234567890') + + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + self.assertEqual(''.join(df.app_iter_range(5, None)), '67890') + + def test_disk_file_app_iter_ranges(self): + df = self._create_test_file('012345678911234567892123456789') + it = df.app_iter_ranges([(0, 10), (10, 20), (20, 30)], 'plain/text', + '\r\n--someheader\r\n', 30) + value = ''.join(it) + self.assert_('0123456789' in value) + self.assert_('1123456789' in value) + self.assert_('2123456789' in value) + + def test_disk_file_app_iter_ranges_edges(self): + df = self._create_test_file('012345678911234567892123456789') + it = df.app_iter_ranges([(3, 10), (0, 2)], 'application/whatever', + '\r\n--someheader\r\n', 30) + value = ''.join(it) + self.assert_('3456789' in value) + self.assert_('01' in value) + + def test_disk_file_large_app_iter_ranges(self): + """ + This test case is to make sure that the disk file app_iter_ranges + method all the paths being tested. + """ + long_str = '01234567890' * 65536 + target_strs = ['3456789', long_str[0:65590]] + df = self._create_test_file(long_str) + + it = df.app_iter_ranges([(3, 10), (0, 65590)], 'plain/text', + '5e816ff8b8b8e9a5d355497e5d9e0301', 655360) + + """ + the produced string actually missing the MIME headers + need to add these headers to make it as real MIME message. + The body of the message is produced by method app_iter_ranges + off of DiskFile object. + """ + header = ''.join(['Content-Type: multipart/byteranges;', + 'boundary=', + '5e816ff8b8b8e9a5d355497e5d9e0301\r\n']) + + value = header + ''.join(it) + + parts = map(lambda p: p.get_payload(decode=True), + email.message_from_string(value).walk())[1:3] + self.assertEqual(parts, target_strs) + + def test_disk_file_app_iter_ranges_empty(self): + """ + This test case tests when empty value passed into app_iter_ranges + When ranges passed into the method is either empty array or None, + this method will yield empty string + """ + df = self._create_test_file('012345678911234567892123456789') + it = df.app_iter_ranges([], 'application/whatever', + '\r\n--someheader\r\n', 100) + self.assertEqual(''.join(it), '') + + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + it = df.app_iter_ranges(None, 'app/something', + '\r\n--someheader\r\n', 150) + self.assertEqual(''.join(it), '') + + def test_disk_file_mkstemp_creates_dir(self): + tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') + os.rmdir(tmpdir) + with diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + 'o', FakeLogger()).writer() as writer: + self.assert_(os.path.exists(tmpdir)) + + def test_iter_hook(self): + hook_call_count = [0] + def hook(): + hook_call_count[0] += 1 + + df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook) + for _ in df: + pass + + self.assertEquals(hook_call_count[0], 9) + + def test_quarantine(self): + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + setxattr(f.fileno(), diskfile.METADATA_KEY, + pickle.dumps({}, diskfile.PICKLE_PROTOCOL)) + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + df.quarantine() + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', + 'objects', os.path.basename(os.path.dirname( + df.data_file))) + self.assert_(os.path.isdir(quar_dir)) + + def test_quarantine_same_file(self): + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + setxattr(f.fileno(), diskfile.METADATA_KEY, + pickle.dumps({}, diskfile.PICKLE_PROTOCOL)) + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger()) + new_dir = df.quarantine() + quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', + 'objects', os.path.basename(os.path.dirname( + df.data_file))) + self.assert_(os.path.isdir(quar_dir)) + self.assertEquals(quar_dir, new_dir) + # have to remake the datadir and file + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time()) + '.data'), 'wb') + setxattr(f.fileno(), diskfile.METADATA_KEY, + pickle.dumps({}, diskfile.PICKLE_PROTOCOL)) + + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', + FakeLogger(), keep_data_fp=True) + double_uuid_path = df.quarantine() + self.assert_(os.path.isdir(double_uuid_path)) + self.assert_('-' in os.path.basename(double_uuid_path)) + + def _get_disk_file(self, invalid_type=None, obj_name='o', + fsize=1024, csize=8, mark_deleted=False, ts=None, + iter_hook=None): + '''returns a DiskFile''' + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + obj_name, FakeLogger()) + data = '0' * fsize + etag = md5() + if ts: + timestamp = ts + else: + timestamp = str(normalize_timestamp(time())) + with df.writer() as writer: + writer.write(data) + etag.update(data) + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': timestamp, + 'Content-Length': str(os.fstat(writer.fd).st_size), + } + writer.put(metadata) + if invalid_type == 'ETag': + etag = md5() + etag.update('1' + '0' * (fsize - 1)) + etag = etag.hexdigest() + metadata['ETag'] = etag + diskfile.write_metadata(writer.fd, metadata) + if invalid_type == 'Content-Length': + metadata['Content-Length'] = fsize - 1 + diskfile.write_metadata(writer.fd, metadata) + + if mark_deleted: + metadata = { + 'X-Timestamp': timestamp, + 'deleted': True + } + df.put_metadata(metadata, tombstone=True) + + df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + obj_name, FakeLogger(), + keep_data_fp=True, disk_chunk_size=csize, + iter_hook=iter_hook) + if invalid_type == 'Zero-Byte': + os.remove(df.data_file) + fp = open(df.data_file, 'w') + fp.close() + df.unit_test_len = fsize + return df + + def test_quarantine_valids(self): + df = self._get_disk_file(obj_name='1') + for chunk in df: + pass + self.assertFalse(df.quarantined_dir) + + df = self._get_disk_file(obj_name='2', csize=1) + for chunk in df: + pass + self.assertFalse(df.quarantined_dir) + + df = self._get_disk_file(obj_name='3', csize=100000) + for chunk in df: + pass + self.assertFalse(df.quarantined_dir) + + def run_quarantine_invalids(self, invalid_type): + df = self._get_disk_file(invalid_type=invalid_type, obj_name='1') + for chunk in df: + pass + self.assertTrue(df.quarantined_dir) + df = self._get_disk_file(invalid_type=invalid_type, + obj_name='2', csize=1) + for chunk in df: + pass + self.assertTrue(df.quarantined_dir) + df = self._get_disk_file(invalid_type=invalid_type, + obj_name='3', csize=100000) + for chunk in df: + pass + self.assertTrue(df.quarantined_dir) + df = self._get_disk_file(invalid_type=invalid_type, obj_name='4') + self.assertFalse(df.quarantined_dir) + df = self._get_disk_file(invalid_type=invalid_type, obj_name='5') + for chunk in df.app_iter_range(0, df.unit_test_len): + pass + self.assertTrue(df.quarantined_dir) + df = self._get_disk_file(invalid_type=invalid_type, obj_name='6') + for chunk in df.app_iter_range(0, df.unit_test_len + 100): + pass + self.assertTrue(df.quarantined_dir) + expected_quar = False + # for the following, Content-Length/Zero-Byte errors will always result + # in a quarantine, even if the whole file isn't check-summed + if invalid_type in ('Zero-Byte', 'Content-Length'): + expected_quar = True + df = self._get_disk_file(invalid_type=invalid_type, obj_name='7') + for chunk in df.app_iter_range(1, df.unit_test_len): + pass + self.assertEquals(bool(df.quarantined_dir), expected_quar) + df = self._get_disk_file(invalid_type=invalid_type, obj_name='8') + for chunk in df.app_iter_range(0, df.unit_test_len - 1): + pass + self.assertEquals(bool(df.quarantined_dir), expected_quar) + df = self._get_disk_file(invalid_type=invalid_type, obj_name='8') + for chunk in df.app_iter_range(1, df.unit_test_len + 1): + pass + self.assertEquals(bool(df.quarantined_dir), expected_quar) + + def test_quarantine_invalids(self): + self.run_quarantine_invalids('ETag') + self.run_quarantine_invalids('Content-Length') + self.run_quarantine_invalids('Zero-Byte') + + def test_quarantine_deleted_files(self): + df = self._get_disk_file(invalid_type='Content-Length') + df.close() + self.assertTrue(df.quarantined_dir) + df = self._get_disk_file(invalid_type='Content-Length', + mark_deleted=True) + df.close() + self.assertFalse(df.quarantined_dir) + df = self._get_disk_file(invalid_type='Content-Length', + mark_deleted=True) + self.assertRaises(DiskFileNotExist, df.get_data_file_size) + + def test_put_metadata(self): + df = self._get_disk_file() + ts = time() + metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } + df.put_metadata(metadata) + exp_name = '%s.meta' % str(normalize_timestamp(ts)) + dl = os.listdir(df.datadir) + self.assertEquals(len(dl), 2) + self.assertTrue(exp_name in set(dl)) + + def test_put_metadata_ts(self): + df = self._get_disk_file() + ts = time() + metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } + df.put_metadata(metadata, tombstone=True) + exp_name = '%s.ts' % str(normalize_timestamp(ts)) + dl = os.listdir(df.datadir) + self.assertEquals(len(dl), 2) + self.assertTrue(exp_name in set(dl)) + + def test_unlinkold(self): + df1 = self._get_disk_file() + future_time = str(normalize_timestamp(time() + 100)) + df2 = self._get_disk_file(ts=future_time) + self.assertEquals(len(os.listdir(df1.datadir)), 2) + df1.unlinkold(future_time) + self.assertEquals(len(os.listdir(df1.datadir)), 1) + self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time) + + def test_close_error(self): + + def err(): + raise Exception("bad") + + df = self._get_disk_file(fsize=1024 * 1024 * 2) + df._handle_close_quarantine = err + for chunk in df: + pass + # close is called at the end of the iterator + self.assertEquals(df.fp, None) + self.assertEquals(len(df.logger.log_dict['error']), 1) + + def test_quarantine_twice(self): + df = self._get_disk_file(invalid_type='Content-Length') + self.assert_(os.path.isfile(df.data_file)) + quar_dir = df.quarantine() + self.assertFalse(os.path.isfile(df.data_file)) + self.assert_(os.path.isdir(quar_dir)) + self.assertEquals(df.quarantine(), None) diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 425c9df428..77985de46b 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -14,14 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" Tests for swift.object_server """ +""" Tests for swift.obj.server """ import cPickle as pickle import operator import os import mock import unittest -import email from shutil import rmtree from StringIO import StringIO from time import gmtime, strftime, time @@ -30,361 +29,18 @@ from hashlib import md5 from eventlet import sleep, spawn, wsgi, listen, Timeout from test.unit import FakeLogger -from test.unit import _setxattr as setxattr from test.unit import connect_tcp, readuntil2crlfs from swift.obj import server as object_server +from swift.obj import diskfile from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ NullLogger, storage_directory, public, \ replication -from swift.common.exceptions import DiskFileNotExist from swift.common import constraints from eventlet import tpool from swift.common.swob import Request, HeaderKeyDict -class TestDiskFile(unittest.TestCase): - """Test swift.obj.server.DiskFile""" - - def setUp(self): - """ Set up for testing swift.object_server.ObjectController """ - self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile') - mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) - - self._real_tpool_execute = tpool.execute - def fake_exe(meth, *args, **kwargs): - return meth(*args, **kwargs) - tpool.execute = fake_exe - - def tearDown(self): - """ Tear down for testing swift.object_server.ObjectController """ - rmtree(os.path.dirname(self.testdir)) - tpool.execute = self._real_tpool_execute - - def _create_test_file(self, data, keep_data_fp=True): - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger()) - mkdirs(df.datadir) - f = open(os.path.join(df.datadir, - normalize_timestamp(time()) + '.data'), 'wb') - f.write(data) - setxattr(f.fileno(), object_server.METADATA_KEY, - pickle.dumps({}, object_server.PICKLE_PROTOCOL)) - f.close() - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger(), keep_data_fp=keep_data_fp) - return df - - def test_disk_file_app_iter_corners(self): - df = self._create_test_file('1234567890') - self.assertEquals(''.join(df.app_iter_range(0, None)), '1234567890') - - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger(), keep_data_fp=True) - self.assertEqual(''.join(df.app_iter_range(5, None)), '67890') - - def test_disk_file_app_iter_ranges(self): - df = self._create_test_file('012345678911234567892123456789') - it = df.app_iter_ranges([(0, 10), (10, 20), (20, 30)], 'plain/text', - '\r\n--someheader\r\n', 30) - value = ''.join(it) - self.assert_('0123456789' in value) - self.assert_('1123456789' in value) - self.assert_('2123456789' in value) - - def test_disk_file_app_iter_ranges_edges(self): - df = self._create_test_file('012345678911234567892123456789') - it = df.app_iter_ranges([(3, 10), (0, 2)], 'application/whatever', - '\r\n--someheader\r\n', 30) - value = ''.join(it) - self.assert_('3456789' in value) - self.assert_('01' in value) - - def test_disk_file_large_app_iter_ranges(self): - """ - This test case is to make sure that the disk file app_iter_ranges - method all the paths being tested. - """ - long_str = '01234567890' * 65536 - target_strs = ['3456789', long_str[0:65590]] - df = self._create_test_file(long_str) - - it = df.app_iter_ranges([(3, 10), (0, 65590)], 'plain/text', - '5e816ff8b8b8e9a5d355497e5d9e0301', 655360) - - """ - the produced string actually missing the MIME headers - need to add these headers to make it as real MIME message. - The body of the message is produced by method app_iter_ranges - off of DiskFile object. - """ - header = ''.join(['Content-Type: multipart/byteranges;', - 'boundary=', - '5e816ff8b8b8e9a5d355497e5d9e0301\r\n']) - - value = header + ''.join(it) - - parts = map(lambda p: p.get_payload(decode=True), - email.message_from_string(value).walk())[1:3] - self.assertEqual(parts, target_strs) - - def test_disk_file_app_iter_ranges_empty(self): - """ - This test case tests when empty value passed into app_iter_ranges - When ranges passed into the method is either empty array or None, - this method will yield empty string - """ - df = self._create_test_file('012345678911234567892123456789') - it = df.app_iter_ranges([], 'application/whatever', - '\r\n--someheader\r\n', 100) - self.assertEqual(''.join(it), '') - - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger(), keep_data_fp=True) - it = df.app_iter_ranges(None, 'app/something', - '\r\n--someheader\r\n', 150) - self.assertEqual(''.join(it), '') - - def test_disk_file_mkstemp_creates_dir(self): - tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') - os.rmdir(tmpdir) - with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', - 'o', FakeLogger()).writer() as writer: - self.assert_(os.path.exists(tmpdir)) - - def test_iter_hook(self): - hook_call_count = [0] - def hook(): - hook_call_count[0] += 1 - - df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook) - for _ in df: - pass - - self.assertEquals(hook_call_count[0], 9) - - def test_quarantine(self): - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger()) - mkdirs(df.datadir) - f = open(os.path.join(df.datadir, - normalize_timestamp(time()) + '.data'), 'wb') - setxattr(f.fileno(), object_server.METADATA_KEY, - pickle.dumps({}, object_server.PICKLE_PROTOCOL)) - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger()) - df.quarantine() - quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', - 'objects', os.path.basename(os.path.dirname( - df.data_file))) - self.assert_(os.path.isdir(quar_dir)) - - def test_quarantine_same_file(self): - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger()) - mkdirs(df.datadir) - f = open(os.path.join(df.datadir, - normalize_timestamp(time()) + '.data'), 'wb') - setxattr(f.fileno(), object_server.METADATA_KEY, - pickle.dumps({}, object_server.PICKLE_PROTOCOL)) - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger()) - new_dir = df.quarantine() - quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', - 'objects', os.path.basename(os.path.dirname( - df.data_file))) - self.assert_(os.path.isdir(quar_dir)) - self.assertEquals(quar_dir, new_dir) - # have to remake the datadir and file - mkdirs(df.datadir) - f = open(os.path.join(df.datadir, - normalize_timestamp(time()) + '.data'), 'wb') - setxattr(f.fileno(), object_server.METADATA_KEY, - pickle.dumps({}, object_server.PICKLE_PROTOCOL)) - - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', - FakeLogger(), keep_data_fp=True) - double_uuid_path = df.quarantine() - self.assert_(os.path.isdir(double_uuid_path)) - self.assert_('-' in os.path.basename(double_uuid_path)) - - def _get_disk_file(self, invalid_type=None, obj_name='o', - fsize=1024, csize=8, mark_deleted=False, ts=None, - iter_hook=None): - '''returns a DiskFile''' - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', - obj_name, FakeLogger()) - data = '0' * fsize - etag = md5() - if ts: - timestamp = ts - else: - timestamp = str(normalize_timestamp(time())) - with df.writer() as writer: - writer.write(data) - etag.update(data) - etag = etag.hexdigest() - metadata = { - 'ETag': etag, - 'X-Timestamp': timestamp, - 'Content-Length': str(os.fstat(writer.fd).st_size), - } - writer.put(metadata) - if invalid_type == 'ETag': - etag = md5() - etag.update('1' + '0' * (fsize - 1)) - etag = etag.hexdigest() - metadata['ETag'] = etag - object_server.write_metadata(writer.fd, metadata) - if invalid_type == 'Content-Length': - metadata['Content-Length'] = fsize - 1 - object_server.write_metadata(writer.fd, metadata) - - if mark_deleted: - metadata = { - 'X-Timestamp': timestamp, - 'deleted': True - } - df.put_metadata(metadata, tombstone=True) - - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', - obj_name, FakeLogger(), - keep_data_fp=True, disk_chunk_size=csize, - iter_hook=iter_hook) - if invalid_type == 'Zero-Byte': - os.remove(df.data_file) - fp = open(df.data_file, 'w') - fp.close() - df.unit_test_len = fsize - return df - - def test_quarantine_valids(self): - df = self._get_disk_file(obj_name='1') - for chunk in df: - pass - self.assertFalse(df.quarantined_dir) - - df = self._get_disk_file(obj_name='2', csize=1) - for chunk in df: - pass - self.assertFalse(df.quarantined_dir) - - df = self._get_disk_file(obj_name='3', csize=100000) - for chunk in df: - pass - self.assertFalse(df.quarantined_dir) - - def run_quarantine_invalids(self, invalid_type): - df = self._get_disk_file(invalid_type=invalid_type, obj_name='1') - for chunk in df: - pass - self.assertTrue(df.quarantined_dir) - df = self._get_disk_file(invalid_type=invalid_type, - obj_name='2', csize=1) - for chunk in df: - pass - self.assertTrue(df.quarantined_dir) - df = self._get_disk_file(invalid_type=invalid_type, - obj_name='3', csize=100000) - for chunk in df: - pass - self.assertTrue(df.quarantined_dir) - df = self._get_disk_file(invalid_type=invalid_type, obj_name='4') - self.assertFalse(df.quarantined_dir) - df = self._get_disk_file(invalid_type=invalid_type, obj_name='5') - for chunk in df.app_iter_range(0, df.unit_test_len): - pass - self.assertTrue(df.quarantined_dir) - df = self._get_disk_file(invalid_type=invalid_type, obj_name='6') - for chunk in df.app_iter_range(0, df.unit_test_len + 100): - pass - self.assertTrue(df.quarantined_dir) - expected_quar = False - # for the following, Content-Length/Zero-Byte errors will always result - # in a quarantine, even if the whole file isn't check-summed - if invalid_type in ('Zero-Byte', 'Content-Length'): - expected_quar = True - df = self._get_disk_file(invalid_type=invalid_type, obj_name='7') - for chunk in df.app_iter_range(1, df.unit_test_len): - pass - self.assertEquals(bool(df.quarantined_dir), expected_quar) - df = self._get_disk_file(invalid_type=invalid_type, obj_name='8') - for chunk in df.app_iter_range(0, df.unit_test_len - 1): - pass - self.assertEquals(bool(df.quarantined_dir), expected_quar) - df = self._get_disk_file(invalid_type=invalid_type, obj_name='8') - for chunk in df.app_iter_range(1, df.unit_test_len + 1): - pass - self.assertEquals(bool(df.quarantined_dir), expected_quar) - - def test_quarantine_invalids(self): - self.run_quarantine_invalids('ETag') - self.run_quarantine_invalids('Content-Length') - self.run_quarantine_invalids('Zero-Byte') - - def test_quarantine_deleted_files(self): - df = self._get_disk_file(invalid_type='Content-Length') - df.close() - self.assertTrue(df.quarantined_dir) - df = self._get_disk_file(invalid_type='Content-Length', - mark_deleted=True) - df.close() - self.assertFalse(df.quarantined_dir) - df = self._get_disk_file(invalid_type='Content-Length', - mark_deleted=True) - self.assertRaises(DiskFileNotExist, df.get_data_file_size) - - def test_put_metadata(self): - df = self._get_disk_file() - ts = time() - metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } - df.put_metadata(metadata) - exp_name = '%s.meta' % str(normalize_timestamp(ts)) - dl = os.listdir(df.datadir) - self.assertEquals(len(dl), 2) - self.assertTrue(exp_name in set(dl)) - - def test_put_metadata_ts(self): - df = self._get_disk_file() - ts = time() - metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' } - df.put_metadata(metadata, tombstone=True) - exp_name = '%s.ts' % str(normalize_timestamp(ts)) - dl = os.listdir(df.datadir) - self.assertEquals(len(dl), 2) - self.assertTrue(exp_name in set(dl)) - - def test_unlinkold(self): - df1 = self._get_disk_file() - future_time = str(normalize_timestamp(time() + 100)) - df2 = self._get_disk_file(ts=future_time) - self.assertEquals(len(os.listdir(df1.datadir)), 2) - df1.unlinkold(future_time) - self.assertEquals(len(os.listdir(df1.datadir)), 1) - self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time) - - def test_close_error(self): - - def err(): - raise Exception("bad") - - df = self._get_disk_file(fsize=1024 * 1024 * 2) - df._handle_close_quarantine = err - for chunk in df: - pass - # close is called at the end of the iterator - self.assertEquals(df.fp, None) - self.assertEquals(len(df.logger.log_dict['error']), 1) - - def test_quarantine_twice(self): - df = self._get_disk_file(invalid_type='Content-Length') - self.assert_(os.path.isfile(df.data_file)) - quar_dir = df.quarantine() - self.assertFalse(os.path.isfile(df.data_file)) - self.assert_(os.path.isdir(quar_dir)) - self.assertEquals(df.quarantine(), None) - - class TestObjectController(unittest.TestCase): """ Test swift.obj.server.ObjectController """ @@ -398,10 +54,13 @@ class TestObjectController(unittest.TestCase): conf = {'devices': self.testdir, 'mount_check': 'false'} self.object_controller = object_server.ObjectController(conf) self.object_controller.bytes_per_sync = 1 + self._orig_tpool_exc = tpool.execute + tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs) def tearDown(self): """ Tear down for testing swift.object_server.ObjectController """ rmtree(os.path.dirname(self.testdir)) + tpool.execute = self._orig_tpool_exc def test_REQUEST_SPECIAL_CHARS(self): obj = 'special昆%20/%' @@ -660,15 +319,15 @@ class TestObjectController(unittest.TestCase): req.body = 'VERIFY' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 201) - file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', FakeLogger(), keep_data_fp=True) file_name = os.path.basename(file.data_file) with open(file.data_file) as fp: - metadata = object_server.read_metadata(fp) + metadata = diskfile.read_metadata(fp) os.unlink(file.data_file) with open(file.data_file, 'w') as fp: - object_server.write_metadata(fp, metadata) + diskfile.write_metadata(fp, metadata) self.assertEquals(os.listdir(file.datadir)[0], file_name) req = Request.blank('/sda1/p/a/c/o', @@ -742,7 +401,7 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY') - self.assertEquals(object_server.read_metadata(objfile), + self.assertEquals(diskfile.read_metadata(objfile), {'X-Timestamp': timestamp, 'Content-Length': '6', 'ETag': '0b4c12d7e0a73840c1c4f148fda3b037', @@ -772,7 +431,7 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY TWO') - self.assertEquals(object_server.read_metadata(objfile), + self.assertEquals(diskfile.read_metadata(objfile), {'X-Timestamp': timestamp, 'Content-Length': '10', 'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039', @@ -814,7 +473,7 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY THREE') - self.assertEquals(object_server.read_metadata(objfile), + self.assertEquals(diskfile.read_metadata(objfile), {'X-Timestamp': timestamp, 'Content-Length': '12', 'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568', @@ -964,15 +623,15 @@ class TestObjectController(unittest.TestCase): req.body = 'VERIFY' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 201) - file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', FakeLogger(), keep_data_fp=True) file_name = os.path.basename(file.data_file) with open(file.data_file) as fp: - metadata = object_server.read_metadata(fp) + metadata = diskfile.read_metadata(fp) os.unlink(file.data_file) with open(file.data_file, 'w') as fp: - object_server.write_metadata(fp, metadata) + diskfile.write_metadata(fp, metadata) self.assertEquals(os.listdir(file.datadir)[0], file_name) req = Request.blank('/sda1/p/a/c/o') @@ -1253,7 +912,7 @@ class TestObjectController(unittest.TestCase): req.body = 'VERIFY' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 201) - file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', FakeLogger(), keep_data_fp=True) file_name = os.path.basename(file.data_file) etag = md5() @@ -1261,7 +920,7 @@ class TestObjectController(unittest.TestCase): etag = etag.hexdigest() metadata = {'X-Timestamp': timestamp, 'Content-Length': 6, 'ETag': etag} - object_server.write_metadata(file.fp, metadata) + diskfile.write_metadata(file.fp, metadata) self.assertEquals(os.listdir(file.datadir)[0], file_name) req = Request.blank('/sda1/p/a/c/o') resp = self.object_controller.GET(req) @@ -1284,14 +943,14 @@ class TestObjectController(unittest.TestCase): req.body = 'VERIFY' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 201) - file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', FakeLogger(), keep_data_fp=True) file_name = os.path.basename(file.data_file) with open(file.data_file) as fp: - metadata = object_server.read_metadata(fp) + metadata = diskfile.read_metadata(fp) os.unlink(file.data_file) with open(file.data_file, 'w') as fp: - object_server.write_metadata(fp, metadata) + diskfile.write_metadata(fp, metadata) self.assertEquals(os.listdir(file.datadir)[0], file_name) req = Request.blank('/sda1/p/a/c/o') @@ -1311,7 +970,7 @@ class TestObjectController(unittest.TestCase): req.body = 'VERIFY' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 201) - file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', + file = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o', FakeLogger(), keep_data_fp=True) file_name = os.path.basename(file.data_file) etag = md5() @@ -1319,7 +978,7 @@ class TestObjectController(unittest.TestCase): etag = etag.hexdigest() metadata = {'X-Timestamp': timestamp, 'Content-Length': 6, 'ETag': etag} - object_server.write_metadata(file.fp, metadata) + diskfile.write_metadata(file.fp, metadata) self.assertEquals(os.listdir(file.datadir)[0], file_name) req = Request.blank('/sda1/p/a/c/o') req.range = 'bytes=0-4' # partial @@ -1485,10 +1144,10 @@ class TestObjectController(unittest.TestCase): return False def my_storage_directory(*args): return self.testdir+'/collide' - _storage_directory = object_server.storage_directory + _storage_directory = diskfile.storage_directory _check = object_server.check_object_creation try: - object_server.storage_directory = my_storage_directory + diskfile.storage_directory = my_storage_directory object_server.check_object_creation = my_check inbuf = StringIO() errbuf = StringIO() @@ -1537,7 +1196,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(outbuf.getvalue()[:4], '403 ') finally: - object_server.storage_directory = _storage_directory + diskfile.storage_directory = _storage_directory object_server.check_object_creation = _check def test_invalid_method_doesnt_exist(self): @@ -1742,7 +1401,7 @@ class TestObjectController(unittest.TestCase): storage_directory(object_server.DATADIR, 'p', hash_path('a', 'c', 'o')), timestamp + '.data') self.assert_(os.path.isfile(objfile)) - self.assertEquals(object_server.read_metadata(objfile), + self.assertEquals(diskfile.read_metadata(objfile), {'X-Timestamp': timestamp, 'Content-Length': '0', 'Content-Type': 'text/plain', 'name': '/a/c/o', 'X-Object-Manifest': 'c/o/', 'ETag': @@ -2759,9 +2418,9 @@ class TestObjectController(unittest.TestCase): def fake_fallocate(fd, size): raise OSError(42, 'Unable to fallocate(%d)' % size) - orig_fallocate = object_server.fallocate + orig_fallocate = diskfile.fallocate try: - object_server.fallocate = fake_fallocate + diskfile.fallocate = fake_fallocate timestamp = normalize_timestamp(time()) body_reader = IgnoredBody() req = Request.blank('/sda1/p/a/c/o', @@ -2775,7 +2434,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 507) self.assertFalse(body_reader.read_called) finally: - object_server.fallocate = orig_fallocate + diskfile.fallocate = orig_fallocate def test_serv_reserv(self): """