sharder: always get ranges from root while shrinking
While auditing a shard container that is in a shrinking state, the sharder will merge any shard ranges fetched from the root that cover the shard's namespace. Previously this was conditional upon the sharder also fetching the shard's *own* shard range from the root. However, in extreme circumstances, the shard's own shard range could become deleted and reclaimed from the root while the shard DB still needs to shrink to its acceptor ranges. This patch therefore allows the shard to be updated with potential acceptor ranges from the root even when its own shard range is not fetched from the root. Also change the log level from debug to info when logging an update to a shard's own shard range from root. Change-Id: I17957cf0ef4936f91e69c6d9ae21551972f0df31
This commit is contained in:
parent
429702e30d
commit
f15b92084f
@ -5316,6 +5316,7 @@ class ShardRange(object):
|
||||
SHARDED: 'sharded',
|
||||
SHRUNK: 'shrunk'}
|
||||
STATES_BY_NAME = dict((v, k) for k, v in STATES.items())
|
||||
SHRINKING_STATES = (SHRINKING, SHRUNK)
|
||||
|
||||
@functools.total_ordering
|
||||
class MaxBound(ShardRangeOuterBound):
|
||||
|
@ -1219,15 +1219,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
if (orig_own_shard_range != own_shard_range or
|
||||
orig_own_shard_range.state != own_shard_range.state):
|
||||
self.logger.debug(
|
||||
self.logger.info(
|
||||
'Updated own shard range from %s to %s',
|
||||
orig_own_shard_range, own_shard_range)
|
||||
else:
|
||||
other_shard_ranges.append(shard_range)
|
||||
|
||||
if (other_shard_ranges and own_shard_range_from_root and
|
||||
own_shard_range.state in
|
||||
(ShardRange.SHRINKING, ShardRange.SHRUNK)):
|
||||
if (other_shard_ranges and
|
||||
own_shard_range.state in ShardRange.SHRINKING_STATES):
|
||||
# If own_shard_range state is shrinking, save off *all* shards
|
||||
# returned because these may contain shards into which this
|
||||
# shard is to shrink itself; shrinking is the only case when we
|
||||
@ -1259,7 +1258,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
own_shard_range.timestamp < delete_age and
|
||||
broker.empty()):
|
||||
broker.delete_db(Timestamp.now().internal)
|
||||
self.logger.debug('Deleted shard container %s (%s)',
|
||||
self.logger.debug('Marked shard container as deleted %s (%s)',
|
||||
broker.db_file, quote(broker.path))
|
||||
|
||||
def _do_audit_shard_container(self, broker):
|
||||
@ -1796,7 +1795,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
quote(broker.path), shard_range)
|
||||
|
||||
replication_quorum = self.existing_shard_replication_quorum
|
||||
if own_shard_range.state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
|
||||
if own_shard_range.state in ShardRange.SHRINKING_STATES:
|
||||
if shard_range.includes(own_shard_range):
|
||||
# When shrinking to a single acceptor that completely encloses
|
||||
# this shard's namespace, include deleted own (donor) shard
|
||||
@ -2001,8 +2000,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
quote(broker.path))
|
||||
return False
|
||||
own_shard_range.update_meta(0, 0)
|
||||
if own_shard_range.state in (ShardRange.SHRINKING,
|
||||
ShardRange.SHRUNK):
|
||||
if own_shard_range.state in ShardRange.SHRINKING_STATES:
|
||||
own_shard_range.update_state(ShardRange.SHRUNK)
|
||||
modified_shard_ranges = []
|
||||
else:
|
||||
|
@ -45,7 +45,8 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
||||
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf, \
|
||||
find_paths_with_gaps
|
||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5, \
|
||||
ShardName
|
||||
from test import annotate_failure
|
||||
|
||||
from test.debug_logger import debug_logger
|
||||
@ -5605,10 +5606,17 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(shard_ranges[1].update_state(ShardRange.ACTIVE,
|
||||
state_timestamp=root_ts))
|
||||
shard_ranges[1].timestamp = root_ts
|
||||
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
|
||||
with mock_timestamp_now() as ts_now:
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertEqual(['Updating own shard range from root', mock.ANY],
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(broker.is_deleted())
|
||||
@ -5695,19 +5703,27 @@ class TestSharder(BaseTestSharder):
|
||||
# make own shard range match one in root, but different state
|
||||
own_ts = next(self.ts_iter)
|
||||
shard_ranges[1].timestamp = own_ts
|
||||
broker.merge_shard_ranges([shard_ranges[1]])
|
||||
own_shard_range = shard_ranges[1].copy()
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
root_ts = next(self.ts_iter)
|
||||
shard_ranges[1].update_state(ShardRange.SHARDING,
|
||||
state_timestamp=root_ts)
|
||||
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
|
||||
with mock_timestamp_now() as ts_now:
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self.assert_no_audit_messages(sharder, mock_swift)
|
||||
self.assertFalse(broker.is_deleted())
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
# own shard range state is updated from root version
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
self.assertEqual(ShardRange.SHARDING, own_shard_range.state)
|
||||
self.assertEqual(root_ts, own_shard_range.state_timestamp)
|
||||
self.assertEqual(['Updating own shard range from root', mock.ANY],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
|
||||
own_shard_range.update_state(ShardRange.SHARDED,
|
||||
state_timestamp=next(self.ts_iter))
|
||||
@ -5834,60 +5850,181 @@ class TestSharder(BaseTestSharder):
|
||||
self._do_test_audit_shard_container_merge_other_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
|
||||
def _do_test_audit_shard_container_with_root_ranges(self, *args):
|
||||
# shards may merge acceptors and the root range when shrinking; verify
|
||||
# that shard audit is ok with merged ranges
|
||||
def check_audit(own_state, acceptor_state, root_state):
|
||||
broker = self._make_broker(
|
||||
account='.shards_a',
|
||||
container='shard_c_%s' % next(self.ts_iter).normal)
|
||||
broker.set_sharding_sysmeta(*args)
|
||||
own_sr = broker.get_own_shard_range().copy(
|
||||
state=own_state, state_timestamp=next(self.ts_iter),
|
||||
lower='a', upper='b', timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
def _assert_merge_into_shard(self, own_shard_range, shard_ranges,
|
||||
root_shard_ranges, expected, *args):
|
||||
# create a shard broker, initialise with shard_ranges, run audit on it
|
||||
# supplying given root_shard_ranges and verify that the broker ends up
|
||||
# with expected shard ranges.
|
||||
broker = self._make_broker(account=own_shard_range.account,
|
||||
container=own_shard_range.container)
|
||||
broker.set_sharding_sysmeta(*args)
|
||||
broker.merge_shard_ranges([own_shard_range] + shard_ranges)
|
||||
self.assertFalse(broker.is_root_container())
|
||||
|
||||
# make acceptor and root ranges that overlap with the shard
|
||||
overlaps = self._make_shard_ranges([('a', 'c'), ('', '')],
|
||||
[acceptor_state, root_state])
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, [own_sr] + overlaps)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
if own_state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
|
||||
# check acceptor & root are merged into audited shard
|
||||
self.assertEqual(
|
||||
[dict(sr) for sr in overlaps],
|
||||
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||
return sharder
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, root_shard_ranges)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
|
||||
def assert_ok(own_state, acceptor_state, root_state):
|
||||
sharder = check_audit(own_state, acceptor_state, root_state)
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_shard_ranges_equal(expected, broker.get_shard_ranges())
|
||||
self.assertEqual(own_shard_range,
|
||||
broker.get_own_shard_range(no_default=True))
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
return sharder
|
||||
|
||||
def _do_test_audit_shard_root_ranges_not_merged(self, *args):
|
||||
# Make root and other ranges that fully contain the shard namespace...
|
||||
root_own_sr = ShardRange('a/c', next(self.ts_iter))
|
||||
acceptor = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c',
|
||||
next(self.ts_iter), 1)),
|
||||
next(self.ts_iter), 'a', 'c')
|
||||
|
||||
def do_test(own_state, acceptor_state, root_state):
|
||||
acceptor_from_root = acceptor.copy(
|
||||
timestamp=next(self.ts_iter), state=acceptor_state)
|
||||
root_from_root = root_own_sr.copy(
|
||||
timestamp=next(self.ts_iter), state=root_state)
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
own_sr_name = ShardName.create(
|
||||
'.shards_a', 'c', 'c', next(self.ts_iter), 0)
|
||||
own_sr = ShardRange(
|
||||
str(own_sr_name), next(self.ts_iter), state=own_state,
|
||||
state_timestamp=next(self.ts_iter), lower='a', upper='b')
|
||||
expected = existing = []
|
||||
sharder = self._assert_merge_into_shard(
|
||||
own_sr, existing,
|
||||
[own_sr, acceptor_from_root, root_from_root],
|
||||
expected, *args)
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.STATES:
|
||||
if own_state in ShardRange.SHRINKING_STATES:
|
||||
# shrinking states are covered by other tests
|
||||
continue
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
assert_ok(own_state, acceptor_state, root_state)
|
||||
do_test(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_old_style_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Root', 'a/c')
|
||||
def test_audit_old_style_shard_root_ranges_not_merged_not_shrinking(self):
|
||||
# verify that other shard ranges from root are NOT merged into shard
|
||||
# when it is NOT in a shrinking state
|
||||
self._do_test_audit_shard_root_ranges_not_merged('Root', 'a/c')
|
||||
|
||||
def test_audit_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
def test_audit_shard_root_ranges_not_merged_not_shrinking(self):
|
||||
# verify that other shard ranges from root are NOT merged into shard
|
||||
# when it is NOT in a shrinking state
|
||||
self._do_test_audit_shard_root_ranges_not_merged('Quoted-Root', 'a/c')
|
||||
|
||||
def test_audit_shard_root_ranges_with_own_merged_while_shrinking(self):
|
||||
# Verify that shrinking shard will merge root and other ranges,
|
||||
# including root range.
|
||||
# Make root and other ranges that fully contain the shard namespace...
|
||||
root_own_sr = ShardRange('a/c', next(self.ts_iter))
|
||||
acceptor = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c',
|
||||
next(self.ts_iter), 1)),
|
||||
next(self.ts_iter), 'a', 'c')
|
||||
|
||||
def do_test(own_state, acceptor_state, root_state):
|
||||
acceptor_from_root = acceptor.copy(
|
||||
timestamp=next(self.ts_iter), state=acceptor_state)
|
||||
root_from_root = root_own_sr.copy(
|
||||
timestamp=next(self.ts_iter), state=root_state)
|
||||
ts = next(self.ts_iter)
|
||||
own_sr = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
|
||||
ts, lower='a', upper='b', state=own_state, state_timestamp=ts)
|
||||
expected = [acceptor_from_root, root_from_root]
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
sharder = self._assert_merge_into_shard(
|
||||
own_sr, [],
|
||||
# own sr is in ranges fetched from root
|
||||
[own_sr, acceptor_from_root, root_from_root],
|
||||
expected, 'Quoted-Root', 'a/c')
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.SHRINKING_STATES:
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
do_test(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_shard_root_ranges_missing_own_merged_while_shrinking(self):
|
||||
# Verify that shrinking shard will merge root and other ranges,
|
||||
# including root range.
|
||||
# Make root and other ranges that fully contain the shard namespace...
|
||||
root_own_sr = ShardRange('a/c', next(self.ts_iter))
|
||||
acceptor = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c',
|
||||
next(self.ts_iter), 1)),
|
||||
next(self.ts_iter), 'a', 'c')
|
||||
|
||||
def do_test(own_state, acceptor_state, root_state):
|
||||
acceptor_from_root = acceptor.copy(
|
||||
timestamp=next(self.ts_iter), state=acceptor_state)
|
||||
root_from_root = root_own_sr.copy(
|
||||
timestamp=next(self.ts_iter), state=root_state)
|
||||
ts = next(self.ts_iter)
|
||||
own_sr = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
|
||||
ts, lower='a', upper='b', state=own_state, state_timestamp=ts)
|
||||
expected = [acceptor_from_root, root_from_root]
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
sharder = self._assert_merge_into_shard(
|
||||
own_sr, [],
|
||||
# own sr is NOT in ranges fetched from root
|
||||
[acceptor_from_root, root_from_root],
|
||||
expected, 'Quoted-Root', 'a/c')
|
||||
warning_lines = sharder.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(warning_lines))
|
||||
self.assertIn('root has no matching shard range',
|
||||
warning_lines[0])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.SHRINKING_STATES:
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
do_test(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_shard_root_ranges_fetch_fails_while_shrinking(self):
|
||||
# check audit copes with failed response while shard is shrinking
|
||||
ts = next(self.ts_iter)
|
||||
own_sr = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
|
||||
ts, lower='a', upper='b', state=ShardRange.SHRINKING,
|
||||
state_timestamp=ts)
|
||||
broker = self._make_broker(account=own_sr.account,
|
||||
container=own_sr.container)
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
self.assertFalse(broker.is_root_container())
|
||||
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, [], exc=internal_client.UnexpectedResponse('bad', 'resp'))
|
||||
self.assertEqual([], broker.get_shard_ranges())
|
||||
self.assertEqual(own_sr, broker.get_own_shard_range(no_default=True))
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
warning_lines = sharder.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(2, len(warning_lines))
|
||||
self.assertIn('Failed to get shard ranges from a/c: bad',
|
||||
warning_lines[0])
|
||||
self.assertIn('unable to get shard ranges from root',
|
||||
warning_lines[1])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_audit_shard_deleted_range_in_root_container(self):
|
||||
# verify that shard DB is marked deleted when its own shard range is
|
||||
|
Loading…
Reference in New Issue
Block a user