From c859ebf5ce1161e0fc2ca5258b8d3f45e29fc9ea Mon Sep 17 00:00:00 2001 From: gholt Date: Sat, 9 Nov 2013 03:18:11 +0000 Subject: [PATCH] Per device replication_lock New replication_one_per_device (True by default) that restricts incoming REPLICATION requests to one per device, replication_currency allowing. Also has replication_lock_timeout (15 by default) to control how long a request will wait to obtain a replication device lock before giving up. This should be very useful in that you can be assured any concurrent REPLICATION requests are each writing to distinct devices. If you have 100 devices on a server, you can set replication_concurrency to 100 and be confident that, even if 100 replication requests were executing concurrently, they'd each be writing to separate devices. Before, all 100 could end up writing to the same device, bringing it to a horrible crawl. NOTE: This is only for ssync replication. The current default rsync replication still has the potentially horrible behavior. Change-Id: I36e99a3d7e100699c76db6d3a4846514537ff685 --- doc/source/deployment_guide.rst | 13 ++++++ etc/object-server.conf-sample | 10 ++++ swift/common/exceptions.py | 4 ++ swift/common/utils.py | 8 +++- swift/obj/diskfile.py | 26 ++++++++++- swift/obj/ssync_receiver.py | 15 ++++-- test/unit/common/test_exceptions.py | 7 +++ test/unit/common/test_utils.py | 52 ++++++++++++++++++++- test/unit/obj/test_diskfile.py | 69 +++++++++++++++++++++++++++- test/unit/obj/test_ssync_receiver.py | 46 ++++++++++++++++++- 10 files changed, 240 insertions(+), 10 deletions(-) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 90470671cc..31b5eed8cb 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -408,6 +408,19 @@ threads_per_disk 0 Size of the per-disk thread pool replication_concurrency 4 Set to restrict the number of concurrent incoming REPLICATION requests; set to 0 for unlimited +replication_one_per_device True Restricts incoming REPLICATION + requests to one per device, + replication_currency above + allowing. This can help control + I/O to each device, but you may + wish to set this to False to + allow multiple REPLICATION + requests (up to the above + replication_concurrency setting) + per device. +replication_lock_timeout 15 Number of seconds to wait for an + existing replication device lock + before giving up. replication_failure_threshold 100 The number of subrequest failures before the replication_failure_ratio is diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 015b816a10..ee06929a4e 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -105,6 +105,16 @@ use = egg:swift#object # Note that REPLICATION is currently an ssync only item # replication_concurrency = 4 # +# Restricts incoming REPLICATION requests to one per device, +# replication_currency above allowing. This can help control I/O to each +# device, but you may wish to set this to False to allow multiple REPLICATION +# requests (up to the above replication_concurrency setting) per device. +# replication_one_per_device = True +# +# Number of seconds to wait for an existing replication device lock before +# giving up. +# replication_lock_timeout = 15 +# # These next two settings control when the REPLICATION subrequest handler will # abort an incoming REPLICATION attempt. An abort will occur if there are at # least threshold number of failures and the value of failures / successes diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 985088c7fa..5943f895d4 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -122,3 +122,7 @@ class SegmentError(SwiftException): class ReplicationException(Exception): pass + + +class ReplicationLockTimeout(LockTimeout): + pass diff --git a/swift/common/utils.py b/swift/common/utils.py index 6456e418d3..1290b94d73 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -1179,7 +1179,7 @@ def hash_path(account, container=None, object=None, raw_digest=False): @contextmanager -def lock_path(directory, timeout=10): +def lock_path(directory, timeout=10, timeout_class=LockTimeout): """ Context manager that acquires a lock on a directory. This will block until the lock can be acquired, or the timeout time has expired (whichever occurs @@ -1191,12 +1191,16 @@ def lock_path(directory, timeout=10): :param directory: directory to be locked :param timeout: timeout (in seconds) + :param timeout_class: The class of the exception to raise if the + lock cannot be granted within the timeout. Will be + constructed as timeout_class(timeout, lockpath). Default: + LockTimeout """ mkdirs(directory) lockpath = '%s/.lock' % directory fd = os.open(lockpath, os.O_WRONLY | os.O_CREAT) try: - with LockTimeout(timeout, lockpath): + with timeout_class(timeout, lockpath): while True: try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 09c7d2f529..53465c25fe 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -56,7 +56,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \ config_true_value, listdir, split_path, ismount from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ - DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir + DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \ + ReplicationLockTimeout from swift.common.swob import multi_range_iterator @@ -382,6 +383,10 @@ class DiskFileManager(object): self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024 self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.reclaim_age = int(conf.get('reclaim_age', ONE_WEEK)) + self.replication_one_per_device = config_true_value( + conf.get('replication_one_per_device', 'true')) + self.replication_lock_timeout = int(conf.get( + 'replication_lock_timeout', 15)) threads_per_disk = int(conf.get('threads_per_disk', '0')) self.threadpools = defaultdict( lambda: ThreadPool(nthreads=threads_per_disk)) @@ -413,6 +418,25 @@ class DiskFileManager(object): dev_path = os.path.join(self.devices, device) return dev_path + @contextmanager + def replication_lock(self, device): + """ + A context manager that will lock on the device given, if + configured to do so. + + :raises ReplicationLockTimeout: If the lock on the device + cannot be granted within the configured timeout. + """ + if self.replication_one_per_device: + dev_path = self.get_dev_path(device) + with lock_path( + dev_path, + timeout=self.replication_lock_timeout, + timeout_class=ReplicationLockTimeout): + yield True + else: + yield True + def pickle_async_update(self, device, account, container, obj, data, timestamp): device_path = self.construct_dev_path(device) diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 88b555188a..a3bc84cbba 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -97,13 +97,20 @@ class Receiver(object): if not self.app.replication_semaphore.acquire(False): raise swob.HTTPServiceUnavailable() try: - for data in self.missing_check(): - yield data - for data in self.updates(): - yield data + with self.app._diskfile_mgr.replication_lock(self.device): + for data in self.missing_check(): + yield data + for data in self.updates(): + yield data finally: if self.app.replication_semaphore: self.app.replication_semaphore.release() + except exceptions.ReplicationLockTimeout as err: + self.app.logger.debug( + '%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % ( + self.request.remote_addr, self.device, self.partition, + err)) + yield ':ERROR: %d %r\n' % (0, str(err)) except exceptions.MessageTimeout as err: self.app.logger.error( '%s/%s/%s TIMEOUT in replication.Receiver: %s' % ( diff --git a/test/unit/common/test_exceptions.py b/test/unit/common/test_exceptions.py index 83dd25acbd..04adfe2bdd 100644 --- a/test/unit/common/test_exceptions.py +++ b/test/unit/common/test_exceptions.py @@ -25,6 +25,13 @@ class TestExceptions(unittest.TestCase): self.assertEqual(str(exceptions.ReplicationException()), '') self.assertEqual(str(exceptions.ReplicationException('test')), 'test') + def test_replication_lock_timeout(self): + exc = exceptions.ReplicationLockTimeout(15, 'test') + try: + self.assertTrue(isinstance(exc, exceptions.MessageTimeout)) + finally: + exc.cancel() + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 2595597a6f..baf839f5ed 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -49,7 +49,8 @@ from netifaces import AF_INET6 from mock import MagicMock, patch from swift.common.exceptions import (Timeout, MessageTimeout, - ConnectionTimeout, LockTimeout) + ConnectionTimeout, LockTimeout, + ReplicationLockTimeout) from swift.common import utils from swift.common.swob import Response from test.unit import FakeLogger @@ -138,6 +139,55 @@ class TestUtils(unittest.TestCase): utils.HASH_PATH_SUFFIX = 'endcap' utils.HASH_PATH_PREFIX = 'startcap' + def test_lock_path(self): + tmpdir = mkdtemp() + try: + with utils.lock_path(tmpdir, 0.1): + exc = None + success = False + try: + with utils.lock_path(tmpdir, 0.1): + success = True + except LockTimeout as err: + exc = err + self.assertTrue(exc is not None) + self.assertTrue(not success) + finally: + shutil.rmtree(tmpdir) + + def test_lock_path_class(self): + tmpdir = mkdtemp() + try: + with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout): + exc = None + exc2 = None + success = False + try: + with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout): + success = True + except ReplicationLockTimeout as err: + exc = err + except LockTimeout as err: + exc2 = err + self.assertTrue(exc is not None) + self.assertTrue(exc2 is None) + self.assertTrue(not success) + exc = None + exc2 = None + success = False + try: + with utils.lock_path(tmpdir, 0.1): + success = True + except ReplicationLockTimeout as err: + exc = err + except LockTimeout as err: + exc2 = err + self.assertTrue(exc is None) + self.assertTrue(exc2 is not None) + self.assertTrue(not success) + finally: + shutil.rmtree(tmpdir) + def test_normalize_timestamp(self): # Test swift.common.utils.normalize_timestamp self.assertEquals(utils.normalize_timestamp('1253327593.48174'), diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index cf99530b58..08f5723c0e 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -41,7 +41,8 @@ from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common import ring from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ - DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, DiskFileError + DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \ + DiskFileError, ReplicationLockTimeout def _create_test_ring(path): @@ -468,6 +469,72 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): self.assertRaises(OSError, list_locations, tmpdir) +class TestDiskFileManager(unittest.TestCase): + + def setUp(self): + self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile') + mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) + mkdirs(os.path.join(self.testdir, 'sda2', 'tmp')) + self._orig_tpool_exc = tpool.execute + tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs) + self.conf = dict(devices=self.testdir, mount_check='false', + keep_cache_size=2 * 1024) + self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger()) + + def test_replication_lock_on(self): + # Double check settings + self.df_mgr.replication_one_per_device = True + self.df_mgr.replication_lock_timeout = 0.1 + dev_path = os.path.join(self.testdir, 'sda1') + with self.df_mgr.replication_lock(dev_path): + lock_exc = None + exc = None + try: + with self.df_mgr.replication_lock(dev_path): + raise Exception( + '%r was not replication locked!' % dev_path) + except ReplicationLockTimeout as err: + lock_exc = err + except Exception as err: + exc = err + self.assertTrue(lock_exc is not None) + self.assertTrue(exc is None) + + def test_replication_lock_off(self): + # Double check settings + self.df_mgr.replication_one_per_device = False + self.df_mgr.replication_lock_timeout = 0.1 + dev_path = os.path.join(self.testdir, 'sda1') + with self.df_mgr.replication_lock(dev_path): + lock_exc = None + exc = None + try: + with self.df_mgr.replication_lock(dev_path): + raise Exception( + '%r was not replication locked!' % dev_path) + except ReplicationLockTimeout as err: + lock_exc = err + except Exception as err: + exc = err + self.assertTrue(lock_exc is None) + self.assertTrue(exc is not None) + + def test_replication_lock_another_device_fine(self): + # Double check settings + self.df_mgr.replication_one_per_device = True + self.df_mgr.replication_lock_timeout = 0.1 + dev_path = os.path.join(self.testdir, 'sda1') + dev_path2 = os.path.join(self.testdir, 'sda2') + with self.df_mgr.replication_lock(dev_path): + lock_exc = None + try: + with self.df_mgr.replication_lock(dev_path2): + pass + except ReplicationLockTimeout as err: + lock_exc = err + self.assertTrue(lock_exc is None) + + class TestDiskFile(unittest.TestCase): """Test swift.obj.diskfile.DiskFile""" diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index e62a12b944..babc2ce936 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -24,6 +24,7 @@ import eventlet import mock from swift.common import constraints +from swift.common import exceptions from swift.common import swob from swift.common import utils from swift.obj import diskfile @@ -45,7 +46,10 @@ class TestReceiver(unittest.TestCase): self.testdir = os.path.join( tempfile.mkdtemp(), 'tmp_test_ssync_receiver') utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) - conf = {'devices': self.testdir, 'mount_check': 'false'} + conf = { + 'devices': self.testdir, + 'mount_check': 'false', + 'replication_one_per_device': 'false'} self.controller = server.ObjectController(conf) self.controller.bytes_per_sync = 1 @@ -104,6 +108,46 @@ class TestReceiver(unittest.TestCase): self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.exception.called) + def test_REPLICATION_calls_replication_lock(self): + with mock.patch.object( + self.controller._diskfile_mgr, 'replication_lock') as \ + mocked_replication_lock: + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + mocked_replication_lock.assert_called_once_with('sda1') + + def test_REPLICATION_replication_lock_fail(self): + def _mock(path): + with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path): + eventlet.sleep(0.05) + with mock.patch.object( + self.controller._diskfile_mgr, 'replication_lock', _mock): + self.controller._diskfile_mgr + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 0 '0.01 seconds: /somewhere/sda1'"]) + self.controller.logger.debug.assert_called_once_with( + 'None/sda1/1 REPLICATION LOCK TIMEOUT: 0.01 seconds: ' + '/somewhere/sda1') + def test_REPLICATION_initial_path(self): with mock.patch.object( self.controller, 'replication_semaphore') as \