diff --git a/swift/common/direct_client.py b/swift/common/direct_client.py
index 68334a3ed4..f9dad3384b 100644
--- a/swift/common/direct_client.py
+++ b/swift/common/direct_client.py
@@ -183,7 +183,7 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5,
 
 
 def direct_get_object(node, part, account, container, obj, conn_timeout=5,
-                      response_timeout=15, resp_chunk_size=None):
+                      response_timeout=15, resp_chunk_size=None, headers={}):
     """
     Get object directly from the object server.
 
@@ -195,13 +195,14 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
     :param conn_timeout: timeout in seconds for establishing the connection
     :param response_timeout: timeout in seconds for getting the response
     :param resp_chunk_size: if defined, chunk size of data to read.
+    :param headers: dict to be passed into HTTPConnection headers
     :returns: a tuple of (response headers, the object's contents) The response
               headers will be a dict and all header names will be lowercase.
     """
     path = '/%s/%s/%s' % (account, container, obj)
     with Timeout(conn_timeout):
         conn = http_connect(node['ip'], node['port'], node['device'], part,
-                'GET', path)
+                'GET', path, headers=headers)
     with Timeout(response_timeout):
         resp = conn.getresponse()
     if resp.status < 200 or resp.status >= 300:
diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py
index 81f544bfae..30af79b722 100644
--- a/swift/common/exceptions.py
+++ b/swift/common/exceptions.py
@@ -30,6 +30,14 @@ class AuditException(Exception):
     pass
 
 
+class DiskFileError(Exception):
+    pass
+
+
+class DiskFileNotExist(Exception):
+    pass
+
+
 class AuthException(Exception):
     pass
 
diff --git a/swift/common/utils.py b/swift/common/utils.py
index c0b2994969..870350533f 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -567,7 +567,7 @@ def storage_directory(datadir, partition, hash):
     :param hash: Account, container or object hash
     :returns: Storage directory
     """
-    return os.path.join(datadir, partition, hash[-3:], hash)
+    return os.path.join(datadir, str(partition), hash[-3:], hash)
 
 
 def hash_path(account, container=None, object=None, raw_digest=False):
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py
index 0a2b1884a5..567aa518b4 100644
--- a/swift/obj/auditor.py
+++ b/swift/obj/auditor.py
@@ -23,7 +23,8 @@ from swift.obj import server as object_server
 from swift.obj.replicator import invalidate_hash
 from swift.common.utils import get_logger, renamer, audit_location_generator, \
     ratelimit_sleep, TRUE_VALUES
-from swift.common.exceptions import AuditException
+from swift.common.exceptions import AuditException, DiskFileError, \
+    DiskFileNotExist
 from swift.common.daemon import Daemon
 
 SLEEP_BETWEEN_AUDITS = 30
@@ -119,40 +120,42 @@ class AuditorWorker(object):
             except Exception, exc:
                 raise AuditException('Error when reading metadata: %s' % exc)
             _junk, account, container, obj = name.split('/', 3)
-            df = object_server.DiskFile(self.devices, device,
-                                        partition, account,
-                                        container, obj,
+            df = object_server.DiskFile(self.devices, device, partition,
+                                        account, container, obj, self.logger,
                                         keep_data_fp=True)
             if df.data_file is None:
                 # file is deleted, we found the tombstone
                 return
-            obj_size = os.path.getsize(df.data_file)
-            if obj_size != int(df.metadata['Content-Length']):
-                raise AuditException('Content-Length of %s does not match '
-                    'file size of %s' % (int(df.metadata['Content-Length']),
-                                         os.path.getsize(df.data_file)))
+            try:
+                obj_size = df.get_data_file_size()
+            except DiskFileError, e:
+                raise AuditException(str(e))
+            except DiskFileNotExist:
+                return
             if self.zero_byte_only_at_fps and obj_size:
                 return
-            etag = md5()
             for chunk in df:
                 self.bytes_running_time = ratelimit_sleep(
                     self.bytes_running_time, self.max_bytes_per_second,
                     incr_by=len(chunk))
-                etag.update(chunk)
                 self.bytes_processed += len(chunk)
                 self.total_bytes_processed += len(chunk)
-            etag = etag.hexdigest()
-            if etag != df.metadata['ETag']:
-                raise AuditException("ETag of %s does not match file's md5 of "
-                    "%s" % (df.metadata['ETag'], etag))
+            df.close()
+            if df.quarantined_dir:
+                self.quarantines += 1
+                self.logger.error(
+                    _("ERROR Object %(path)s failed audit and will be "
+                      "quarantined: ETag and file's md5 do not match"),
+                    {'path': path})
         except AuditException, err:
             self.quarantines += 1
             self.logger.error(_('ERROR Object %(obj)s failed audit and will '
                 'be quarantined: %(err)s'), {'obj': path, 'err': err})
-            object_server.DiskFile.quarantine(
+            object_server.quarantine_renamer(
                 os.path.join(self.devices, device), path)
             return
         except Exception:
+            #raise
             self.errors += 1
             self.logger.exception(_('ERROR Trying to audit %s'), path)
             return
diff --git a/swift/obj/server.py b/swift/obj/server.py
index c6199f7545..6f8cdeaa01 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -42,7 +42,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \
 from swift.common.bufferedhttp import http_connect
 from swift.common.constraints import check_object_creation, check_mount, \
     check_float, check_utf8
-from swift.common.exceptions import ConnectionTimeout
+from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
+    DiskFileNotExist
 from swift.obj.replicator import get_hashes, invalidate_hash, \
     recalculate_hashes
 
@@ -89,6 +90,32 @@ def write_metadata(fd, metadata):
         key += 1
 
 
+def quarantine_renamer(device_path, corrupted_file_path):
+    """
+    In the case that a file is corrupted, move it to a quarantined
+    area to allow replication to fix it.
+
+    :params device_path: The path to the device the corrupted file is on.
+    :params corrupted_file_path: The path to the file you want quarantined.
+
+    :returns: path (str) of directory the file was moved to
+    :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
+                     exceptions from rename
+    """
+    from_dir = os.path.dirname(corrupted_file_path)
+    to_dir = os.path.join(device_path, 'quarantined',
+                          'objects', os.path.basename(from_dir))
+    invalidate_hash(os.path.dirname(from_dir))
+    try:
+        renamer(from_dir, to_dir)
+    except OSError, e:
+        if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
+            raise
+        to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
+        renamer(from_dir, to_dir)
+    return to_dir
+
+
 class DiskFile(object):
     """
     Manage object files on disk.
