diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 90470671cc..31b5eed8cb 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -408,6 +408,19 @@ threads_per_disk 0 Size of the per-disk thread pool replication_concurrency 4 Set to restrict the number of concurrent incoming REPLICATION requests; set to 0 for unlimited +replication_one_per_device True Restricts incoming REPLICATION + requests to one per device, + replication_currency above + allowing. This can help control + I/O to each device, but you may + wish to set this to False to + allow multiple REPLICATION + requests (up to the above + replication_concurrency setting) + per device. +replication_lock_timeout 15 Number of seconds to wait for an + existing replication device lock + before giving up. replication_failure_threshold 100 The number of subrequest failures before the replication_failure_ratio is diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 015b816a10..ee06929a4e 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -105,6 +105,16 @@ use = egg:swift#object # Note that REPLICATION is currently an ssync only item # replication_concurrency = 4 # +# Restricts incoming REPLICATION requests to one per device, +# replication_currency above allowing. This can help control I/O to each +# device, but you may wish to set this to False to allow multiple REPLICATION +# requests (up to the above replication_concurrency setting) per device. +# replication_one_per_device = True +# +# Number of seconds to wait for an existing replication device lock before +# giving up. +# replication_lock_timeout = 15 +# # These next two settings control when the REPLICATION subrequest handler will # abort an incoming REPLICATION attempt. An abort will occur if there are at # least threshold number of failures and the value of failures / successes diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 985088c7fa..5943f895d4 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -122,3 +122,7 @@ class SegmentError(SwiftException): class ReplicationException(Exception): pass + + +class ReplicationLockTimeout(LockTimeout): + pass diff --git a/swift/common/utils.py b/swift/common/utils.py index 6456e418d3..1290b94d73 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -1179,7 +1179,7 @@ def hash_path(account, container=None, object=None, raw_digest=False): @contextmanager -def lock_path(directory, timeout=10): +def lock_path(directory, timeout=10, timeout_class=LockTimeout): """ Context manager that acquires a lock on a directory. This will block until the lock can be acquired, or the timeout time has expired (whichever occurs @@ -1191,12 +1191,16 @@ def lock_path(directory, timeout=10): :param directory: directory to be locked :param timeout: timeout (in seconds) + :param timeout_class: The class of the exception to raise if the + lock cannot be granted within the timeout. Will be + constructed as timeout_class(timeout, lockpath). Default: + LockTimeout """ mkdirs(directory) lockpath = '%s/.lock' % directory fd = os.open(lockpath, os.O_WRONLY | os.O_CREAT) try: - with LockTimeout(timeout, lockpath): + with timeout_class(timeout, lockpath): while True: try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 09c7d2f529..53465c25fe 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -56,7 +56,8 @@ from swift.common.utils import mkdirs, normalize_timestamp, \ config_true_value, listdir, split_path, ismount from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ - DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir + DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \ + ReplicationLockTimeout from swift.common.swob import multi_range_iterator @@ -382,6 +383,10 @@ class DiskFileManager(object): self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024 self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.reclaim_age = int(conf.get('reclaim_age', ONE_WEEK)) + self.replication_one_per_device = config_true_value( + conf.get('replication_one_per_device', 'true')) + self.replication_lock_timeout = int(conf.get( + 'replication_lock_timeout', 15)) threads_per_disk = int(conf.get('threads_per_disk', '0')) self.threadpools = defaultdict( lambda: ThreadPool(nthreads=threads_per_disk)) @@ -413,6 +418,25 @@ class DiskFileManager(object): dev_path = os.path.join(self.devices, device) return dev_path + @contextmanager + def replication_lock(self, device): + """ + A context manager that will lock on the device given, if + configured to do so. + + :raises ReplicationLockTimeout: If the lock on the device + cannot be granted within the configured timeout. + """ + if self.replication_one_per_device: + dev_path = self.get_dev_path(device) + with lock_path( + dev_path, + timeout=self.replication_lock_timeout, + timeout_class=ReplicationLockTimeout): + yield True + else: + yield True + def pickle_async_update(self, device, account, container, obj, data, timestamp): device_path = self.construct_dev_path(device) diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 88b555188a..a3bc84cbba 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -97,13 +97,20 @@ class Receiver(object): if not self.app.replication_semaphore.acquire(False): raise swob.HTTPServiceUnavailable() try: - for data in self.missing_check(): - yield data - for data in self.updates(): - yield data + with self.app._diskfile_mgr.replication_lock(self.device): + for data in self.missing_check(): + yield data + for data in self.updates(): + yield data finally: if self.app.replication_semaphore: self.app.replication_semaphore.release() + except exceptions.ReplicationLockTimeout as err: + self.app.logger.debug( + '%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % ( + self.request.remote_addr, self.device, self.partition, + err)) + yield ':ERROR: %d %r\n' % (0, str(err)) except exceptions.MessageTimeout as err: self.app.logger.error( '%s/%s/%s TIMEOUT in replication.Receiver: %s' % ( diff --git a/test/unit/common/test_exceptions.py b/test/unit/common/test_exceptions.py index 83dd25acbd..04adfe2bdd 100644 --- a/test/unit/common/test_exceptions.py +++ b/test/unit/common/test_exceptions.py @@ -25,6 +25,13 @@ class TestExceptions(unittest.TestCase): self.assertEqual(str(exceptions.ReplicationException()), '') self.assertEqual(str(exceptions.ReplicationException('test')), 'test') + def test_replication_lock_timeout(self): + exc = exceptions.ReplicationLockTimeout(15, 'test') + try: + self.assertTrue(isinstance(exc, exceptions.MessageTimeout)) + finally: + exc.cancel() + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 2595597a6f..baf839f5ed 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -49,7 +49,8 @@ from netifaces import AF_INET6 from mock import MagicMock, patch from swift.common.exceptions import (Timeout, MessageTimeout, - ConnectionTimeout, LockTimeout) + ConnectionTimeout, LockTimeout, + ReplicationLockTimeout) from swift.common import utils from swift.common.swob import Response from test.unit import FakeLogger @@ -138,6 +139,55 @@ class TestUtils(unittest.TestCase): utils.HASH_PATH_SUFFIX = 'endcap' utils.HASH_PATH_PREFIX = 'startcap' + def test_lock_path(self): + tmpdir = mkdtemp() + try: + with utils.lock_path(tmpdir, 0.1): + exc = None + success = False + try: + with utils.lock_path(tmpdir, 0.1): + success = True + except LockTimeout as err: + exc = err + self.assertTrue(exc is not None) + self.assertTrue(not success) + finally: + shutil.rmtree(tmpdir) + + def test_lock_path_class(self): + tmpdir = mkdtemp() + try: + with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout): + exc = None + exc2 = None + success = False + try: + with utils.lock_path(tmpdir, 0.1, ReplicationLockTimeout): + success = True + except ReplicationLockTimeout as err: + exc = err + except LockTimeout as err: + exc2 = err + self.assertTrue(exc is not None) + self.assertTrue(exc2 is None) + self.assertTrue(not success) + exc = None + exc2 = None + success = False + try: + with utils.lock_path(tmpdir, 0.1): + success = True + except ReplicationLockTimeout as err: + exc = err + except LockTimeout as err: + exc2 = err + self.assertTrue(exc is None) + self.assertTrue(exc2 is not None) + self.assertTrue(not success) + finally: + shutil.rmtree(tmpdir) + def test_normalize_timestamp(self): # Test swift.common.utils.normalize_timestamp self.assertEquals(utils.normalize_timestamp('1253327593.48174'), diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index cf99530b58..08f5723c0e 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -41,7 +41,8 @@ from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common import ring from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ - DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, DiskFileError + DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \ + DiskFileError, ReplicationLockTimeout def _create_test_ring(path): @@ -468,6 +469,72 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): self.assertRaises(OSError, list_locations, tmpdir) +class TestDiskFileManager(unittest.TestCase): + + def setUp(self): + self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile') + mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) + mkdirs(os.path.join(self.testdir, 'sda2', 'tmp')) + self._orig_tpool_exc = tpool.execute + tpool.execute = lambda f, *args, **kwargs: f(*args, **kwargs) + self.conf = dict(devices=self.testdir, mount_check='false', + keep_cache_size=2 * 1024) + self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger()) + + def test_replication_lock_on(self): + # Double check settings + self.df_mgr.replication_one_per_device = True + self.df_mgr.replication_lock_timeout = 0.1 + dev_path = os.path.join(self.testdir, 'sda1') + with self.df_mgr.replication_lock(dev_path): + lock_exc = None + exc = None + try: + with self.df_mgr.replication_lock(dev_path): + raise Exception( + '%r was not replication locked!' % dev_path) + except ReplicationLockTimeout as err: + lock_exc = err + except Exception as err: + exc = err + self.assertTrue(lock_exc is not None) + self.assertTrue(exc is None) + + def test_replication_lock_off(self): + # Double check settings + self.df_mgr.replication_one_per_device = False + self.df_mgr.replication_lock_timeout = 0.1 + dev_path = os.path.join(self.testdir, 'sda1') + with self.df_mgr.replication_lock(dev_path): + lock_exc = None + exc = None + try: + with self.df_mgr.replication_lock(dev_path): + raise Exception( + '%r was not replication locked!' % dev_path) + except ReplicationLockTimeout as err: + lock_exc = err + except Exception as err: + exc = err + self.assertTrue(lock_exc is None) + self.assertTrue(exc is not None) + + def test_replication_lock_another_device_fine(self): + # Double check settings + self.df_mgr.replication_one_per_device = True + self.df_mgr.replication_lock_timeout = 0.1 + dev_path = os.path.join(self.testdir, 'sda1') + dev_path2 = os.path.join(self.testdir, 'sda2') + with self.df_mgr.replication_lock(dev_path): + lock_exc = None + try: + with self.df_mgr.replication_lock(dev_path2): + pass + except ReplicationLockTimeout as err: + lock_exc = err + self.assertTrue(lock_exc is None) + + class TestDiskFile(unittest.TestCase): """Test swift.obj.diskfile.DiskFile""" diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index e62a12b944..babc2ce936 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -24,6 +24,7 @@ import eventlet import mock from swift.common import constraints +from swift.common import exceptions from swift.common import swob from swift.common import utils from swift.obj import diskfile @@ -45,7 +46,10 @@ class TestReceiver(unittest.TestCase): self.testdir = os.path.join( tempfile.mkdtemp(), 'tmp_test_ssync_receiver') utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) - conf = {'devices': self.testdir, 'mount_check': 'false'} + conf = { + 'devices': self.testdir, + 'mount_check': 'false', + 'replication_one_per_device': 'false'} self.controller = server.ObjectController(conf) self.controller.bytes_per_sync = 1 @@ -104,6 +108,46 @@ class TestReceiver(unittest.TestCase): self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.exception.called) + def test_REPLICATION_calls_replication_lock(self): + with mock.patch.object( + self.controller._diskfile_mgr, 'replication_lock') as \ + mocked_replication_lock: + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + mocked_replication_lock.assert_called_once_with('sda1') + + def test_REPLICATION_replication_lock_fail(self): + def _mock(path): + with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path): + eventlet.sleep(0.05) + with mock.patch.object( + self.controller._diskfile_mgr, 'replication_lock', _mock): + self.controller._diskfile_mgr + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 0 '0.01 seconds: /somewhere/sda1'"]) + self.controller.logger.debug.assert_called_once_with( + 'None/sda1/1 REPLICATION LOCK TIMEOUT: 0.01 seconds: ' + '/somewhere/sda1') + def test_REPLICATION_initial_path(self): with mock.patch.object( self.controller, 'replication_semaphore') as \