From e199192caefef068b5bf57da8b878e0bc82e3453 Mon Sep 17 00:00:00 2001
From: Romain LE DISEZ <romain.le-disez@corp.ovh.com>
Date: Wed, 26 Oct 2016 10:53:46 +0200
Subject: [PATCH] Replace replication_one_per_device by custom count

This commit replaces boolean replication_one_per_device by an integer
replication_concurrency_per_device. The new configuration parameter is
passed to utils.lock_path() which now accept as an argument a limit for
the number of locks that can be acquired for a specific path.

Instead of trying to lock path/.lock, utils.lock_path() now tries to lock
files path/.lock-X, where X is in the range (0, N), N being the limit for
the number of locks allowed for the path. The default value of limit is
set to 1.

Change-Id: I3c3193344c7a57a8a4fc7932d1b10e702efd3572
---
 doc/manpages/object-server.conf.5             |  11 +-
 doc/source/deployment_guide.rst               | 226 +++++++++---------
 etc/object-server.conf-sample                 |  11 +-
 swift/common/storage_policy.py                |   8 +-
 swift/common/utils.py                         |  54 ++++-
 swift/obj/diskfile.py                         |  29 ++-
 .../probe/test_replication_servers_working.py |   5 +-
 test/unit/common/test_utils.py                |  31 +++
 test/unit/obj/test_diskfile.py                | 122 +++++++++-
 test/unit/obj/test_ssync.py                   |   2 +-
 test/unit/obj/test_ssync_receiver.py          |   2 +-
 11 files changed, 351 insertions(+), 150 deletions(-)

diff --git a/doc/manpages/object-server.conf.5 b/doc/manpages/object-server.conf.5
index 825e2d3174..27b36a21f4 100644
--- a/doc/manpages/object-server.conf.5
+++ b/doc/manpages/object-server.conf.5
@@ -225,11 +225,12 @@ should not specify any value for "replication_server".
 Set to restrict the number of concurrent incoming SSYNC requests
 Set to 0 for unlimited (the default is 4). Note that SSYNC requests are only used
 by the object reconstructor or the object replicator when configured to use ssync.
-.IP "\fBreplication_one_per_device\fR"
-Restricts incoming SSYNC requests to one per device,
-replication_currency above allowing. This can help control I/O to each
-device, but you may wish to set this to False to allow multiple SSYNC
-requests (up to the above replication_concurrency setting) per device. The default is true.
+.IP "\fBreplication_concurrency_per_device\fR"
+Set to restrict the number of concurrent incoming SSYNC requests per device;
+set to 0 for unlimited requests per devices. This can help control I/O to each
+device. This does not override replication_concurrency described above, so you
+may need to adjust both parameters depending on your hardware or network
+capacity. Defaults to 1.
 .IP "\fBreplication_lock_timeout\fR"
 Number of seconds to wait for an existing replication device lock before
 giving up. The default is 15.
diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst
index 28df6cde0b..4e2e79b55d 100644
--- a/doc/source/deployment_guide.rst
+++ b/doc/source/deployment_guide.rst
@@ -563,119 +563,119 @@ ionice_priority                  None        I/O scheduling priority of server
 [object-server]
 ***************
 