@@ -104,7 +131,8 @@ class DiskFile(object):
     """
 
     def __init__(self, path, device, partition, account, container, obj,
-                    keep_data_fp=False, disk_chunk_size=65536):
+                 logger, keep_data_fp=False, disk_chunk_size=65536):
+
         self.disk_chunk_size = disk_chunk_size
         self.name = '/' + '/'.join((account, container, obj))
         name_hash = hash_path(account, container, obj)
@@ -112,14 +140,20 @@ class DiskFile(object):
                     storage_directory(DATADIR, partition, name_hash))
         self.device_path = os.path.join(path, device)
         self.tmpdir = os.path.join(path, device, 'tmp')
+        self.logger = logger
         self.metadata = {}
         self.meta_file = None
         self.data_file = None
         self.fp = None
+        self.iter_etag = None
+        self.last_iter_pos = 0
+        self.read_to_eof = False
+        self.quarantined_dir = None
         self.keep_cache = False
         if not os.path.exists(self.datadir):
             return
         files = sorted(os.listdir(self.datadir), reverse=True)
+
         for file in files:
             if file.endswith('.ts'):
                 self.data_file = self.meta_file = None
@@ -135,7 +169,7 @@ class DiskFile(object):
         self.fp = open(self.data_file, 'rb')
         self.metadata = read_metadata(self.fp)
         if not keep_data_fp:
-            self.close()
+            self.close(verify_file=False)
         if self.meta_file:
             with open(self.meta_file) as mfp:
                 for key in self.metadata.keys():
@@ -149,9 +183,18 @@ class DiskFile(object):
         try:
             dropped_cache = 0
             read = 0
+            self.last_iter_pos = 0
+            self.iter_etag = md5()
             while True:
+                pre_read_pos = self.fp.tell()
                 chunk = self.fp.read(self.disk_chunk_size)
                 if chunk:
+                    if self.iter_etag and self.last_iter_pos == pre_read_pos:
+                        self.iter_etag.update(chunk)
+                        self.last_iter_pos += len(chunk)
+                    else:
+                        # file has not been read sequentially
+                        self.iter_etag = None
                     read += len(chunk)
                     if read - dropped_cache > (1024 * 1024):
                         self.drop_cache(self.fp.fileno(), dropped_cache,
@@ -159,6 +202,7 @@ class DiskFile(object):
                         dropped_cache = read
                     yield chunk
                 else:
+                    self.read_to_eof = True
                     self.drop_cache(self.fp.fileno(), dropped_cache,
                         read - dropped_cache)
                     break
@@ -182,11 +226,42 @@ class DiskFile(object):
                     break
             yield chunk
 
-    def close(self):
-        """Close the file."""
+    def _handle_close_quarantine(self):
+        """Check if file needs to be quarantined"""
+        obj_size = None
+        try:
+            obj_size = self.get_data_file_size()
+        except DiskFileError, e:
+            self.quarantine()
+            return
+        except DiskFileNotExist:
+            return
+
+        if (self.iter_etag and self.read_to_eof and self.metadata.get('ETag')
+            and obj_size == self.last_iter_pos and
+            self.iter_etag.hexdigest() != self.metadata['ETag']):
+                self.quarantine()
+
+    def close(self, verify_file=True):
+        """
+        Close the file.
+
+        :param verify_file: Defaults to True, will handle quarantining
+                            file if necessary.
+        """
         if self.fp:
-            self.fp.close()
-            self.fp = None
+            try:
+                if verify_file:
+                    self._handle_close_quarantine()
+            except Exception, e:
+                import traceback
+                self.logger.error(_('ERROR DiskFile %(data_file)s in '
+                     '%(data_dir)s close failure: %(exc)s : %(stack)'),
+                     {'exc': e, 'stack': ''.join(traceback.format_stack()),
+                      'data_file': self.data_file, 'data_dir': self.datadir})
+            finally:
+                self.fp.close()
+                self.fp = None
 
     def is_deleted(self):
         """
@@ -257,30 +332,42 @@ class DiskFile(object):
         if not self.keep_cache:
             drop_buffer_cache(fd, offset, length)
 
-    @classmethod
-    def quarantine(cls, device_path, corrupted_file_path):
+    def quarantine(self):
         """
         In the case that a file is corrupted, move it to a quarantined
         area to allow replication to fix it.
-
-        :params device_path: The path to the device the corrupted file is on.
-        :params corrupted_file_path: The path to the file you want quarantined.
-
-        :returns: path (str) of directory the file was moved to
-        :raises OSError: re-raises non errno.EEXIST exceptions from rename
+        :returns: if quarantine is successful, path to quarantined
+                  directory otherwise None
+        """
+        if not (self.is_deleted() or self.quarantined_dir):
+            self.quarantined_dir = quarantine_renamer(self.device_path,
+                                                      self.data_file)
+            return self.quarantined_dir
+
+    def get_data_file_size(self):
+        """
+        Returns the os.path.getsize for the file.  Raises an exception if this
+        file does not match the Content-Length stored in the metadata. Or if
+        self.data_file does not exist.
+
+        :returns: file size as an int
+        :raises DiskFileError: on file size mismatch.
+        :raises DiskFileNotExist: on file not existing (including deleted)
         """
-        from_dir = os.path.dirname(corrupted_file_path)
-        to_dir = os.path.join(device_path, 'quarantined',
-                              'objects', os.path.basename(from_dir))
-        invalidate_hash(os.path.dirname(from_dir))
         try:
-            renamer(from_dir, to_dir)
-        except OSError, e:
-            if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
+            file_size = 0
+            if self.data_file:
+                file_size = int(os.path.getsize(self.data_file))
+                if self.metadata.has_key('Content-Length'):
+                    metadata_size = int(self.metadata['Content-Length'])
+                    if file_size != metadata_size:
+                        raise DiskFileError('Content-Length of %s does not '
+                          'match file size of %s' % (metadata_size, file_size))
+                return file_size
+        except OSError, err:
+            if err.errno != errno.ENOENT:
                 raise
-            to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
-            renamer(from_dir, to_dir)
-        return to_dir
+        raise DiskFileNotExist('Data File does not exist.')
 
 
 class ObjectController(object):
@@ -370,13 +457,19 @@ class ObjectController(object):
         if self.mount_check and not check_mount(self.devices, device):
             return Response(status='507 %s is not mounted' % device)
         file = DiskFile(self.devices, device, partition, account, container,
-                        obj, disk_chunk_size=self.disk_chunk_size)
+                        obj, self.logger, disk_chunk_size=self.disk_chunk_size)
 
         if file.is_deleted():
             response_class = HTTPNotFound
         else:
             response_class = HTTPAccepted
 
+        try:
+            file_size = file.get_data_file_size()
+        except (DiskFileError, DiskFileNotExist):
+            file.quarantine()
+            return HTTPNotFound(request=request)
+
         metadata = {'X-Timestamp': request.headers['x-timestamp']}
         metadata.update(val for val in request.headers.iteritems()
                 if val[0].lower().startswith('x-object-meta-'))
@@ -402,7 +495,7 @@ class ObjectController(object):
         if error_response:
             return error_response
         file = DiskFile(self.devices, device, partition, account, container,
-                        obj, disk_chunk_size=self.disk_chunk_size)
+                        obj, self.logger, disk_chunk_size=self.disk_chunk_size)
         upload_expiration = time.time() + self.max_upload_time
         etag = md5()
         upload_size = 0
@@ -470,12 +563,18 @@ class ObjectController(object):
         if self.mount_check and not check_mount(self.devices, device):
             return Response(status='507 %s is not mounted' % device)
         file = DiskFile(self.devices, device, partition, account, container,
-                obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size)
+                        obj, self.logger, keep_data_fp=True,
+                        disk_chunk_size=self.disk_chunk_size)
         if file.is_deleted():
             if request.headers.get('if-match') == '*':
                 return HTTPPreconditionFailed(request=request)
             else:
                 return HTTPNotFound(request=request)
