From 6942b25cc109258804989c32fa3ffe2b536c6e77 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Thu, 13 Jan 2022 16:40:50 +0000 Subject: [PATCH] Fix statsd prefix mutation in proxy controllers Swift loggers encapsulate a StatsdClient that is typically initialised with a prefix, equal to the logger name (e.g. 'proxy_server'), that is prepended to metrics names. The proxy server would previously mutate its logger's prefix, using its set_statsd_prefix method, each time a controller was instantiated, extending it with the controller type (e.g. changing the prefix 'proxy_server.object'). As a result, when an object request spawned container subrequests, for example, the statsd client would be left with a 'proxy_server.container' prefix part for subsequent object request related metrics. The proxy server logger is now wrapped with a new MetricsPrefixLoggerAdapter each time a controller is instantiated, and the adapter applies the correct prefix for the controller type for the lifetime of the controller. Change-Id: I0522b1953722ca96021a0002cf93432b973ce626 --- swift/common/utils.py | 68 ++++++++--- swift/proxy/controllers/account.py | 3 +- swift/proxy/controllers/base.py | 77 +++++++------ swift/proxy/controllers/container.py | 31 ++--- swift/proxy/controllers/obj.py | 108 +++++++++--------- swift/proxy/server.py | 8 +- .../common/middleware/test_container_sync.py | 3 +- test/unit/common/test_utils.py | 65 +++++++++++ test/unit/proxy/controllers/test_container.py | 19 +-- test/unit/proxy/controllers/test_info.py | 3 +- test/unit/proxy/controllers/test_obj.py | 2 +- test/unit/proxy/test_server.py | 96 ++++++++++------ 12 files changed, 311 insertions(+), 172 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index 803b1aa19f..c0fb7a9902 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2084,28 +2084,46 @@ def timing_stats(**dec_kwargs): class SwiftLoggerAdapter(logging.LoggerAdapter): """ A logging.LoggerAdapter subclass that also passes through StatsD method - calls. + calls. Adds an optional metric_prefix to statsd metric names. Like logging.LoggerAdapter, you have to subclass this and override the process() method to accomplish anything useful. """ - def update_stats(self, *a, **kw): - return self.logger.update_stats(*a, **kw) + def get_metric_name(self, metric): + # subclasses may override this method to annotate the metric name + return metric - def increment(self, *a, **kw): - return self.logger.increment(*a, **kw) + def update_stats(self, metric, *a, **kw): + return self.logger.update_stats(self.get_metric_name(metric), *a, **kw) - def decrement(self, *a, **kw): - return self.logger.decrement(*a, **kw) + def increment(self, metric, *a, **kw): + return self.logger.increment(self.get_metric_name(metric), *a, **kw) - def timing(self, *a, **kw): - return self.logger.timing(*a, **kw) + def decrement(self, metric, *a, **kw): + return self.logger.decrement(self.get_metric_name(metric), *a, **kw) - def timing_since(self, *a, **kw): - return self.logger.timing_since(*a, **kw) + def timing(self, metric, *a, **kw): + return self.logger.timing(self.get_metric_name(metric), *a, **kw) - def transfer_rate(self, *a, **kw): - return self.logger.transfer_rate(*a, **kw) + def timing_since(self, metric, *a, **kw): + return self.logger.timing_since(self.get_metric_name(metric), *a, **kw) + + def transfer_rate(self, metric, *a, **kw): + return self.logger.transfer_rate( + self.get_metric_name(metric), *a, **kw) + + @property + def thread_locals(self): + return self.logger.thread_locals + + @thread_locals.setter + def thread_locals(self, thread_locals): + self.logger.thread_locals = thread_locals + + def exception(self, msg, *a, **kw): + # We up-call to exception() where stdlib uses error() so we can get + # some of the traceback suppression from LogAdapter, below + self.logger.exception(msg, *a, **kw) class PrefixLoggerAdapter(SwiftLoggerAdapter): @@ -2119,9 +2137,7 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter): def exception(self, msg, *a, **kw): if 'prefix' in self.extra: msg = self.extra['prefix'] + msg - # We up-call to exception() where stdlib uses error() so we can get - # some of the traceback suppression from LogAdapter, below - self.logger.exception(msg, *a, **kw) + super(PrefixLoggerAdapter, self).exception(msg, *a, **kw) def process(self, msg, kwargs): msg, kwargs = super(PrefixLoggerAdapter, self).process(msg, kwargs) @@ -2130,6 +2146,26 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter): return (msg, kwargs) +class MetricsPrefixLoggerAdapter(SwiftLoggerAdapter): + """ + Adds a prefix to all Statsd metrics' names. + """ + def __init__(self, logger, extra, metric_prefix): + """ + :param logger: an instance of logging.Logger + :param extra: a dict-like object + :param metric_prefix: A prefix that will be added to the start of each + metric name such that the metric name is transformed to: + ``.``. Note that the logger's + StatsdClient also adds its configured prefix to metric names. + """ + super(MetricsPrefixLoggerAdapter, self).__init__(logger, extra) + self.metric_prefix = metric_prefix + + def get_metric_name(self, metric): + return '%s.%s' % (self.metric_prefix, metric) + + # double inheritance to support property with setter class LogAdapter(logging.LoggerAdapter, object): """ diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index 193b244367..f6dd3b2cfa 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -63,7 +63,8 @@ class AccountController(Controller): partition = self.app.account_ring.get_part(self.account_name) concurrency = self.app.account_ring.replica_count \ if self.app.get_policy_options(None).concurrent_gets else 1 - node_iter = self.app.iter_nodes(self.app.account_ring, partition) + node_iter = self.app.iter_nodes(self.app.account_ring, partition, + self.logger) params = req.params params['format'] = 'json' req.params = params diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index d083926010..2667b67ea0 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -46,7 +46,7 @@ from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \ document_iters_to_http_response_body, ShardRange, find_shard_range, \ - cache_from_env + cache_from_env, MetricsPrefixLoggerAdapter from swift.common.bufferedhttp import http_connect from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ @@ -956,7 +956,7 @@ class ByteCountEnforcer(object): class GetOrHeadHandler(object): def __init__(self, app, req, server_type, node_iter, partition, path, backend_headers, concurrency=1, policy=None, - client_chunk_size=None, newest=None): + client_chunk_size=None, newest=None, logger=None): self.app = app self.node_iter = node_iter self.server_type = server_type @@ -964,6 +964,7 @@ class GetOrHeadHandler(object): self.path = path self.backend_headers = backend_headers self.client_chunk_size = client_chunk_size + self.logger = logger or app.logger self.skip_bytes = 0 self.bytes_used_from_backend = 0 self.used_nodes = [] @@ -1312,10 +1313,10 @@ class GetOrHeadHandler(object): 'Trying to read during GET') raise except ChunkWriteTimeout: - self.app.logger.info( + self.logger.info( 'Client did not read from proxy within %ss', self.app.client_timeout) - self.app.logger.increment('client_timeouts') + self.logger.increment('client_timeouts') except GeneratorExit: warn = True req_range = self.backend_headers['Range'] @@ -1327,11 +1328,11 @@ class GetOrHeadHandler(object): if end - begin + 1 == self.bytes_used_from_backend: warn = False if not req.environ.get('swift.non_client_disconnect') and warn: - self.app.logger.info('Client disconnected on read of %r', - self.path) + self.logger.info('Client disconnected on read of %r', + self.path) raise except Exception: - self.app.logger.exception('Trying to send to client') + self.logger.exception('Trying to send to client') raise finally: # Close-out the connection as best as possible. @@ -1353,7 +1354,7 @@ class GetOrHeadHandler(object): return None def _make_node_request(self, node, node_timeout, logger_thread_locals): - self.app.logger.thread_locals = logger_thread_locals + self.logger.thread_locals = logger_thread_locals if node in self.used_nodes: return False req_headers = dict(self.backend_headers) @@ -1473,7 +1474,7 @@ class GetOrHeadHandler(object): for node in nodes: pile.spawn(self._make_node_request, node, node_timeout, - self.app.logger.thread_locals) + self.logger.thread_locals) _timeout = self.app.get_policy_options( self.policy).concurrency_timeout \ if pile.inflight < self.concurrency else None @@ -1543,7 +1544,7 @@ class GetOrHeadHandler(object): return document_iters_to_http_response_body( (add_content_type(pi) for pi in parts_iter), - boundary, is_multipart, self.app.logger) + boundary, is_multipart, self.logger) def get_working_response(self, req): source, node = self._get_source_and_node() @@ -1589,9 +1590,11 @@ class NodeIter(object): want to filter or reorder the nodes. :param policy: an instance of :class:`BaseStoragePolicy`. This should be None for an account or container ring. + :param logger: a logger instance; defaults to the app logger """ - def __init__(self, app, ring, partition, node_iter=None, policy=None): + def __init__(self, app, ring, partition, node_iter=None, policy=None, + logger=None): self.app = app self.ring = ring self.partition = partition @@ -1611,6 +1614,7 @@ class NodeIter(object): policy=policy) self.handoff_iter = node_iter self._node_provider = None + self.logger = logger or self.app.logger @property def primaries_left(self): @@ -1636,12 +1640,12 @@ class NodeIter(object): return extra_handoffs = handoffs - self.expected_handoffs if extra_handoffs > 0: - self.app.logger.increment('handoff_count') - self.app.logger.warning( + self.logger.increment('handoff_count') + self.logger.warning( 'Handoff requested (%d)' % handoffs) if (extra_handoffs == self.num_primary_nodes): # all the primaries were skipped, and handoffs didn't help - self.app.logger.increment('handoff_all_count') + self.logger.increment('handoff_all_count') def set_node_provider(self, callback): """ @@ -1704,6 +1708,9 @@ class Controller(object): self.trans_id = '-' self._allowed_methods = None self._private_methods = None + # adapt the app logger to prefix statsd metrics with the server type + self.logger = MetricsPrefixLoggerAdapter( + self.app.logger, {}, self.server_type.lower()) @property def allowed_methods(self): @@ -1857,11 +1864,11 @@ class Controller(object): :param body: byte string to use as the request body. Try to keep it small. :param logger_thread_locals: The thread local values to be set on the - self.app.logger to retain transaction + self.logger to retain transaction logging information. :returns: a swob.Response object, or None if no responses were received """ - self.app.logger.thread_locals = logger_thread_locals + self.logger.thread_locals = logger_thread_locals if body: if not isinstance(body, bytes): raise TypeError('body must be bytes, not %s' % type(body)) @@ -1926,14 +1933,14 @@ class Controller(object): :returns: a swob.Response object """ nodes = GreenthreadSafeIterator( - node_iterator or self.app.iter_nodes(ring, part) + node_iterator or self.app.iter_nodes(ring, part, self.logger) ) node_number = node_count or len(ring.get_part_nodes(part)) pile = GreenAsyncPile(node_number) for head in headers: pile.spawn(self._make_request, nodes, part, method, path, - head, query_string, body, self.app.logger.thread_locals) + head, query_string, body, self.logger.thread_locals) response = [] statuses = [] for resp in pile: @@ -2026,8 +2033,8 @@ class Controller(object): if not resp: resp = HTTPServiceUnavailable(request=req) - self.app.logger.error('%(type)s returning 503 for %(statuses)s', - {'type': server_type, 'statuses': statuses}) + self.logger.error('%(type)s returning 503 for %(statuses)s', + {'type': server_type, 'statuses': statuses}) return resp @@ -2100,12 +2107,11 @@ class Controller(object): self.app.account_ring, partition, 'PUT', path, [headers] * len(nodes)) if is_success(resp.status_int): - self.app.logger.info('autocreate account %r', path) + self.logger.info('autocreate account %r', path) clear_info_cache(self.app, req.environ, account) return True else: - self.app.logger.warning('Could not autocreate account %r', - path) + self.logger.warning('Could not autocreate account %r', path) return False def GETorHEAD_base(self, req, server_type, node_iter, partition, path, @@ -2129,7 +2135,8 @@ class Controller(object): handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter, partition, path, backend_headers, concurrency, policy=policy, - client_chunk_size=client_chunk_size) + client_chunk_size=client_chunk_size, + logger=self.logger) res = handler.get_working_response(req) if not res: @@ -2148,7 +2155,7 @@ class Controller(object): if policy: res.headers['X-Storage-Policy'] = policy.name else: - self.app.logger.error( + self.logger.error( 'Could not translate %s (%r) from %r to policy', 'X-Backend-Storage-Policy-Index', res.headers['X-Backend-Storage-Policy-Index'], path) @@ -2260,7 +2267,7 @@ class Controller(object): def _parse_listing_response(self, req, response): if not is_success(response.status_int): - self.app.logger.warning( + self.logger.warning( 'Failed to get container listing from %s: %s', req.path_qs, response.status_int) return None @@ -2271,7 +2278,7 @@ class Controller(object): raise ValueError('not a list') return data except ValueError as err: - self.app.logger.error( + self.logger.error( 'Problem with listing response from %s: %r', req.path_qs, err) return None @@ -2300,7 +2307,7 @@ class Controller(object): if headers: subreq.headers.update(headers) subreq.params = params - self.app.logger.debug( + self.logger.debug( 'Get listing from %s %s' % (subreq.path_qs, headers)) response = self.app.handle_request(subreq) data = self._parse_listing_response(req, response) @@ -2313,15 +2320,15 @@ class Controller(object): record_type = response.headers.get('x-backend-record-type') if record_type != 'shard': err = 'unexpected record type %r' % record_type - self.app.logger.error("Failed to get shard ranges from %s: %s", - req.path_qs, err) + self.logger.error("Failed to get shard ranges from %s: %s", + req.path_qs, err) return None try: return [ShardRange.from_dict(shard_range) for shard_range in listing] except (ValueError, TypeError, KeyError) as err: - self.app.logger.error( + self.logger.error( "Failed to get shard ranges from %s: invalid data: %r", req.path_qs, err) return None @@ -2373,7 +2380,7 @@ class Controller(object): # caching is disabled; fall back to old behavior shard_ranges, response = self._get_shard_ranges( req, account, container, states='updating', includes=obj) - self.app.logger.increment( + self.logger.increment( 'shard_updating.backend.%s' % response.status_int) if not shard_ranges: return None @@ -2388,10 +2395,10 @@ class Controller(object): skip_chance = \ self.app.container_updating_shard_ranges_skip_cache if skip_chance and random.random() < skip_chance: - self.app.logger.increment('shard_updating.cache.skip') + self.logger.increment('shard_updating.cache.skip') else: cached_ranges = memcache.get(cache_key) - self.app.logger.increment('shard_updating.cache.%s' % ( + self.logger.increment('shard_updating.cache.%s' % ( 'hit' if cached_ranges else 'miss')) if cached_ranges: @@ -2401,7 +2408,7 @@ class Controller(object): else: shard_ranges, response = self._get_shard_ranges( req, account, container, states='updating') - self.app.logger.increment( + self.logger.increment( 'shard_updating.backend.%s' % response.status_int) if shard_ranges: cached_ranges = [dict(sr) for sr in shard_ranges] diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 36cc53b3a5..0550cb3c69 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -101,7 +101,8 @@ class ContainerController(Controller): self.account_name, self.container_name) concurrency = self.app.container_ring.replica_count \ if self.app.get_policy_options(None).concurrent_gets else 1 - node_iter = self.app.iter_nodes(self.app.container_ring, part) + node_iter = self.app.iter_nodes(self.app.container_ring, part, + self.logger) resp = self.GETorHEAD_base( req, 'Container', node_iter, part, req.swift_entity_path, concurrency) @@ -131,7 +132,7 @@ class ContainerController(Controller): # X-Newest is true then we always fetch from the backend servers. get_newest = config_true_value(req.headers.get('x-newest', False)) if get_newest: - self.app.logger.debug( + self.logger.debug( 'Skipping shard cache lookup (x-newest) for %s', req.path_qs) info = None else: @@ -154,17 +155,17 @@ class ContainerController(Controller): skip_chance = \ self.app.container_listing_shard_ranges_skip_cache if skip_chance and random.random() < skip_chance: - self.app.logger.increment('shard_listing.cache.skip') + self.logger.increment('shard_listing.cache.skip') else: cached_ranges = memcache.get(cache_key) - self.app.logger.increment('shard_listing.cache.%s' % ( + self.logger.increment('shard_listing.cache.%s' % ( 'hit' if cached_ranges else 'miss')) if cached_ranges is not None: infocache[cache_key] = tuple(cached_ranges) # shard ranges can be returned from cache - self.app.logger.debug('Found %d shards in cache for %s', - len(cached_ranges), req.path_qs) + self.logger.debug('Found %d shards in cache for %s', + len(cached_ranges), req.path_qs) headers.update({'x-backend-record-type': 'shard', 'x-backend-cached-results': 'true'}) shard_range_body = self._filter_resp_shard_ranges( @@ -224,8 +225,8 @@ class ContainerController(Controller): memcache = cache_from_env(req.environ, True) if memcache and cached_ranges: # cache in memcache only if shard ranges as expected - self.app.logger.debug('Caching %d shards for %s', - len(cached_ranges), req.path_qs) + self.logger.debug('Caching %d shards for %s', + len(cached_ranges), req.path_qs) memcache.set( cache_key, cached_ranges, time=self.app.recheck_listing_shard_ranges) @@ -340,8 +341,8 @@ class ContainerController(Controller): shard_listing_history.append((self.account_name, self.container_name)) shard_ranges = [ShardRange.from_dict(data) for data in json.loads(resp.body)] - self.app.logger.debug('GET listing from %s shards for: %s', - len(shard_ranges), req.path_qs) + self.logger.debug('GET listing from %s shards for: %s', + len(shard_ranges), req.path_qs) if not shard_ranges: # can't find ranges or there was a problem getting the ranges. So # return what we have. @@ -404,7 +405,7 @@ class ContainerController(Controller): if just_past < shard_range: continue - self.app.logger.debug( + self.logger.debug( 'Getting listing part %d from shard %s %s with %s', i, shard_range, shard_range.name, headers) objs, shard_resp = self._get_container_listing( @@ -417,7 +418,7 @@ class ContainerController(Controller): if objs is None: # give up if any non-success response from shard containers - self.app.logger.error( + self.logger.error( 'Aborting listing from shards due to bad response: %r' % all_resp_status) return HTTPServiceUnavailable(request=req) @@ -426,12 +427,12 @@ class ContainerController(Controller): shard_resp.headers[policy_key] ) if shard_policy != req.headers[policy_key]: - self.app.logger.error( + self.logger.error( 'Aborting listing from shards due to bad shard policy ' 'index: %s (expected %s)', shard_policy, req.headers[policy_key]) return HTTPServiceUnavailable(request=req) - self.app.logger.debug( + self.logger.debug( 'Found %d objects in shard (state=%s), total = %d', len(objs), sharding_state, len(objs) + len(objects)) @@ -457,7 +458,7 @@ class ContainerController(Controller): constrained = any(req.params.get(constraint) for constraint in ( 'marker', 'end_marker', 'path', 'prefix', 'delimiter')) if not constrained and len(objects) < req_limit: - self.app.logger.debug('Setting object count to %s' % len(objects)) + self.logger.debug('Setting object count to %s' % len(objects)) # prefer the actual listing stats over the potentially outdated # root stats. This condition is only likely when a sharded # container is shrinking or in tests; typically a sharded container diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index da61b7e110..c3989bcead 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -196,7 +196,8 @@ class BaseObjectController(Controller): policy_options = self.app.get_policy_options(policy) is_local = policy_options.write_affinity_is_local_fn if is_local is None: - return self.app.iter_nodes(ring, partition, policy=policy) + return self.app.iter_nodes(ring, partition, self.logger, + policy=policy) primary_nodes = ring.get_part_nodes(partition) handoff_nodes = ring.get_more_nodes(partition) @@ -229,8 +230,8 @@ class BaseObjectController(Controller): (node for node in all_nodes if node not in preferred_nodes) ) - return self.app.iter_nodes(ring, partition, node_iter=node_iter, - policy=policy) + return self.app.iter_nodes(ring, partition, self.logger, + node_iter=node_iter, policy=policy) def GETorHEAD(self, req): """Handle HTTP GET or HEAD requests.""" @@ -249,7 +250,8 @@ class BaseObjectController(Controller): return aresp partition = obj_ring.get_part( self.account_name, self.container_name, self.object_name) - node_iter = self.app.iter_nodes(obj_ring, partition, policy=policy) + node_iter = self.app.iter_nodes(obj_ring, partition, self.logger, + policy=policy) resp = self._get_or_head_response(req, node_iter, partition, policy) @@ -409,7 +411,7 @@ class BaseObjectController(Controller): def _get_conn_response(self, putter, path, logger_thread_locals, final_phase, **kwargs): - self.app.logger.thread_locals = logger_thread_locals + self.logger.thread_locals = logger_thread_locals try: resp = putter.await_response( self.app.node_timeout, not final_phase) @@ -466,7 +468,7 @@ class BaseObjectController(Controller): if putter.failed: continue pile.spawn(self._get_conn_response, putter, req.path, - self.app.logger.thread_locals, final_phase=final_phase) + self.logger.thread_locals, final_phase=final_phase) def _handle_response(putter, response): statuses.append(response.status) @@ -563,7 +565,7 @@ class BaseObjectController(Controller): putter.resp.status for putter in putters if putter.resp] if HTTP_PRECONDITION_FAILED in statuses: # If we find any copy of the file, it shouldn't be uploaded - self.app.logger.debug( + self.logger.debug( 'Object PUT returning 412, %(statuses)r', {'statuses': statuses}) raise HTTPPreconditionFailed(request=req) @@ -576,7 +578,7 @@ class BaseObjectController(Controller): putter.resp.getheaders()).get( 'X-Backend-Timestamp', 'unknown') } for putter in putters if putter.resp] - self.app.logger.debug( + self.logger.debug( 'Object PUT returning 202 for 409: ' '%(req_timestamp)s <= %(timestamps)r', {'req_timestamp': req.timestamp.internal, @@ -614,11 +616,11 @@ class BaseObjectController(Controller): :param req: a swob Request :param headers: request headers :param logger_thread_locals: The thread local values to be set on the - self.app.logger to retain transaction + self.logger to retain transaction logging information. :return: an instance of a Putter """ - self.app.logger.thread_locals = logger_thread_locals + self.logger.thread_locals = logger_thread_locals for node in nodes: try: putter = self._make_putter(node, part, req, headers) @@ -653,7 +655,7 @@ class BaseObjectController(Controller): del nheaders['Content-Length'] nheaders['Expect'] = '100-continue' pile.spawn(self._connect_put_node, node_iter, partition, - req, nheaders, self.app.logger.thread_locals) + req, nheaders, self.logger.thread_locals) putters = [putter for putter in pile if putter] @@ -664,8 +666,7 @@ class BaseObjectController(Controller): 'required connections') if len(putters) < min_conns: - self.app.logger.error((msg), - {'conns': len(putters), 'nodes': min_conns}) + self.logger.error(msg, {'conns': len(putters), 'nodes': min_conns}) raise HTTPServiceUnavailable(request=req) def _get_footers(self, req): @@ -874,7 +875,7 @@ class ReplicatedObjectController(BaseObjectController): node_timeout=self.app.node_timeout, write_timeout=self.app.node_timeout, send_exception_handler=self.app.exception_occurred, - logger=self.app.logger, + logger=self.logger, need_multiphase=False) else: te = ',' + headers.get('Transfer-Encoding', '') @@ -884,7 +885,7 @@ class ReplicatedObjectController(BaseObjectController): node_timeout=self.app.node_timeout, write_timeout=self.app.node_timeout, send_exception_handler=self.app.exception_occurred, - logger=self.app.logger, + logger=self.logger, chunked=te.endswith(',chunked')) return putter @@ -927,9 +928,9 @@ class ReplicatedObjectController(BaseObjectController): ml = req.message_length() if ml and bytes_transferred < ml: - self.app.logger.warning( + self.logger.warning( 'Client disconnected without sending enough data') - self.app.logger.increment('client_disconnects') + self.logger.increment('client_disconnects') raise HTTPClientDisconnect(request=req) trail_md = self._get_footers(req) @@ -942,23 +943,23 @@ class ReplicatedObjectController(BaseObjectController): msg='Object PUT exceptions after last send, ' '%(conns)s/%(nodes)s required connections') except ChunkReadTimeout as err: - self.app.logger.warning( + self.logger.warning( 'ERROR Client read timeout (%ss)', err.seconds) - self.app.logger.increment('client_timeouts') + self.logger.increment('client_timeouts') raise HTTPRequestTimeout(request=req) except HTTPException: raise except ChunkReadError: - self.app.logger.warning( + self.logger.warning( 'Client disconnected without sending last chunk') - self.app.logger.increment('client_disconnects') + self.logger.increment('client_disconnects') raise HTTPClientDisconnect(request=req) except Timeout: - self.app.logger.exception( + self.logger.exception( 'ERROR Exception causing client disconnect') raise HTTPClientDisconnect(request=req) except Exception: - self.app.logger.exception( + self.logger.exception( 'ERROR Exception transferring data to object servers %s', {'path': req.path}) raise HTTPInternalServerError(request=req) @@ -1002,7 +1003,7 @@ class ReplicatedObjectController(BaseObjectController): putter.close() if len(etags) > 1: - self.app.logger.error( + self.logger.error( 'Object servers returned %s mismatched etags', len(etags)) return HTTPServerError(request=req) etag = etags.pop() if len(etags) else None @@ -2376,7 +2377,8 @@ def is_good_source(status): class ECFragGetter(object): def __init__(self, app, req, node_iter, partition, policy, path, - backend_headers, header_provider, logger_thread_locals): + backend_headers, header_provider, logger_thread_locals, + logger): self.app = app self.req = req self.node_iter = node_iter @@ -2390,6 +2392,7 @@ class ECFragGetter(object): self.bytes_used_from_backend = 0 self.source = None self.logger_thread_locals = logger_thread_locals + self.logger = logger def fast_forward(self, num_bytes): """ @@ -2574,7 +2577,7 @@ class ECFragGetter(object): try: self.fast_forward(self.bytes_used_from_backend) except (HTTPException, ValueError): - self.app.logger.exception('Unable to fast forward') + self.logger.exception('Unable to fast forward') six.reraise(exc_type, exc_value, exc_traceback) except RangeAlreadyComplete: break @@ -2701,10 +2704,10 @@ class ECFragGetter(object): 'Trying to read during GET') raise except ChunkWriteTimeout: - self.app.logger.warning( + self.logger.warning( 'Client did not read from proxy within %ss' % self.app.client_timeout) - self.app.logger.increment('client_timeouts') + self.logger.increment('client_timeouts') except GeneratorExit: warn = True req_range = self.backend_headers['Range'] @@ -2716,11 +2719,11 @@ class ECFragGetter(object): if end - begin + 1 == self.bytes_used_from_backend: warn = False if not req.environ.get('swift.non_client_disconnect') and warn: - self.app.logger.warning( + self.logger.warning( 'Client disconnected on read of EC frag %r', self.path) raise except Exception: - self.app.logger.exception('Trying to send to client') + self.logger.exception('Trying to send to client') raise finally: # Close-out the connection as best as possible. @@ -2739,7 +2742,7 @@ class ECFragGetter(object): return HeaderKeyDict() def _make_node_request(self, node, node_timeout): - self.app.logger.thread_locals = self.logger_thread_locals + self.logger.thread_locals = self.logger_thread_locals req_headers = dict(self.backend_headers) ip, port = get_ip_port(node, req_headers) req_headers.update(self.header_provider()) @@ -2774,8 +2777,8 @@ class ECFragGetter(object): not Timestamp(src_headers.get('x-backend-timestamp', 0)): # throw out 5XX and 404s from handoff nodes unless the data is # really on disk and had been DELETEd - self.app.logger.debug('Ignoring %s from handoff' % - possible_source.status) + self.logger.debug('Ignoring %s from handoff' % + possible_source.status) conn.close() return None @@ -2798,7 +2801,7 @@ class ECFragGetter(object): {'status': possible_source.status, 'body': self.body[:1024]}) else: - self.app.logger.debug( + self.logger.debug( 'Ignoring %s from primary' % possible_source.status) return None @@ -2830,7 +2833,7 @@ class ECFragGetter(object): # _make_node_request only returns good sources continue if source.getheader('X-Object-Sysmeta-EC-ETag') != used_etag: - self.app.logger.warning( + self.logger.warning( 'Skipping source (etag mismatch: got %s, expected %s)', source.getheader('X-Object-Sysmeta-EC-ETag'), used_etag) else: @@ -2846,13 +2849,14 @@ class ECObjectController(BaseObjectController): """ Makes a GET request for a fragment. """ - self.app.logger.thread_locals = logger_thread_locals + self.logger.thread_locals = logger_thread_locals backend_headers = self.generate_request_headers( req, additional=req.headers) getter = ECFragGetter(self.app, req, node_iter, partition, policy, req.swift_entity_path, backend_headers, - header_provider, logger_thread_locals) + header_provider, logger_thread_locals, + self.logger) return (getter, getter.response_parts_iter(req)) def _convert_range(self, req, policy): @@ -2965,14 +2969,14 @@ class ECObjectController(BaseObjectController): pile.spawn(self._fragment_GET_request, req, safe_iter, partition, policy, buckets.get_extra_headers, - self.app.logger.thread_locals) + self.logger.thread_locals) feeder_q = None if policy_options.concurrent_gets: feeder_q = Queue() pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req, partition, policy, buckets, feeder_q, - self.app.logger.thread_locals) + self.logger.thread_locals) extra_requests = 0 # max_extra_requests is an arbitrary hard limit for spawning extra @@ -2987,7 +2991,7 @@ class ECObjectController(BaseObjectController): try: buckets.add_response(get, parts_iter) except ValueError as err: - self.app.logger.error( + self.logger.error( "Problem with fragment response: %s", err) best_bucket = buckets.best_bucket if best_bucket.durable and best_bucket.shortfall <= 0: @@ -3001,7 +3005,7 @@ class ECObjectController(BaseObjectController): extra_requests += 1 pile.spawn(self._fragment_GET_request, req, safe_iter, partition, policy, buckets.get_extra_headers, - self.app.logger.thread_locals) + self.logger.thread_locals) if feeder_q: feeder_q.put('stop') @@ -3024,7 +3028,7 @@ class ECObjectController(BaseObjectController): policy, [p_iter for _getter, p_iter in best_bucket.get_responses()], range_specs, fa_length, obj_length, - self.app.logger) + self.logger) resp = Response( request=req, conditional_response=True, @@ -3146,7 +3150,7 @@ class ECObjectController(BaseObjectController): node_timeout=self.app.node_timeout, write_timeout=self.app.node_timeout, send_exception_handler=self.app.exception_occurred, - logger=self.app.logger, + logger=self.logger, need_multiphase=True) def _determine_chunk_destinations(self, putters, policy): @@ -3285,9 +3289,9 @@ class ECObjectController(BaseObjectController): ml = req.message_length() if ml and bytes_transferred < ml: - self.app.logger.warning( + self.logger.warning( 'Client disconnected without sending enough data') - self.app.logger.increment('client_disconnects') + self.logger.increment('client_disconnects') raise HTTPClientDisconnect(request=req) send_chunk(b'') # flush out any buffered data @@ -3327,7 +3331,7 @@ class ECObjectController(BaseObjectController): min_responses=min_conns) if not self.have_quorum( statuses, len(nodes), quorum=min_conns): - self.app.logger.error( + self.logger.error( 'Not enough object servers ack\'ed (got %d)', statuses.count(HTTP_CONTINUE)) raise HTTPServiceUnavailable(request=req) @@ -3355,23 +3359,23 @@ class ECObjectController(BaseObjectController): for putter in putters: putter.send_commit_confirmation() except ChunkReadTimeout as err: - self.app.logger.warning( + self.logger.warning( 'ERROR Client read timeout (%ss)', err.seconds) - self.app.logger.increment('client_timeouts') + self.logger.increment('client_timeouts') raise HTTPRequestTimeout(request=req) except ChunkReadError: - self.app.logger.warning( + self.logger.warning( 'Client disconnected without sending last chunk') - self.app.logger.increment('client_disconnects') + self.logger.increment('client_disconnects') raise HTTPClientDisconnect(request=req) except HTTPException: raise except Timeout: - self.app.logger.exception( + self.logger.exception( 'ERROR Exception causing client disconnect') raise HTTPClientDisconnect(request=req) except Exception: - self.app.logger.exception( + self.logger.exception( 'ERROR Exception transferring data to object servers %s', {'path': req.path}) raise HTTPInternalServerError(request=req) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 1e1388c419..091ac8cf5c 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -198,6 +198,7 @@ class Application(object): self.logger = get_logger(conf, log_route='proxy-server') else: self.logger = logger + self.logger.set_statsd_prefix('proxy-server') self._error_limiting = {} swift_dir = conf.get('swift_dir', '/etc/swift') @@ -514,7 +515,6 @@ class Application(object): :param req: swob.Request object """ try: - self.logger.set_statsd_prefix('proxy-server') if req.content_length and req.content_length < 0: self.logger.increment('errors') return HTTPBadRequest(request=req, @@ -546,8 +546,6 @@ class Application(object): req.host.split(':')[0] in self.deny_host_headers: return HTTPForbidden(request=req, body='Invalid host header') - self.logger.set_statsd_prefix('proxy-server.' + - controller.server_type.lower()) controller = controller(self, **path_parts) if 'swift.trans_id' not in req.environ: # if this wasn't set by an earlier middleware, set it now @@ -702,9 +700,9 @@ class Application(object): 'msg': msg, 'ip': node['ip'], 'port': node['port'], 'device': node['device']}) - def iter_nodes(self, ring, partition, node_iter=None, policy=None): + def iter_nodes(self, ring, partition, logger, node_iter=None, policy=None): return NodeIter(self, ring, partition, node_iter=node_iter, - policy=policy) + policy=policy, logger=logger) def exception_occurred(self, node, typ, additional_info, **kwargs): diff --git a/test/unit/common/middleware/test_container_sync.py b/test/unit/common/middleware/test_container_sync.py index 4bfeb0221f..d5c98d5387 100644 --- a/test/unit/common/middleware/test_container_sync.py +++ b/test/unit/common/middleware/test_container_sync.py @@ -34,7 +34,8 @@ class FakeApp(object): def __call__(self, env, start_response): if env.get('PATH_INFO') == '/info': controller = InfoController( - app=None, version=None, expose_info=True, + app=mock.Mock(logger=debug_logger()), + version=None, expose_info=True, disallowed_sections=[], admin_key=None) handler = getattr(controller, env.get('REQUEST_METHOD')) return handler(swob.Request(env))(env, start_response) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 97be1251f1..6c242569a6 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -6344,6 +6344,71 @@ class TestStatsdLoggingDelegation(unittest.TestCase): self.assertEqual(called, [12345]) +class TestSwiftLoggerAdapter(unittest.TestCase): + @reset_logger_state + def test_thread_locals(self): + logger = utils.get_logger({}, 'foo') + adapter1 = utils.SwiftLoggerAdapter(logger, {}) + adapter2 = utils.SwiftLoggerAdapter(logger, {}) + locals1 = ('tx_123', '1.2.3.4') + adapter1.thread_locals = locals1 + self.assertEqual(adapter1.thread_locals, locals1) + self.assertEqual(adapter2.thread_locals, locals1) + self.assertEqual(logger.thread_locals, locals1) + + locals2 = ('tx_456', '1.2.3.456') + logger.thread_locals = locals2 + self.assertEqual(adapter1.thread_locals, locals2) + self.assertEqual(adapter2.thread_locals, locals2) + self.assertEqual(logger.thread_locals, locals2) + logger.thread_locals = (None, None) + + def test_exception(self): + # verify that the adapter routes exception calls to utils.LogAdapter + # for special case handling + logger = utils.get_logger({}) + adapter = utils.SwiftLoggerAdapter(logger, {}) + try: + raise OSError(errno.ECONNREFUSED, 'oserror') + except OSError: + with mock.patch('logging.LoggerAdapter.error') as mocked: + adapter.exception('Caught') + mocked.assert_called_with('Caught: Connection refused') + + +class TestMetricsPrefixLoggerAdapter(unittest.TestCase): + def test_metric_prefix(self): + logger = utils.get_logger({}, 'logger_name') + adapter1 = utils.MetricsPrefixLoggerAdapter(logger, {}, 'one') + adapter2 = utils.MetricsPrefixLoggerAdapter(logger, {}, 'two') + adapter3 = utils.SwiftLoggerAdapter(logger, {}) + self.assertEqual('logger_name', logger.name) + self.assertEqual('logger_name', adapter1.logger.name) + self.assertEqual('logger_name', adapter2.logger.name) + self.assertEqual('logger_name', adapter3.logger.name) + + with mock.patch.object(logger, 'increment') as mock_increment: + adapter1.increment('test1') + adapter2.increment('test2') + adapter3.increment('test3') + logger.increment('test') + self.assertEqual( + [mock.call('one.test1'), mock.call('two.test2'), + mock.call('test3'), mock.call('test')], + mock_increment.call_args_list) + + adapter1.metric_prefix = 'not one' + with mock.patch.object(logger, 'increment') as mock_increment: + adapter1.increment('test1') + adapter2.increment('test2') + adapter3.increment('test3') + logger.increment('test') + self.assertEqual( + [mock.call('not one.test1'), mock.call('two.test2'), + mock.call('test3'), mock.call('test')], + mock_increment.call_args_list) + + class TestAuditLocationGenerator(unittest.TestCase): def test_drive_tree_access(self): diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index 85bda3f037..7570be9bdb 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -2094,7 +2094,7 @@ class TestContainerController(TestRingBase): # container is sharded but proxy does not have that state cached; # expect a backend request and expect shard ranges to be cached self.memcache.clear_calls() - self.logger.logger.log_dict['increment'][:] = [] + self.logger.clear() req = self._build_request({'X-Backend-Record-Type': record_type}, {'states': 'listing'}, {}) backend_req, resp = self._capture_backend_request( @@ -2130,7 +2130,7 @@ class TestContainerController(TestRingBase): # no shard ranges cached; expect a cache miss and write-back self.memcache.delete('shard-listing/a/c') self.memcache.clear_calls() - self.logger.logger.log_dict['increment'][:] = [] + self.logger.clear() req = self._build_request({'X-Backend-Record-Type': record_type}, {'states': 'listing'}, {}) backend_req, resp = self._capture_backend_request( @@ -2161,12 +2161,12 @@ class TestContainerController(TestRingBase): req.environ['swift.infocache']['shard-listing/a/c']) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], - ['shard_listing.cache.miss']) + ['container.shard_listing.cache.miss']) # container is sharded and proxy does have that state cached and # also has shard ranges cached; expect a read from cache self.memcache.clear_calls() - self.logger.logger.log_dict['increment'][:] = [] + self.logger.clear() req = self._build_request({'X-Backend-Record-Type': record_type}, {'states': 'listing'}, {}) resp = req.get_response(self.app) @@ -2184,11 +2184,11 @@ class TestContainerController(TestRingBase): req.environ['swift.infocache']['shard-listing/a/c']) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], - ['shard_listing.cache.hit']) + ['container.shard_listing.cache.hit']) # if there's a chance to skip cache, maybe we go to disk again... self.memcache.clear_calls() - self.logger.logger.log_dict['increment'][:] = [] + self.logger.clear() self.app.container_listing_shard_ranges_skip_cache = 0.10 req = self._build_request({'X-Backend-Record-Type': record_type}, {'states': 'listing'}, {}) @@ -2220,11 +2220,11 @@ class TestContainerController(TestRingBase): req.environ['swift.infocache']['shard-listing/a/c']) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], - ['shard_listing.cache.skip']) + ['container.shard_listing.cache.skip']) # ... or maybe we serve from cache self.memcache.clear_calls() - self.logger.logger.log_dict['increment'][:] = [] + self.logger.clear() req = self._build_request({'X-Backend-Record-Type': record_type}, {'states': 'listing'}, {}) with mock.patch('random.random', return_value=0.11): @@ -2243,7 +2243,8 @@ class TestContainerController(TestRingBase): req.environ['swift.infocache']['shard-listing/a/c']) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], - ['shard_listing.cache.hit']) + ['container.shard_listing.cache.hit']) + # put this back the way we found it for later subtests self.app.container_listing_shard_ranges_skip_cache = 0.0 diff --git a/test/unit/proxy/controllers/test_info.py b/test/unit/proxy/controllers/test_info.py index 2317acfbe1..4dbba5a141 100644 --- a/test/unit/proxy/controllers/test_info.py +++ b/test/unit/proxy/controllers/test_info.py @@ -22,6 +22,7 @@ from swift.proxy.controllers import InfoController from swift.proxy.server import Application as ProxyApp from swift.common import utils from swift.common.swob import Request, HTTPException +from test.debug_logger import debug_logger class TestInfoController(unittest.TestCase): @@ -34,7 +35,7 @@ class TestInfoController(unittest.TestCase): admin_key=None): disallowed_sections = disallowed_sections or [] - app = Mock(spec=ProxyApp) + app = Mock(spec=ProxyApp, logger=debug_logger()) return InfoController(app, None, expose_info, disallowed_sections, admin_key) diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index baddfe7f24..01440906df 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2526,7 +2526,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): controller = self.controller_cls( self.app, 'a', 'c', 'o') safe_iter = utils.GreenthreadSafeIterator(self.app.iter_nodes( - self.policy.object_ring, 0, policy=self.policy)) + self.policy.object_ring, 0, self.logger, policy=self.policy)) controller._fragment_GET_request = lambda *a, **k: next(safe_iter) pile = utils.GreenAsyncPile(self.policy.ec_ndata) for i in range(self.policy.ec_ndata): diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index ac12c089b6..8b47e68458 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -1335,6 +1335,13 @@ class TestProxyServer(unittest.TestCase): self.assertTrue('swift.valid_api_versions' in path_parts.get('disallowed_sections')) + def test_statsd_prefix(self): + app = proxy_server.Application({}, logger=debug_logger(), + account_ring=FakeRing(), + container_ring=FakeRing()) + self.assertEqual([(('proxy-server',), {})], + app.logger.log_dict['set_statsd_prefix']) + @patch_policies([ StoragePolicy(0, 'zero', is_default=True), @@ -2294,9 +2301,10 @@ class TestReplicatedObjectController( def setUp(self): skip_if_no_xattrs() _test_servers[0]._error_limiting = {} # clear out errors + self.logger = debug_logger('proxy-ut') self.app = proxy_server.Application( None, - logger=debug_logger('proxy-ut'), + logger=self.logger, account_ring=FakeRing(), container_ring=FakeRing()) super(TestReplicatedObjectController, self).setUp() @@ -3955,7 +3963,7 @@ class TestReplicatedObjectController( self.app.recheck_updating_shard_ranges = 0 def do_test(method, sharding_state): - self.app.logger = debug_logger('proxy-ut') # clean capture state + self.app.logger.clear() # clean capture state req = Request.blank('/v1/a/c/o', {}, method=method, body='', headers={'Content-Type': 'text/plain'}) @@ -3975,8 +3983,10 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.get_increment_counts() - self.assertEqual({'shard_updating.backend.200': 1}, stats) + self.assertEqual({'object.shard_updating.backend.200': 1}, stats) backend_requests = fake_conn.requests + # verify statsd prefix is not mutated + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) account_request = backend_requests[0] self._check_request( @@ -4035,7 +4045,7 @@ class TestReplicatedObjectController( self.app.recheck_updating_shard_ranges = 3600 def do_test(method, sharding_state): - self.app.logger = debug_logger('proxy-ut') # clean capture state + self.app.logger.clear() # clean capture state req = Request.blank( '/v1/a/c/o', {'swift.cache': FakeMemcache()}, method=method, body='', headers={'Content-Type': 'text/plain'}) @@ -4064,8 +4074,10 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.get_increment_counts() - self.assertEqual({'shard_updating.cache.miss': 1, - 'shard_updating.backend.200': 1}, stats) + self.assertEqual({'object.shard_updating.cache.miss': 1, + 'object.shard_updating.backend.200': 1}, stats) + # verify statsd prefix is not mutated + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) backend_requests = fake_conn.requests account_request = backend_requests[0] @@ -4133,7 +4145,7 @@ class TestReplicatedObjectController( self.app.recheck_updating_shard_ranges = 3600 def do_test(method, sharding_state): - self.app.logger = debug_logger('proxy-ut') # clean capture state + self.app.logger.clear() # clean capture state shard_ranges = [ utils.ShardRange( '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), @@ -4162,7 +4174,9 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.get_increment_counts() - self.assertEqual({'shard_updating.cache.hit': 1}, stats) + self.assertEqual({'object.shard_updating.cache.hit': 1}, stats) + # verify statsd prefix is not mutated + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) backend_requests = fake_conn.requests account_request = backend_requests[0] @@ -4224,7 +4238,7 @@ class TestReplicatedObjectController( self.app.container_updating_shard_ranges_skip_cache = 0.001 def do_test(method, sharding_state): - self.app.logger = debug_logger('proxy-ut') # clean capture state + self.app.logger.clear() # clean capture state cached_shard_ranges = [ utils.ShardRange( '.shards_a/c_nope', utils.Timestamp.now(), '', 'l'), @@ -4254,7 +4268,7 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.get_increment_counts() - self.assertEqual({'shard_updating.cache.hit': 1}, stats) + self.assertEqual({'object.shard_updating.cache.hit': 1}, stats) # cached shard ranges are still there cache_key = 'shard-updating/a/c' @@ -4292,9 +4306,11 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.get_increment_counts() - self.assertEqual({'shard_updating.cache.skip': 1, - 'shard_updating.cache.hit': 1, - 'shard_updating.backend.200': 1}, stats) + self.assertEqual({'object.shard_updating.cache.skip': 1, + 'object.shard_updating.cache.hit': 1, + 'object.shard_updating.backend.200': 1}, stats) + # verify statsd prefix is not mutated + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) backend_requests = fake_conn.requests container_request_shard = backend_requests[0] @@ -4356,7 +4372,7 @@ class TestReplicatedObjectController( self.app.recheck_updating_shard_ranges = 0 def do_test(method, sharding_state): - self.app.logger = debug_logger('proxy-ut') # clean capture state + self.app.logger.clear() # clean capture state req = Request.blank('/v1/a/c/o', {}, method=method, body='', headers={'Content-Type': 'text/plain'}) @@ -4372,7 +4388,7 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.get_increment_counts() - self.assertEqual({'shard_updating.backend.404': 1}, stats) + self.assertEqual({'object.shard_updating.backend.404': 1}, stats) backend_requests = fake_conn.requests account_request = backend_requests[0] @@ -4936,8 +4952,8 @@ class TestReplicatedObjectController( 'container', 'object') collected_nodes = [] - for node in self.app.iter_nodes(object_ring, - partition): + for node in self.app.iter_nodes(object_ring, partition, + self.logger): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 5) @@ -4947,21 +4963,22 @@ class TestReplicatedObjectController( 'container', 'object') collected_nodes = [] - for node in self.app.iter_nodes(object_ring, - partition): + for node in self.app.iter_nodes(object_ring, partition, + self.logger): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 9) # zero error-limited primary nodes -> no handoff warnings self.app.log_handoffs = True - self.app.logger = debug_logger() + self.app.logger.clear() # clean capture state self.app.request_node_count = lambda r: 7 object_ring.max_more_nodes = 20 partition, nodes = object_ring.get_nodes('account', 'container', 'object') collected_nodes = [] - for node in self.app.iter_nodes(object_ring, partition): + for node in self.app.iter_nodes(object_ring, partition, + self.logger): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 7) self.assertEqual(self.app.logger.log_dict['warning'], []) @@ -4969,14 +4986,15 @@ class TestReplicatedObjectController( # one error-limited primary node -> one handoff warning self.app.log_handoffs = True - self.app.logger = debug_logger() + self.app.logger.clear() # clean capture state self.app.request_node_count = lambda r: 7 self.app._error_limiting = {} # clear out errors set_node_errors(self.app, object_ring._devs[0], 999, last_error=(2 ** 63 - 1)) collected_nodes = [] - for node in self.app.iter_nodes(object_ring, partition): + for node in self.app.iter_nodes(object_ring, partition, + self.logger): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 7) self.assertEqual( @@ -4987,7 +5005,7 @@ class TestReplicatedObjectController( # two error-limited primary nodes -> two handoff warnings self.app.log_handoffs = True - self.app.logger = debug_logger() + self.app.logger.clear() # clean capture state self.app.request_node_count = lambda r: 7 self.app._error_limiting = {} # clear out errors for i in range(2): @@ -4995,7 +5013,8 @@ class TestReplicatedObjectController( last_error=(2 ** 63 - 1)) collected_nodes = [] - for node in self.app.iter_nodes(object_ring, partition): + for node in self.app.iter_nodes(object_ring, partition, + self.logger): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 7) self.assertEqual( @@ -5010,7 +5029,7 @@ class TestReplicatedObjectController( # all error-limited primary nodes -> four handoff warnings, # plus a handoff-all metric self.app.log_handoffs = True - self.app.logger = debug_logger() + self.app.logger.clear() # clean capture state self.app.request_node_count = lambda r: 10 object_ring.set_replicas(4) # otherwise we run out of handoffs self.app._error_limiting = {} # clear out errors @@ -5019,7 +5038,8 @@ class TestReplicatedObjectController( last_error=(2 ** 63 - 1)) collected_nodes = [] - for node in self.app.iter_nodes(object_ring, partition): + for node in self.app.iter_nodes(object_ring, partition, + self.logger): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 10) self.assertEqual( @@ -5050,7 +5070,7 @@ class TestReplicatedObjectController( with mock.patch.object(self.app, 'sort_nodes', side_effect=fake_sort_nodes): object_ring = self.app.get_object_ring(None) - for node in self.app.iter_nodes(object_ring, 0): + for node in self.app.iter_nodes(object_ring, 0, self.logger): pass self.assertEqual(called, [ mock.call(object_ring.get_part_nodes(0), policy=None) @@ -5060,12 +5080,15 @@ class TestReplicatedObjectController( with mock.patch.object(self.app, 'sort_nodes', lambda n, *args, **kwargs: n): object_ring = self.app.get_object_ring(None) - first_nodes = list(self.app.iter_nodes(object_ring, 0)) - second_nodes = list(self.app.iter_nodes(object_ring, 0)) + first_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger)) + second_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger)) self.assertIn(first_nodes[0], second_nodes) self.app.error_limit(first_nodes[0], 'test') - second_nodes = list(self.app.iter_nodes(object_ring, 0)) + second_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger)) self.assertNotIn(first_nodes[0], second_nodes) def test_iter_nodes_gives_extra_if_error_limited_inline(self): @@ -5075,9 +5098,10 @@ class TestReplicatedObjectController( mock.patch.object(self.app, 'request_node_count', lambda r: 6), \ mock.patch.object(object_ring, 'max_more_nodes', 99): - first_nodes = list(self.app.iter_nodes(object_ring, 0)) + first_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger)) second_nodes = [] - for node in self.app.iter_nodes(object_ring, 0): + for node in self.app.iter_nodes(object_ring, 0, self.logger): if not second_nodes: self.app.error_limit(node, 'test') second_nodes.append(node) @@ -5092,7 +5116,7 @@ class TestReplicatedObjectController( lambda n, *args, **kwargs: n), \ mock.patch.object(self.app, 'request_node_count', lambda r: 3): - got_nodes = list(self.app.iter_nodes(object_ring, 0, + got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger, node_iter=iter(node_list))) self.assertEqual(node_list[:3], got_nodes) @@ -5100,7 +5124,7 @@ class TestReplicatedObjectController( lambda n, *args, **kwargs: n), \ mock.patch.object(self.app, 'request_node_count', lambda r: 1000000): - got_nodes = list(self.app.iter_nodes(object_ring, 0, + got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger, node_iter=iter(node_list))) self.assertEqual(node_list, got_nodes) @@ -11144,7 +11168,7 @@ class FakeObjectController(object): resp = Response(app_iter=iter(body)) return resp - def iter_nodes(self, ring, partition): + def iter_nodes(self, ring, partition, logger): for node in ring.get_part_nodes(partition): yield node for node in ring.get_more_nodes(partition):