-=============================  ====================== ===============================================
-Option                         Default                Description
------------------------------  ---------------------- -----------------------------------------------
-use                                                   paste.deploy entry point for the
-                                                      object server.  For most cases,
-                                                      this should be
-                                                      `egg:swift#object`.
-set log_name                   object-server          Label used when logging
-set log_facility               LOG_LOCAL0             Syslog log facility
-set log_level                  INFO                   Logging level
-set log_requests               True                   Whether or not to log each
-                                                      request
-set log_address                /dev/log               Logging directory
-user                           swift                  User to run as
-max_upload_time                86400                  Maximum time allowed to upload an
-                                                      object
-slow                           0                      If > 0, Minimum time in seconds for a PUT or
-                                                      DELETE request to complete.  This is only
-                                                      useful to simulate slow devices during testing
-                                                      and development.
-mb_per_sync                    512                    On PUT requests, sync file every
-                                                      n MB
-keep_cache_size                5242880                Largest object size to keep in
-                                                      buffer cache
-keep_cache_private             false                  Allow non-public objects to stay
-                                                      in kernel's buffer cache
-allowed_headers                Content-Disposition,   Comma separated list of headers
-                               Content-Encoding,      that can be set in metadata on an object.
-                               X-Delete-At,           This list is in addition to
-                               X-Object-Manifest,     X-Object-Meta-* headers and cannot include
-                               X-Static-Large-Object  Content-Type, etag, Content-Length, or deleted
-auto_create_account_prefix     .                      Prefix used when automatically
-                                                      creating accounts.
-replication_server                                    Configure parameter for creating
-                                                      specific server. To handle all verbs,
-                                                      including replication verbs, do not
-                                                      specify "replication_server"
-                                                      (this is the default). To only
-                                                      handle replication, set to a True
-                                                      value (e.g. "True" or "1").
-                                                      To handle only non-replication
-                                                      verbs, set to "False". Unless you
-                                                      have a separate replication network, you
-                                                      should not specify any value for
-                                                      "replication_server".
-replication_concurrency        4                      Set to restrict the number of
-                                                      concurrent incoming SSYNC
-                                                      requests; set to 0 for unlimited
-replication_one_per_device     True                   Restricts incoming SSYNC
-                                                      requests to one per device,
-                                                      replication_currency above
-                                                      allowing. This can help control
-                                                      I/O to each device, but you may
-                                                      wish to set this to False to
-                                                      allow multiple SSYNC
-                                                      requests (up to the above
-                                                      replication_concurrency setting)
-                                                      per device.
-replication_lock_timeout       15                     Number of seconds to wait for an
-                                                      existing replication device lock
-                                                      before giving up.
-replication_failure_threshold  100                    The number of subrequest failures
-                                                      before the
-                                                      replication_failure_ratio is
-                                                      checked
-replication_failure_ratio      1.0                    If the value of failures /
-                                                      successes of SSYNC
-                                                      subrequests exceeds this ratio,
-                                                      the overall SSYNC request
-                                                      will be aborted
-splice                         no                     Use splice() for zero-copy object
-                                                      GETs. This requires Linux kernel
-                                                      version 3.0 or greater. If you set
-                                                      "splice = yes" but the kernel
-                                                      does not support it, error messages
-                                                      will appear in the object server
-                                                      logs at startup, but your object
-                                                      servers should continue to function.
-nice_priority                  None                   Scheduling priority of server processes.
-                                                      Niceness values range from -20 (most
-                                                      favorable to the process) to 19 (least
-                                                      favorable to the process). The default
-                                                      does not modify priority.
-ionice_class                   None                   I/O scheduling class of server processes.
-                                                      I/O niceness class values are IOPRIO_CLASS_RT
-                                                      (realtime), IOPRIO_CLASS_BE (best-effort),
-                                                      and IOPRIO_CLASS_IDLE (idle).
-                                                      The default does not modify class and
-                                                      priority. Linux supports io scheduling
-                                                      priorities and classes since 2.6.13 with
-                                                      the CFQ io scheduler.
-                                                      Work only with ionice_priority.
-ionice_priority                None                   I/O scheduling priority of server
-                                                      processes. I/O niceness priority is
-                                                      a number which goes from 0 to 7.
-                                                      The higher the value, the lower the I/O
-                                                      priority of the process. Work only with
-                                                      ionice_class.
-                                                      Ignored if IOPRIO_CLASS_IDLE is set.
-eventlet_tpool_num_threads     auto                   The number of threads in eventlet's thread pool.
-                                                      Most IO will occur in the object server's main
-                                                      thread, but certain "heavy" IO operations will
-                                                      occur in separate IO threads, managed by
-                                                      eventlet.
-                                                      The default value is auto, whose actual value
-                                                      is dependent on the servers_per_port value.
-                                                      If servers_per_port is zero then it uses
-                                                      eventlet's default (currently 20 threads).
-                                                      If the servers_per_port is nonzero then it'll
-                                                      only use 1 thread per process.
-                                                      This value can be overridden with an integer
-                                                      value.
-=============================  ====================== ===============================================
+================================== ====================== ===============================================
+Option                             Default                Description
+---------------------------------- ---------------------- -----------------------------------------------
+use                                                       paste.deploy entry point for the
+                                                          object server.  For most cases,
+                                                          this should be
+                                                          `egg:swift#object`.
+set log_name                       object-server          Label used when logging
+set log_facility                   LOG_LOCAL0             Syslog log facility
+set log_level                      INFO                   Logging level
+set log_requests                   True                   Whether or not to log each
+                                                          request
+set log_address                    /dev/log               Logging directory
+user                               swift                  User to run as
+max_upload_time                    86400                  Maximum time allowed to upload an
+                                                          object
+slow                               0                      If > 0, Minimum time in seconds for a PUT or
+                                                          DELETE request to complete.  This is only
+                                                          useful to simulate slow devices during testing
+                                                          and development.
+mb_per_sync                        512                    On PUT requests, sync file every
+                                                          n MB
+keep_cache_size                    5242880                Largest object size to keep in
+                                                          buffer cache
+keep_cache_private                 false                  Allow non-public objects to stay
+                                                          in kernel's buffer cache
+allowed_headers                    Content-Disposition,   Comma separated list of headers
+                                   Content-Encoding,      that can be set in metadata on an object.
+                                   X-Delete-At,           This list is in addition to
+                                   X-Object-Manifest,     X-Object-Meta-* headers and cannot include
+                                   X-Static-Large-Object  Content-Type, etag, Content-Length, or deleted
+auto_create_account_prefix         .                      Prefix used when automatically
+                                                          creating accounts.
+replication_server                                        Configure parameter for creating
+                                                          specific server. To handle all verbs,
+                                                          including replication verbs, do not
+                                                          specify "replication_server"
+                                                          (this is the default). To only
+                                                          handle replication, set to a True
+                                                          value (e.g. "True" or "1").
+                                                          To handle only non-replication
+                                                          verbs, set to "False". Unless you
+                                                          have a separate replication network, you
+                                                          should not specify any value for
+                                                          "replication_server".
+replication_concurrency            4                      Set to restrict the number of
+                                                          concurrent incoming SSYNC
+                                                          requests; set to 0 for unlimited
+replication_concurrency_per_device 1                      Set to restrict the number of
+                                                          concurrent incoming SSYNC
+                                                          requests per device; set to 0 for
+                                                          unlimited requests per devices.
+                                                          This can help control I/O to each
+                                                          device. This does not override
+                                                          replication_concurrency described
+                                                          above, so you may need to adjust
+                                                          both parameters depending on your
+                                                          hardware or network capacity.
+replication_lock_timeout           15                     Number of seconds to wait for an
+                                                          existing replication device lock
+                                                          before giving up.
+replication_failure_threshold      100                    The number of subrequest failures
+                                                          before the
+                                                          replication_failure_ratio is
+                                                          checked
+replication_failure_ratio          1.0                    If the value of failures /
+                                                          successes of SSYNC
+                                                          subrequests exceeds this ratio,
+                                                          the overall SSYNC request
+                                                          will be aborted
+splice                             no                     Use splice() for zero-copy object
+                                                          GETs. This requires Linux kernel
+                                                          version 3.0 or greater. If you set
+                                                          "splice = yes" but the kernel
+                                                          does not support it, error messages
+                                                          will appear in the object server
+                                                          logs at startup, but your object
+                                                          servers should continue to function.
+nice_priority                      None                   Scheduling priority of server processes.
+                                                          Niceness values range from -20 (most
+                                                          favorable to the process) to 19 (least
+                                                          favorable to the process). The default
+                                                          does not modify priority.
+ionice_class                       None                   I/O scheduling class of server processes.
+                                                          I/O niceness class values are IOPRIO_CLASS_RT
+                                                          (realtime), IOPRIO_CLASS_BE (best-effort),
+                                                          and IOPRIO_CLASS_IDLE (idle).
+                                                          The default does not modify class and
+                                                          priority. Linux supports io scheduling
+                                                          priorities and classes since 2.6.13 with
+                                                          the CFQ io scheduler.
+                                                          Work only with ionice_priority.
+ionice_priority                    None                   I/O scheduling priority of server
+                                                          processes. I/O niceness priority is
+                                                          a number which goes from 0 to 7.
+                                                          The higher the value, the lower the I/O
+                                                          priority of the process. Work only with
+                                                          ionice_class.
+                                                          Ignored if IOPRIO_CLASS_IDLE is set.
+eventlet_tpool_num_threads         auto                   The number of threads in eventlet's thread pool.
+                                                          Most IO will occur in the object server's main
+                                                          thread, but certain "heavy" IO operations will
+                                                          occur in separate IO threads, managed by
+                                                          eventlet.
+                                                          The default value is auto, whose actual value
+                                                          is dependent on the servers_per_port value.
+                                                          If servers_per_port is zero then it uses
+                                                          eventlet's default (currently 20 threads).
+                                                          If the servers_per_port is nonzero then it'll
+                                                          only use 1 thread per process.
+                                                          This value can be overridden with an integer
+                                                          value.
+================================== ====================== ===============================================
 
 *******************
 [object-replicator]
diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample
index bbe6a66148..04d04c11c5 100644
--- a/etc/object-server.conf-sample
+++ b/etc/object-server.conf-sample
@@ -161,11 +161,12 @@ use = egg:swift#object
 # object replicator when configured to use ssync.
 # replication_concurrency = 4
 #
-# Restricts incoming SSYNC requests to one per device,
-# replication_currency above allowing. This can help control I/O to each
-# device, but you may wish to set this to False to allow multiple SSYNC
-# requests (up to the above replication_concurrency setting) per device.
-# replication_one_per_device = True
+# Set to restrict the number of concurrent incoming SSYNC requests per
+# device; set to 0 for unlimited requests per device. This can help control
+# I/O to each device. This does not override replication_concurrency described
+# above, so you may need to adjust both parameters depending on your hardware
+# or network capacity.
+# replication_concurrency_per_device = 1
 #
 # Number of seconds to wait for an existing replication device lock before
 # giving up.
diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py
index 0e62c7e92d..dc79a6414b 100644
--- a/swift/common/storage_policy.py
+++ b/swift/common/storage_policy.py
@@ -21,7 +21,7 @@ import six
 from six.moves.configparser import ConfigParser
 from swift.common.utils import (
     config_true_value, quorum_size, whataremyips, list_from_csv,
-    config_positive_int_value)
+    config_positive_int_value, get_zero_indexed_base_string)
 from swift.common.ring import Ring, RingData
 from swift.common import utils
 from swift.common.exceptions import RingLoadError
