Merge "make swift fsync"
This commit is contained in:
commit
9e006183f8
@ -64,7 +64,6 @@ logging._levelNames[NOTICE] = 'NOTICE'
|
|||||||
SysLogHandler.priority_map['NOTICE'] = 'notice'
|
SysLogHandler.priority_map['NOTICE'] = 'notice'
|
||||||
|
|
||||||
# These are lazily pulled from libc elsewhere
|
# These are lazily pulled from libc elsewhere
|
||||||
_sys_fsync = None
|
|
||||||
_sys_fallocate = None
|
_sys_fallocate = None
|
||||||
_posix_fadvise = None
|
_posix_fadvise = None
|
||||||
|
|
||||||
@ -231,46 +230,31 @@ def fallocate(fd, size):
|
|||||||
raise OSError(err, 'Unable to fallocate(%s)' % size)
|
raise OSError(err, 'Unable to fallocate(%s)' % size)
|
||||||
|
|
||||||
|
|
||||||
class FsyncWrapper(object):
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
if hasattr(os, 'fdatasync'):
|
|
||||||
self.func_name = 'fdatasync'
|
|
||||||
self.fsync = os.fdatasync
|
|
||||||
self.fcntl_flag = None
|
|
||||||
elif hasattr(fcntl, 'F_FULLFSYNC'):
|
|
||||||
self.func_name = 'fcntl'
|
|
||||||
self.fsync = fcntl.fcntl
|
|
||||||
self.fcntl_flag = fcntl.F_FULLFSYNC
|
|
||||||
else:
|
|
||||||
self.func_name = 'fsync'
|
|
||||||
self.fsync = os.fsync
|
|
||||||
self.fcntl_flag = None
|
|
||||||
|
|
||||||
def __call__(self, fd):
|
|
||||||
args = {
|
|
||||||
'fdatasync': (fd, ),
|
|
||||||
'fsync': (fd, ),
|
|
||||||
'fcntl': (fd, self.fcntl_flag)
|
|
||||||
}
|
|
||||||
return self.fsync(*args[self.func_name])
|
|
||||||
|
|
||||||
|
|
||||||
def fsync(fd):
|
def fsync(fd):
|
||||||
"""
|
"""
|
||||||
Write buffered changes to disk.
|
Sync modified file data and metadata to disk.
|
||||||
|
|
||||||
:param fd: file descriptor
|
:param fd: file descriptor
|
||||||
"""
|
"""
|
||||||
|
if hasattr(fcntl, 'F_FULLSYNC'):
|
||||||
|
try:
|
||||||
|
fcntl.fcntl(fd, fcntl.F_FULLSYNC)
|
||||||
|
except IOError as e:
|
||||||
|
raise OSError(e.errno, 'Unable to F_FULLSYNC(%s)' % fd)
|
||||||
|
else:
|
||||||
|
os.fsync(fd)
|
||||||
|
|
||||||
global _sys_fsync
|
|
||||||
if _sys_fsync is None:
|
|
||||||
_sys_fsync = FsyncWrapper()
|
|
||||||
|
|
||||||
ret = _sys_fsync(fd)
|
def fdatasync(fd):
|
||||||
err = ctypes.get_errno()
|
"""
|
||||||
if ret and err != 0:
|
Sync modified file data to disk.
|
||||||
raise OSError(err, 'Unable to fsync(%s)' % fd)
|
|
||||||
|
:param fd: file descriptor
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
os.fdatasync(fd)
|
||||||
|
except AttributeError:
|
||||||
|
fsync(fd)
|
||||||
|
|
||||||
|
|
||||||
def drop_buffer_cache(fd, offset, length):
|
def drop_buffer_cache(fd, offset, length):
|
||||||
|
@ -31,7 +31,7 @@ from xattr import getxattr, setxattr
|
|||||||
from eventlet import sleep, Timeout, tpool
|
from eventlet import sleep, Timeout, tpool
|
||||||
|
|
||||||
from swift.common.utils import mkdirs, normalize_timestamp, public, \
|
from swift.common.utils import mkdirs, normalize_timestamp, public, \
|
||||||
storage_directory, hash_path, renamer, fallocate, fsync, \
|
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
|
||||||
split_path, drop_buffer_cache, get_logger, write_pickle, \
|
split_path, drop_buffer_cache, get_logger, write_pickle, \
|
||||||
config_true_value, validate_device_partition, timing_stats
|
config_true_value, validate_device_partition, timing_stats
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
@ -659,7 +659,7 @@ class ObjectController(object):
|
|||||||
chunk = chunk[written:]
|
chunk = chunk[written:]
|
||||||
# For large files sync every 512MB (by default) written
|
# For large files sync every 512MB (by default) written
|
||||||
if upload_size - last_sync >= self.bytes_per_sync:
|
if upload_size - last_sync >= self.bytes_per_sync:
|
||||||
tpool.execute(fsync, fd)
|
tpool.execute(fdatasync, fd)
|
||||||
drop_buffer_cache(fd, last_sync, upload_size - last_sync)
|
drop_buffer_cache(fd, last_sync, upload_size - last_sync)
|
||||||
last_sync = upload_size
|
last_sync = upload_size
|
||||||
sleep()
|
sleep()
|
||||||
|
@ -38,6 +38,7 @@ from tempfile import TemporaryFile, NamedTemporaryFile
|
|||||||
from logging import handlers as logging_handlers
|
from logging import handlers as logging_handlers
|
||||||
|
|
||||||
from eventlet import sleep
|
from eventlet import sleep
|
||||||
|
from mock import patch
|
||||||
|
|
||||||
from swift.common.exceptions import (Timeout, MessageTimeout,
|
from swift.common.exceptions import (Timeout, MessageTimeout,
|
||||||
ConnectionTimeout)
|
ConnectionTimeout)
|
||||||
@ -1545,6 +1546,57 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
|
|||||||
finally:
|
finally:
|
||||||
logger.thread_locals = orig_thread_locals
|
logger.thread_locals = orig_thread_locals
|
||||||
|
|
||||||
|
def test_no_fdatasync(self):
|
||||||
|
called = []
|
||||||
|
class NoFdatasync:
|
||||||
|
pass
|
||||||
|
def fsync(fd):
|
||||||
|
called.append(fd)
|
||||||
|
with patch('swift.common.utils.os', NoFdatasync()):
|
||||||
|
with patch('swift.common.utils.fsync', fsync):
|
||||||
|
utils.fdatasync(12345)
|
||||||
|
self.assertEquals(called, [12345])
|
||||||
|
|
||||||
|
def test_yes_fdatasync(self):
|
||||||
|
called = []
|
||||||
|
class YesFdatasync:
|
||||||
|
def fdatasync(self, fd):
|
||||||
|
called.append(fd)
|
||||||
|
with patch('swift.common.utils.os', YesFdatasync()):
|
||||||
|
utils.fdatasync(12345)
|
||||||
|
self.assertEquals(called, [12345])
|
||||||
|
|
||||||
|
def test_fsync_bad_fullsync(self):
|
||||||
|
called = []
|
||||||
|
class FCNTL:
|
||||||
|
F_FULLSYNC = 123
|
||||||
|
def fcntl(self, fd, op):
|
||||||
|
raise IOError(18)
|
||||||
|
with patch('swift.common.utils.fcntl', FCNTL()):
|
||||||
|
self.assertRaises(OSError, lambda: utils.fsync(12345))
|
||||||
|
|
||||||
|
def test_fsync_f_fullsync(self):
|
||||||
|
called = []
|
||||||
|
class FCNTL:
|
||||||
|
F_FULLSYNC = 123
|
||||||
|
def fcntl(self, fd, op):
|
||||||
|
called[:] = [fd, op]
|
||||||
|
return 0
|
||||||
|
with patch('swift.common.utils.fcntl', FCNTL()):
|
||||||
|
utils.fsync(12345)
|
||||||
|
self.assertEquals(called, [12345, 123])
|
||||||
|
|
||||||
|
def test_fsync_no_fullsync(self):
|
||||||
|
called = []
|
||||||
|
class FCNTL:
|
||||||
|
pass
|
||||||
|
def fsync(fd):
|
||||||
|
called.append(fd)
|
||||||
|
with patch('swift.common.utils.fcntl', FCNTL()):
|
||||||
|
with patch('os.fsync', fsync):
|
||||||
|
utils.fsync(12345)
|
||||||
|
self.assertEquals(called, [12345])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user