+        try:
+            file_size = file.get_data_file_size()
+        except (DiskFileError, DiskFileNotExist):
+            file.quarantine()
+            return HTTPNotFound(request=request)
         if request.headers.get('if-match') not in (None, '*') and \
                 file.metadata['ETag'] not in request.if_match:
             file.close()
@@ -515,7 +614,7 @@ class ObjectController(object):
                 response.headers[key] = value
         response.etag = file.metadata['ETag']
         response.last_modified = float(file.metadata['X-Timestamp'])
-        response.content_length = int(file.metadata['Content-Length'])
+        response.content_length = file_size
         if response.content_length < KEEP_CACHE_SIZE and \
                 'X-Auth-Token' not in request.headers and \
                 'X-Storage-Token' not in request.headers:
@@ -537,9 +636,14 @@ class ObjectController(object):
         if self.mount_check and not check_mount(self.devices, device):
             return Response(status='507 %s is not mounted' % device)
         file = DiskFile(self.devices, device, partition, account, container,
-                        obj, disk_chunk_size=self.disk_chunk_size)
+                        obj, self.logger, disk_chunk_size=self.disk_chunk_size)
         if file.is_deleted():
             return HTTPNotFound(request=request)
+        try:
+            file_size = file.get_data_file_size()
+        except (DiskFileError, DiskFileNotExist):
+            file.quarantine()
+            return HTTPNotFound(request=request)
         response = Response(content_type=file.metadata['Content-Type'],
                             request=request, conditional_response=True)
         for key, value in file.metadata.iteritems():
@@ -548,7 +652,7 @@ class ObjectController(object):
                 response.headers[key] = value
         response.etag = file.metadata['ETag']
         response.last_modified = float(file.metadata['X-Timestamp'])
-        response.content_length = int(file.metadata['Content-Length'])
+        response.content_length = file_size
         if 'Content-Encoding' in file.metadata:
             response.content_encoding = file.metadata['Content-Encoding']
         return response
@@ -569,7 +673,7 @@ class ObjectController(object):
             return Response(status='507 %s is not mounted' % device)
         response_class = HTTPNoContent
         file = DiskFile(self.devices, device, partition, account, container,
-                        obj, disk_chunk_size=self.disk_chunk_size)
+                        obj, self.logger, disk_chunk_size=self.disk_chunk_size)
         if file.is_deleted():
             response_class = HTTPNotFound
         metadata = {
diff --git a/test/probe/test_object_failures.py b/test/probe/test_object_failures.py
new file mode 100644
index 0000000000..d6dafbdfd1
--- /dev/null
+++ b/test/probe/test_object_failures.py
@@ -0,0 +1,173 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010-2011 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import os
+from signal import SIGTERM
+from subprocess import call, Popen
+from time import sleep
+from uuid import uuid4
+
+from swift.common import client, direct_client
+from swift.common.utils import hash_path, readconf
+from swift.obj.server import write_metadata, read_metadata
+from test.probe.common import kill_pids, reset_environment
+from test.unit import FakeLogger
+
+class TestObjectFailures(unittest.TestCase):
+
+    def setUp(self):
+        self.pids, self.port2server, self.account_ring, self.container_ring, \
+            self.object_ring, self.url, self.token, self.account = \
+                reset_environment()
+
+    def tearDown(self):
+        kill_pids(self.pids)
+
+    def _get_data_file_path(self, obj_dir):
+        files = sorted(os.listdir(obj_dir), reverse=True)
+        for file in files:
+            return os.path.join(obj_dir, file)
+
+    def run_quarantine(self):
+        container = 'container-%s' % uuid4()
+        obj = 'object-%s' % uuid4()
+        client.put_container(self.url, self.token, container)
+        client.put_object(self.url, self.token, container, obj, 'VERIFY')
+        odata = client.get_object(self.url, self.token, container, obj)[-1]
+        self.assertEquals(odata, 'VERIFY')
+        opart, onodes = self.object_ring.get_nodes(
+            self.account, container, obj)
+        onode = onodes[0]
+        node_id = (onode['port'] - 6000) / 10
+        device = onode['device']
+        hash_str = hash_path(self.account, container, obj)
+        obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
+                                   node_id)
+        devices = obj_server_conf['app:object-server']['devices']
+        obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
+                                               device, opart,
+                                               hash_str[-3:], hash_str)
+        data_file = self._get_data_file_path(obj_dir)
+
+        with open(data_file) as fp:
+            metadata = read_metadata(fp)
+        metadata['ETag'] = 'badetag'
+        with open(data_file) as fp:
+            write_metadata(fp, metadata)
+
+        odata = direct_client.direct_get_object(onode, opart,
+                                          self.account, container, obj)[-1]
+        self.assertEquals(odata, 'VERIFY')
+        try:
+            resp = direct_client.direct_get_object(onode, opart, self.account,
+                                                   container, obj)
+            raise "Did not quarantine object"
+        except client.ClientException, e:
+            self.assertEquals(e.http_status, 404)
+
+    def run_quarantine_range_etag(self):
+        container = 'container-range-%s' % uuid4()
+        obj = 'object-range-%s' % uuid4()
+        client.put_container(self.url, self.token, container)
+        client.put_object(self.url, self.token, container, obj, 'RANGE')
+        odata = client.get_object(self.url, self.token, container, obj)[-1]
+        self.assertEquals(odata, 'RANGE')
+        opart, onodes = self.object_ring.get_nodes(
+            self.account, container, obj)
+
+        onode = onodes[0]
+        node_id = (onode['port'] - 6000) / 10
+        device = onode['device']
+        hash_str = hash_path(self.account, container, obj)
+
+        obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
+                                   node_id)
+        devices = obj_server_conf['app:object-server']['devices']
+        obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
+                                               device, opart,
+                                               hash_str[-3:], hash_str)
+        data_file = self._get_data_file_path(obj_dir)
+        with open(data_file) as fp:
+            metadata = read_metadata(fp)
+        metadata['ETag'] = 'badetag'
+        with open(data_file) as fp:
+            write_metadata(fp, metadata)
+        for header, result in [({'Range': 'bytes=0-2'}, 'RAN'),
+                               ({'Range': 'bytes=1-11'}, 'ANGE'),
+                               ({'Range': 'bytes=0-11'}, 'RANGE'),]:
+            odata = direct_client.direct_get_object(onode, opart,
+                                              self.account, container, obj,
+                                              headers=header)[-1]
+
+            self.assertEquals(odata, result)
+
+        try:
+            resp = direct_client.direct_get_object(onode, opart, self.account,
+                                                   container, obj)
+            raise "Did not quarantine object"
+        except client.ClientException, e:
+            self.assertEquals(e.http_status, 404)
+
+
+
+
+
+
+    def run_quarantine_range_zero_byte(self):
+        container = 'container-zbyte-%s' % uuid4()
+        obj = 'object-zbyte-%s' % uuid4()
+        client.put_container(self.url, self.token, container)
+        client.put_object(self.url, self.token, container, obj, 'ZBYTE')
+        odata = client.get_object(self.url, self.token, container, obj)[-1]
+        self.assertEquals(odata, 'ZBYTE')
+        opart, onodes = self.object_ring.get_nodes(
+            self.account, container, obj)
+        onode = onodes[0]
+        node_id = (onode['port'] - 6000) / 10
+        device = onode['device']
+        hash_str = hash_path(self.account, container, obj)
+        obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
+                                   node_id)
+        devices = obj_server_conf['app:object-server']['devices']
+        obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
+                                               device, opart,
+                                               hash_str[-3:], hash_str)
+        data_file = self._get_data_file_path(obj_dir)
+
+        with open(data_file) as fp:
+            metadata = read_metadata(fp)
+        os.unlink(data_file)
+
+        with open(data_file,'w') as fp:
+            write_metadata(fp, metadata)
+        try:
+            resp = direct_client.direct_get_object(onode, opart, self.account,
+                                                   container, obj,
+                                                   conn_timeout=1,
+                                                   response_timeout=1)
+            raise "Did not quarantine object"
+        except client.ClientException, e:
+            self.assertEquals(e.http_status, 404)
+
+
+    def test_runner(self):
+        self.run_quarantine()
+        self.run_quarantine_range_etag()
+        self.run_quarantine_range_zero_byte()
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 005aabd3db..6b5b7b4d70 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -91,6 +91,22 @@ def temptree(files, contents=''):
         rmtree(tempdir)
 
 