@@ -92,11 +92,7 @@ class PolicyError(ValueError):
 
 
 def _get_policy_string(base, policy_index):
-    if policy_index == 0 or policy_index is None:
-        return_string = base
-    else:
-        return_string = base + "-%d" % int(policy_index)
-    return return_string
+    return get_zero_indexed_base_string(base, policy_index)
 
 
 def get_policy_string(base, policy_or_index):
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 02c9db1399..b7f3407886 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -2280,8 +2280,45 @@ def hash_path(account, container=None, object=None, raw_digest=False):
                    + HASH_PATH_SUFFIX).hexdigest()
 
 
+def get_zero_indexed_base_string(base, index):
+    """
+    This allows the caller to make a list of things with indexes, where the
+    first item (zero indexed) is just the bare base string, and subsequent
+    indexes are appended '-1', '-2', etc.
+
+    e.g.::
+
+      'lock', None => 'lock'
+      'lock', 0    => 'lock'
+      'lock', 1    => 'lock-1'
+      'object', 2  => 'object-2'
+
+    :param base: a string, the base string; when ``index`` is 0 (or None) this
+                 is the identity function.
+    :param index: a digit, typically an integer (or None); for values other
+                  than 0 or None this digit is appended to the base string
+                  separated by a hyphen.
+    """
+    if index == 0 or index is None:
+        return_string = base
+    else:
+        return_string = base + "-%d" % int(index)
+    return return_string
+
+
+def _get_any_lock(fds):
+    for fd in fds:
+        try:
+            fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+            return True
+        except IOError as err:
+            if err.errno != errno.EAGAIN:
+                raise
+    return False
+
+
 @contextmanager
-def lock_path(directory, timeout=10, timeout_class=None):
+def lock_path(directory, timeout=10, timeout_class=None, limit=1):
     """
     Context manager that acquires a lock on a directory.  This will block until
     the lock can be acquired, or the timeout time has expired (whichever occurs
@@ -2297,12 +2334,16 @@ def lock_path(directory, timeout=10, timeout_class=None):
         lock cannot be granted within the timeout. Will be
         constructed as timeout_class(timeout, lockpath). Default:
         LockTimeout
+    :param limit: the maximum number of locks that may be held concurrently on
+        the same directory; defaults to 1
     """
     if timeout_class is None:
         timeout_class = swift.common.exceptions.LockTimeout
     mkdirs(directory)
     lockpath = '%s/.lock' % directory
