sharder: refactor _audit_shard_container
This patch is purely a refactor. The only intended behavioral change is the following consequence of the refactor: when the audit fails to fetch a copy of the shard's own_shard_range from root, the warning for that is now logged before any other shard audit warnings rather than after. Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: Ibc7d298fa8dfbc5c771db2e861cacc2e8af104cc
This commit is contained in:
parent
fa19868703
commit
429702e30d
@ -1181,94 +1181,77 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
self._increment_stat('audit_root', 'success', statsd=True)
|
self._increment_stat('audit_root', 'success', statsd=True)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _audit_shard_container(self, broker):
|
def _merge_shard_ranges_from_root(self, broker, shard_ranges,
|
||||||
self._increment_stat('audit_shard', 'attempted')
|
own_shard_range):
|
||||||
warnings = []
|
"""
|
||||||
errors = []
|
Merge appropriate items from the given ``shard_ranges`` into the
|
||||||
if not broker.account.startswith(self.shards_account_prefix):
|
``broker``. The selection of items that are merged will depend upon the
|
||||||
warnings.append('account not in shards namespace %r' %
|
state of the shard.
|
||||||
self.shards_account_prefix)
|
|
||||||
|
|
||||||
own_shard_range = broker.get_own_shard_range(no_default=True)
|
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||||
|
:param shard_ranges: A list of instances of
|
||||||
|
:class:`~swift.common.utils.ShardRange` describing the shard ranges
|
||||||
|
fetched from the root container.
|
||||||
|
:param own_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||||
|
describing the shard's own shard range.
|
||||||
|
:return: a tuple of ``own_shard_range, own_shard_range_from_root``. The
|
||||||
|
returned``own_shard_range`` will have been updated if the matching
|
||||||
|
``own_shard_range_from_root`` has newer data.
|
||||||
|
``own_shard_range_from_root`` will be None if no such matching
|
||||||
|
shard range is found in ``shard_ranges``.
|
||||||
|
"""
|
||||||
|
own_shard_range_from_root = None
|
||||||
|
other_shard_ranges = []
|
||||||
|
for shard_range in shard_ranges:
|
||||||
|
# look for this shard range in the list of shard ranges received
|
||||||
|
# from root; the root may have different lower and upper bounds for
|
||||||
|
# this shard (e.g. if this shard has been expanded in the root to
|
||||||
|
# accept a shrinking shard) so we only match on name.
|
||||||
|
if shard_range.name == own_shard_range.name:
|
||||||
|
# If we find our own shard range in the root response, merge
|
||||||
|
# it and reload own shard range (note: own_range_from_root may
|
||||||
|
# not necessarily be 'newer' than the own shard range we
|
||||||
|
# already have, but merging will get us to the 'newest' state)
|
||||||
|
self.logger.debug('Updating own shard range from root')
|
||||||
|
own_shard_range_from_root = shard_range
|
||||||
|
broker.merge_shard_ranges(own_shard_range_from_root)
|
||||||
|
orig_own_shard_range = own_shard_range
|
||||||
|
own_shard_range = broker.get_own_shard_range()
|
||||||
|
if (orig_own_shard_range != own_shard_range or
|
||||||
|
orig_own_shard_range.state != own_shard_range.state):
|
||||||
|
self.logger.debug(
|
||||||
|
'Updated own shard range from %s to %s',
|
||||||
|
orig_own_shard_range, own_shard_range)
|
||||||
|
else:
|
||||||
|
other_shard_ranges.append(shard_range)
|
||||||
|
|
||||||
shard_ranges = own_shard_range_from_root = None
|
if (other_shard_ranges and own_shard_range_from_root and
|
||||||
if own_shard_range:
|
own_shard_range.state in
|
||||||
# Get the root view of the world, at least that part of the world
|
(ShardRange.SHRINKING, ShardRange.SHRUNK)):
|
||||||
# that overlaps with this shard's namespace. The
|
# If own_shard_range state is shrinking, save off *all* shards
|
||||||
# 'states=auditing' parameter will cause the root to include
|
# returned because these may contain shards into which this
|
||||||
# its own shard range in the response, which is necessary for the
|
# shard is to shrink itself; shrinking is the only case when we
|
||||||
# particular case when this shard should be shrinking to the root
|
# want to learn about *other* shard ranges from the root.
|
||||||
# container; when not shrinking to root, but to another acceptor,
|
# We need to include shrunk state too, because one replica of a
|
||||||
# the root range should be in sharded state and will not interfere
|
# shard may already have moved the own_shard_range state to
|
||||||
# with cleaving, listing or updating behaviour.
|
# shrunk while another replica may still be in the process of
|
||||||
shard_ranges = self._fetch_shard_ranges(
|
# shrinking.
|
||||||
broker, newest=True,
|
self.logger.debug('Updating %s other shard range(s) from root',
|
||||||
params={'marker': str_to_wsgi(own_shard_range.lower_str),
|
len(other_shard_ranges))
|
||||||
'end_marker': str_to_wsgi(own_shard_range.upper_str),
|
broker.merge_shard_ranges(other_shard_ranges)
|
||||||
'states': 'auditing'},
|
|
||||||
include_deleted=True)
|
|
||||||
if shard_ranges:
|
|
||||||
for shard_range in shard_ranges:
|
|
||||||
# look for this shard range in the list of shard ranges
|
|
||||||
# received from root; the root may have different lower and
|
|
||||||
# upper bounds for this shard (e.g. if this shard has been
|
|
||||||
# expanded in the root to accept a shrinking shard) so we
|
|
||||||
# only match on name.
|
|
||||||
if shard_range.name == own_shard_range.name:
|
|
||||||
own_shard_range_from_root = shard_range
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
# this is not necessarily an error - some replicas of the
|
|
||||||
# root may not yet know about this shard container
|
|
||||||
warnings.append('root has no matching shard range')
|
|
||||||
elif not own_shard_range.deleted:
|
|
||||||
warnings.append('unable to get shard ranges from root')
|
|
||||||
# else, our shard range is deleted, so root may have reclaimed it
|
|
||||||
else:
|
|
||||||
errors.append('missing own shard range')
|
|
||||||
|
|
||||||
if warnings:
|
return own_shard_range, own_shard_range_from_root
|
||||||
self.logger.warning(
|
|
||||||
'Audit warnings for shard %s (%s): %s',
|
|
||||||
broker.db_file, quote(broker.path), ', '.join(warnings))
|
|
||||||
|
|
||||||
if errors:
|
def _delete_shard_container(self, broker, own_shard_range):
|
||||||
self.logger.warning(
|
"""
|
||||||
'Audit failed for shard %s (%s) - skipping: %s',
|
Mark a shard container as deleted if it was sharded or shrunk more than
|
||||||
broker.db_file, quote(broker.path), ', '.join(errors))
|
reclaim_age in the past. (The DB file will be removed by the replicator
|
||||||
self._increment_stat('audit_shard', 'failure', statsd=True)
|
after a further reclaim_age.)
|
||||||
return False
|
|
||||||
|
|
||||||
if own_shard_range_from_root:
|
|
||||||
# iff we find our own shard range in the root response, merge it
|
|
||||||
# and reload own shard range (note: own_range_from_root may not
|
|
||||||
# necessarily be 'newer' than the own shard range we already have,
|
|
||||||
# but merging will get us to the 'newest' state)
|
|
||||||
self.logger.debug('Updating own shard range from root')
|
|
||||||
broker.merge_shard_ranges(own_shard_range_from_root)
|
|
||||||
orig_own_shard_range = own_shard_range
|
|
||||||
own_shard_range = broker.get_own_shard_range()
|
|
||||||
if (orig_own_shard_range != own_shard_range or
|
|
||||||
orig_own_shard_range.state != own_shard_range.state):
|
|
||||||
self.logger.debug(
|
|
||||||
'Updated own shard range from %s to %s',
|
|
||||||
orig_own_shard_range, own_shard_range)
|
|
||||||
if own_shard_range.state in (ShardRange.SHRINKING,
|
|
||||||
ShardRange.SHRUNK):
|
|
||||||
# If the up-to-date state is shrinking, save off *all* shards
|
|
||||||
# returned because these may contain shards into which this
|
|
||||||
# shard is to shrink itself; shrinking is the only case when we
|
|
||||||
# want to learn about *other* shard ranges from the root.
|
|
||||||
# We need to include shrunk state too, because one replica of a
|
|
||||||
# shard may already have moved the own_shard_range state to
|
|
||||||
# shrunk while another replica may still be in the process of
|
|
||||||
# shrinking.
|
|
||||||
other_shard_ranges = [sr for sr in shard_ranges
|
|
||||||
if sr is not own_shard_range_from_root]
|
|
||||||
self.logger.debug('Updating %s other shard range(s) from root',
|
|
||||||
len(other_shard_ranges))
|
|
||||||
broker.merge_shard_ranges(other_shard_ranges)
|
|
||||||
|
|
||||||
|
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||||
|
:param own_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||||
|
describing the shard's own shard range.
|
||||||
|
"""
|
||||||
delete_age = time.time() - self.reclaim_age
|
delete_age = time.time() - self.reclaim_age
|
||||||
deletable_states = (ShardRange.SHARDED, ShardRange.SHRUNK)
|
deletable_states = (ShardRange.SHARDED, ShardRange.SHRUNK)
|
||||||
if (own_shard_range.state in deletable_states and
|
if (own_shard_range.state in deletable_states and
|
||||||
@ -1279,8 +1262,62 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
self.logger.debug('Deleted shard container %s (%s)',
|
self.logger.debug('Deleted shard container %s (%s)',
|
||||||
broker.db_file, quote(broker.path))
|
broker.db_file, quote(broker.path))
|
||||||
|
|
||||||
self._increment_stat('audit_shard', 'success', statsd=True)
|
def _do_audit_shard_container(self, broker):
|
||||||
return True
|
warnings = []
|
||||||
|
if not broker.account.startswith(self.shards_account_prefix):
|
||||||
|
warnings.append('account not in shards namespace %r' %
|
||||||
|
self.shards_account_prefix)
|
||||||
|
|
||||||
|
own_shard_range = broker.get_own_shard_range(no_default=True)
|
||||||
|
|
||||||
|
if not own_shard_range:
|
||||||
|
self.logger.warning('Audit failed for shard %s (%s) - skipping: '
|
||||||
|
'missing own shard range',
|
||||||
|
broker.db_file, quote(broker.path))
|
||||||
|
return False, warnings
|
||||||
|
|
||||||
|
# Get the root view of the world, at least that part of the world
|
||||||
|
# that overlaps with this shard's namespace. The
|
||||||
|
# 'states=auditing' parameter will cause the root to include
|
||||||
|
# its own shard range in the response, which is necessary for the
|
||||||
|
# particular case when this shard should be shrinking to the root
|
||||||
|
# container; when not shrinking to root, but to another acceptor,
|
||||||
|
# the root range should be in sharded state and will not interfere
|
||||||
|
# with cleaving, listing or updating behaviour.
|
||||||
|
shard_ranges = self._fetch_shard_ranges(
|
||||||
|
broker, newest=True,
|
||||||
|
params={'marker': str_to_wsgi(own_shard_range.lower_str),
|
||||||
|
'end_marker': str_to_wsgi(own_shard_range.upper_str),
|
||||||
|
'states': 'auditing'},
|
||||||
|
include_deleted=True)
|
||||||
|
if shard_ranges:
|
||||||
|
own_shard_range, own_shard_range_from_root = \
|
||||||
|
self._merge_shard_ranges_from_root(
|
||||||
|
broker, shard_ranges, own_shard_range)
|
||||||
|
if not own_shard_range_from_root:
|
||||||
|
# this is not necessarily an error - some replicas of the
|
||||||
|
# root may not yet know about this shard container, or the
|
||||||
|
# shard's own shard range could become deleted and
|
||||||
|
# reclaimed from the root under rare conditions
|
||||||
|
warnings.append('root has no matching shard range')
|
||||||
|
elif not own_shard_range.deleted:
|
||||||
|
warnings.append('unable to get shard ranges from root')
|
||||||
|
# else, our shard range is deleted, so root may have reclaimed it
|
||||||
|
|
||||||
|
self._delete_shard_container(broker, own_shard_range)
|
||||||
|
|
||||||
|
return True, warnings
|
||||||
|
|
||||||
|
def _audit_shard_container(self, broker):
|
||||||
|
self._increment_stat('audit_shard', 'attempted')
|
||||||
|
success, warnings = self._do_audit_shard_container(broker)
|
||||||
|
if warnings:
|
||||||
|
self.logger.warning(
|
||||||
|
'Audit warnings for shard %s (%s): %s',
|
||||||
|
broker.db_file, quote(broker.path), ', '.join(warnings))
|
||||||
|
self._increment_stat(
|
||||||
|
'audit_shard', 'success' if success else 'failure', statsd=True)
|
||||||
|
return success
|
||||||
|
|
||||||
def _audit_cleave_contexts(self, broker):
|
def _audit_cleave_contexts(self, broker):
|
||||||
now = Timestamp.now()
|
now = Timestamp.now()
|
||||||
|
@ -5570,12 +5570,12 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
|
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
|
||||||
lines = sharder.logger.get_lines_for_level('warning')
|
lines = sharder.logger.get_lines_for_level('warning')
|
||||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||||
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0])
|
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0])
|
||||||
self.assertIn('account not in shards namespace', lines[0])
|
self.assertIn('missing own shard range', lines[0])
|
||||||
self.assertNotIn('root has no matching shard range', lines[0])
|
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1])
|
||||||
self.assertNotIn('unable to get shard ranges from root', lines[0])
|
self.assertIn('account not in shards namespace', lines[1])
|
||||||
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[1])
|
self.assertNotIn('root has no matching shard range', lines[1])
|
||||||
self.assertIn('missing own shard range', lines[1])
|
self.assertNotIn('unable to get shard ranges from root', lines[1])
|
||||||
self.assertFalse(lines[2:])
|
self.assertFalse(lines[2:])
|
||||||
self.assertFalse(broker.is_deleted())
|
self.assertFalse(broker.is_deleted())
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user