Merge "sharder: show path and db file in logs"

This commit is contained in:
Zuul 2023-03-02 08:48:25 +00:00 committed by Gerrit Code Review
commit 55bf0d1224
2 changed files with 438 additions and 246 deletions

View File

@ -15,6 +15,7 @@
import collections
import errno
import json
import logging
import operator
import time
from collections import defaultdict
@ -908,6 +909,44 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self.stats_interval = float(conf.get('stats_interval', '3600'))
self.reported = 0
def _format_log_msg(self, broker, msg, *args):
# make best effort to include broker properties...
try:
db_file = broker.db_file
except Exception: # noqa
db_file = ''
try:
path = broker.path
except Exception: # noqa
path = ''
if args:
msg = msg % args
return '%s, path: %s, db: %s' % (msg, quote(path), db_file)
def _log(self, level, broker, msg, *args):
if not self.logger.isEnabledFor(level):
return
self.logger.log(level, self._format_log_msg(broker, msg, *args))
def debug(self, broker, msg, *args, **kwargs):
self._log(logging.DEBUG, broker, msg, *args, **kwargs)
def info(self, broker, msg, *args, **kwargs):
self._log(logging.INFO, broker, msg, *args, **kwargs)
def warning(self, broker, msg, *args, **kwargs):
self._log(logging.WARNING, broker, msg, *args, **kwargs)
def error(self, broker, msg, *args, **kwargs):
self._log(logging.ERROR, broker, msg, *args, **kwargs)
def exception(self, broker, msg, *args, **kwargs):
if not self.logger.isEnabledFor(logging.ERROR):
return
self.logger.exception(self._format_log_msg(broker, msg, *args))
def _zero_stats(self):
"""Zero out the stats."""
super(ContainerSharder, self)._zero_stats()
@ -1040,14 +1079,13 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# container DB, which predicates sharding starting. But s-m-s-r and
# auto-sharding do set epoch and then merge, so we use it to tell
# whether sharding has been taking too long or not.
self.logger.warning(
'Cleaving has not completed in %.2f seconds since %s.'
' Container DB file and path: %s (%s), DB state: %s,'
' own_shard_range state: %s, state count of shard ranges: %s' %
(time.time() - float(own_shard_range.epoch),
own_shard_range.epoch.isoformat, broker.db_file,
quote(broker.path), db_state,
own_shard_range.state_text, str(state_count)))
self.warning(broker,
'Cleaving has not completed in %.2f seconds since %s.'
'DB state: %s, own_shard_range state: %s, '
'state count of shard ranges: %s' %
(time.time() - float(own_shard_range.epoch),
own_shard_range.epoch.isoformat, db_state,
own_shard_range.state_text, str(state_count)))
def _report_stats(self):
# report accumulated stats since start of one sharder cycle
@ -1127,14 +1165,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
'GET', path, headers, acceptable_statuses=(2,),
params=params)
except internal_client.UnexpectedResponse as err:
self.logger.warning("Failed to get shard ranges from %s: %s",
quote(broker.root_path), err)
self.warning(broker, "Failed to get shard ranges from %s: %s",
quote(broker.root_path), err)
return None
record_type = resp.headers.get('x-backend-record-type')
if record_type != 'shard':
err = 'unexpected record type %r' % record_type
self.logger.error("Failed to get shard ranges from %s: %s",
quote(broker.root_path), err)
self.error(broker, "Failed to get shard ranges from %s: %s",
quote(broker.root_path), err)
return None
try:
@ -1144,32 +1182,33 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
return [ShardRange.from_dict(shard_range)
for shard_range in data]
except (ValueError, TypeError, KeyError) as err:
self.logger.error(
"Failed to get shard ranges from %s: invalid data: %r",
quote(broker.root_path), err)
self.error(broker,
"Failed to get shard ranges from %s: invalid data: %r",
quote(broker.root_path), err)
return None
def _put_container(self, node, part, account, container, headers, body):
def _put_container(self, broker, node, part, account, container, headers,
body):
try:
direct_put_container(node, part, account, container,
conn_timeout=self.conn_timeout,
response_timeout=self.node_timeout,
headers=headers, contents=body)
except DirectClientException as err:
self.logger.warning(
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
node['ip'], node['port'], node['device'],
quote(account), quote(container), err.http_status)
self.warning(broker,
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
node['ip'], node['port'], node['device'],
quote(account), quote(container), err.http_status)
except (Exception, Timeout) as err:
self.logger.exception(
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
node['ip'], node['port'], node['device'],
quote(account), quote(container), err)
self.exception(broker,
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
node['ip'], node['port'], node['device'],
quote(account), quote(container), err)
else:
return True
return False
def _send_shard_ranges(self, account, container, shard_ranges,
def _send_shard_ranges(self, broker, account, container, shard_ranges,
headers=None):
body = json.dumps([dict(sr, reported=0)
for sr in shard_ranges]).encode('ascii')
@ -1184,7 +1223,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
pool = GreenAsyncPile(len(nodes))
for node in nodes:
pool.spawn(self._put_container, node, part, account,
pool.spawn(self._put_container, broker, node, part, account,
container, headers, body)
results = pool.waitall(None)
@ -1291,9 +1330,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
% broker.db_epoch)
if warnings:
self.logger.warning(
'Audit failed for root %s (%s): %s',
broker.db_file, quote(broker.path), ', '.join(warnings))
self.warning(broker, 'Audit failed for root: %s',
', '.join(warnings))
self._increment_stat('audit_root', 'failure', statsd=True)
return False
@ -1332,15 +1370,16 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# it and reload own shard range (note: own_range_from_root may
# not necessarily be 'newer' than the own shard range we
# already have, but merging will get us to the 'newest' state)
self.logger.debug('Updating own shard range from root')
self.debug(broker, 'Updating own shard range from root')
own_shard_range_from_root = shard_range
broker.merge_shard_ranges(own_shard_range_from_root)
orig_own_shard_range = own_shard_range
own_shard_range = broker.get_own_shard_range()
if (orig_own_shard_range != own_shard_range or
orig_own_shard_range.state != own_shard_range.state):
self.logger.info('Updated own shard range from %s to %s',
orig_own_shard_range, own_shard_range)
self.info(broker,
'Updated own shard range from %s to %s',
orig_own_shard_range, own_shard_range)
elif shard_range.is_child_of(own_shard_range):
children_shard_ranges.append(shard_range)
else:
@ -1351,8 +1390,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# DB is fully cleaved and reaches SHARDED DB state, after which it
# is useful for debugging for the set of sub-shards to which a
# shards has sharded to be frozen.
self.logger.debug('Updating %d children shard ranges from root',
len(children_shard_ranges))
self.debug(broker, 'Updating %d children shard ranges from root',
len(children_shard_ranges))
broker.merge_shard_ranges(children_shard_ranges)
if (other_shard_ranges
@ -1415,9 +1454,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
combined_shard_ranges, own_shard_range)
if not (overlaps or paths_with_gaps):
# only merge if shard ranges appear to be *good*
self.logger.debug(
'Updating %s other shard range(s) from root',
len(filtered_other_shard_ranges))
self.debug(broker,
'Updating %s other shard range(s) from root',
len(filtered_other_shard_ranges))
broker.merge_shard_ranges(filtered_other_shard_ranges)
return own_shard_range, own_shard_range_from_root
@ -1439,8 +1478,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
own_shard_range.timestamp < delete_age and
broker.empty()):
broker.delete_db(Timestamp.now().internal)
self.logger.debug('Marked shard container as deleted %s (%s)',
broker.db_file, quote(broker.path))
self.debug(broker, 'Marked shard container as deleted')
def _do_audit_shard_container(self, broker):
warnings = []
@ -1451,9 +1489,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
own_shard_range = broker.get_own_shard_range(no_default=True)
if not own_shard_range:
self.logger.warning('Audit failed for shard %s (%s) - skipping: '
'missing own shard range',
broker.db_file, quote(broker.path))
self.warning(broker, 'Audit failed for shard: missing own shard '
'range (skipping)')
return False, warnings
# Get the root view of the world, at least that part of the world
@ -1492,9 +1529,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self._increment_stat('audit_shard', 'attempted')
success, warnings = self._do_audit_shard_container(broker)
if warnings:
self.logger.warning(
'Audit warnings for shard %s (%s): %s',
broker.db_file, quote(broker.path), ', '.join(warnings))
self.warning(broker, 'Audit warnings for shard: %s',
', '.join(warnings))
self._increment_stat(
'audit_shard', 'success' if success else 'failure', statsd=True)
return success
@ -1513,9 +1549,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if broker.is_deleted():
if broker.is_old_enough_to_reclaim(time.time(), self.reclaim_age) \
and not broker.is_empty_enough_to_reclaim():
self.logger.warning(
'Reclaimable db stuck waiting for shrinking: %s (%s)',
broker.db_file, quote(broker.path))
self.warning(broker,
'Reclaimable db stuck waiting for shrinking')
# if the container has been marked as deleted, all metadata will
# have been erased so no point auditing. But we want it to pass, in
# case any objects exist inside it.
@ -1563,12 +1598,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
end_marker=src_shard_range.end_marker,
include_deleted=include_deleted,
since_row=since_row)
self.logger.debug(
'got %s rows (deleted=%s) from %s in %ss',
len(objects),
include_deleted,
broker.db_file,
time.time() - start)
self.debug(broker, 'got %s rows (deleted=%s) in %ss',
len(objects), include_deleted, time.time() - start)
if objects:
yield objects, info
@ -1649,9 +1680,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
part, dest_broker.db_file, node_id)
quorum = quorum_size(self.ring.replica_count)
if not success and responses.count(True) < quorum:
self.logger.warning(
'Failed to sufficiently replicate misplaced objects: %s in %s '
'(not removing)', dest_shard_range, quote(broker.path))
self.warning(broker, 'Failed to sufficiently replicate misplaced '
'objects to %s (not removing)',
dest_shard_range)
return False
if broker.get_info()['id'] != info['id']:
@ -1669,9 +1700,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
success = True
if not success:
self.logger.warning(
'Refused to remove misplaced objects: %s in %s',
dest_shard_range, quote(broker.path))
self.warning(broker,
'Refused to remove misplaced objects for dest %s',
dest_shard_range)
return success
def _move_objects(self, src_broker, src_shard_range, policy_index,
@ -1689,8 +1720,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
continue
if dest_shard_range.name == src_broker.path:
self.logger.debug(
'Skipping source as misplaced objects destination')
self.debug(src_broker,
'Skipping source as misplaced objects destination')
# in shrinking context, the misplaced objects might actually be
# correctly placed if the root has expanded this shard but this
# broker has not yet been updated
@ -1715,14 +1746,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
placed += len(objs)
if unplaced:
self.logger.warning(
'Failed to find destination for at least %s misplaced objects '
'in %s', unplaced, quote(src_broker.path))
self.warning(src_broker, 'Failed to find destination for at least '
'%s misplaced objects', unplaced)
# TODO: consider executing the replication jobs concurrently
for dest_shard_range, dest_args in dest_brokers.items():
self.logger.debug('moving misplaced objects found in range %s' %
dest_shard_range)
self.debug(src_broker,
'moving misplaced objects found in range %s',
dest_shard_range)
success &= self._replicate_and_delete(
src_broker, dest_shard_range, **dest_args)
@ -1802,8 +1833,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
:return: True if all misplaced objects were sufficiently replicated to
their correct shard containers, False otherwise
"""
self.logger.debug('Looking for misplaced objects in %s (%s)',
quote(broker.path), broker.db_file)
self.debug(broker, 'Looking for misplaced objects')
self._increment_stat('misplaced', 'attempted')
src_broker = src_broker or broker
if src_bounds is None:
@ -1811,7 +1841,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# (ab)use ShardRange instances to encapsulate source namespaces
src_ranges = [ShardRange('dont/care', Timestamp.now(), lower, upper)
for lower, upper in src_bounds]
self.logger.debug('misplaced object source bounds %s' % src_bounds)
self.debug(broker, 'misplaced object source bounds %s', src_bounds)
policy_index = broker.storage_policy_index
success = True
num_placed = num_unplaced = 0
@ -1827,11 +1857,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# the found stat records the number of DBs in which any misplaced
# rows were found, not the total number of misplaced rows
self._increment_stat('misplaced', 'found', statsd=True)
self.logger.debug('Placed %s misplaced objects (%s unplaced)',
num_placed, num_unplaced)
self.debug(broker, 'Placed %s misplaced objects (%s unplaced)',
num_placed, num_unplaced)
self._increment_stat('misplaced', 'success' if success else 'failure',
statsd=True)
self.logger.debug('Finished handling misplaced objects')
self.debug(broker, 'Finished handling misplaced objects')
return success
def _find_shard_ranges(self, broker):
@ -1847,12 +1877,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
own_shard_range = broker.get_own_shard_range()
shard_ranges = broker.get_shard_ranges()
if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper:
self.logger.debug('Scan for shard ranges already completed for %s',
quote(broker.path))
self.debug(broker, 'Scan for shard ranges already completed')
return 0
self.logger.info('Starting scan for shard ranges on %s',
quote(broker.path))
self.info(broker, 'Starting scan for shard ranges')
self._increment_stat('scanned', 'attempted')
start = time.time()
@ -1864,11 +1892,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if not shard_data:
if last_found:
self.logger.info("Already found all shard ranges")
self.info(broker, "Already found all shard ranges")
self._increment_stat('scanned', 'success', statsd=True)
else:
# we didn't find anything
self.logger.warning("No shard ranges found")
self.warning(broker, "No shard ranges found")
self._increment_stat('scanned', 'failure', statsd=True)
return 0
@ -1876,14 +1904,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
broker, shard_data, self.shards_account_prefix)
broker.merge_shard_ranges(shard_ranges)
num_found = len(shard_ranges)
self.logger.info(
"Completed scan for shard ranges: %d found", num_found)
self.info(broker, "Completed scan for shard ranges: %d found",
num_found)
self._update_stat('scanned', 'found', step=num_found)
self._min_stat('scanned', 'min_time', round(elapsed / num_found, 3))
self._max_stat('scanned', 'max_time', round(elapsed / num_found, 3))
if last_found:
self.logger.info("Final shard range reached.")
self.info(broker, "Final shard range reached.")
self._increment_stat('scanned', 'success', statsd=True)
return num_found
@ -1910,16 +1938,15 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# may think they are in fact roots, but it cleans up well enough
# once everyone's upgraded.
success = self._send_shard_ranges(
shard_range.account, shard_range.container,
broker, shard_range.account, shard_range.container,
[shard_range], headers=headers)
if success:
self.logger.debug('PUT new shard range container for %s',
shard_range)
self.debug(broker, 'PUT new shard range container for %s',
shard_range)
self._increment_stat('created', 'success', statsd=True)
else:
self.logger.error(
'PUT of new shard container %r failed for %s.',
shard_range, quote(broker.path))
self.error(broker, 'PUT of new shard container %r failed',
shard_range)
self._increment_stat('created', 'failure', statsd=True)
# break, not continue, because elsewhere it is assumed that
# finding and cleaving shard ranges progresses linearly, so we
@ -1931,12 +1958,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if created_ranges:
broker.merge_shard_ranges(created_ranges)
if not broker.is_root_container():
self._send_shard_ranges(
broker.root_account, broker.root_container, created_ranges)
self.logger.info(
"Completed creating shard range containers: %d created, "
"from sharding container %s",
len(created_ranges), quote(broker.path))
self._send_shard_ranges(broker, broker.root_account,
broker.root_container, created_ranges)
self.info(broker, "Completed creating %d shard range containers",
len(created_ranges))
return len(created_ranges)
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
@ -1962,8 +1987,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
since_row=sync_from_row):
shard_broker.merge_items(objects)
if objects is None:
self.logger.info("Cleaving '%s': %r - zero objects found",
quote(broker.path), shard_range)
self.info(broker, "Cleaving %r - zero objects found",
shard_range)
if shard_broker.get_info()['put_timestamp'] == put_timestamp:
# This was just created; don't need to replicate this
# SR because there was nothing there. So cleanup and
@ -1986,8 +2011,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
[{'sync_point': source_max_row, 'remote_id': source_db_id}] +
source_broker.get_syncs())
else:
self.logger.debug("Cleaving '%s': %r - shard db already in sync",
quote(broker.path), shard_range)
self.debug(broker, "Cleaving %r - shard db already in sync",
shard_range)
replication_quorum = self.existing_shard_replication_quorum
if own_shard_range.state in ShardRange.SHRINKING_STATES:
@ -2022,9 +2047,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if result == CLEAVE_EMPTY:
self.delete_db(shard_broker)
else: # result == CLEAVE_SUCCESS:
self.logger.info(
'Replicating new shard container %s for %s',
quote(shard_broker.path), own_shard_range)
self.info(broker, 'Replicating new shard container %s for %s',
quote(shard_broker.path), own_shard_range)
success, responses = self._replicate_object(
shard_part, shard_broker.db_file, node_id)
@ -2035,20 +2059,18 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# insufficient replication or replication not even attempted;
# break because we don't want to progress the cleave cursor
# until each shard range has been successfully cleaved
self.logger.warning(
'Failed to sufficiently replicate cleaved shard %s for %s:'
' %s successes, %s required.', shard_range,
quote(broker.path),
replication_successes, replication_quorum)
self.warning(broker,
'Failed to sufficiently replicate cleaved shard '
'%s: %s successes, %s required', shard_range,
replication_successes, replication_quorum)
self._increment_stat('cleaved', 'failure', statsd=True)
result = CLEAVE_FAILED
else:
elapsed = round(time.time() - start, 3)
self._min_stat('cleaved', 'min_time', elapsed)
self._max_stat('cleaved', 'max_time', elapsed)
self.logger.info(
'Cleaved %s for shard range %s in %gs.',
quote(broker.path), shard_range, elapsed)
self.info(broker, 'Cleaved %s in %gs', shard_range,
elapsed)
self._increment_stat('cleaved', 'success', statsd=True)
if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY):
@ -2062,10 +2084,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
def _cleave_shard_range(self, broker, cleaving_context, shard_range,
own_shard_range):
self.logger.info("Cleaving '%s' from row %s into %s for %r",
quote(broker.path),
cleaving_context.last_cleave_to_row,
quote(shard_range.name), shard_range)
self.info(broker, "Cleaving from row %s into %s for %r",
cleaving_context.last_cleave_to_row,
quote(shard_range.name), shard_range)
self._increment_stat('cleaved', 'attempted')
policy_index = broker.storage_policy_index
shard_part, shard_broker, node_id, put_timestamp = \
@ -2081,8 +2102,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# Returns True if misplaced objects have been moved and the entire
# container namespace has been successfully cleaved, False otherwise
if broker.is_sharded():
self.logger.debug('Passing over already sharded container %s',
quote(broker.path))
self.debug(broker, 'Passing over already sharded container')
return True
cleaving_context = CleavingContext.load(broker)
@ -2090,9 +2110,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# ensure any misplaced objects in the source broker are moved; note
# that this invocation of _move_misplaced_objects is targetted at
# the *retiring* db.
self.logger.debug(
'Moving any misplaced objects from sharding container: %s',
quote(broker.path))
self.debug(broker,
'Moving any misplaced objects from sharding container')
bounds = self._make_default_misplaced_object_bounds(broker)
cleaving_context.misplaced_done = self._move_misplaced_objects(
broker, src_broker=broker.get_brokers()[0],
@ -2100,8 +2119,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
cleaving_context.store(broker)
if cleaving_context.cleaving_done:
self.logger.debug('Cleaving already complete for container %s',
quote(broker.path))
self.debug(broker, 'Cleaving already complete for container')
return cleaving_context.misplaced_done
shard_ranges = broker.get_shard_ranges(marker=cleaving_context.marker)
@ -2116,25 +2134,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# always update ranges_todo in case shard ranges have changed since
# last visit
cleaving_context.ranges_todo = len(ranges_todo)
self.logger.debug('Continuing to cleave (%s done, %s todo): %s',
cleaving_context.ranges_done,
cleaving_context.ranges_todo,
quote(broker.path))
self.debug(broker, 'Continuing to cleave (%s done, %s todo)',
cleaving_context.ranges_done,
cleaving_context.ranges_todo)
else:
cleaving_context.start()
own_shard_range = broker.get_own_shard_range()
cleaving_context.cursor = own_shard_range.lower_str
cleaving_context.ranges_todo = len(ranges_todo)
self.logger.info('Starting to cleave (%s todo): %s',
cleaving_context.ranges_todo, quote(broker.path))
self.info(broker, 'Starting to cleave (%s todo)',
cleaving_context.ranges_todo)
own_shard_range = broker.get_own_shard_range(no_default=True)
if own_shard_range is None:
# A default should never be SHRINKING or SHRUNK but because we
# may write own_shard_range back to broker, let's make sure
# it can't be defaulted.
self.logger.warning('Failed to get own_shard_range for %s',
quote(broker.path))
self.warning(broker, 'Failed to get own_shard_range')
ranges_todo = [] # skip cleaving
ranges_done = []
@ -2150,14 +2166,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
break
if shard_range.lower > cleaving_context.cursor:
self.logger.info('Stopped cleave at gap: %r - %r' %
(cleaving_context.cursor, shard_range.lower))
self.info(broker, 'Stopped cleave at gap: %r - %r' %
(cleaving_context.cursor, shard_range.lower))
break
if shard_range.state not in (ShardRange.CREATED,
ShardRange.CLEAVED,
ShardRange.ACTIVE):
self.logger.info('Stopped cleave at unready %s', shard_range)
self.info(broker, 'Stopped cleave at unready %s', shard_range)
break
cleave_result = self._cleave_shard_range(
@ -2174,9 +2190,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# that here in case we hit a failure right off the bat or ended loop
# with skipped ranges
cleaving_context.store(broker)
self.logger.debug(
'Cleaved %s shard ranges for %s',
len(ranges_done), quote(broker.path))
self.debug(broker, 'Cleaved %s shard ranges', len(ranges_done))
return (cleaving_context.misplaced_done and
cleaving_context.cleaving_done)
@ -2191,8 +2205,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# This is more of a belts and braces, not sure we could even
# get this far with without an own_shard_range. But because
# we will be writing own_shard_range back, we need to make sure
self.logger.warning('Failed to get own_shard_range for %s',
quote(broker.path))
self.warning(broker, 'Failed to get own_shard_range')
return False
own_shard_range.update_meta(0, 0)
if own_shard_range.state in ShardRange.SHRINKING_STATES:
@ -2213,13 +2226,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if broker.set_sharded_state():
return True
else:
self.logger.warning(
'Failed to remove retiring db file for %s',
quote(broker.path))
self.warning(broker, 'Failed to remove retiring db file')
else:
self.logger.warning(
'Repeat cleaving required for %r with context: %s',
broker.db_files[0], dict(cleaving_context))
self.warning(broker, 'Repeat cleaving required, context: %s',
dict(cleaving_context))
cleaving_context.reset()
cleaving_context.store(broker)
@ -2229,33 +2239,32 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
candidates = find_sharding_candidates(
broker, self.shard_container_threshold, shard_ranges)
if candidates:
self.logger.debug('Identified %s sharding candidates',
len(candidates))
self.debug(broker, 'Identified %s sharding candidates',
len(candidates))
broker.merge_shard_ranges(candidates)
def _find_and_enable_shrinking_candidates(self, broker):
if not broker.is_sharded():
self.logger.warning('Cannot shrink a not yet sharded container %s',
quote(broker.path))
self.warning(broker, 'Cannot shrink a not yet sharded container')
return
compactible_sequences = find_compactible_shard_sequences(
broker, self.shrink_threshold, self.expansion_limit,
self.max_shrinking, self.max_expanding, include_shrinking=True)
self.logger.debug('Found %s compactible sequences of length(s) %s' %
(len(compactible_sequences),
[len(s) for s in compactible_sequences]))
self.debug(broker, 'Found %s compactible sequences of length(s) %s' %
(len(compactible_sequences),
[len(s) for s in compactible_sequences]))
process_compactible_shard_sequences(broker, compactible_sequences)
own_shard_range = broker.get_own_shard_range()
for sequence in compactible_sequences:
acceptor = sequence[-1]
donors = ShardRangeList(sequence[:-1])
self.logger.debug(
'shrinking %d objects from %d shard ranges into %s in %s' %
(donors.object_count, len(donors), acceptor, broker.db_file))
self.debug(broker,
'shrinking %d objects from %d shard ranges into %s' %
(donors.object_count, len(donors), acceptor))
if acceptor.name != own_shard_range.name:
self._send_shard_ranges(
acceptor.account, acceptor.container, [acceptor])
self._send_shard_ranges(broker, acceptor.account,
acceptor.container, [acceptor])
acceptor.increment_meta(donors.object_count, donors.bytes_used)
# Now send a copy of the expanded acceptor, with an updated
# timestamp, to each donor container. This forces each donor to
@ -2265,8 +2274,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# the acceptor will then update the root to have the deleted donor
# shard range.
for donor in donors:
self._send_shard_ranges(
donor.account, donor.container, [donor, acceptor])
self._send_shard_ranges(broker, donor.account,
donor.container, [donor, acceptor])
def _update_root_container(self, broker):
own_shard_range = broker.get_own_shard_range(no_default=True)
@ -2279,8 +2288,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# count that is consistent with the current object_count
reclaimer = self._reclaim(broker)
tombstones = reclaimer.get_tombstone_count()
self.logger.debug('tombstones in %s = %d',
quote(broker.path), tombstones)
self.debug(broker, 'tombstones = %d', tombstones)
# shrinking candidates are found in the root DB so that's the only
# place we need up to date tombstone stats.
own_shard_range.update_tombstones(tombstones)
@ -2301,25 +2309,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
include_own=True,
include_deleted=True)
# send everything
if self._send_shard_ranges(
broker.root_account, broker.root_container, shard_ranges,
{'Referer': quote(broker.path)}):
if self._send_shard_ranges(broker, broker.root_account,
broker.root_container, shard_ranges,
{'Referer': quote(broker.path)}):
# on success, mark ourselves as reported so we don't keep
# hammering the root
own_shard_range.reported = True
broker.merge_shard_ranges(own_shard_range)
self.logger.debug(
'updated root objs=%d, tombstones=%s (%s)',
own_shard_range.object_count, own_shard_range.tombstones,
quote(broker.path))
self.debug(broker, 'updated root objs=%d, tombstones=%s',
own_shard_range.object_count,
own_shard_range.tombstones)
def _process_broker(self, broker, node, part):
broker.get_info() # make sure account/container are populated
state = broker.get_db_state()
is_deleted = broker.is_deleted()
self.logger.debug('Starting processing %s state %s%s',
quote(broker.path), state,
' (deleted)' if is_deleted else '')
self.debug(broker, 'Starting processing, state %s%s', state,
' (deleted)' if is_deleted else '')
if not self._audit_container(broker):
return
@ -2344,18 +2350,17 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# or manually triggered cleaving.
if broker.set_sharding_state():
state = SHARDING
self.logger.info('Kick off container cleaving on %s, '
'own shard range in state %r',
quote(broker.path),
own_shard_range.state_text)
self.info(broker, 'Kick off container cleaving, '
'own shard range in state %r',
own_shard_range.state_text)
elif is_leader:
if broker.set_sharding_state():
state = SHARDING
else:
self.logger.debug(
'Own shard range in state %r but no shard ranges '
'and not leader; remaining unsharded: %s',
own_shard_range.state_text, quote(broker.path))
self.debug(broker,
'Own shard range in state %r but no shard '
'ranges and not leader; remaining unsharded',
own_shard_range.state_text)
if state == SHARDING:
if is_leader:
@ -2377,13 +2382,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if self._complete_sharding(broker):
state = SHARDED
self._increment_stat('visited', 'completed', statsd=True)
self.logger.info(
'Completed cleaving of %s, DB set to sharded state',
quote(broker.path))
self.info(broker, 'Completed cleaving, DB set to sharded '
'state')
else:
self.logger.info(
'Completed cleaving of %s, DB remaining in sharding '
'state', quote(broker.path))
self.info(broker, 'Completed cleaving, DB remaining in '
'sharding state')
if not broker.is_deleted():
if state == SHARDED and broker.is_root_container():
@ -2394,9 +2397,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self._find_and_enable_sharding_candidates(broker)
for shard_range in broker.get_shard_ranges(
states=[ShardRange.SHARDING]):
self._send_shard_ranges(
shard_range.account, shard_range.container,
[shard_range])
self._send_shard_ranges(broker, shard_range.account,
shard_range.container,
[shard_range])
if not broker.is_root_container():
# Update the root container with this container's shard range
@ -2407,9 +2410,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# simultaneously become deleted.
self._update_root_container(broker)
self.logger.debug('Finished processing %s state %s%s',
quote(broker.path), broker.get_db_state(),
' (deleted)' if is_deleted else '')
self.debug(broker,
'Finished processing, state %s%s',
broker.get_db_state(), ' (deleted)' if is_deleted else '')
def _one_shard_cycle(self, devices_to_shard, partitions_to_shard):
"""
@ -2477,15 +2480,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self._increment_stat('visited', 'skipped')
except (Exception, Timeout) as err:
self._increment_stat('visited', 'failure', statsd=True)
self.logger.exception(
'Unhandled exception while processing %s: %s', path, err)
self.exception(broker, 'Unhandled exception while processing: '
'%s', err)
error = err
try:
self._record_sharding_progress(broker, node, error)
except (Exception, Timeout) as error:
self.logger.exception(
'Unhandled exception while dumping progress for %s: %s',
path, error)
self.exception(broker, 'Unhandled exception while dumping '
'progress: %s', error)
self._periodic_report_stats()
self._report_stats()

View File

@ -426,6 +426,101 @@ class TestSharder(BaseTestSharder):
_do_test_init_ic_log_name({'log_name': 'container-sharder-6021'},
'container-sharder-6021-ic')
def test_log_broker(self):
broker = self._make_broker(container='c@d')
def do_test(level):
with self._mock_sharder() as sharder:
func = getattr(sharder, level)
func(broker, 'bonjour %s %s', 'mes', 'amis')
func(broker, 'hello my %s', 'friend%04ds')
func(broker, 'greetings friend%04ds')
self.assertEqual(
['bonjour mes amis, path: a/c%40d, db: ' + broker.db_file,
'hello my friend%04ds, path: a/c%40d, db: ' + broker.db_file,
'greetings friend%04ds, path: a/c%40d, db: ' + broker.db_file
], sharder.logger.get_lines_for_level(level))
for log_level, lines in sharder.logger.all_log_lines().items():
if log_level == level:
continue
else:
self.assertFalse(lines)
do_test('debug')
do_test('info')
do_test('warning')
do_test('error')
def test_log_broker_exception(self):
broker = self._make_broker()
with self._mock_sharder() as sharder:
try:
raise ValueError('test')
except ValueError as err:
sharder.exception(broker, 'exception: %s', err)
self.assertEqual(
['exception: test, path: a/c, db: %s: ' % broker.db_file],
sharder.logger.get_lines_for_level('error'))
for log_level, lines in sharder.logger.all_log_lines().items():
if log_level == 'error':
continue
else:
self.assertFalse(lines)
def test_log_broker_levels(self):
# verify that the broker is not queried if the log level is not enabled
broker = self._make_broker()
# erase cached properties...
broker.account = broker.container = None
with self._mock_sharder() as sharder:
with mock.patch.object(sharder.logger, 'isEnabledFor',
return_value=False):
sharder.debug(broker, 'test')
sharder.info(broker, 'test')
sharder.warning(broker, 'test')
sharder.error(broker, 'test')
# cached properties have not been set...
self.assertIsNone(broker.account)
self.assertIsNone(broker.container)
self.assertFalse(sharder.logger.all_log_lines())
def test_log_broker_exception_while_logging(self):
broker = self._make_broker()
def do_test(level):
with self._mock_sharder() as sharder:
func = getattr(sharder, level)
with mock.patch.object(broker, '_populate_instance_cache',
side_effect=Exception()):
func(broker, 'bonjour %s %s', 'mes', 'amis')
broker._db_files = None
with mock.patch.object(broker, 'reload_db_files',
side_effect=Exception()):
func(broker, 'bonjour %s %s', 'mes', 'amis')
self.assertEqual(
['bonjour mes amis, path: , db: %s' % broker.db_file,
'bonjour mes amis, path: a/c, db: '],
sharder.logger.get_lines_for_level(level))
for log_level, lines in sharder.logger.all_log_lines().items():
if log_level == level:
continue
else:
self.assertFalse(lines)
do_test('debug')
do_test('info')
do_test('warning')
do_test('error')
def _assert_stats(self, expected, sharder, category):
# assertEqual doesn't work with a stats defaultdict so copy to a dict
# before comparing
@ -587,6 +682,7 @@ class TestSharder(BaseTestSharder):
lines = sharder.logger.get_lines_for_level('error')
self.assertIn(
'Unhandled exception while dumping progress', lines[0])
self.assertIn('path: a/c', lines[0]) # match one of the brokers
self.assertIn('Test over', lines[0])
def check_recon(data, time, last, expected_stats):
@ -782,6 +878,7 @@ class TestSharder(BaseTestSharder):
self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths))
lines = sharder.logger.get_lines_for_level('error')
self.assertIn('Unhandled exception while processing', lines[0])
self.assertIn('path: a/c', lines[0]) # match one of the brokers
self.assertFalse(lines[1:])
sharder.logger.clear()
expected_stats = {'attempted': 3, 'success': 2, 'failure': 1,
@ -1983,7 +2080,8 @@ class TestSharder(BaseTestSharder):
self.assertEqual(UNSHARDED, broker.get_db_state())
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertEqual(warning_lines[0],
'Failed to get own_shard_range for a/c')
'Failed to get own_shard_range, path: a/c, db: %s'
% broker.db_file)
sharder._replicate_object.assert_not_called()
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
@ -2374,10 +2472,13 @@ class TestSharder(BaseTestSharder):
self.assertEqual(12, context.max_row) # note that max row increased
lines = sharder.logger.get_lines_for_level('info')
self.assertEqual(
["Kick off container cleaving on a/c, own shard range in state "
"'sharding'", "Starting to cleave (2 todo): a/c"], lines[:2])
self.assertIn('Completed cleaving of a/c, DB remaining in '
'sharding state', lines[1:])
["Kick off container cleaving, own shard range in state "
"'sharding', path: a/c, db: %s" % broker.db_file,
"Starting to cleave (2 todo), path: a/c, db: %s"
% broker.db_file], lines[:2])
self.assertIn('Completed cleaving, DB remaining in sharding state, '
'path: a/c, db: %s'
% broker.db_file, lines[1:])
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Repeat cleaving required', lines[0])
self.assertFalse(lines[1:])
@ -2407,9 +2508,12 @@ class TestSharder(BaseTestSharder):
self._check_shard_range(shard_ranges[1], updated_shard_ranges[1])
self._check_objects(new_objects[1:], expected_shard_dbs[1])
lines = sharder.logger.get_lines_for_level('info')
self.assertEqual('Starting to cleave (2 todo): a/c', lines[0])
self.assertIn('Completed cleaving of a/c, DB set to sharded state',
lines[1:])
self.assertEqual(
'Starting to cleave (2 todo), path: a/c, db: %s'
% broker.db_file, lines[0])
self.assertIn(
'Completed cleaving, DB set to sharded state, path: a/c, db: %s'
% broker.db_file, lines[1:])
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
def test_cleave_multiple_storage_policies(self):
@ -3103,9 +3207,7 @@ class TestSharder(BaseTestSharder):
with self._mock_sharder() as sharder:
self.assertFalse(sharder._complete_sharding(broker))
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertIn(
'Repeat cleaving required for %r' % broker.db_files[0],
warning_lines[0])
self.assertIn('Repeat cleaving required', warning_lines[0])
self.assertFalse(warning_lines[1:])
sharder.logger.clear()
context = CleavingContext.load(broker)
@ -3214,7 +3316,8 @@ class TestSharder(BaseTestSharder):
self.assertEqual(SHARDING, broker.get_db_state())
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertEqual(warning_lines[0],
'Failed to get own_shard_range for a/c')
'Failed to get own_shard_range, path: a/c, db: %s'
% broker.db_file)
def test_sharded_record_sharding_progress_missing_contexts(self):
broker = self._check_complete_sharding(
@ -3981,8 +4084,11 @@ class TestSharder(BaseTestSharder):
self._assert_stats(expected_stats, sharder, 'misplaced')
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Refused to remove misplaced objects', lines[0])
self.assertIn('Refused to remove misplaced objects', lines[1])
shard_ranges = broker.get_shard_ranges()
self.assertIn('Refused to remove misplaced objects for dest %s'
% shard_ranges[2], lines[0])
self.assertIn('Refused to remove misplaced objects for dest %s'
% shard_ranges[3], lines[1])
self.assertFalse(lines[2:])
# they will be moved again on next cycle
@ -5042,6 +5148,7 @@ class TestSharder(BaseTestSharder):
self.assertTrue(sharding_enabled(broker))
def test_send_shard_ranges(self):
broker = self._make_broker()
shard_ranges = self._make_shard_ranges((('', 'h'), ('h', '')))
def do_test(replicas, *resp_codes):
@ -5054,7 +5161,7 @@ class TestSharder(BaseTestSharder):
with mocked_http_conn(*resp_codes, give_send=on_send) as conn:
with mock_timestamp_now() as now:
res = sharder._send_shard_ranges(
'a', 'c', shard_ranges)
broker, 'a', 'c', shard_ranges)
self.assertEqual(sharder.ring.replica_count, len(conn.requests))
expected_body = json.dumps([dict(sr) for sr in shard_ranges])
@ -5092,6 +5199,9 @@ class TestSharder(BaseTestSharder):
self.assertEqual([True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, 202, 202, Exception)
self.assertTrue(res)
@ -5099,27 +5209,42 @@ class TestSharder(BaseTestSharder):
self.assertEqual([True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
res, sharder = do_test(replicas, 202, 404, 404)
self.assertFalse(res)
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, 500, 500, 500)
self.assertFalse(res)
self.assertEqual([True, True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, Exception, Exception, 202)
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
res, sharder = do_test(replicas, Exception, eventlet.Timeout(), 202)
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
replicas = 2
res, sharder = do_test(replicas, 202, 202)
@ -5131,6 +5256,9 @@ class TestSharder(BaseTestSharder):
self.assertEqual([True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, 202, Exception)
self.assertTrue(res)
@ -5138,11 +5266,17 @@ class TestSharder(BaseTestSharder):
self.assertEqual([True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
res, sharder = do_test(replicas, 404, 404)
self.assertFalse(res)
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, Exception, Exception)
self.assertFalse(res)
@ -5150,12 +5284,18 @@ class TestSharder(BaseTestSharder):
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
res, sharder = do_test(replicas, eventlet.Timeout(), Exception)
self.assertFalse(res)
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
replicas = 4
res, sharder = do_test(replicas, 202, 202, 202, 202)
@ -5167,6 +5307,9 @@ class TestSharder(BaseTestSharder):
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, 202, 202, Exception, Exception)
self.assertTrue(res)
@ -5174,35 +5317,56 @@ class TestSharder(BaseTestSharder):
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
res, sharder = do_test(replicas, 202, 404, 404, 404)
self.assertFalse(res)
self.assertEqual([True, True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, 500, 500, 500, 202)
self.assertFalse(res)
self.assertEqual([True, True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
res, sharder = do_test(replicas, Exception, Exception, 202, 404)
self.assertFalse(res)
self.assertEqual([True], [
all(msg in line for msg in ('Failed to put shard ranges', '404'))
for line in sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
res, sharder = do_test(
replicas, eventlet.Timeout(), eventlet.Timeout(), 202, 404)
self.assertFalse(res)
self.assertEqual([True], [
all(msg in line for msg in ('Failed to put shard ranges', '404'))
for line in sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('warning')])
self.assertEqual([True, True], [
'Failed to put shard ranges' in line for line in
sharder.logger.get_lines_for_level('error')])
self.assertEqual([True, True], [
'path: a/c, db: %s' % broker.db_file in line for line in
sharder.logger.get_lines_for_level('error')])
def test_process_broker_not_sharding_no_others(self):
# verify that sharding process will not start when own shard range is
@ -5270,8 +5434,8 @@ class TestSharder(BaseTestSharder):
self.assertEqual(SHARDED, broker.get_db_state())
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
lines = broker.logger.get_lines_for_level('info')
self.assertIn('Completed creating shard range containers: 2 created, '
'from sharding container a/c', lines)
self.assertIn('Completed creating 2 shard range containers, '
'path: a/c, db: %s' % broker.db_file, lines)
self.assertFalse(broker.logger.get_lines_for_level('warning'))
self.assertFalse(broker.logger.get_lines_for_level('error'))
self.assertEqual(deleted, broker.is_deleted())
@ -5711,8 +5875,9 @@ class TestSharder(BaseTestSharder):
mocked.assert_not_called()
def assert_overlap_warning(line, state_text):
self.assertIn(
'Audit failed for root %s' % broker.db_file, line)
self.assertIn('Audit failed for root', line)
self.assertIn(broker.db_file, line)
self.assertIn(broker.path, line)
self.assertIn(
'overlapping ranges in state %r: k-t s-y, y-z y-z'
% state_text, line)
@ -5781,9 +5946,10 @@ class TestSharder(BaseTestSharder):
broker.merge_shard_ranges(shard_ranges)
def assert_missing_warning(line):
self.assertIn(
'Audit failed for root %s' % broker.db_file, line)
self.assertIn('Audit failed for root', line)
self.assertIn('missing range(s): -a j-k z-', line)
self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file),
line)
def check_missing():
own_shard_range = broker.get_own_shard_range()
@ -5896,9 +6062,10 @@ class TestSharder(BaseTestSharder):
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._audit_container(broker)
message = 'Reclaimable db stuck waiting for shrinking: %s (%s)' % (
broker.db_file, broker.path)
self.assertEqual([message], self.logger.get_lines_for_level('warning'))
self.assertEqual(
['Reclaimable db stuck waiting for shrinking, path: %s, db: %s'
% (broker.path, broker.db_file)],
self.logger.get_lines_for_level('warning'))
# delete all shard ranges
for sr in shard_ranges:
@ -5970,10 +6137,14 @@ class TestSharder(BaseTestSharder):
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
lines = sharder.logger.get_lines_for_level('warning')
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0])
self.assertIn('Audit failed for shard', lines[0])
self.assertIn('missing own shard range', lines[0])
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1])
self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file),
lines[0])
self.assertIn('Audit warnings for shard', lines[1])
self.assertIn('account not in shards namespace', lines[1])
self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file),
lines[1])
self.assertNotIn('root has no matching shard range', lines[1])
self.assertNotIn('unable to get shard ranges from root', lines[1])
self.assertFalse(lines[2:])
@ -5984,8 +6155,10 @@ class TestSharder(BaseTestSharder):
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
lines = sharder.logger.get_lines_for_level('warning')
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0])
self.assertIn('Audit failed for shard', lines[0])
self.assertIn('missing own shard range', lines[0])
self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file),
lines[0])
self.assertNotIn('unable to get shard ranges from root', lines[0])
self.assertFalse(lines[1:])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
@ -6009,12 +6182,14 @@ class TestSharder(BaseTestSharder):
sharder, mock_swift = self.call_audit_container(
broker, shard_ranges)
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertEqual(['Updating own shard range from root'],
self.assertEqual(['Updating own shard range from root, path: '
'.shards_a/shard_c, db: %s' % broker.db_file],
sharder.logger.get_lines_for_level('debug'))
expected = shard_ranges[1].copy()
self.assertEqual(['Updated own shard range from %s to %s'
% (own_shard_range, expected)],
sharder.logger.get_lines_for_level('info'))
self.assertEqual(
['Updated own shard range from %s to %s, path: .shards_a/shard_c, '
'db: %s' % (own_shard_range, expected, broker.db_file)],
sharder.logger.get_lines_for_level('info'))
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
@ -6049,7 +6224,8 @@ class TestSharder(BaseTestSharder):
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertEqual(['Updating own shard range from root'],
self.assertEqual(['Updating own shard range from root, path: '
'.shards_a/shard_c, db: %s' % broker.db_file],
sharder.logger.get_lines_for_level('debug'))
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
@ -6083,7 +6259,9 @@ class TestSharder(BaseTestSharder):
exc=internal_client.UnexpectedResponse('bad', 'resp'))
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Failed to get shard ranges', lines[0])
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1])
self.assertIn('Audit warnings for shard', lines[1])
self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file),
lines[1])
self.assertNotIn('account not in shards namespace', lines[1])
self.assertNotIn('missing own shard range', lines[1])
self.assertNotIn('root has no matching shard range', lines[1])
@ -6111,12 +6289,14 @@ class TestSharder(BaseTestSharder):
broker, shard_ranges)
self.assert_no_audit_messages(sharder, mock_swift)
self.assertFalse(broker.is_deleted())
self.assertEqual(['Updating own shard range from root'],
self.assertEqual(['Updating own shard range from root, path: '
'.shards_a/shard_c, db: %s' % broker.db_file],
sharder.logger.get_lines_for_level('debug'))
expected = shard_ranges[1].copy()
self.assertEqual(['Updated own shard range from %s to %s'
% (own_shard_range, expected)],
sharder.logger.get_lines_for_level('info'))
self.assertEqual(
['Updated own shard range from %s to %s, path: .shards_a/shard_c, '
'db: %s' % (own_shard_range, expected, broker.db_file)],
sharder.logger.get_lines_for_level('info'))
# own shard range state is updated from root version
own_shard_range = broker.get_own_shard_range()
self.assertEqual(ShardRange.SHARDING, own_shard_range.state)
@ -6157,9 +6337,9 @@ class TestSharder(BaseTestSharder):
shard_ranges = self._make_shard_ranges(shard_bounds, shard_states)
def check_audit(own_state, root_state):
broker = self._make_broker(
account='.shards_a',
container='shard_c_%s' % root_ts.normal)
shard_container = 'shard_c_%s' % root_ts.normal
broker = self._make_broker(account='.shards_a',
container=shard_container)
broker.set_sharding_sysmeta(*args)
shard_ranges[1].name = broker.path
@ -6180,8 +6360,10 @@ class TestSharder(BaseTestSharder):
self._assert_stats(expected_stats, sharder, 'audit_shard')
debug_lines = sharder.logger.get_lines_for_level('debug')
self.assertGreater(len(debug_lines), 0)
self.assertEqual('Updating own shard range from root',
debug_lines[0])
self.assertEqual(
'Updating own shard range from root, path: .shards_a/%s, '
'db: %s' % (shard_container, broker.db_file),
sharder.logger.get_lines_for_level('debug')[0])
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
@ -7254,8 +7436,10 @@ class TestSharder(BaseTestSharder):
self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]],
broker.get_shard_ranges())
sharder._send_shard_ranges.assert_has_calls(
[mock.call(acceptor.account, acceptor.container, [acceptor]),
mock.call(donor.account, donor.container, [donor, acceptor])]
[mock.call(broker, acceptor.account, acceptor.container,
[acceptor]),
mock.call(broker, donor.account, donor.container,
[donor, acceptor])]
)
# check idempotency
@ -7266,8 +7450,10 @@ class TestSharder(BaseTestSharder):
self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]],
broker.get_shard_ranges())
sharder._send_shard_ranges.assert_has_calls(
[mock.call(acceptor.account, acceptor.container, [acceptor]),
mock.call(donor.account, donor.container, [donor, acceptor])]
[mock.call(broker, acceptor.account, acceptor.container,
[acceptor]),
mock.call(broker, donor.account, donor.container,
[donor, acceptor])]
)
# acceptor falls below threshold - not a candidate
@ -7280,8 +7466,10 @@ class TestSharder(BaseTestSharder):
self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]],
broker.get_shard_ranges())
sharder._send_shard_ranges.assert_has_calls(
[mock.call(acceptor.account, acceptor.container, [acceptor]),
mock.call(donor.account, donor.container, [donor, acceptor])]
[mock.call(broker, acceptor.account, acceptor.container,
[acceptor]),
mock.call(broker, donor.account, donor.container,
[donor, acceptor])]
)
# ...until donor has shrunk
@ -7300,9 +7488,9 @@ class TestSharder(BaseTestSharder):
[donor, new_donor, new_acceptor],
broker.get_shard_ranges(include_deleted=True))
sharder._send_shard_ranges.assert_has_calls(
[mock.call(new_acceptor.account, new_acceptor.container,
[mock.call(broker, new_acceptor.account, new_acceptor.container,
[new_acceptor]),
mock.call(new_donor.account, new_donor.container,
mock.call(broker, new_donor.account, new_donor.container,
[new_donor, new_acceptor])]
)
@ -7321,7 +7509,7 @@ class TestSharder(BaseTestSharder):
[donor, new_donor, final_donor],
broker.get_shard_ranges(include_deleted=True))
sharder._send_shard_ranges.assert_has_calls(
[mock.call(final_donor.account, final_donor.container,
[mock.call(broker, final_donor.account, final_donor.container,
[final_donor, broker.get_own_shard_range()])]
)
@ -7367,8 +7555,10 @@ class TestSharder(BaseTestSharder):
broker.get_shard_ranges())
for donor, acceptor in (shard_ranges[:2], shard_ranges[3:5]):
sharder._send_shard_ranges.assert_has_calls(
[mock.call(acceptor.account, acceptor.container, [acceptor]),
mock.call(donor.account, donor.container, [donor, acceptor])]
[mock.call(broker, acceptor.account, acceptor.container,
[acceptor]),
mock.call(broker, donor.account, donor.container,
[donor, acceptor])]
)
def test_partition_and_device_filters(self):