-    fd = os.open(lockpath, os.O_WRONLY | os.O_CREAT)
+    fds = [os.open(get_zero_indexed_base_string(lockpath, i),
+                   os.O_WRONLY | os.O_CREAT)
+           for i in range(limit)]
     sleep_time = 0.01
     slower_sleep_time = max(timeout * 0.01, sleep_time)
     slowdown_at = timeout * 0.01
@@ -2310,19 +2351,16 @@ def lock_path(directory, timeout=10, timeout_class=None):
     try:
         with timeout_class(timeout, lockpath):
             while True:
-                try:
-                    fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+                if _get_any_lock(fds):
                     break
-                except IOError as err:
-                    if err.errno != errno.EAGAIN:
-                        raise
                 if time_slept > slowdown_at:
                     sleep_time = slower_sleep_time
                 sleep(sleep_time)
                 time_slept += sleep_time
         yield True
     finally:
-        os.close(fd)
+        for fd in fds:
+            os.close(fd)
 
 
 @contextmanager
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 86e53b6b4d..5824a8f780 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -624,8 +624,28 @@ class BaseDiskFileManager(object):
         self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
         self.mount_check = config_true_value(conf.get('mount_check', 'true'))
         self.reclaim_age = int(conf.get('reclaim_age', DEFAULT_RECLAIM_AGE))
-        self.replication_one_per_device = config_true_value(
-            conf.get('replication_one_per_device', 'true'))
+        replication_concurrency_per_device = conf.get(
+            'replication_concurrency_per_device')
+        replication_one_per_device = conf.get('replication_one_per_device')
+        if replication_concurrency_per_device is None \
+                and replication_one_per_device is not None:
+            self.logger.warning('Option replication_one_per_device is '
+                                'deprecated and will be removed in a future '
+                                'version. Update your configuration to use '
+                                'option replication_concurrency_per_device.')
+            if config_true_value(replication_one_per_device):
+                replication_concurrency_per_device = 1
+            else:
+                replication_concurrency_per_device = 0
+        elif replication_one_per_device is not None:
+            self.logger.warning('Option replication_one_per_device ignored as '
+                                'replication_concurrency_per_device is '
+                                'defined.')
+        if replication_concurrency_per_device is None:
+            self.replication_concurrency_per_device = 1
+        else:
+            self.replication_concurrency_per_device = int(
+                replication_concurrency_per_device)
         self.replication_lock_timeout = int(conf.get(
             'replication_lock_timeout', 15))
 
