Fix EC ring validation at ring reload
Swift EC has a strong constraint about the ring must have a number of replicas which fits ec_k + ec_m. That is validated when servers waking up. However, Swift has more chance to load such an invalid ring when a request comming, calling some node iteration like get_nodes, get_part_nodes or so, and no ring validation is there. This patch moves ring validation from policy validate_ring into the ring instance as validation_hook that will run at ring reload. Since this patch, ring instance will allow to use the old ring if the reload is not fourced. Note that the exception if invalid ring found was changed from RingValidationError to RingLoadError because RingValidationError is a child of RingBuilderError but the ring reload is obviously outside of "builder". Closes-Bug: #1534572 Change-Id: I6428fbfb04e0c79679b917d5e57bd2a34f2a0875
This commit is contained in:
parent
574a666a43
commit
1eb96397e7
@ -145,6 +145,10 @@ class LockTimeout(MessageTimeout):
|
||||
pass
|
||||
|
||||
|
||||
class RingLoadError(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class RingBuilderError(SwiftException):
|
||||
pass
|
||||
|
||||
|
@ -29,6 +29,7 @@ from tempfile import NamedTemporaryFile
|
||||
|
||||
from six.moves import range
|
||||
|
||||
from swift.common.exceptions import RingLoadError
|
||||
from swift.common.utils import hash_path, validate_configuration
|
||||
from swift.common.ring.utils import tiers_for_dev
|
||||
|
||||
@ -156,9 +157,14 @@ class Ring(object):
|
||||
|
||||
:param serialized_path: path to serialized RingData instance
|
||||
:param reload_time: time interval in seconds to check for a ring change
|
||||
:param ring_name: ring name string (basically specified from policy)
|
||||
:param validation_hook: hook point to validate ring configuration ontime
|
||||
|
||||
:raises: RingLoadError if the loaded ring data violates its constraint
|
||||
"""
|
||||
|
||||
def __init__(self, serialized_path, reload_time=15, ring_name=None):
|
||||
def __init__(self, serialized_path, reload_time=15, ring_name=None,
|
||||
validation_hook=lambda ring_data: None):
|
||||
# can't use the ring unless HASH_PATH_SUFFIX is set
|
||||
validate_configuration()
|
||||
if ring_name:
|
||||
@ -167,12 +173,24 @@ class Ring(object):
|
||||
else:
|
||||
self.serialized_path = os.path.join(serialized_path)
|
||||
self.reload_time = reload_time
|
||||
self._validation_hook = validation_hook
|
||||
self._reload(force=True)
|
||||
|
||||
def _reload(self, force=False):
|
||||
self._rtime = time() + self.reload_time
|
||||
if force or self.has_changed():
|
||||
ring_data = RingData.load(self.serialized_path)
|
||||
|
||||
try:
|
||||
self._validation_hook(ring_data)
|
||||
except RingLoadError:
|
||||
if force:
|
||||
raise
|
||||
else:
|
||||
# In runtime reload at working server, it's ok to use old
|
||||
# ring data if the new ring data is invalid.
|
||||
return
|
||||
|
||||
self._mtime = getmtime(self.serialized_path)
|
||||
self._devs = ring_data.devs
|
||||
# NOTE(akscram): Replication parameters like replication_ip
|
||||
|
@ -21,7 +21,7 @@ from swift.common.utils import (
|
||||
config_true_value, SWIFT_CONF_FILE, whataremyips, list_from_csv)
|
||||
from swift.common.ring import Ring, RingData
|
||||
from swift.common.utils import quorum_size
|
||||
from swift.common.exceptions import RingValidationError
|
||||
from swift.common.exceptions import RingLoadError
|
||||
from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES
|
||||
|
||||
LEGACY_POLICY_NAME = 'Policy-0'
|
||||
@ -350,13 +350,6 @@ class BaseStoragePolicy(object):
|
||||
self._validate_policy_name(name)
|
||||
self.alias_list.insert(0, name)
|
||||
|
||||
def _validate_ring(self):
|
||||
"""
|
||||
Hook, called when the ring is loaded. Can be used to
|
||||
validate the ring against the StoragePolicy configuration.
|
||||
"""
|
||||
pass
|
||||
|
||||
def load_ring(self, swift_dir):
|
||||
"""
|
||||
Load the ring for this policy immediately.
|
||||
@ -367,9 +360,6 @@ class BaseStoragePolicy(object):
|
||||
return
|
||||
self.object_ring = Ring(swift_dir, ring_name=self.ring_name)
|
||||
|
||||
# Validate ring to make sure it conforms to policy requirements
|
||||
self._validate_ring()
|
||||
|
||||
@property
|
||||
def quorum(self):
|
||||
"""
|
||||
@ -552,25 +542,6 @@ class ECStoragePolicy(BaseStoragePolicy):
|
||||
info.pop('ec_type')
|
||||
return info
|
||||
|
||||
def _validate_ring(self):
|
||||
"""
|
||||
EC specific validation
|
||||
|
||||
Replica count check - we need _at_least_ (#data + #parity) replicas
|
||||
configured. Also if the replica count is larger than exactly that
|
||||
number there's a non-zero risk of error for code that is considering
|
||||
the number of nodes in the primary list from the ring.
|
||||
"""
|
||||
if not self.object_ring:
|
||||
raise PolicyError('Ring is not loaded')
|
||||
nodes_configured = self.object_ring.replica_count
|
||||
if nodes_configured != (self.ec_ndata + self.ec_nparity):
|
||||
raise RingValidationError(
|
||||
'EC ring for policy %s needs to be configured with '
|
||||
'exactly %d nodes. Got %d.' % (
|
||||
self.name, self.ec_ndata + self.ec_nparity,
|
||||
nodes_configured))
|
||||
|
||||
@property
|
||||
def quorum(self):
|
||||
"""
|
||||
@ -593,6 +564,37 @@ class ECStoragePolicy(BaseStoragePolicy):
|
||||
"""
|
||||
return self._ec_quorum_size
|
||||
|
||||
def load_ring(self, swift_dir):
|
||||
"""
|
||||
Load the ring for this policy immediately.
|
||||
|
||||
:param swift_dir: path to rings
|
||||
"""
|
||||
if self.object_ring:
|
||||
return
|
||||
|
||||
def validate_ring_data(ring_data):
|
||||
"""
|
||||
EC specific validation
|
||||
|
||||
Replica count check - we need _at_least_ (#data + #parity) replicas
|
||||
configured. Also if the replica count is larger than exactly that
|
||||
number there's a non-zero risk of error for code that is
|
||||
considering the number of nodes in the primary list from the ring.
|
||||
"""
|
||||
|
||||
nodes_configured = len(ring_data._replica2part2dev_id)
|
||||
if nodes_configured != (self.ec_ndata + self.ec_nparity):
|
||||
raise RingLoadError(
|
||||
'EC ring for policy %s needs to be configured with '
|
||||
'exactly %d replicas. Got %d.' % (
|
||||
self.name, self.ec_ndata + self.ec_nparity,
|
||||
nodes_configured))
|
||||
|
||||
self.object_ring = Ring(
|
||||
swift_dir, ring_name=self.ring_name,
|
||||
validation_hook=validate_ring_data)
|
||||
|
||||
|
||||
class StoragePolicyCollection(object):
|
||||
"""
|
||||
|
@ -26,7 +26,7 @@ from swift.common.storage_policy import (
|
||||
BaseStoragePolicy, StoragePolicy, ECStoragePolicy, REPL_POLICY, EC_POLICY,
|
||||
VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE, BindPortsCache)
|
||||
from swift.common.ring import RingData
|
||||
from swift.common.exceptions import RingValidationError
|
||||
from swift.common.exceptions import RingLoadError
|
||||
from pyeclib.ec_iface import ECDriver
|
||||
|
||||
|
||||
@ -1146,23 +1146,32 @@ class TestStoragePolicies(unittest.TestCase):
|
||||
test_policies = [
|
||||
ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=8, ec_nparity=2,
|
||||
object_ring=FakeRing(replicas=8),
|
||||
is_default=True),
|
||||
ECStoragePolicy(1, 'ec10-4', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=10, ec_nparity=4,
|
||||
object_ring=FakeRing(replicas=10)),
|
||||
ec_ndata=10, ec_nparity=4),
|
||||
ECStoragePolicy(2, 'ec4-2', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=4, ec_nparity=2,
|
||||
object_ring=FakeRing(replicas=7)),
|
||||
ec_ndata=4, ec_nparity=2),
|
||||
]
|
||||
actual_load_ring_replicas = [8, 10, 7]
|
||||
policies = StoragePolicyCollection(test_policies)
|
||||
|
||||
for policy in policies:
|
||||
def create_mock_ring_data(num_replica):
|
||||
class mock_ring_data_klass(object):
|
||||
def __init__(self):
|
||||
self._replica2part2dev_id = [0] * num_replica
|
||||
|
||||
return mock_ring_data_klass()
|
||||
|
||||
for policy, ring_replicas in zip(policies, actual_load_ring_replicas):
|
||||
with mock.patch('swift.common.ring.ring.RingData.load',
|
||||
return_value=create_mock_ring_data(ring_replicas)):
|
||||
with mock.patch(
|
||||
'swift.common.ring.ring.validate_configuration'):
|
||||
msg = 'EC ring for policy %s needs to be configured with ' \
|
||||
'exactly %d nodes.' % \
|
||||
'exactly %d replicas.' % \
|
||||
(policy.name, policy.ec_ndata + policy.ec_nparity)
|
||||
self.assertRaisesWithMessage(RingValidationError, msg,
|
||||
policy._validate_ring)
|
||||
self.assertRaisesWithMessage(RingLoadError, msg,
|
||||
policy.load_ring, 'mock')
|
||||
|
||||
def test_storage_policy_get_info(self):
|
||||
test_policies = [
|
||||
|
@ -51,6 +51,9 @@ class TestObjectController(test_server.TestObjectController):
|
||||
def test_PUT_ec_fragment_archive_etag_mismatch(self):
|
||||
pass
|
||||
|
||||
def test_reload_ring_ec(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestContainerController(test_server.TestContainerController):
|
||||
pass
|
||||
|
@ -24,7 +24,7 @@ import sys
|
||||
import traceback
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
from shutil import rmtree
|
||||
from shutil import rmtree, copyfile
|
||||
import gc
|
||||
import time
|
||||
from textwrap import dedent
|
||||
@ -3018,6 +3018,96 @@ class TestObjectController(unittest.TestCase):
|
||||
test_content_type('test.css', iter(['', '', 'text/css',
|
||||
'text/css', 'text/css']))
|
||||
|
||||
@unpatch_policies
|
||||
def test_reload_ring_ec(self):
|
||||
policy = POLICIES[3]
|
||||
self.put_container("ec", "ec-con")
|
||||
|
||||
orig_rtime = policy.object_ring._rtime
|
||||
# save original file as back up
|
||||
copyfile(policy.object_ring.serialized_path,
|
||||
policy.object_ring.serialized_path + '.bak')
|
||||
|
||||
try:
|
||||
# overwrite with 2 replica, 2 devices ring
|
||||
obj_devs = []
|
||||
obj_devs.append(
|
||||
{'port': _test_sockets[-3].getsockname()[1],
|
||||
'device': 'sdg1'})
|
||||
obj_devs.append(
|
||||
{'port': _test_sockets[-2].getsockname()[1],
|
||||
'device': 'sdh1'})
|
||||
write_fake_ring(policy.object_ring.serialized_path,
|
||||
*obj_devs)
|
||||
|
||||
def get_ring_reloaded_response(method):
|
||||
# force to reload at the request
|
||||
policy.object_ring._rtime = 0
|
||||
|
||||
trans_data = ['%s /v1/a/ec-con/o2 HTTP/1.1\r\n' % method,
|
||||
'Host: localhost\r\n',
|
||||
'Connection: close\r\n',
|
||||
'X-Storage-Token: t\r\n']
|
||||
|
||||
if method == 'PUT':
|
||||
# small, so we don't get multiple EC stripes
|
||||
obj = 'abCD' * 10
|
||||
|
||||
extra_trans_data = [
|
||||
'Etag: "%s"\r\n' % md5(obj).hexdigest(),
|
||||
'Content-Length: %d\r\n' % len(obj),
|
||||
'Content-Type: application/octet-stream\r\n',
|
||||
'\r\n%s' % obj
|
||||
]
|
||||
trans_data.extend(extra_trans_data)
|
||||
else:
|
||||
trans_data.append('\r\n')
|
||||
|
||||
prolis = _test_sockets[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
fd.write(''.join(trans_data))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
|
||||
# use older ring with rollbacking
|
||||
return headers
|
||||
|
||||
for method in ('PUT', 'HEAD', 'GET', 'POST', 'DELETE'):
|
||||
headers = get_ring_reloaded_response(method)
|
||||
exp = 'HTTP/1.1 20'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# proxy didn't load newest ring, use older one
|
||||
self.assertEqual(3, policy.object_ring.replica_count)
|
||||
|
||||
if method == 'POST':
|
||||
# Take care fast post here!
|
||||
orig_post_as_copy = getattr(
|
||||
_test_servers[0], 'object_post_as_copy', None)
|
||||
try:
|
||||
_test_servers[0].object_post_as_copy = False
|
||||
with mock.patch.object(
|
||||
_test_servers[0],
|
||||
'object_post_as_copy', False):
|
||||
headers = get_ring_reloaded_response(method)
|
||||
finally:
|
||||
if orig_post_as_copy is None:
|
||||
del _test_servers[0].object_post_as_copy
|
||||
else:
|
||||
_test_servers[0].object_post_as_copy = \
|
||||
orig_post_as_copy
|
||||
|
||||
exp = 'HTTP/1.1 20'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
# sanity
|
||||
self.assertEqual(3, policy.object_ring.replica_count)
|
||||
|
||||
finally:
|
||||
policy.object_ring._rtime = orig_rtime
|
||||
os.rename(policy.object_ring.serialized_path + '.bak',
|
||||
policy.object_ring.serialized_path)
|
||||
|
||||
def test_custom_mime_types_files(self):
|
||||
swift_dir = mkdtemp()
|
||||
try:
|
||||
|
Loading…
Reference in New Issue
Block a user