Per device replication_lock
New replication_one_per_device (True by default) that restricts incoming REPLICATION requests to one per device, replication_currency allowing. Also has replication_lock_timeout (15 by default) to control how long a request will wait to obtain a replication device lock before giving up. This should be very useful in that you can be assured any concurrent REPLICATION requests are each writing to distinct devices. If you have 100 devices on a server, you can set replication_concurrency to 100 and be confident that, even if 100 replication requests were executing concurrently, they'd each be writing to separate devices. Before, all 100 could end up writing to the same device, bringing it to a horrible crawl. NOTE: This is only for ssync replication. The current default rsync replication still has the potentially horrible behavior. Change-Id: I36e99a3d7e100699c76db6d3a4846514537ff685
This commit is contained in:
parent
c850580566
commit
c859ebf5ce
@ -408,6 +408,19 @@ threads_per_disk 0 Size of the per-disk thread pool
|
||||
replication_concurrency 4 Set to restrict the number of
|
||||
concurrent incoming REPLICATION
|
||||
requests; set to 0 for unlimited
|
||||
replication_one_per_device True Restricts incoming REPLICATION
|
||||
requests to one per device,
|
||||
replication_currency above
|
||||
allowing. This can help control
|
||||
I/O to each device, but you may
|
||||
wish to set this to False to
|
||||
allow multiple REPLICATION
|
||||
requests (up to the above
|
||||
replication_concurrency setting)
|
||||
per device.
|
||||
replication_lock_timeout 15 Number of seconds to wait for an
|
||||
existing replication device lock
|
||||
before giving up.
|
||||
replication_failure_threshold 100 The number of subrequest failures
|
||||
before the
|
||||
replication_failure_ratio is
|
||||
|
@ -105,6 +105,16 @@ use = egg:swift#object
|
||||
# Note that REPLICATION is currently an ssync only item
|
||||
# replication_concurrency = 4
|
||||
#
|
||||
# Restricts incoming REPLICATION requests to one per device,
|
||||
# replication_currency above allowing. This can help control I/O to each
|
||||
# device, but you may wish to set this to False to allow multiple REPLICATION
|
||||
# requests (up to the above replication_concurrency setting) per device.
|
||||
# replication_one_per_device = True
|
||||
#
|
||||
# Number of seconds to wait for an existing replication device lock before
|
||||
# giving up.
|
||||
# replication_lock_timeout = 15
|
||||
#
|
||||
# These next two settings control when the REPLICATION subrequest handler will
|
||||
# abort an incoming REPLICATION attempt. An abort will occur if there are at
|
||||
# least threshold number of failures and the value of failures / successes
|
||||
|
@ -122,3 +122,7 @@ class SegmentError(SwiftException):
|
||||
|
||||
class ReplicationException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ReplicationLockTimeout(LockTimeout):
|
||||
pass
|
||||
|
@ -1179,7 +1179,7 @@ def hash_path(account, container=None, object=None, raw_digest=False):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def lock_path(directory, timeout=10):
|
||||
def lock_path(directory, timeout=10, timeout_class=LockTimeout):
|
||||
"""
|
||||
Context manager that acquires a lock on a directory. This will block until
|
||||
the lock can be acquired, or the timeout time has expired (whichever occurs
|
||||
@ -1191,12 +1191,16 @@ def lock_path(directory, timeout=10):
|
||||
|
||||
:param directory: directory to be locked
|
||||
:param timeout: timeout (in seconds)
|
||||
:param timeout_class: The class of the exception to raise if the
|
||||
lock cannot be granted within the timeout. Will be
|
||||
constructed as timeout_class(timeout, lockpath). Default:
|
||||
LockTimeout
|
||||
"""
|
||||
mkdirs(directory)
|
||||
lockpath = '%s/.lock' % directory
|
||||
fd = os.open(lockpath, os.O_WRONLY | os.O_CREAT)
|
||||
try:
|
||||
with LockTimeout(timeout, lockpath):
|
||||
with timeout_class(timeout, lockpath):
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
@ -56,7 +56,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \
|
||||
config_true_value, listdir, split_path, ismount
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir
|
||||
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
|
||||
ReplicationLockTimeout
|
||||
from swift.common.swob import multi_range_iterator
|
||||
|
||||
|
||||
@ -382,6 +383,10 @@ class DiskFileManager(object):
|
||||
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.reclaim_age = int(conf.get('reclaim_age', ONE_WEEK))
|
||||
self.replication_one_per_device = config_true_value(
|
||||
conf.get('replication_one_per_device', 'true'))
|
||||
self.replication_lock_timeout = int(conf.get(
|
||||
'replication_lock_timeout', 15))
|
||||
threads_per_disk = int(conf.get('threads_per_disk', '0'))
|
||||
self.threadpools = defaultdict(
|
||||
lambda: ThreadPool(nthreads=threads_per_disk))
|
||||
@ -413,6 +418,25 @@ class DiskFileManager(object):
|
||||
dev_path = os.path.join(self.devices, device)
|
||||
return dev_path
|
||||
|
||||
@contextmanager
|
||||
def replication_lock(self, device):
|
||||
"""
|
||||
A context manager that will lock on the device given, if
|
||||
configured to do so.
|
||||
|
||||
:raises ReplicationLockTimeout: If the lock on the device
|
||||
cannot be granted within the configured timeout.
|
||||
"""
|
||||
if self.replication_one_per_device:
|
||||
dev_path = self.get_dev_path(device)
|
||||
with lock_path(
|
||||
dev_path,
|
||||
timeout=self.replication_lock_timeout,
|
||||
timeout_class=ReplicationLockTimeout):
|
||||
yield True
|
||||
else:
|
||||
yield True
|
||||
|
||||
def pickle_async_update(self, device, account, container, obj, data,
|
||||
timestamp):
|
||||
device_path = self.construct_dev_path(device)
|
||||
|
@ -97,6 +97,7 @@ class Receiver(object):
|
||||
if not self.app.replication_semaphore.acquire(False):
|
||||
raise swob.HTTPServiceUnavailable()
|
||||
try:
|
||||
with self.app._diskfile_mgr.replication_lock(self.device):
|
||||
for data in self.missing_check():
|
||||
yield data
|
||||
for data in self.updates():
|
||||
@ -104,6 +105,12 @@ class Receiver(object):
|
||||
finally:
|
||||
if self.app.replication_semaphore:
|
||||
self.app.replication_semaphore.release()
|
||||
except exceptions.ReplicationLockTimeout as err:
|
||||
self.app.logger.debug(
|
||||
'%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % (
|
||||
self.request.remote_addr, self.device, self.partition,
|
||||
err))
|
||||
yield ':ERROR: %d %r\n' % (0, str(err))
|
||||
except exceptions.MessageTimeout as err:
|
||||
self.app.logger.error(
|
||||
'%s/%s/%s TIMEOUT in replication.Receiver: %s' % (
|
||||
|
@ -25,6 +25,13 @@ class TestExceptions(unittest.TestCase):
|
||||
self.assertEqual(str(exceptions.ReplicationException()), '')
|
||||
self.assertEqual(str(exceptions.ReplicationException('test')), 'test')
|
||||
|
||||
def test_replication_lock_timeout(self):
|
||||
exc = exceptions.ReplicationLockTimeout(15, 'test')
|
||||
try:
|
||||
self.assertTrue(isinstance(exc, exceptions.MessageTimeout))
|
||||
finally:
|
||||
exc.cancel()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -49,7 +49,8 @@ from netifaces import AF_INET6
|
||||
from mock import MagicMock, patch
|
||||
|
||||
from swift.common.exceptions import (Timeout, MessageTimeout,
|
||||
ConnectionTimeout, LockTimeout)
|
||||
ConnectionTimeout, LockTimeout,
|
||||
ReplicationLockTimeout)
|
||||
from swift.common import utils
|
||||
from swift.common.swob import Response
|
||||
from test.unit import FakeLogger
|
||||
@ -138,6 +139,55 @@ class TestUtils(unittest.TestCase):
|
||||
utils.HASH_PATH_SUFFIX = 'endcap'
|
||||
utils.HASH_PATH_PREFIX = 'startcap'
|
||||
|
||||
def test_lock_path(self):
|
||||
tmpdir = mkdtemp()
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
exc = None
|
||||
success = False
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
success = True
|
||||
except LockTimeout as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
self.assertTrue(not success)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_lock_path_class(self):
|
||||
tmpdir = mkdtemp()
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout):
|
||||
exc = None
|
||||
exc2 = None
|
||||
success = False
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
exc = err
|
||||
except LockTimeout as err:
|
||||
exc2 = err
|
||||
self.assertTrue(exc is not None)
|
||||
self.assertTrue(exc2 is None)
|
||||
self.assertTrue(not success)
|
||||
exc = None
|
||||
exc2 = None
|
||||
success = False
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
success = True
|
||||
except ReplicationLockTimeout as err:
|
||||
exc = err
|
||||
except LockTimeout as err:
|
||||
exc2 = err
|
||||
self.assertTrue(exc is None)
|
||||
self.assertTrue(exc2 is not None)
|
||||
self.assertTrue(not success)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_normalize_timestamp(self):
|
||||
# Test swift.common.utils.normalize_timestamp
|
||||
self.assertEquals(utils.normalize_timestamp('1253327593.48174'),
|
||||
|
@ -41,7 +41,8 @@ from swift.common import utils
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
||||
from swift.common import ring
|
||||
from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \
|
||||
DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, DiskFileError
|
||||
DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \
|
||||
DiskFileError, ReplicationLockTimeout
|
||||
|
||||
|
||||
def _create_test_ring(path):
|
||||
@ -468,6 +469,72 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
|
||||
|
||||
class TestDiskFileManager(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
mkdirs(os.path.join(self.testdir, 'sda2', 'tmp'))
|
||||
self._orig_tpool_exc = tpool.execute
|
||||
tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs)
|
||||
self.conf = dict(devices=self.testdir, mount_check='false',
|
||||
keep_cache_size=2 * 1024)
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger())
|
||||
|
||||
def test_replication_lock_on(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = True
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, 'sda1')
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
except Exception as err:
|
||||
exc = err
|
||||
self.assertTrue(lock_exc is not None)
|
||||
self.assertTrue(exc is None)
|
||||
|
||||
def test_replication_lock_off(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = False
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, 'sda1')
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
except Exception as err:
|
||||
exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_replication_lock_another_device_fine(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = True
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, 'sda1')
|
||||
dev_path2 = os.path.join(self.testdir, 'sda2')
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
lock_exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(dev_path2):
|
||||
pass
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
|
||||
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
"""Test swift.obj.diskfile.DiskFile"""
|
||||
|
||||
|
@ -24,6 +24,7 @@ import eventlet
|
||||
import mock
|
||||
|
||||
from swift.common import constraints
|
||||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common import utils
|
||||
from swift.obj import diskfile
|
||||
@ -45,7 +46,10 @@ class TestReceiver(unittest.TestCase):
|
||||
self.testdir = os.path.join(
|
||||
tempfile.mkdtemp(), 'tmp_test_ssync_receiver')
|
||||
utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
conf = {'devices': self.testdir, 'mount_check': 'false'}
|
||||
conf = {
|
||||
'devices': self.testdir,
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false'}
|
||||
self.controller = server.ObjectController(conf)
|
||||
self.controller.bytes_per_sync = 1
|
||||
|
||||
@ -104,6 +108,46 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
|
||||
def test_REPLICATION_calls_replication_lock(self):
|
||||
with mock.patch.object(
|
||||
self.controller._diskfile_mgr, 'replication_lock') as \
|
||||
mocked_replication_lock:
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'REPLICATION'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = self.controller.REPLICATION(req)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
mocked_replication_lock.assert_called_once_with('sda1')
|
||||
|
||||
def test_REPLICATION_replication_lock_fail(self):
|
||||
def _mock(path):
|
||||
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
|
||||
eventlet.sleep(0.05)
|
||||
with mock.patch.object(
|
||||
self.controller._diskfile_mgr, 'replication_lock', _mock):
|
||||
self.controller._diskfile_mgr
|
||||
self.controller.logger = mock.MagicMock()
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'REPLICATION'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = self.controller.REPLICATION(req)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[":ERROR: 0 '0.01 seconds: /somewhere/sda1'"])
|
||||
self.controller.logger.debug.assert_called_once_with(
|
||||
'None/sda1/1 REPLICATION LOCK TIMEOUT: 0.01 seconds: '
|
||||
'/somewhere/sda1')
|
||||
|
||||
def test_REPLICATION_initial_path(self):
|
||||
with mock.patch.object(
|
||||
self.controller, 'replication_semaphore') as \
|
||||
|
Loading…
Reference in New Issue
Block a user