@@ -1208,12 +1228,13 @@ class BaseDiskFileManager(object):
         :raises ReplicationLockTimeout: If the lock on the device
             cannot be granted within the configured timeout.
         """
-        if self.replication_one_per_device:
+        if self.replication_concurrency_per_device:
             dev_path = self.get_dev_path(device)
             with lock_path(
                     dev_path,
                     timeout=self.replication_lock_timeout,
-                    timeout_class=ReplicationLockTimeout):
+                    timeout_class=ReplicationLockTimeout,
+                    limit=self.replication_concurrency_per_device):
                 yield True
         else:
             yield True
diff --git a/test/probe/test_replication_servers_working.py b/test/probe/test_replication_servers_working.py
index 4721c82ad6..88e419dfac 100644
--- a/test/probe/test_replication_servers_working.py
+++ b/test/probe/test_replication_servers_working.py
@@ -19,6 +19,7 @@ from uuid import uuid4
 import os
 import time
 import shutil
+import re
 
 from swiftclient import client
 from swift.obj.diskfile import get_data_dir
@@ -26,7 +27,7 @@ from swift.obj.diskfile import get_data_dir
 from test.probe.common import ReplProbeTest
 from swift.common.utils import readconf
 
-EXCLUDE_FILES = ['hashes.pkl', 'hashes.invalid', '.lock']
+EXCLUDE_FILES = re.compile('^(hashes\.(pkl|invalid)|lock(-\d+)?)$')
 
 
 def collect_info(path_list):
@@ -43,7 +44,7 @@ def collect_info(path_list):
         temp_files_list = []
         temp_dir_list = []
         for root, dirs, files in os.walk(path):
-            files = [f for f in files if f not in EXCLUDE_FILES]
+            files = [f for f in files if not EXCLUDE_FILES.match(f)]
             temp_files_list += files
             temp_dir_list += dirs
         files_list.append(temp_files_list)
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index 634f29e61e..56533b87ca 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -924,9 +924,20 @@ class TestUtils(unittest.TestCase):
         utils.HASH_PATH_SUFFIX = 'endcap'
         utils.HASH_PATH_PREFIX = 'startcap'
 
+    def test_get_zero_indexed_base_string(self):
+        self.assertEqual(utils.get_zero_indexed_base_string('something', 0),
+                         'something')
+        self.assertEqual(utils.get_zero_indexed_base_string('something', None),
+                         'something')
+        self.assertEqual(utils.get_zero_indexed_base_string('something', 1),
+                         'something-1')
+        self.assertRaises(ValueError, utils.get_zero_indexed_base_string,
+                          'something', 'not_integer')
+
     def test_lock_path(self):
         tmpdir = mkdtemp()
         try:
+            # 2 locks with limit=1 must fail
             with utils.lock_path(tmpdir, 0.1):
                 exc = None
                 success = False
@@ -937,6 +948,26 @@ class TestUtils(unittest.TestCase):
                     exc = err
                 self.assertTrue(exc is not None)
                 self.assertTrue(not success)
+
+            # 2 locks with limit=2 must succeed
+            with utils.lock_path(tmpdir, 0.1, limit=2):
+                success = False
+                with utils.lock_path(tmpdir, 0.1, limit=2):
+                    success = True
+                self.assertTrue(success)
+
+            # 3 locks with limit=2 must fail
+            with utils.lock_path(tmpdir, 0.1, limit=2):
+                exc = None
+                success = False
+                with utils.lock_path(tmpdir, 0.1, limit=2):
+                    try:
+                        with utils.lock_path(tmpdir, 0.1, limit=2):
+                            success = True
+                    except LockTimeout as err:
+                        exc = err
+                self.assertTrue(exc is not None)
+                self.assertTrue(not success)
         finally:
             shutil.rmtree(tmpdir)
 
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index 8795ac40bc..56b8cbaa06 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -960,9 +960,66 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
         locations = list(self.df_mgr.object_audit_location_generator())
         self.assertEqual(locations, [])
 
+    def test_replication_one_per_device_deprecation(self):
+        conf = dict(**self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 1)
+
+        conf = dict(replication_concurrency_per_device='0', **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 0)
+
+        conf = dict(replication_concurrency_per_device='2', **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 2)
+
+        conf = dict(replication_concurrency_per_device=2, **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 2)
+
+        # Check backward compatibility
+        conf = dict(replication_one_per_device='true', **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 1)
+        log_lines = mgr.logger.get_lines_for_level('warning')
+        self.assertIn('replication_one_per_device is deprecated',
+                      log_lines[-1])
+
+        conf = dict(replication_one_per_device='false', **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 0)
+        log_lines = mgr.logger.get_lines_for_level('warning')
+        self.assertIn('replication_one_per_device is deprecated',
+                      log_lines[-1])
+
+        # If defined, new parameter has precedence
+        conf = dict(replication_concurrency_per_device='2',
+                    replication_one_per_device='true', **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 2)
+        log_lines = mgr.logger.get_lines_for_level('warning')
+        self.assertIn('replication_one_per_device ignored',
+                      log_lines[-1])
+
+        conf = dict(replication_concurrency_per_device='2',
+                    replication_one_per_device='false', **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 2)
+        log_lines = mgr.logger.get_lines_for_level('warning')
+        self.assertIn('replication_one_per_device ignored',
+                      log_lines[-1])
+
+        conf = dict(replication_concurrency_per_device='0',
+                    replication_one_per_device='true', **self.conf)
+        mgr = diskfile.DiskFileManager(conf, FakeLogger())
+        self.assertEqual(mgr.replication_concurrency_per_device, 0)
+        log_lines = mgr.logger.get_lines_for_level('warning')
+        self.assertIn('replication_one_per_device ignored',
+                      log_lines[-1])
+
     def test_replication_lock_on(self):
         # Double check settings
-        self.df_mgr.replication_one_per_device = True
+        self.df_mgr.replication_concurrency_per_device = 1
         self.df_mgr.replication_lock_timeout = 0.1
         dev_path = os.path.join(self.testdir, self.existing_device)
         with self.df_mgr.replication_lock(self.existing_device):
@@ -981,14 +1038,16 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
 
     def test_replication_lock_off(self):
         # Double check settings
-        self.df_mgr.replication_one_per_device = False
+        self.df_mgr.replication_concurrency_per_device = 0
         self.df_mgr.replication_lock_timeout = 0.1
         dev_path = os.path.join(self.testdir, self.existing_device)
-        with self.df_mgr.replication_lock(dev_path):
+
+        # 2 locks must succeed
+        with self.df_mgr.replication_lock(self.existing_device):
             lock_exc = None
             exc = None
             try:
-                with self.df_mgr.replication_lock(dev_path):
+                with self.df_mgr.replication_lock(self.existing_device):
                     raise Exception(
                         '%r was not replication locked!' % dev_path)
             except ReplicationLockTimeout as err:
@@ -998,9 +1057,62 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
             self.assertTrue(lock_exc is None)
             self.assertTrue(exc is not None)
 
+        # 3 locks must succeed
+        with self.df_mgr.replication_lock(self.existing_device):
+            with self.df_mgr.replication_lock(self.existing_device):
+                lock_exc = None
+                exc = None
+                try:
+                    with self.df_mgr.replication_lock(self.existing_device):
+                        raise Exception(
+                            '%r was not replication locked!' % dev_path)
+                except ReplicationLockTimeout as err:
+                    lock_exc = err
+                except Exception as err:
+                    exc = err
+                self.assertTrue(lock_exc is None)
+                self.assertTrue(exc is not None)
+
+    def test_replication_lock_2(self):
+        # Double check settings
+        self.df_mgr.replication_concurrency_per_device = 2
+        self.df_mgr.replication_lock_timeout = 0.1
+        dev_path = os.path.join(self.testdir, self.existing_device)
+
+        # 2 locks with replication_concurrency_per_device=2 must succeed
+        with self.df_mgr.replication_lock(self.existing_device):
+            lock_exc = None
+            exc = None
+            try:
+                with self.df_mgr.replication_lock(self.existing_device):
+                    raise Exception(
+                        '%r was not replication locked!' % dev_path)
+            except ReplicationLockTimeout as err:
+                lock_exc = err
+            except Exception as err:
+                exc = err
+            self.assertTrue(lock_exc is None)
+            self.assertTrue(exc is not None)
+
+        # 3 locks with replication_concurrency_per_device=2 must fail
+        with self.df_mgr.replication_lock(self.existing_device):
+            with self.df_mgr.replication_lock(self.existing_device):
+                lock_exc = None
+                exc = None
+                try:
+                    with self.df_mgr.replication_lock(self.existing_device):
+                        raise Exception(
+                            '%r was not replication locked!' % dev_path)
+                except ReplicationLockTimeout as err:
+                    lock_exc = err
+                except Exception as err:
+                    exc = err
+                self.assertTrue(lock_exc is not None)
+                self.assertTrue(exc is None)
+
     def test_replication_lock_another_device_fine(self):
         # Double check settings
-        self.df_mgr.replication_one_per_device = True
+        self.df_mgr.replication_concurrency_per_device = 1
         self.df_mgr.replication_lock_timeout = 0.1
         with self.df_mgr.replication_lock(self.existing_device):
             lock_exc = None
diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py
index 130e583d22..9594719208 100644
--- a/test/unit/obj/test_ssync.py
+++ b/test/unit/obj/test_ssync.py
@@ -54,7 +54,7 @@ class TestBaseSsync(BaseTest):
         conf = {
             'devices': self.rx_testdir,
             'mount_check': 'false',
-            'replication_one_per_device': 'false',
+            'replication_concurrency_per_device': '0',
             'log_requests': 'false'}
         self.rx_logger = debug_logger()
         self.rx_controller = server.ObjectController(conf, self.rx_logger)
diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py
index 739d28d38d..5ce3bccd27 100644
--- a/test/unit/obj/test_ssync_receiver.py
+++ b/test/unit/obj/test_ssync_receiver.py
@@ -55,7 +55,7 @@ class TestReceiver(unittest.TestCase):
         self.conf = {
             'devices': self.testdir,
             'mount_check': 'false',
-            'replication_one_per_device': 'false',
+            'replication_concurrency_per_device': '0',
             'log_requests': 'false'}
         utils.mkdirs(os.path.join(self.testdir, 'device', 'partition'))
         self.controller = server.ObjectController(self.conf)