+class FakeLogger(object):
+    # a thread safe logger
+
+    def __init__(self):
+        self.log_dict = dict(error=[], info=[], warning=[])
+
+    def error(self, *args, **kwargs):
+        self.log_dict['error'].append((args, kwargs))
+
+    def info(self, *args, **kwargs):
+        self.log_dict['info'].append((args, kwargs))
+
+    def warning(self, *args, **kwargs):
+        self.log_dict['warning'].append((args, kwargs))
+
+
 class MockTrue(object):
     """
     Instances of MockTrue evaluate like True
diff --git a/test/unit/common/middleware/test_ratelimit.py b/test/unit/common/middleware/test_ratelimit.py
index 4afefb0351..257033dc11 100644
--- a/test/unit/common/middleware/test_ratelimit.py
+++ b/test/unit/common/middleware/test_ratelimit.py
@@ -19,6 +19,7 @@ from contextlib import contextmanager
 from threading import Thread
 from webob import Request
 
+from test.unit import FakeLogger
 from swift.common.middleware import ratelimit
 from swift.proxy.server import get_container_memcache_key
 from swift.common.memcached import MemcacheConnectionError
@@ -96,19 +97,6 @@ class FakeApp(object):
         return ['204 No Content']
 
 
-class FakeLogger(object):
-    # a thread safe logger
-
-    def error(self, *args, **kwargs):
-        pass
-
-    def info(self, *args, **kwargs):
-        pass
-
-    def warning(self, *args, **kwargs):
-        pass
-
-
 def start_response(*args):
     pass
 
diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py
index d50f6158d7..be895057d5 100644
--- a/test/unit/obj/test_auditor.py
+++ b/test/unit/obj/test_auditor.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# TODO: Tests
 from test import unit
 import unittest
 import tempfile
@@ -22,6 +21,7 @@ import time
 from shutil import rmtree
 from hashlib import md5
 from tempfile import mkdtemp
+from test.unit import FakeLogger
 from swift.obj import auditor
 from swift.obj import server as object_server
 from swift.obj.server import DiskFile, write_metadata, DATADIR
@@ -34,13 +34,11 @@ from swift.common.exceptions import AuditException
 class TestAuditor(unittest.TestCase):
 
     def setUp(self):
-        self.testdir = \
-            os.path.join(mkdtemp(), 'tmp_test_object_auditor')
+        self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
         self.devices = os.path.join(self.testdir, 'node')
+        self.logger = FakeLogger()
         rmtree(self.testdir, ignore_errors=1)
-        os.mkdir(self.testdir)
-        os.mkdir(self.devices)
-        os.mkdir(os.path.join(self.devices, 'sda'))
+        mkdirs(os.path.join(self.devices, 'sda'))
         self.objects = os.path.join(self.devices, 'sda', 'objects')
 
         os.mkdir(os.path.join(self.devices, 'sdb'))
@@ -55,6 +53,8 @@ class TestAuditor(unittest.TestCase):
         self.conf = dict(
             devices=self.devices,
             mount_check='false')
+        self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
+                                  self.logger)
 
     def tearDown(self):
         rmtree(os.path.dirname(self.testdir), ignore_errors=1)
@@ -62,11 +62,9 @@ class TestAuditor(unittest.TestCase):
 
     def test_object_audit_extra_data(self):
         self.auditor = auditor.AuditorWorker(self.conf)
-        cur_part = '0'
-        disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
         data = '0' * 1024
         etag = md5()
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             os.write(fd, data)
             etag.update(data)
             etag = etag.hexdigest()
@@ -76,28 +74,26 @@ class TestAuditor(unittest.TestCase):
                 'X-Timestamp': timestamp,
                 'Content-Length': str(os.fstat(fd).st_size),
             }
-            disk_file.put(fd, tmppath, metadata)
+            self.disk_file.put(fd, tmppath, metadata)
             pre_quarantines = self.auditor.quarantines
 
             self.auditor.object_audit(
-                os.path.join(disk_file.datadir, timestamp + '.data'),
-                'sda', cur_part)
+                os.path.join(self.disk_file.datadir, timestamp + '.data'),
+                'sda', '0')
             self.assertEquals(self.auditor.quarantines, pre_quarantines)
 
             os.write(fd, 'extra_data')
             self.auditor.object_audit(
-                os.path.join(disk_file.datadir, timestamp + '.data'),
-                'sda', cur_part)
+                os.path.join(self.disk_file.datadir, timestamp + '.data'),
+                'sda', '0')
             self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
 
     def test_object_audit_diff_data(self):
         self.auditor = auditor.AuditorWorker(self.conf)
-        cur_part = '0'
-        disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
         data = '0' * 1024
         etag = md5()
         timestamp = str(normalize_timestamp(time.time()))
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             os.write(fd, data)
             etag.update(data)
             etag = etag.hexdigest()
@@ -106,12 +102,15 @@ class TestAuditor(unittest.TestCase):
                 'X-Timestamp': timestamp,
                 'Content-Length': str(os.fstat(fd).st_size),
             }
-            disk_file.put(fd, tmppath, metadata)
+            self.disk_file.put(fd, tmppath, metadata)
             pre_quarantines = self.auditor.quarantines
+            # remake to it will have metadata
+            self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
+                                      self.logger)
 
             self.auditor.object_audit(
-                os.path.join(disk_file.datadir, timestamp + '.data'),
-                'sda', cur_part)
+                os.path.join(self.disk_file.datadir, timestamp + '.data'),
+                'sda', '0')
             self.assertEquals(self.auditor.quarantines, pre_quarantines)
             etag = md5()
             etag.update('1' + '0' * 1023)
@@ -120,25 +119,23 @@ class TestAuditor(unittest.TestCase):
             write_metadata(fd, metadata)
 
             self.auditor.object_audit(
-                os.path.join(disk_file.datadir, timestamp + '.data'),
-                'sda', cur_part)
+                os.path.join(self.disk_file.datadir, timestamp + '.data'),
+                'sda', '0')
             self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
 
     def test_object_audit_no_meta(self):
-        cur_part = '0'
-        disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
         timestamp = str(normalize_timestamp(time.time()))
-        path = os.path.join(disk_file.datadir, timestamp + '.data')
-        mkdirs(disk_file.datadir)
+        path = os.path.join(self.disk_file.datadir, timestamp + '.data')
+        mkdirs(self.disk_file.datadir)
         fp = open(path, 'w')
         fp.write('0' * 1024)
         fp.close()
-        invalidate_hash(os.path.dirname(disk_file.datadir))
+        invalidate_hash(os.path.dirname(self.disk_file.datadir))
         self.auditor = auditor.AuditorWorker(self.conf)
         pre_quarantines = self.auditor.quarantines
         self.auditor.object_audit(
-            os.path.join(disk_file.datadir, timestamp + '.data'),
-            'sda', cur_part)
+            os.path.join(self.disk_file.datadir, timestamp + '.data'),
+            'sda', '0')
         self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
 
     def test_object_audit_bad_args(self):
@@ -153,13 +150,11 @@ class TestAuditor(unittest.TestCase):
     def test_object_run_once_pass(self):
         self.auditor = auditor.AuditorWorker(self.conf)
         self.auditor.log_time = 0
-        cur_part = '0'
         timestamp = str(normalize_timestamp(time.time()))
         pre_quarantines = self.auditor.quarantines
-        disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
         data = '0' * 1024
         etag = md5()
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             os.write(fd, data)
             etag.update(data)
             etag = etag.hexdigest()
@@ -168,20 +163,18 @@ class TestAuditor(unittest.TestCase):
                 'X-Timestamp': timestamp,
                 'Content-Length': str(os.fstat(fd).st_size),
             }
-            disk_file.put(fd, tmppath, metadata)
-            disk_file.close()
+            self.disk_file.put(fd, tmppath, metadata)
+            self.disk_file.close()
         self.auditor.audit_all_objects()
         self.assertEquals(self.auditor.quarantines, pre_quarantines)
 
     def test_object_run_once_no_sda(self):
         self.auditor = auditor.AuditorWorker(self.conf)
-        cur_part = '0'
         timestamp = str(normalize_timestamp(time.time()))
         pre_quarantines = self.auditor.quarantines
-        disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'o')
         data = '0' * 1024
         etag = md5()
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             os.write(fd, data)
             etag.update(data)
             etag = etag.hexdigest()
@@ -190,21 +183,19 @@ class TestAuditor(unittest.TestCase):
                 'X-Timestamp': timestamp,
                 'Content-Length': str(os.fstat(fd).st_size),
             }
-            disk_file.put(fd, tmppath, metadata)
-            disk_file.close()
+            self.disk_file.put(fd, tmppath, metadata)
+            self.disk_file.close()
             os.write(fd, 'extra_data')
         self.auditor.audit_all_objects()
         self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
 
     def test_object_run_once_multi_devices(self):
         self.auditor = auditor.AuditorWorker(self.conf)
-        cur_part = '0'
         timestamp = str(normalize_timestamp(time.time()))
         pre_quarantines = self.auditor.quarantines
-        disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
         data = '0' * 10
         etag = md5()
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             os.write(fd, data)
             etag.update(data)
             etag = etag.hexdigest()
@@ -213,13 +204,14 @@ class TestAuditor(unittest.TestCase):
                 'X-Timestamp': timestamp,
                 'Content-Length': str(os.fstat(fd).st_size),
             }
-            disk_file.put(fd, tmppath, metadata)
-            disk_file.close()
+            self.disk_file.put(fd, tmppath, metadata)
+            self.disk_file.close()
         self.auditor.audit_all_objects()
-        disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob')
+        self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c',
+                                  'ob', self.logger)
         data = '1' * 10
         etag = md5()
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             os.write(fd, data)
             etag.update(data)
             etag = etag.hexdigest()
@@ -228,8 +220,8 @@ class TestAuditor(unittest.TestCase):
                 'X-Timestamp': timestamp,
                 'Content-Length': str(os.fstat(fd).st_size),
             }
-            disk_file.put(fd, tmppath, metadata)
-            disk_file.close()
+            self.disk_file.put(fd, tmppath, metadata)
+            self.disk_file.close()
             os.write(fd, 'extra_data')
         self.auditor.audit_all_objects()
         self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
@@ -237,11 +229,9 @@ class TestAuditor(unittest.TestCase):
     def test_object_run_fast_track_non_zero(self):
         self.auditor = auditor.ObjectAuditor(self.conf)
         self.auditor.log_time = 0
-        cur_part = '0'
-        disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
         data = '0' * 1024
         etag = md5()
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             os.write(fd, data)
             etag.update(data)
             etag = etag.hexdigest()
@@ -250,7 +240,7 @@ class TestAuditor(unittest.TestCase):
                 'X-Timestamp': str(normalize_timestamp(time.time())),
                 'Content-Length': str(os.fstat(fd).st_size),
             }
-            disk_file.put(fd, tmppath, metadata)
+            self.disk_file.put(fd, tmppath, metadata)
             etag = md5()
             etag.update('1' + '0' * 1023)
             etag = etag.hexdigest()
@@ -267,36 +257,33 @@ class TestAuditor(unittest.TestCase):
     def setup_bad_zero_byte(self, with_ts=False):
         self.auditor = auditor.ObjectAuditor(self.conf)
         self.auditor.log_time = 0
-        cur_part = '0'
         ts_file_path = ''
         if with_ts:
 
             name_hash = hash_path('a', 'c', 'o')
             dir_path = os.path.join(self.devices, 'sda',
-                               storage_directory(DATADIR, cur_part, name_hash))
+                               storage_directory(DATADIR, '0', name_hash))
             ts_file_path = os.path.join(dir_path, '99999.ts')
             if not os.path.exists(dir_path):
                 mkdirs(dir_path)
             fp = open(ts_file_path, 'w')
             fp.close()
 
-
-        disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
         etag = md5()
-        with disk_file.mkstemp() as (fd, tmppath):
+        with self.disk_file.mkstemp() as (fd, tmppath):
             etag = etag.hexdigest()
             metadata = {
                 'ETag': etag,
                 'X-Timestamp': str(normalize_timestamp(time.time())),
                 'Content-Length': 10,
             }
-            disk_file.put(fd, tmppath, metadata)
+            self.disk_file.put(fd, tmppath, metadata)
             etag = md5()
             etag = etag.hexdigest()
             metadata['ETag'] = etag
             write_metadata(fd, metadata)
-        if disk_file.data_file:
-            return disk_file.data_file
+        if self.disk_file.data_file:
+            return self.disk_file.data_file
         return ts_file_path
 
     def test_object_run_fast_track_all(self):
diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py
index 9558a2b0d8..018c2aa44f 100644
--- a/test/unit/obj/test_replicator.py
+++ b/test/unit/obj/test_replicator.py
@@ -26,8 +26,8 @@ import time
 import tempfile
 from contextlib import contextmanager
 from eventlet import tpool
-
 from eventlet.green import subprocess
+from test.unit import FakeLogger
 from swift.common import utils
 from swift.common.utils import hash_path, mkdirs, normalize_timestamp
 from swift.common import ring
@@ -164,7 +164,8 @@ class TestObjectReplicator(unittest.TestCase):
         was_connector = object_replicator.http_connect
         object_replicator.http_connect = mock_http_connect(200)
         cur_part = '0'
-        df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
+        df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
+                      FakeLogger())
         mkdirs(df.datadir)
         f = open(os.path.join(df.datadir,
                               normalize_timestamp(time.time()) + '.data'),
@@ -189,7 +190,7 @@ class TestObjectReplicator(unittest.TestCase):
         object_replicator.http_connect = was_connector
 
     def test_hash_suffix_one_file(self):
-        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
+        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
         mkdirs(df.datadir)
         f = open(os.path.join(df.datadir,
                      normalize_timestamp(time.time() - 100) + '.ts'),
@@ -206,7 +207,7 @@ class TestObjectReplicator(unittest.TestCase):
         self.assertEquals(len(os.listdir(self.parts['0'])), 0)
 
     def test_hash_suffix_multi_file_one(self):
-        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
+        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
         mkdirs(df.datadir)
         for tdiff in [1, 50, 100, 500]:
             for suff in ['.meta', '.data', '.ts']:
@@ -227,7 +228,7 @@ class TestObjectReplicator(unittest.TestCase):
         self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
 
     def test_hash_suffix_multi_file_two(self):
-        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
+        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
         mkdirs(df.datadir)
         for tdiff in [1, 50, 100, 500]:
             suffs = ['.meta', '.data']
@@ -257,7 +258,7 @@ class TestObjectReplicator(unittest.TestCase):
                 fdata = fp.read()
                 self.assertEquals(fdata, data)
 
-        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
+        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
         mkdirs(df.datadir)
         ohash = hash_path('a', 'c', 'o')
         data_dir = ohash[-3:]
@@ -312,7 +313,7 @@ class TestObjectReplicator(unittest.TestCase):
                               os.path.join(self.objects, part))
 
     def test_delete_partition(self):
-        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
+        df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
         mkdirs(df.datadir)
         ohash = hash_path('a', 'c', 'o')
         data_dir = ohash[-3:]
@@ -330,7 +331,8 @@ class TestObjectReplicator(unittest.TestCase):
         # Write some files into '1' and run replicate- they should be moved
         # to the other partitoins and then node should get deleted.
         cur_part = '1'
-        df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
+        df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
+                      FakeLogger())
         mkdirs(df.datadir)
         f = open(os.path.join(df.datadir,
                               normalize_timestamp(time.time()) + '.data'),
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index 7c9f097455..ee92559f87 100644
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -18,22 +18,25 @@
 import cPickle as pickle
 import os
 import sys
+import shutil
 import unittest
 from nose import SkipTest
 from shutil import rmtree
 from StringIO import StringIO
 from time import gmtime, sleep, strftime, time
 from tempfile import mkdtemp
+from hashlib import md5
 
 from eventlet import sleep, spawn, wsgi, listen
 from webob import Request
+from test.unit import FakeLogger
 from test.unit import _getxattr as getxattr
 from test.unit import _setxattr as setxattr
-
 from test.unit import connect_tcp, readuntil2crlfs
 from swift.obj import server as object_server
 from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
                                NullLogger, storage_directory
+from swift.common.exceptions import DiskFileNotExist
 
 
 class TestDiskFile(unittest.TestCase):
@@ -49,7 +52,8 @@ class TestDiskFile(unittest.TestCase):
         rmtree(os.path.dirname(self.testdir))
 
     def test_disk_file_app_iter_corners(self):
-        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
+                                    FakeLogger())
         mkdirs(df.datadir)
         f = open(os.path.join(df.datadir,
                               normalize_timestamp(time()) + '.data'), 'wb')
@@ -58,7 +62,7 @@ class TestDiskFile(unittest.TestCase):
                  pickle.dumps({}, object_server.PICKLE_PROTOCOL))
         f.close()
         df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
-                                    keep_data_fp=True)
+                                    FakeLogger(), keep_data_fp=True)
         it = df.app_iter_range(0, None)
         sio = StringIO()
         for chunk in it:
@@ -66,7 +70,7 @@ class TestDiskFile(unittest.TestCase):
         self.assertEquals(sio.getvalue(), '1234567890')
 
         df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
-                                    keep_data_fp=True)
+                                    FakeLogger(), keep_data_fp=True)
         it = df.app_iter_range(5, None)
         sio = StringIO()
         for chunk in it:
@@ -77,47 +81,201 @@ class TestDiskFile(unittest.TestCase):
         tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
         os.rmdir(tmpdir)
         with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
-                'o').mkstemp():
+                                    'o', FakeLogger()).mkstemp():
             self.assert_(os.path.exists(tmpdir))
 
     def test_quarantine(self):
-        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
+                                    FakeLogger())
         mkdirs(df.datadir)
         f = open(os.path.join(df.datadir,
                               normalize_timestamp(time()) + '.data'), 'wb')
         setxattr(f.fileno(), object_server.METADATA_KEY,
                  pickle.dumps({}, object_server.PICKLE_PROTOCOL))
-        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
-        object_server.DiskFile.quarantine(df.device_path, df.data_file)
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
+                                    FakeLogger())
+        df.quarantine()
         quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
                                 'objects', os.path.basename(os.path.dirname(
                                                             df.data_file)))
         self.assert_(os.path.isdir(quar_dir))
 
-    def test_quarantine_double(self):
-        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
+    def test_quarantine_same_file(self):
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
+                                    FakeLogger())
         mkdirs(df.datadir)
         f = open(os.path.join(df.datadir,
                               normalize_timestamp(time()) + '.data'), 'wb')
         setxattr(f.fileno(), object_server.METADATA_KEY,
                  pickle.dumps({}, object_server.PICKLE_PROTOCOL))
-        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
-        new_dir = object_server.DiskFile.quarantine(df.device_path,
-                                                    df.data_file)
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
+                                    FakeLogger())
+        new_dir = df.quarantine()
         quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
                                 'objects', os.path.basename(os.path.dirname(
                                                             df.data_file)))
         self.assert_(os.path.isdir(quar_dir))
         self.assertEquals(quar_dir, new_dir)
-        # have to remake the datadir
+        # have to remake the datadir and file
         mkdirs(df.datadir)
-        double_uuid_path = df.quarantine(df.device_path, df.data_file)
+        f = open(os.path.join(df.datadir,
+                              normalize_timestamp(time()) + '.data'), 'wb')
+        setxattr(f.fileno(), object_server.METADATA_KEY,
+                 pickle.dumps({}, object_server.PICKLE_PROTOCOL))
+
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
+                                    FakeLogger(), keep_data_fp=True)
+        double_uuid_path = df.quarantine()
         self.assert_(os.path.isdir(double_uuid_path))
         self.assert_('-' in os.path.basename(double_uuid_path))
 
+    def _get_data_file(self, invalid_type=None, obj_name='o',
+                       fsize=1024, csize=8, extension='.data', ts=None):
+        '''returns a DiskFile'''
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
+                                    obj_name, FakeLogger())
+        data = '0' * fsize
+        etag = md5()
+        if ts:
+            timestamp = ts
+        else:
+            timestamp = str(normalize_timestamp(time()))
+        with df.mkstemp() as (fd, tmppath):
+            os.write(fd, data)
+            etag.update(data)
+            etag = etag.hexdigest()
+            metadata = {
+                'ETag': etag,
+                'X-Timestamp': timestamp,
+                'Content-Length': str(os.fstat(fd).st_size),
+            }
+            df.put(fd, tmppath, metadata, extension=extension)
+            if invalid_type == 'ETag':
+                etag = md5()
+                etag.update('1' + '0' * (fsize-1))
+                etag = etag.hexdigest()
+                metadata['ETag'] = etag
+                object_server.write_metadata(fd, metadata)
+            if invalid_type == 'Content-Length':
+                metadata['Content-Length'] = fsize-1
+                object_server.write_metadata(fd, metadata)
+
+        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
+                                    obj_name, FakeLogger(),
+                                    keep_data_fp=True, disk_chunk_size=csize)
+        if invalid_type == 'Zero-Byte':
+            os.remove(df.data_file)
+            fp = open(df.data_file,'w')
+            fp.close()
+        df.unit_test_len = fsize
+        return df
+
+    def test_quarantine_valids(self):
+        df = self._get_data_file(obj_name='1')
+        for chunk in df:
+            pass
+        self.assertFalse(df.quarantined_dir)
+
+        df = self._get_data_file(obj_name='2', csize=1)
+        for chunk in df:
+            pass
+        self.assertFalse(df.quarantined_dir)
+
+        df = self._get_data_file(obj_name='3', csize=100000)
+        for chunk in df:
+            pass
+        self.assertFalse(df.quarantined_dir)
+
+    def run_quarantine_invalids(self, invalid_type):
+        df = self._get_data_file(invalid_type=invalid_type, obj_name='1')
+        for chunk in df:
+            pass
+        self.assertTrue(df.quarantined_dir)
+        df = self._get_data_file(invalid_type=invalid_type,
+                                 obj_name='2', csize=1)
+        for chunk in df:
+            pass
+        self.assertTrue(df.quarantined_dir)
+        df = self._get_data_file(invalid_type=invalid_type,
+                                 obj_name='3',csize=100000)
+        for chunk in df:
+            pass
+        self.assertTrue(df.quarantined_dir)
+        df = self._get_data_file(invalid_type=invalid_type, obj_name='4')
+        self.assertFalse(df.quarantined_dir)
+        df = self._get_data_file(invalid_type=invalid_type, obj_name='5')
+        for chunk in df.app_iter_range(0, df.unit_test_len):
+            pass
+        self.assertTrue(df.quarantined_dir)
+        df = self._get_data_file(invalid_type=invalid_type, obj_name='6')
+        for chunk in df.app_iter_range(0, df.unit_test_len + 100):
+            pass
+        self.assertTrue(df.quarantined_dir)
+        expected_quar = False
+        # for the following, Content-Length/Zero-Byte errors will always result
+        # in a quarantine, even if the whole file isn't check-summed
+        if invalid_type in ('Zero-Byte', 'Content-Length'):
+            expected_quar = True
+        df = self._get_data_file(invalid_type=invalid_type, obj_name='7')
+        for chunk in df.app_iter_range(1, df.unit_test_len):
+            pass
+        self.assertEquals(bool(df.quarantined_dir), expected_quar)
+        df = self._get_data_file(invalid_type=invalid_type, obj_name='8')
+        for chunk in df.app_iter_range(0, df.unit_test_len - 1):
+            pass
+        self.assertEquals(bool(df.quarantined_dir), expected_quar)
+        df = self._get_data_file(invalid_type=invalid_type, obj_name='8')
+        for chunk in df.app_iter_range(1, df.unit_test_len + 1):
+            pass
+        self.assertEquals(bool(df.quarantined_dir), expected_quar)
+
+    def test_quarantine_invalids(self):
+        self.run_quarantine_invalids('ETag')
+        self.run_quarantine_invalids('Content-Length')
+        self.run_quarantine_invalids('Zero-Byte')
+
+    def test_quarantine_deleted_files(self):
+        df = self._get_data_file(invalid_type='Content-Length',
+                                 extension='.data')
+        df.close()
+        self.assertTrue(df.quarantined_dir)
+
+        df = self._get_data_file(invalid_type='Content-Length',
+                                 extension='.ts')
+        df.close()
+        self.assertFalse(df.quarantined_dir)
+        df = self._get_data_file(invalid_type='Content-Length',
+                                 extension='.ts')
+        self.assertRaises(DiskFileNotExist, df.get_data_file_size)
+
     def test_unlinkold(self):
-        df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'ob')
-        
+        df1 = self._get_data_file()
+        future_time = str(normalize_timestamp(time()+100))
+        df2 = self._get_data_file(ts=future_time)
+        self.assertEquals(len(os.listdir(df1.datadir)), 2)
+        df1.unlinkold(future_time)
+        self.assertEquals(len(os.listdir(df1.datadir)), 1)
+        self.assertEquals(os.listdir(df1.datadir)[0], "%s.data" % future_time)
+
+    def test_close_error(self):
+        def err():
+            raise Exception("bad")
+        df = self._get_data_file(fsize=1024*1024*2)
+        df._handle_close_quarantine = err
+        for chunk in df:
+            pass
+        # close is called at the end of the iterator
+        self.assertEquals(df.fp, None)
+        self.assertEquals(len(df.logger.log_dict['error']), 1)
+
+    def test_quarantine_twice(self):
+        df = self._get_data_file(invalid_type='Content-Length',
+                                 extension='.data')
+        self.assert_(os.path.isfile(df.data_file))
+        quar_dir = df.quarantine()
+        self.assertFalse(os.path.isfile(df.data_file))
+        self.assert_(os.path.isdir(quar_dir))
+        self.assertEquals(df.quarantine(), None)
 
 
 class TestObjectController(unittest.TestCase):
@@ -251,6 +409,37 @@ class TestObjectController(unittest.TestCase):
         finally:
             object_server.http_connect = old_http_connect
 
+    def test_POST_quarantine_zbyte(self):
+        """ Test swift.object_server.ObjectController.GET """
+        timestamp = normalize_timestamp(time())
+        req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+                            headers={'X-Timestamp': timestamp,
+                                     'Content-Type': 'application/x-test'})
+        req.body = 'VERIFY'
+        resp = self.object_controller.PUT(req)
+        self.assertEquals(resp.status_int, 201)
+        file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
+                                      FakeLogger(), keep_data_fp=True)
+
+        file_name = os.path.basename(file.data_file)
+        with open(file.data_file) as fp:
+            metadata = object_server.read_metadata(fp)
+        os.unlink(file.data_file)
+        with open(file.data_file,'w') as fp:
+            object_server.write_metadata(fp, metadata)
+
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        req = Request.blank('/sda1/p/a/c/o',
+                        headers={'X-Timestamp': normalize_timestamp(time())})
+        resp = self.object_controller.POST(req)
+        self.assertEquals(resp.status_int, 404)
+
+        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
+                       os.path.basename(os.path.dirname(file.data_file)))
+        self.assertEquals(os.listdir(quar_dir)[0], file_name)
+
+
+
     def test_PUT_invalid_path(self):
         req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'})
         resp = self.object_controller.PUT(req)
@@ -520,6 +709,34 @@ class TestObjectController(unittest.TestCase):
         resp = self.object_controller.HEAD(req)
         self.assertEquals(resp.status_int, 404)
 
+    def test_HEAD_quarantine_zbyte(self):
+        """ Test swift.object_server.ObjectController.GET """
+        timestamp = normalize_timestamp(time())
+        req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+                            headers={'X-Timestamp': timestamp,
+                                     'Content-Type': 'application/x-test'})
+        req.body = 'VERIFY'
+        resp = self.object_controller.PUT(req)
+        self.assertEquals(resp.status_int, 201)
+        file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
+                                      FakeLogger(), keep_data_fp=True)
+
+        file_name = os.path.basename(file.data_file)
+        with open(file.data_file) as fp:
+            metadata = object_server.read_metadata(fp)
+        os.unlink(file.data_file)
+        with open(file.data_file,'w') as fp:
+            object_server.write_metadata(fp, metadata)
+
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        req = Request.blank('/sda1/p/a/c/o')
+        resp = self.object_controller.HEAD(req)
+        self.assertEquals(resp.status_int, 404)
+
+        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
+                       os.path.basename(os.path.dirname(file.data_file)))
+        self.assertEquals(os.listdir(quar_dir)[0], file_name)
+
     def test_GET(self):
         """ Test swift.object_server.ObjectController.GET """
         req = Request.blank('/sda1/p/a/c')
@@ -781,6 +998,117 @@ class TestObjectController(unittest.TestCase):
         resp = self.object_controller.GET(req)
         self.assertEquals(resp.status_int, 200)
 
+    def test_GET_quarantine(self):
+        """ Test swift.object_server.ObjectController.GET """
+        timestamp = normalize_timestamp(time())
+        req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+                            headers={'X-Timestamp': timestamp,
+                                     'Content-Type': 'application/x-test'})
+        req.body = 'VERIFY'
+        resp = self.object_controller.PUT(req)
+        self.assertEquals(resp.status_int, 201)
+        file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
+                                      FakeLogger(), keep_data_fp=True)
+        file_name = os.path.basename(file.data_file)
+        etag = md5()
+        etag.update('VERIF')
+        etag = etag.hexdigest()
+        metadata = {'X-Timestamp': timestamp,
+                    'Content-Length': 6, 'ETag': etag}
+        object_server.write_metadata(file.fp, metadata)
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        req = Request.blank('/sda1/p/a/c/o')
+        resp = self.object_controller.GET(req)
+        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
+                       os.path.basename(os.path.dirname(file.data_file)))
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        body = resp.body # actually does quarantining
+        self.assertEquals(body, 'VERIFY')
+        self.assertEquals(os.listdir(quar_dir)[0], file_name)
+        req = Request.blank('/sda1/p/a/c/o')
+        resp = self.object_controller.GET(req)
+        self.assertEquals(resp.status_int, 404)
+
+    def test_GET_quarantine_zbyte(self):
+        """ Test swift.object_server.ObjectController.GET """
+        timestamp = normalize_timestamp(time())
+        req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+                            headers={'X-Timestamp': timestamp,
+                                     'Content-Type': 'application/x-test'})
+        req.body = 'VERIFY'
+        resp = self.object_controller.PUT(req)
+        self.assertEquals(resp.status_int, 201)
+        file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
+                                      FakeLogger(), keep_data_fp=True)
+        file_name = os.path.basename(file.data_file)
+        with open(file.data_file) as fp:
+            metadata = object_server.read_metadata(fp)
+        os.unlink(file.data_file)
+        with open(file.data_file,'w') as fp:
+            object_server.write_metadata(fp, metadata)
+
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        req = Request.blank('/sda1/p/a/c/o')
+        resp = self.object_controller.GET(req)
+        self.assertEquals(resp.status_int, 404)
+
+        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
+                       os.path.basename(os.path.dirname(file.data_file)))
+        self.assertEquals(os.listdir(quar_dir)[0], file_name)
+
+
+    def test_GET_quarantine_range(self):
+        """ Test swift.object_server.ObjectController.GET """
+        timestamp = normalize_timestamp(time())
+        req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+                            headers={'X-Timestamp': timestamp,
+                                     'Content-Type': 'application/x-test'})
+        req.body = 'VERIFY'
+        resp = self.object_controller.PUT(req)
+        self.assertEquals(resp.status_int, 201)
+        file = object_server.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
+                                      FakeLogger(), keep_data_fp=True)
+        file_name = os.path.basename(file.data_file)
+        etag = md5()
+        etag.update('VERIF')
+        etag = etag.hexdigest()
+        metadata = {'X-Timestamp': timestamp,
+                    'Content-Length': 6, 'ETag': etag}
+        object_server.write_metadata(file.fp, metadata)
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        req = Request.blank('/sda1/p/a/c/o')
+        req.range = 'bytes=0-4'  # partial
+        resp = self.object_controller.GET(req)
+        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
+                            os.path.basename(os.path.dirname(file.data_file)))
+        body = resp.body
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        self.assertFalse(os.path.isdir(quar_dir))
+        req = Request.blank('/sda1/p/a/c/o')
+        resp = self.object_controller.GET(req)
+        self.assertEquals(resp.status_int, 200)
+
+        req = Request.blank('/sda1/p/a/c/o')
+        req.range = 'bytes=1-6'  # partial
+        resp = self.object_controller.GET(req)
+        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
+                            os.path.basename(os.path.dirname(file.data_file)))
+        body = resp.body
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        self.assertFalse(os.path.isdir(quar_dir))
+
+        req = Request.blank('/sda1/p/a/c/o')
+        req.range = 'bytes=0-14'  # full
+        resp = self.object_controller.GET(req)
+        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined', 'objects',
+                       os.path.basename(os.path.dirname(file.data_file)))
+        self.assertEquals(os.listdir(file.datadir)[0], file_name)
+        body = resp.body
+        self.assertTrue(os.path.isdir(quar_dir))
+        req = Request.blank('/sda1/p/a/c/o')
+        resp = self.object_controller.GET(req)
+        self.assertEquals(resp.status_int, 404)
+
     def test_DELETE(self):
         """ Test swift.object_server.ObjectController.DELETE """
         req = Request.blank('/sda1/p/a/c',