Merge "Per device replication_lock"
This commit is contained in:
commit
5989849512
@ -408,6 +408,19 @@ threads_per_disk 0 Size of the per-disk thread pool
|
||||
replication_concurrency 4 Set to restrict the number of
|
||||
concurrent incoming REPLICATION
|
||||
requests; set to 0 for unlimited
|
||||
replication_one_per_device True Restricts incoming REPLICATION
|
||||
requests to one per device,
|
||||
replication_currency above
|
||||
allowing. This can help control
|
||||
I/O to each device, but you may
|
||||
wish to set this to False to
|
||||
allow multiple REPLICATION
|
||||
requests (up to the above
|
||||
replication_concurrency setting)
|
||||
per device.
|
||||
replication_lock_timeout 15 Number of seconds to wait for an
|
||||
existing replication device lock
|
||||
before giving up.
|
||||
replication_failure_threshold 100 The number of subrequest failures
|
||||
before the
|
||||
replication_failure_ratio is
|
||||
|
@ -105,6 +105,16 @@ use = egg:swift#object
|
||||
# Note that REPLICATION is currently an ssync only item
|
||||
# replication_concurrency = 4
|
||||
#
|
||||
# Restricts incoming REPLICATION requests to one per device,
|
||||
# replication_currency above allowing. This can help control I/O to each
|
||||
# device, but you may wish to set this to False to allow multiple REPLICATION
|
||||
# requests (up to the above replication_concurrency setting) per device.
|
||||
# replication_one_per_device = True
|
||||
#
|
||||
# Number of seconds to wait for an existing replication device lock before
|
||||
# giving up.
|
||||
# replication_lock_timeout = 15
|
||||
#
|
||||
# These next two settings control when the REPLICATION subrequest handler will
|
||||
# abort an incoming REPLICATION attempt. An abort will occur if there are at
|
||||
# least threshold number of failures and the value of failures / successes
|
||||
|
@ -122,3 +122,7 @@ class SegmentError(SwiftException):
|
||||
|
||||
class ReplicationException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ReplicationLockTimeout(LockTimeout):
|
||||
pass
|
||||
|
@ -1179,7 +1179,7 @@ def hash_path(account, container=None, object=None, raw_digest=False):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def lock_path(directory, timeout=10):
|
||||
def lock_path(directory, timeout=10, timeout_class=LockTimeout):
|
||||
"""
|
||||
Context manager that acquires a lock on a directory. This will block until
|
||||
the lock can be acquired, or the timeout time has expired (whichever occurs
|
||||
@ -1191,12 +1191,16 @@ def lock_path(directory, timeout=10):
|
||||
|
||||
:param directory: directory to be locked
|
||||
:param timeout: timeout (in seconds)
|
||||
:param timeout_class: The class of the exception to raise if the
|
||||
lock cannot be granted within the timeout. Will be
|
||||
constructed as timeout_class(timeout, lockpath). Default:
|
||||
LockTimeout
|
||||
"""
|
||||
mkdirs(directory)
|
||||
lockpath = '%s/.lock' % directory
|
||||
fd = os.open(lockpath, os.O_WRONLY | os.O_CREAT)
|
||||
try:
|
||||
with LockTimeout(timeout, lockpath):
|
||||
with timeout_class(timeout, lockpath):
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
@ -55,7 +55,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \
|
||||
config_true_value, listdir, split_path, ismount
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir
|
||||
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
|
||||
ReplicationLockTimeout
|
||||
from swift.common.swob import multi_range_iterator
|
||||
|
||||
|
||||
@ -381,6 +382,10 @@ class DiskFileManager(object):
|
||||
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.reclaim_age = int(conf.get('reclaim_age', ONE_WEEK))
|
||||
self.replication_one_per_device = config_true_value(
|
||||
conf.get('replication_one_per_device', 'true'))
|
||||
self.replication_lock_timeout = int(conf.get(
|
||||
'replication_lock_timeout', 15))
|
||||
threads_per_disk = int(conf.get('threads_per_disk', '0'))
|
||||
self.threadpools = defaultdict(
|
||||
lambda: ThreadPool(nthreads=threads_per_disk))
|
||||
@ -412,6 +417,25 @@ class DiskFileManager(object):
|
||||
dev_path = os.path.join(self.devices, device)
|
||||
return dev_path
|
||||
|
||||
@contextmanager
|
||||
def replication_lock(self, device):
|
||||
"""
|
||||
A context manager that will lock on the device given, if
|
||||
configured to do so.
|
||||
|
||||
:raises ReplicationLockTimeout: If the lock on the device
|
||||
cannot be granted within the configured timeout.
|
||||
"""
|
||||
if self.replication_one_per_device:
|
||||
dev_path = self.get_dev_path(device)
|
||||
with lock_path(
|
||||
dev_path,
|
||||
timeout=self.replication_lock_timeout,
|
||||
timeout_class=ReplicationLockTimeout):
|
||||
yield True
|
||||
else:
|
||||
yield True
|
||||
|
||||
def pickle_async_update(self, device, account, container, obj, data,
|
||||
timestamp):
|
||||
device_path = self.construct_dev_path(device)
|
||||
|
@ -95,13 +95,20 @@ class Receiver(object):
|
||||
if not self.app.replication_semaphore.acquire(False):
|
||||
raise swob.HTTPServiceUnavailable()
|
||||
try:
|
||||
for data in self.missing_check():
|
||||
yield data
|
||||
for data in self.updates():
|
||||
yield data
|
||||
with self.app._diskfile_mgr.replication_lock(self.device):
|
||||
for data in self.missing_check():
|
||||
yield data
|
||||
for data in self.updates():
|
||||
yield data
|
||||
finally:
|
||||
if self.app.replication_semaphore:
|
||||
self.app.replication_semaphore.release()
|
||||
except exceptions.ReplicationLockTimeout as err:
|
||||
self.app.logger.debug(
|
||||
'%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % (
|
||||
self.request.remote_addr, self.device, self.partition,
|
||||
err))
|
||||
yield ':ERROR: %d %r\n' % (0, str(err))
|
||||
except exceptions.MessageTimeout as err:
|
||||
self.app.logger.error(
|
||||
'%s/%s/%s TIMEOUT in replication.Receiver: %s' % (
|
||||
|
@ -25,6 +25,13 @@ class TestExceptions(unittest.TestCase):
|
||||
self.assertEqual(str(exceptions.ReplicationException()), '')
|
||||
self.assertEqual(str(exceptions.ReplicationException('test')), 'test')
|
||||
|
||||
def test_replication_lock_timeout(self):
|
||||
exc = exceptions.ReplicationLockTimeout(15, 'test')
|
||||
try:
|
||||
self.assertTrue(isinstance(exc, exceptions.MessageTimeout))
|
||||
finally:
|
||||
exc.cancel()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -48,7 +48,8 @@ from netifaces import AF_INET6
|
||||
from mock import MagicMock, patch
|
||||
|
||||
from swift.common.exceptions import (Timeout, MessageTimeout,
|
||||
ConnectionTimeout, LockTimeout)
|
||||
ConnectionTimeout, LockTimeout,
|
||||
ReplicationLockTimeout)
|
||||
from swift.common import utils
|
||||
from swift.common.swob import Response
|
||||
from test.unit import FakeLogger
|
||||
@ -137,6 +138,55 @@ class TestUtils(unittest.TestCase):
|
||||
utils.HASH_PATH_SUFFIX = 'endcap'
|
||||
utils.HASH_PATH_PREFIX = 'startcap'
|
||||
|
||||
def test_lock_path(self):
|
||||
tmpdir = mkdtemp()
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
exc = None
|
||||
success = False
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
success = True
|
||||
except LockTimeout as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
self.assertTrue(not success)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_lock_path_class(self):
|
||||
tmpdir = mkdtemp()
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout):
|
||||
exc = None
|
||||
exc2 = None
|
||||
success = False
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
exc = err
|
||||
except LockTimeout as err:
|
||||
exc2 = err
|
||||
self.assertTrue(exc is not None)
|
||||
self.assertTrue(exc2 is None)
|
||||
self.assertTrue(not success)
|
||||
exc = None
|
||||
exc2 = None
|
||||
success = False
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
exc = err
|
||||
except LockTimeout as err:
|
||||
exc2 = err
|
||||
self.assertTrue(exc is None)
|
||||
self.assertTrue(exc2 is not None)
|
||||
self.assertTrue(not success)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_normalize_timestamp(self):
|
||||
# Test swift.common.utils.normalize_timestamp
|
||||
self.assertEquals(utils.normalize_timestamp('1253327593.48174'),
|
||||
|
@ -39,7 +39,8 @@ from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
||||
from swift.common import ring
|
||||
from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \
|
||||
DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, DiskFileError
|
||||
DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \
|
||||
DiskFileError, ReplicationLockTimeout
|
||||
|
||||
|
||||
def _create_test_ring(path):
|
||||
@ -466,6 +467,72 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
|
||||
|
||||
class TestDiskFileManager(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
mkdirs(os.path.join(self.testdir, 'sda2', 'tmp'))
|
||||
self._orig_tpool_exc = tpool.execute
|
||||
tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs)
|
||||
self.conf = dict(devices=self.testdir, mount_check='false',
|
||||
keep_cache_size=2 * 1024)
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger())
|
||||
|
||||
def test_replication_lock_on(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = True
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, 'sda1')
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
except Exception as err:
|
||||
exc = err
|
||||
self.assertTrue(lock_exc is not None)
|
||||
self.assertTrue(exc is None)
|
||||
|
||||
def test_replication_lock_off(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = False
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, 'sda1')
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
except Exception as err:
|
||||
exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_replication_lock_another_device_fine(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = True
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, 'sda1')
|
||||
dev_path2 = os.path.join(self.testdir, 'sda2')
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
lock_exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(dev_path2):
|
||||
pass
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
|
||||
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
"""Test swift.obj.diskfile.DiskFile"""
|
||||
|
||||
|
@ -24,6 +24,7 @@ import eventlet
|
||||
import mock
|
||||
|
||||
from swift.common import constraints
|
||||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common import utils
|
||||
from swift.obj import diskfile
|
||||
@ -45,7 +46,10 @@ class TestReceiver(unittest.TestCase):
|
||||
self.testdir = os.path.join(
|
||||
tempfile.mkdtemp(), 'tmp_test_ssync_receiver')
|
||||
utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
conf = {'devices': self.testdir, 'mount_check': 'false'}
|
||||
conf = {
|
||||
'devices': self.testdir,
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false'}
|
||||
self.controller = server.ObjectController(conf)
|
||||
self.controller.bytes_per_sync = 1
|
||||
|
||||
@ -104,6 +108,46 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
|
||||
def test_REPLICATION_calls_replication_lock(self):
|
||||
with mock.patch.object(
|
||||
self.controller._diskfile_mgr, 'replication_lock') as \
|
||||
mocked_replication_lock:
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'REPLICATION'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = self.controller.REPLICATION(req)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
mocked_replication_lock.assert_called_once_with('sda1')
|
||||
|
||||
def test_REPLICATION_replication_lock_fail(self):
|
||||
def _mock(path):
|
||||
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
|
||||
eventlet.sleep(0.05)
|
||||
with mock.patch.object(
|
||||
self.controller._diskfile_mgr, 'replication_lock', _mock):
|
||||
self.controller._diskfile_mgr
|
||||
self.controller.logger = mock.MagicMock()
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'REPLICATION'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = self.controller.REPLICATION(req)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[":ERROR: 0 '0.01 seconds: /somewhere/sda1'"])
|
||||
self.controller.logger.debug.assert_called_once_with(
|
||||
'None/sda1/1 REPLICATION LOCK TIMEOUT: 0.01 seconds: '
|
||||
'/somewhere/sda1')
|
||||
|
||||
def test_REPLICATION_initial_path(self):
|
||||
with mock.patch.object(
|
||||
self.controller, 'replication_semaphore') as \
|
||||
|
Loading…
Reference in New Issue
Block a user