Replace replication_one_per_device by custom count
This commit replaces boolean replication_one_per_device by an integer replication_concurrency_per_device. The new configuration parameter is passed to utils.lock_path() which now accept as an argument a limit for the number of locks that can be acquired for a specific path. Instead of trying to lock path/.lock, utils.lock_path() now tries to lock files path/.lock-X, where X is in the range (0, N), N being the limit for the number of locks allowed for the path. The default value of limit is set to 1. Change-Id: I3c3193344c7a57a8a4fc7932d1b10e702efd3572
This commit is contained in:
parent
49dd146068
commit
e199192cae
@ -225,11 +225,12 @@ should not specify any value for "replication_server".
|
||||
Set to restrict the number of concurrent incoming SSYNC requests
|
||||
Set to 0 for unlimited (the default is 4). Note that SSYNC requests are only used
|
||||
by the object reconstructor or the object replicator when configured to use ssync.
|
||||
.IP "\fBreplication_one_per_device\fR"
|
||||
Restricts incoming SSYNC requests to one per device,
|
||||
replication_currency above allowing. This can help control I/O to each
|
||||
device, but you may wish to set this to False to allow multiple SSYNC
|
||||
requests (up to the above replication_concurrency setting) per device. The default is true.
|
||||
.IP "\fBreplication_concurrency_per_device\fR"
|
||||
Set to restrict the number of concurrent incoming SSYNC requests per device;
|
||||
set to 0 for unlimited requests per devices. This can help control I/O to each
|
||||
device. This does not override replication_concurrency described above, so you
|
||||
may need to adjust both parameters depending on your hardware or network
|
||||
capacity. Defaults to 1.
|
||||
.IP "\fBreplication_lock_timeout\fR"
|
||||
Number of seconds to wait for an existing replication device lock before
|
||||
giving up. The default is 15.
|
||||
|
@ -563,9 +563,9 @@ ionice_priority None I/O scheduling priority of server
|
||||
[object-server]
|
||||
***************
|
||||
|
||||
============================= ====================== ===============================================
|
||||
================================== ====================== ===============================================
|
||||
Option Default Description
|
||||
----------------------------- ---------------------- -----------------------------------------------
|
||||
---------------------------------- ---------------------- -----------------------------------------------
|
||||
use paste.deploy entry point for the
|
||||
object server. For most cases,
|
||||
this should be
|
||||
@ -611,16 +611,16 @@ replication_server Configure parameter for cr
|
||||
replication_concurrency 4 Set to restrict the number of
|
||||
concurrent incoming SSYNC
|
||||
requests; set to 0 for unlimited
|
||||
replication_one_per_device True Restricts incoming SSYNC
|
||||
requests to one per device,
|
||||
replication_currency above
|
||||
allowing. This can help control
|
||||
I/O to each device, but you may
|
||||
wish to set this to False to
|
||||
allow multiple SSYNC
|
||||
requests (up to the above
|
||||
replication_concurrency setting)
|
||||
per device.
|
||||
replication_concurrency_per_device 1 Set to restrict the number of
|
||||
concurrent incoming SSYNC
|
||||
requests per device; set to 0 for
|
||||
unlimited requests per devices.
|
||||
This can help control I/O to each
|
||||
device. This does not override
|
||||
replication_concurrency described
|
||||
above, so you may need to adjust
|
||||
both parameters depending on your
|
||||
hardware or network capacity.
|
||||
replication_lock_timeout 15 Number of seconds to wait for an
|
||||
existing replication device lock
|
||||
before giving up.
|
||||
@ -675,7 +675,7 @@ eventlet_tpool_num_threads auto The number of threads in e
|
||||
only use 1 thread per process.
|
||||
This value can be overridden with an integer
|
||||
value.
|
||||
============================= ====================== ===============================================
|
||||
================================== ====================== ===============================================
|
||||
|
||||
*******************
|
||||
[object-replicator]
|
||||
|
@ -161,11 +161,12 @@ use = egg:swift#object
|
||||
# object replicator when configured to use ssync.
|
||||
# replication_concurrency = 4
|
||||
#
|
||||
# Restricts incoming SSYNC requests to one per device,
|
||||
# replication_currency above allowing. This can help control I/O to each
|
||||
# device, but you may wish to set this to False to allow multiple SSYNC
|
||||
# requests (up to the above replication_concurrency setting) per device.
|
||||
# replication_one_per_device = True
|
||||
# Set to restrict the number of concurrent incoming SSYNC requests per
|
||||
# device; set to 0 for unlimited requests per device. This can help control
|
||||
# I/O to each device. This does not override replication_concurrency described
|
||||
# above, so you may need to adjust both parameters depending on your hardware
|
||||
# or network capacity.
|
||||
# replication_concurrency_per_device = 1
|
||||
#
|
||||
# Number of seconds to wait for an existing replication device lock before
|
||||
# giving up.
|
||||
|
@ -21,7 +21,7 @@ import six
|
||||
from six.moves.configparser import ConfigParser
|
||||
from swift.common.utils import (
|
||||
config_true_value, quorum_size, whataremyips, list_from_csv,
|
||||
config_positive_int_value)
|
||||
config_positive_int_value, get_zero_indexed_base_string)
|
||||
from swift.common.ring import Ring, RingData
|
||||
from swift.common import utils
|
||||
from swift.common.exceptions import RingLoadError
|
||||
@ -92,11 +92,7 @@ class PolicyError(ValueError):
|
||||
|
||||
|
||||
def _get_policy_string(base, policy_index):
|
||||
if policy_index == 0 or policy_index is None:
|
||||
return_string = base
|
||||
else:
|
||||
return_string = base + "-%d" % int(policy_index)
|
||||
return return_string
|
||||
return get_zero_indexed_base_string(base, policy_index)
|
||||
|
||||
|
||||
def get_policy_string(base, policy_or_index):
|
||||
|
@ -2280,8 +2280,45 @@ def hash_path(account, container=None, object=None, raw_digest=False):
|
||||
+ HASH_PATH_SUFFIX).hexdigest()
|
||||
|
||||
|
||||
def get_zero_indexed_base_string(base, index):
|
||||
"""
|
||||
This allows the caller to make a list of things with indexes, where the
|
||||
first item (zero indexed) is just the bare base string, and subsequent
|
||||
indexes are appended '-1', '-2', etc.
|
||||
|
||||
e.g.::
|
||||
|
||||
'lock', None => 'lock'
|
||||
'lock', 0 => 'lock'
|
||||
'lock', 1 => 'lock-1'
|
||||
'object', 2 => 'object-2'
|
||||
|
||||
:param base: a string, the base string; when ``index`` is 0 (or None) this
|
||||
is the identity function.
|
||||
:param index: a digit, typically an integer (or None); for values other
|
||||
than 0 or None this digit is appended to the base string
|
||||
separated by a hyphen.
|
||||
"""
|
||||
if index == 0 or index is None:
|
||||
return_string = base
|
||||
else:
|
||||
return_string = base + "-%d" % int(index)
|
||||
return return_string
|
||||
|
||||
|
||||
def _get_any_lock(fds):
|
||||
for fd in fds:
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
return True
|
||||
except IOError as err:
|
||||
if err.errno != errno.EAGAIN:
|
||||
raise
|
||||
return False
|
||||
|
||||
|
||||
@contextmanager
|
||||
def lock_path(directory, timeout=10, timeout_class=None):
|
||||
def lock_path(directory, timeout=10, timeout_class=None, limit=1):
|
||||
"""
|
||||
Context manager that acquires a lock on a directory. This will block until
|
||||
the lock can be acquired, or the timeout time has expired (whichever occurs
|
||||
@ -2297,12 +2334,16 @@ def lock_path(directory, timeout=10, timeout_class=None):
|
||||
lock cannot be granted within the timeout. Will be
|
||||
constructed as timeout_class(timeout, lockpath). Default:
|
||||
LockTimeout
|
||||
:param limit: the maximum number of locks that may be held concurrently on
|
||||
the same directory; defaults to 1
|
||||
"""
|
||||
if timeout_class is None:
|
||||
timeout_class = swift.common.exceptions.LockTimeout
|
||||
mkdirs(directory)
|
||||
lockpath = '%s/.lock' % directory
|
||||
fd = os.open(lockpath, os.O_WRONLY | os.O_CREAT)
|
||||
fds = [os.open(get_zero_indexed_base_string(lockpath, i),
|
||||
os.O_WRONLY | os.O_CREAT)
|
||||
for i in range(limit)]
|
||||
sleep_time = 0.01
|
||||
slower_sleep_time = max(timeout * 0.01, sleep_time)
|
||||
slowdown_at = timeout * 0.01
|
||||
@ -2310,18 +2351,15 @@ def lock_path(directory, timeout=10, timeout_class=None):
|
||||
try:
|
||||
with timeout_class(timeout, lockpath):
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
if _get_any_lock(fds):
|
||||
break
|
||||
except IOError as err:
|
||||
if err.errno != errno.EAGAIN:
|
||||
raise
|
||||
if time_slept > slowdown_at:
|
||||
sleep_time = slower_sleep_time
|
||||
sleep(sleep_time)
|
||||
time_slept += sleep_time
|
||||
yield True
|
||||
finally:
|
||||
for fd in fds:
|
||||
os.close(fd)
|
||||
|
||||
|
||||
|
@ -624,8 +624,28 @@ class BaseDiskFileManager(object):
|
||||
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.reclaim_age = int(conf.get('reclaim_age', DEFAULT_RECLAIM_AGE))
|
||||
self.replication_one_per_device = config_true_value(
|
||||
conf.get('replication_one_per_device', 'true'))
|
||||
replication_concurrency_per_device = conf.get(
|
||||
'replication_concurrency_per_device')
|
||||
replication_one_per_device = conf.get('replication_one_per_device')
|
||||
if replication_concurrency_per_device is None \
|
||||
and replication_one_per_device is not None:
|
||||
self.logger.warning('Option replication_one_per_device is '
|
||||
'deprecated and will be removed in a future '
|
||||
'version. Update your configuration to use '
|
||||
'option replication_concurrency_per_device.')
|
||||
if config_true_value(replication_one_per_device):
|
||||
replication_concurrency_per_device = 1
|
||||
else:
|
||||
replication_concurrency_per_device = 0
|
||||
elif replication_one_per_device is not None:
|
||||
self.logger.warning('Option replication_one_per_device ignored as '
|
||||
'replication_concurrency_per_device is '
|
||||
'defined.')
|
||||
if replication_concurrency_per_device is None:
|
||||
self.replication_concurrency_per_device = 1
|
||||
else:
|
||||
self.replication_concurrency_per_device = int(
|
||||
replication_concurrency_per_device)
|
||||
self.replication_lock_timeout = int(conf.get(
|
||||
'replication_lock_timeout', 15))
|
||||
|
||||
@ -1208,12 +1228,13 @@ class BaseDiskFileManager(object):
|
||||
:raises ReplicationLockTimeout: If the lock on the device
|
||||
cannot be granted within the configured timeout.
|
||||
"""
|
||||
if self.replication_one_per_device:
|
||||
if self.replication_concurrency_per_device:
|
||||
dev_path = self.get_dev_path(device)
|
||||
with lock_path(
|
||||
dev_path,
|
||||
timeout=self.replication_lock_timeout,
|
||||
timeout_class=ReplicationLockTimeout):
|
||||
timeout_class=ReplicationLockTimeout,
|
||||
limit=self.replication_concurrency_per_device):
|
||||
yield True
|
||||
else:
|
||||
yield True
|
||||
|
@ -19,6 +19,7 @@ from uuid import uuid4
|
||||
import os
|
||||
import time
|
||||
import shutil
|
||||
import re
|
||||
|
||||
from swiftclient import client
|
||||
from swift.obj.diskfile import get_data_dir
|
||||
@ -26,7 +27,7 @@ from swift.obj.diskfile import get_data_dir
|
||||
from test.probe.common import ReplProbeTest
|
||||
from swift.common.utils import readconf
|
||||
|
||||
EXCLUDE_FILES = ['hashes.pkl', 'hashes.invalid', '.lock']
|
||||
EXCLUDE_FILES = re.compile('^(hashes\.(pkl|invalid)|lock(-\d+)?)$')
|
||||
|
||||
|
||||
def collect_info(path_list):
|
||||
@ -43,7 +44,7 @@ def collect_info(path_list):
|
||||
temp_files_list = []
|
||||
temp_dir_list = []
|
||||
for root, dirs, files in os.walk(path):
|
||||
files = [f for f in files if f not in EXCLUDE_FILES]
|
||||
files = [f for f in files if not EXCLUDE_FILES.match(f)]
|
||||
temp_files_list += files
|
||||
temp_dir_list += dirs
|
||||
files_list.append(temp_files_list)
|
||||
|
@ -924,9 +924,20 @@ class TestUtils(unittest.TestCase):
|
||||
utils.HASH_PATH_SUFFIX = 'endcap'
|
||||
utils.HASH_PATH_PREFIX = 'startcap'
|
||||
|
||||
def test_get_zero_indexed_base_string(self):
|
||||
self.assertEqual(utils.get_zero_indexed_base_string('something', 0),
|
||||
'something')
|
||||
self.assertEqual(utils.get_zero_indexed_base_string('something', None),
|
||||
'something')
|
||||
self.assertEqual(utils.get_zero_indexed_base_string('something', 1),
|
||||
'something-1')
|
||||
self.assertRaises(ValueError, utils.get_zero_indexed_base_string,
|
||||
'something', 'not_integer')
|
||||
|
||||
def test_lock_path(self):
|
||||
tmpdir = mkdtemp()
|
||||
try:
|
||||
# 2 locks with limit=1 must fail
|
||||
with utils.lock_path(tmpdir, 0.1):
|
||||
exc = None
|
||||
success = False
|
||||
@ -937,6 +948,26 @@ class TestUtils(unittest.TestCase):
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
self.assertTrue(not success)
|
||||
|
||||
# 2 locks with limit=2 must succeed
|
||||
with utils.lock_path(tmpdir, 0.1, limit=2):
|
||||
success = False
|
||||
with utils.lock_path(tmpdir, 0.1, limit=2):
|
||||
success = True
|
||||
self.assertTrue(success)
|
||||
|
||||
# 3 locks with limit=2 must fail
|
||||
with utils.lock_path(tmpdir, 0.1, limit=2):
|
||||
exc = None
|
||||
success = False
|
||||
with utils.lock_path(tmpdir, 0.1, limit=2):
|
||||
try:
|
||||
with utils.lock_path(tmpdir, 0.1, limit=2):
|
||||
success = True
|
||||
except LockTimeout as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
self.assertTrue(not success)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
@ -960,9 +960,66 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
locations = list(self.df_mgr.object_audit_location_generator())
|
||||
self.assertEqual(locations, [])
|
||||
|
||||
def test_replication_one_per_device_deprecation(self):
|
||||
conf = dict(**self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 1)
|
||||
|
||||
conf = dict(replication_concurrency_per_device='0', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 0)
|
||||
|
||||
conf = dict(replication_concurrency_per_device='2', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
|
||||
conf = dict(replication_concurrency_per_device=2, **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
|
||||
# Check backward compatibility
|
||||
conf = dict(replication_one_per_device='true', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 1)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device is deprecated',
|
||||
log_lines[-1])
|
||||
|
||||
conf = dict(replication_one_per_device='false', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 0)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device is deprecated',
|
||||
log_lines[-1])
|
||||
|
||||
# If defined, new parameter has precedence
|
||||
conf = dict(replication_concurrency_per_device='2',
|
||||
replication_one_per_device='true', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device ignored',
|
||||
log_lines[-1])
|
||||
|
||||
conf = dict(replication_concurrency_per_device='2',
|
||||
replication_one_per_device='false', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device ignored',
|
||||
log_lines[-1])
|
||||
|
||||
conf = dict(replication_concurrency_per_device='0',
|
||||
replication_one_per_device='true', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 0)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device ignored',
|
||||
log_lines[-1])
|
||||
|
||||
def test_replication_lock_on(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = True
|
||||
self.df_mgr.replication_concurrency_per_device = 1
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, self.existing_device)
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
@ -981,14 +1038,16 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
def test_replication_lock_off(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = False
|
||||
self.df_mgr.replication_concurrency_per_device = 0
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, self.existing_device)
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
|
||||
# 2 locks must succeed
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(dev_path):
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
@ -998,9 +1057,62 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.assertTrue(lock_exc is None)
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
# 3 locks must succeed
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
except Exception as err:
|
||||
exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_replication_lock_2(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_concurrency_per_device = 2
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
dev_path = os.path.join(self.testdir, self.existing_device)
|
||||
|
||||
# 2 locks with replication_concurrency_per_device=2 must succeed
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
except Exception as err:
|
||||
exc = err
|
||||
self.assertTrue(lock_exc is None)
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
# 3 locks with replication_concurrency_per_device=2 must fail
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
lock_exc = None
|
||||
exc = None
|
||||
try:
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
raise Exception(
|
||||
'%r was not replication locked!' % dev_path)
|
||||
except ReplicationLockTimeout as err:
|
||||
lock_exc = err
|
||||
except Exception as err:
|
||||
exc = err
|
||||
self.assertTrue(lock_exc is not None)
|
||||
self.assertTrue(exc is None)
|
||||
|
||||
def test_replication_lock_another_device_fine(self):
|
||||
# Double check settings
|
||||
self.df_mgr.replication_one_per_device = True
|
||||
self.df_mgr.replication_concurrency_per_device = 1
|
||||
self.df_mgr.replication_lock_timeout = 0.1
|
||||
with self.df_mgr.replication_lock(self.existing_device):
|
||||
lock_exc = None
|
||||
|
@ -54,7 +54,7 @@ class TestBaseSsync(BaseTest):
|
||||
conf = {
|
||||
'devices': self.rx_testdir,
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false',
|
||||
'replication_concurrency_per_device': '0',
|
||||
'log_requests': 'false'}
|
||||
self.rx_logger = debug_logger()
|
||||
self.rx_controller = server.ObjectController(conf, self.rx_logger)
|
||||
|
@ -55,7 +55,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.conf = {
|
||||
'devices': self.testdir,
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false',
|
||||
'replication_concurrency_per_device': '0',
|
||||
'log_requests': 'false'}
|
||||
utils.mkdirs(os.path.join(self.testdir, 'device', 'partition'))
|
||||
self.controller = server.ObjectController(self.conf)
|
||||
|
Loading…
Reference in New Issue
Block a user