Fix statsd prefix mutation in proxy controllers
Swift loggers encapsulate a StatsdClient that is typically initialised with a prefix, equal to the logger name (e.g. 'proxy_server'), that is prepended to metrics names. The proxy server would previously mutate its logger's prefix, using its set_statsd_prefix method, each time a controller was instantiated, extending it with the controller type (e.g. changing the prefix 'proxy_server.object'). As a result, when an object request spawned container subrequests, for example, the statsd client would be left with a 'proxy_server.container' prefix part for subsequent object request related metrics. The proxy server logger is now wrapped with a new MetricsPrefixLoggerAdapter each time a controller is instantiated, and the adapter applies the correct prefix for the controller type for the lifetime of the controller. Change-Id: I0522b1953722ca96021a0002cf93432b973ce626
This commit is contained in:
parent
8c6ccb5fd4
commit
6942b25cc1
@ -2084,28 +2084,46 @@ def timing_stats(**dec_kwargs):
|
||||
class SwiftLoggerAdapter(logging.LoggerAdapter):
|
||||
"""
|
||||
A logging.LoggerAdapter subclass that also passes through StatsD method
|
||||
calls.
|
||||
calls. Adds an optional metric_prefix to statsd metric names.
|
||||
|
||||
Like logging.LoggerAdapter, you have to subclass this and override the
|
||||
process() method to accomplish anything useful.
|
||||
"""
|
||||
def update_stats(self, *a, **kw):
|
||||
return self.logger.update_stats(*a, **kw)
|
||||
def get_metric_name(self, metric):
|
||||
# subclasses may override this method to annotate the metric name
|
||||
return metric
|
||||
|
||||
def increment(self, *a, **kw):
|
||||
return self.logger.increment(*a, **kw)
|
||||
def update_stats(self, metric, *a, **kw):
|
||||
return self.logger.update_stats(self.get_metric_name(metric), *a, **kw)
|
||||
|
||||
def decrement(self, *a, **kw):
|
||||
return self.logger.decrement(*a, **kw)
|
||||
def increment(self, metric, *a, **kw):
|
||||
return self.logger.increment(self.get_metric_name(metric), *a, **kw)
|
||||
|
||||
def timing(self, *a, **kw):
|
||||
return self.logger.timing(*a, **kw)
|
||||
def decrement(self, metric, *a, **kw):
|
||||
return self.logger.decrement(self.get_metric_name(metric), *a, **kw)
|
||||
|
||||
def timing_since(self, *a, **kw):
|
||||
return self.logger.timing_since(*a, **kw)
|
||||
def timing(self, metric, *a, **kw):
|
||||
return self.logger.timing(self.get_metric_name(metric), *a, **kw)
|
||||
|
||||
def transfer_rate(self, *a, **kw):
|
||||
return self.logger.transfer_rate(*a, **kw)
|
||||
def timing_since(self, metric, *a, **kw):
|
||||
return self.logger.timing_since(self.get_metric_name(metric), *a, **kw)
|
||||
|
||||
def transfer_rate(self, metric, *a, **kw):
|
||||
return self.logger.transfer_rate(
|
||||
self.get_metric_name(metric), *a, **kw)
|
||||
|
||||
@property
|
||||
def thread_locals(self):
|
||||
return self.logger.thread_locals
|
||||
|
||||
@thread_locals.setter
|
||||
def thread_locals(self, thread_locals):
|
||||
self.logger.thread_locals = thread_locals
|
||||
|
||||
def exception(self, msg, *a, **kw):
|
||||
# We up-call to exception() where stdlib uses error() so we can get
|
||||
# some of the traceback suppression from LogAdapter, below
|
||||
self.logger.exception(msg, *a, **kw)
|
||||
|
||||
|
||||
class PrefixLoggerAdapter(SwiftLoggerAdapter):
|
||||
@ -2119,9 +2137,7 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter):
|
||||
def exception(self, msg, *a, **kw):
|
||||
if 'prefix' in self.extra:
|
||||
msg = self.extra['prefix'] + msg
|
||||
# We up-call to exception() where stdlib uses error() so we can get
|
||||
# some of the traceback suppression from LogAdapter, below
|
||||
self.logger.exception(msg, *a, **kw)
|
||||
super(PrefixLoggerAdapter, self).exception(msg, *a, **kw)
|
||||
|
||||
def process(self, msg, kwargs):
|
||||
msg, kwargs = super(PrefixLoggerAdapter, self).process(msg, kwargs)
|
||||
@ -2130,6 +2146,26 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter):
|
||||
return (msg, kwargs)
|
||||
|
||||
|
||||
class MetricsPrefixLoggerAdapter(SwiftLoggerAdapter):
|
||||
"""
|
||||
Adds a prefix to all Statsd metrics' names.
|
||||
"""
|
||||
def __init__(self, logger, extra, metric_prefix):
|
||||
"""
|
||||
:param logger: an instance of logging.Logger
|
||||
:param extra: a dict-like object
|
||||
:param metric_prefix: A prefix that will be added to the start of each
|
||||
metric name such that the metric name is transformed to:
|
||||
``<metric_prefix>.<metric name>``. Note that the logger's
|
||||
StatsdClient also adds its configured prefix to metric names.
|
||||
"""
|
||||
super(MetricsPrefixLoggerAdapter, self).__init__(logger, extra)
|
||||
self.metric_prefix = metric_prefix
|
||||
|
||||
def get_metric_name(self, metric):
|
||||
return '%s.%s' % (self.metric_prefix, metric)
|
||||
|
||||
|
||||
# double inheritance to support property with setter
|
||||
class LogAdapter(logging.LoggerAdapter, object):
|
||||
"""
|
||||
|
@ -63,7 +63,8 @@ class AccountController(Controller):
|
||||
partition = self.app.account_ring.get_part(self.account_name)
|
||||
concurrency = self.app.account_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.account_ring, partition)
|
||||
node_iter = self.app.iter_nodes(self.app.account_ring, partition,
|
||||
self.logger)
|
||||
params = req.params
|
||||
params['format'] = 'json'
|
||||
req.params = params
|
||||
|
@ -46,7 +46,7 @@ from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
|
||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
|
||||
document_iters_to_http_response_body, ShardRange, find_shard_range, \
|
||||
cache_from_env
|
||||
cache_from_env, MetricsPrefixLoggerAdapter
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common import constraints
|
||||
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
||||
@ -956,7 +956,7 @@ class ByteCountEnforcer(object):
|
||||
class GetOrHeadHandler(object):
|
||||
def __init__(self, app, req, server_type, node_iter, partition, path,
|
||||
backend_headers, concurrency=1, policy=None,
|
||||
client_chunk_size=None, newest=None):
|
||||
client_chunk_size=None, newest=None, logger=None):
|
||||
self.app = app
|
||||
self.node_iter = node_iter
|
||||
self.server_type = server_type
|
||||
@ -964,6 +964,7 @@ class GetOrHeadHandler(object):
|
||||
self.path = path
|
||||
self.backend_headers = backend_headers
|
||||
self.client_chunk_size = client_chunk_size
|
||||
self.logger = logger or app.logger
|
||||
self.skip_bytes = 0
|
||||
self.bytes_used_from_backend = 0
|
||||
self.used_nodes = []
|
||||
@ -1312,10 +1313,10 @@ class GetOrHeadHandler(object):
|
||||
'Trying to read during GET')
|
||||
raise
|
||||
except ChunkWriteTimeout:
|
||||
self.app.logger.info(
|
||||
self.logger.info(
|
||||
'Client did not read from proxy within %ss',
|
||||
self.app.client_timeout)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
self.logger.increment('client_timeouts')
|
||||
except GeneratorExit:
|
||||
warn = True
|
||||
req_range = self.backend_headers['Range']
|
||||
@ -1327,11 +1328,11 @@ class GetOrHeadHandler(object):
|
||||
if end - begin + 1 == self.bytes_used_from_backend:
|
||||
warn = False
|
||||
if not req.environ.get('swift.non_client_disconnect') and warn:
|
||||
self.app.logger.info('Client disconnected on read of %r',
|
||||
self.path)
|
||||
self.logger.info('Client disconnected on read of %r',
|
||||
self.path)
|
||||
raise
|
||||
except Exception:
|
||||
self.app.logger.exception('Trying to send to client')
|
||||
self.logger.exception('Trying to send to client')
|
||||
raise
|
||||
finally:
|
||||
# Close-out the connection as best as possible.
|
||||
@ -1353,7 +1354,7 @@ class GetOrHeadHandler(object):
|
||||
return None
|
||||
|
||||
def _make_node_request(self, node, node_timeout, logger_thread_locals):
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
self.logger.thread_locals = logger_thread_locals
|
||||
if node in self.used_nodes:
|
||||
return False
|
||||
req_headers = dict(self.backend_headers)
|
||||
@ -1473,7 +1474,7 @@ class GetOrHeadHandler(object):
|
||||
|
||||
for node in nodes:
|
||||
pile.spawn(self._make_node_request, node, node_timeout,
|
||||
self.app.logger.thread_locals)
|
||||
self.logger.thread_locals)
|
||||
_timeout = self.app.get_policy_options(
|
||||
self.policy).concurrency_timeout \
|
||||
if pile.inflight < self.concurrency else None
|
||||
@ -1543,7 +1544,7 @@ class GetOrHeadHandler(object):
|
||||
|
||||
return document_iters_to_http_response_body(
|
||||
(add_content_type(pi) for pi in parts_iter),
|
||||
boundary, is_multipart, self.app.logger)
|
||||
boundary, is_multipart, self.logger)
|
||||
|
||||
def get_working_response(self, req):
|
||||
source, node = self._get_source_and_node()
|
||||
@ -1589,9 +1590,11 @@ class NodeIter(object):
|
||||
want to filter or reorder the nodes.
|
||||
:param policy: an instance of :class:`BaseStoragePolicy`. This should be
|
||||
None for an account or container ring.
|
||||
:param logger: a logger instance; defaults to the app logger
|
||||
"""
|
||||
|
||||
def __init__(self, app, ring, partition, node_iter=None, policy=None):
|
||||
def __init__(self, app, ring, partition, node_iter=None, policy=None,
|
||||
logger=None):
|
||||
self.app = app
|
||||
self.ring = ring
|
||||
self.partition = partition
|
||||
@ -1611,6 +1614,7 @@ class NodeIter(object):
|
||||
policy=policy)
|
||||
self.handoff_iter = node_iter
|
||||
self._node_provider = None
|
||||
self.logger = logger or self.app.logger
|
||||
|
||||
@property
|
||||
def primaries_left(self):
|
||||
@ -1636,12 +1640,12 @@ class NodeIter(object):
|
||||
return
|
||||
extra_handoffs = handoffs - self.expected_handoffs
|
||||
if extra_handoffs > 0:
|
||||
self.app.logger.increment('handoff_count')
|
||||
self.app.logger.warning(
|
||||
self.logger.increment('handoff_count')
|
||||
self.logger.warning(
|
||||
'Handoff requested (%d)' % handoffs)
|
||||
if (extra_handoffs == self.num_primary_nodes):
|
||||
# all the primaries were skipped, and handoffs didn't help
|
||||
self.app.logger.increment('handoff_all_count')
|
||||
self.logger.increment('handoff_all_count')
|
||||
|
||||
def set_node_provider(self, callback):
|
||||
"""
|
||||
@ -1704,6 +1708,9 @@ class Controller(object):
|
||||
self.trans_id = '-'
|
||||
self._allowed_methods = None
|
||||
self._private_methods = None
|
||||
# adapt the app logger to prefix statsd metrics with the server type
|
||||
self.logger = MetricsPrefixLoggerAdapter(
|
||||
self.app.logger, {}, self.server_type.lower())
|
||||
|
||||
@property
|
||||
def allowed_methods(self):
|
||||
@ -1857,11 +1864,11 @@ class Controller(object):
|
||||
:param body: byte string to use as the request body.
|
||||
Try to keep it small.
|
||||
:param logger_thread_locals: The thread local values to be set on the
|
||||
self.app.logger to retain transaction
|
||||
self.logger to retain transaction
|
||||
logging information.
|
||||
:returns: a swob.Response object, or None if no responses were received
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
self.logger.thread_locals = logger_thread_locals
|
||||
if body:
|
||||
if not isinstance(body, bytes):
|
||||
raise TypeError('body must be bytes, not %s' % type(body))
|
||||
@ -1926,14 +1933,14 @@ class Controller(object):
|
||||
:returns: a swob.Response object
|
||||
"""
|
||||
nodes = GreenthreadSafeIterator(
|
||||
node_iterator or self.app.iter_nodes(ring, part)
|
||||
node_iterator or self.app.iter_nodes(ring, part, self.logger)
|
||||
)
|
||||
node_number = node_count or len(ring.get_part_nodes(part))
|
||||
pile = GreenAsyncPile(node_number)
|
||||
|
||||
for head in headers:
|
||||
pile.spawn(self._make_request, nodes, part, method, path,
|
||||
head, query_string, body, self.app.logger.thread_locals)
|
||||
head, query_string, body, self.logger.thread_locals)
|
||||
response = []
|
||||
statuses = []
|
||||
for resp in pile:
|
||||
@ -2026,8 +2033,8 @@ class Controller(object):
|
||||
|
||||
if not resp:
|
||||
resp = HTTPServiceUnavailable(request=req)
|
||||
self.app.logger.error('%(type)s returning 503 for %(statuses)s',
|
||||
{'type': server_type, 'statuses': statuses})
|
||||
self.logger.error('%(type)s returning 503 for %(statuses)s',
|
||||
{'type': server_type, 'statuses': statuses})
|
||||
|
||||
return resp
|
||||
|
||||
@ -2100,12 +2107,11 @@ class Controller(object):
|
||||
self.app.account_ring, partition, 'PUT',
|
||||
path, [headers] * len(nodes))
|
||||
if is_success(resp.status_int):
|
||||
self.app.logger.info('autocreate account %r', path)
|
||||
self.logger.info('autocreate account %r', path)
|
||||
clear_info_cache(self.app, req.environ, account)
|
||||
return True
|
||||
else:
|
||||
self.app.logger.warning('Could not autocreate account %r',
|
||||
path)
|
||||
self.logger.warning('Could not autocreate account %r', path)
|
||||
return False
|
||||
|
||||
def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
|
||||
@ -2129,7 +2135,8 @@ class Controller(object):
|
||||
handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
|
||||
partition, path, backend_headers,
|
||||
concurrency, policy=policy,
|
||||
client_chunk_size=client_chunk_size)
|
||||
client_chunk_size=client_chunk_size,
|
||||
logger=self.logger)
|
||||
res = handler.get_working_response(req)
|
||||
|
||||
if not res:
|
||||
@ -2148,7 +2155,7 @@ class Controller(object):
|
||||
if policy:
|
||||
res.headers['X-Storage-Policy'] = policy.name
|
||||
else:
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
'Could not translate %s (%r) from %r to policy',
|
||||
'X-Backend-Storage-Policy-Index',
|
||||
res.headers['X-Backend-Storage-Policy-Index'], path)
|
||||
@ -2260,7 +2267,7 @@ class Controller(object):
|
||||
|
||||
def _parse_listing_response(self, req, response):
|
||||
if not is_success(response.status_int):
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Failed to get container listing from %s: %s',
|
||||
req.path_qs, response.status_int)
|
||||
return None
|
||||
@ -2271,7 +2278,7 @@ class Controller(object):
|
||||
raise ValueError('not a list')
|
||||
return data
|
||||
except ValueError as err:
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
'Problem with listing response from %s: %r',
|
||||
req.path_qs, err)
|
||||
return None
|
||||
@ -2300,7 +2307,7 @@ class Controller(object):
|
||||
if headers:
|
||||
subreq.headers.update(headers)
|
||||
subreq.params = params
|
||||
self.app.logger.debug(
|
||||
self.logger.debug(
|
||||
'Get listing from %s %s' % (subreq.path_qs, headers))
|
||||
response = self.app.handle_request(subreq)
|
||||
data = self._parse_listing_response(req, response)
|
||||
@ -2313,15 +2320,15 @@ class Controller(object):
|
||||
record_type = response.headers.get('x-backend-record-type')
|
||||
if record_type != 'shard':
|
||||
err = 'unexpected record type %r' % record_type
|
||||
self.app.logger.error("Failed to get shard ranges from %s: %s",
|
||||
req.path_qs, err)
|
||||
self.logger.error("Failed to get shard ranges from %s: %s",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
|
||||
try:
|
||||
return [ShardRange.from_dict(shard_range)
|
||||
for shard_range in listing]
|
||||
except (ValueError, TypeError, KeyError) as err:
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
"Failed to get shard ranges from %s: invalid data: %r",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
@ -2373,7 +2380,7 @@ class Controller(object):
|
||||
# caching is disabled; fall back to old behavior
|
||||
shard_ranges, response = self._get_shard_ranges(
|
||||
req, account, container, states='updating', includes=obj)
|
||||
self.app.logger.increment(
|
||||
self.logger.increment(
|
||||
'shard_updating.backend.%s' % response.status_int)
|
||||
if not shard_ranges:
|
||||
return None
|
||||
@ -2388,10 +2395,10 @@ class Controller(object):
|
||||
skip_chance = \
|
||||
self.app.container_updating_shard_ranges_skip_cache
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.app.logger.increment('shard_updating.cache.skip')
|
||||
self.logger.increment('shard_updating.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_updating.cache.%s' % (
|
||||
self.logger.increment('shard_updating.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
|
||||
if cached_ranges:
|
||||
@ -2401,7 +2408,7 @@ class Controller(object):
|
||||
else:
|
||||
shard_ranges, response = self._get_shard_ranges(
|
||||
req, account, container, states='updating')
|
||||
self.app.logger.increment(
|
||||
self.logger.increment(
|
||||
'shard_updating.backend.%s' % response.status_int)
|
||||
if shard_ranges:
|
||||
cached_ranges = [dict(sr) for sr in shard_ranges]
|
||||
|
@ -101,7 +101,8 @@ class ContainerController(Controller):
|
||||
self.account_name, self.container_name)
|
||||
concurrency = self.app.container_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part)
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part,
|
||||
self.logger)
|
||||
resp = self.GETorHEAD_base(
|
||||
req, 'Container', node_iter, part,
|
||||
req.swift_entity_path, concurrency)
|
||||
@ -131,7 +132,7 @@ class ContainerController(Controller):
|
||||
# X-Newest is true then we always fetch from the backend servers.
|
||||
get_newest = config_true_value(req.headers.get('x-newest', False))
|
||||
if get_newest:
|
||||
self.app.logger.debug(
|
||||
self.logger.debug(
|
||||
'Skipping shard cache lookup (x-newest) for %s', req.path_qs)
|
||||
info = None
|
||||
else:
|
||||
@ -154,17 +155,17 @@ class ContainerController(Controller):
|
||||
skip_chance = \
|
||||
self.app.container_listing_shard_ranges_skip_cache
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.app.logger.increment('shard_listing.cache.skip')
|
||||
self.logger.increment('shard_listing.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_listing.cache.%s' % (
|
||||
self.logger.increment('shard_listing.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
|
||||
if cached_ranges is not None:
|
||||
infocache[cache_key] = tuple(cached_ranges)
|
||||
# shard ranges can be returned from cache
|
||||
self.app.logger.debug('Found %d shards in cache for %s',
|
||||
len(cached_ranges), req.path_qs)
|
||||
self.logger.debug('Found %d shards in cache for %s',
|
||||
len(cached_ranges), req.path_qs)
|
||||
headers.update({'x-backend-record-type': 'shard',
|
||||
'x-backend-cached-results': 'true'})
|
||||
shard_range_body = self._filter_resp_shard_ranges(
|
||||
@ -224,8 +225,8 @@ class ContainerController(Controller):
|
||||
memcache = cache_from_env(req.environ, True)
|
||||
if memcache and cached_ranges:
|
||||
# cache in memcache only if shard ranges as expected
|
||||
self.app.logger.debug('Caching %d shards for %s',
|
||||
len(cached_ranges), req.path_qs)
|
||||
self.logger.debug('Caching %d shards for %s',
|
||||
len(cached_ranges), req.path_qs)
|
||||
memcache.set(
|
||||
cache_key, cached_ranges,
|
||||
time=self.app.recheck_listing_shard_ranges)
|
||||
@ -340,8 +341,8 @@ class ContainerController(Controller):
|
||||
shard_listing_history.append((self.account_name, self.container_name))
|
||||
shard_ranges = [ShardRange.from_dict(data)
|
||||
for data in json.loads(resp.body)]
|
||||
self.app.logger.debug('GET listing from %s shards for: %s',
|
||||
len(shard_ranges), req.path_qs)
|
||||
self.logger.debug('GET listing from %s shards for: %s',
|
||||
len(shard_ranges), req.path_qs)
|
||||
if not shard_ranges:
|
||||
# can't find ranges or there was a problem getting the ranges. So
|
||||
# return what we have.
|
||||
@ -404,7 +405,7 @@ class ContainerController(Controller):
|
||||
if just_past < shard_range:
|
||||
continue
|
||||
|
||||
self.app.logger.debug(
|
||||
self.logger.debug(
|
||||
'Getting listing part %d from shard %s %s with %s',
|
||||
i, shard_range, shard_range.name, headers)
|
||||
objs, shard_resp = self._get_container_listing(
|
||||
@ -417,7 +418,7 @@ class ContainerController(Controller):
|
||||
|
||||
if objs is None:
|
||||
# give up if any non-success response from shard containers
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
'Aborting listing from shards due to bad response: %r'
|
||||
% all_resp_status)
|
||||
return HTTPServiceUnavailable(request=req)
|
||||
@ -426,12 +427,12 @@ class ContainerController(Controller):
|
||||
shard_resp.headers[policy_key]
|
||||
)
|
||||
if shard_policy != req.headers[policy_key]:
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
'Aborting listing from shards due to bad shard policy '
|
||||
'index: %s (expected %s)',
|
||||
shard_policy, req.headers[policy_key])
|
||||
return HTTPServiceUnavailable(request=req)
|
||||
self.app.logger.debug(
|
||||
self.logger.debug(
|
||||
'Found %d objects in shard (state=%s), total = %d',
|
||||
len(objs), sharding_state, len(objs) + len(objects))
|
||||
|
||||
@ -457,7 +458,7 @@ class ContainerController(Controller):
|
||||
constrained = any(req.params.get(constraint) for constraint in (
|
||||
'marker', 'end_marker', 'path', 'prefix', 'delimiter'))
|
||||
if not constrained and len(objects) < req_limit:
|
||||
self.app.logger.debug('Setting object count to %s' % len(objects))
|
||||
self.logger.debug('Setting object count to %s' % len(objects))
|
||||
# prefer the actual listing stats over the potentially outdated
|
||||
# root stats. This condition is only likely when a sharded
|
||||
# container is shrinking or in tests; typically a sharded container
|
||||
|
@ -196,7 +196,8 @@ class BaseObjectController(Controller):
|
||||
policy_options = self.app.get_policy_options(policy)
|
||||
is_local = policy_options.write_affinity_is_local_fn
|
||||
if is_local is None:
|
||||
return self.app.iter_nodes(ring, partition, policy=policy)
|
||||
return self.app.iter_nodes(ring, partition, self.logger,
|
||||
policy=policy)
|
||||
|
||||
primary_nodes = ring.get_part_nodes(partition)
|
||||
handoff_nodes = ring.get_more_nodes(partition)
|
||||
@ -229,8 +230,8 @@ class BaseObjectController(Controller):
|
||||
(node for node in all_nodes if node not in preferred_nodes)
|
||||
)
|
||||
|
||||
return self.app.iter_nodes(ring, partition, node_iter=node_iter,
|
||||
policy=policy)
|
||||
return self.app.iter_nodes(ring, partition, self.logger,
|
||||
node_iter=node_iter, policy=policy)
|
||||
|
||||
def GETorHEAD(self, req):
|
||||
"""Handle HTTP GET or HEAD requests."""
|
||||
@ -249,7 +250,8 @@ class BaseObjectController(Controller):
|
||||
return aresp
|
||||
partition = obj_ring.get_part(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
node_iter = self.app.iter_nodes(obj_ring, partition, policy=policy)
|
||||
node_iter = self.app.iter_nodes(obj_ring, partition, self.logger,
|
||||
policy=policy)
|
||||
|
||||
resp = self._get_or_head_response(req, node_iter, partition, policy)
|
||||
|
||||
@ -409,7 +411,7 @@ class BaseObjectController(Controller):
|
||||
|
||||
def _get_conn_response(self, putter, path, logger_thread_locals,
|
||||
final_phase, **kwargs):
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
self.logger.thread_locals = logger_thread_locals
|
||||
try:
|
||||
resp = putter.await_response(
|
||||
self.app.node_timeout, not final_phase)
|
||||
@ -466,7 +468,7 @@ class BaseObjectController(Controller):
|
||||
if putter.failed:
|
||||
continue
|
||||
pile.spawn(self._get_conn_response, putter, req.path,
|
||||
self.app.logger.thread_locals, final_phase=final_phase)
|
||||
self.logger.thread_locals, final_phase=final_phase)
|
||||
|
||||
def _handle_response(putter, response):
|
||||
statuses.append(response.status)
|
||||
@ -563,7 +565,7 @@ class BaseObjectController(Controller):
|
||||
putter.resp.status for putter in putters if putter.resp]
|
||||
if HTTP_PRECONDITION_FAILED in statuses:
|
||||
# If we find any copy of the file, it shouldn't be uploaded
|
||||
self.app.logger.debug(
|
||||
self.logger.debug(
|
||||
'Object PUT returning 412, %(statuses)r',
|
||||
{'statuses': statuses})
|
||||
raise HTTPPreconditionFailed(request=req)
|
||||
@ -576,7 +578,7 @@ class BaseObjectController(Controller):
|
||||
putter.resp.getheaders()).get(
|
||||
'X-Backend-Timestamp', 'unknown')
|
||||
} for putter in putters if putter.resp]
|
||||
self.app.logger.debug(
|
||||
self.logger.debug(
|
||||
'Object PUT returning 202 for 409: '
|
||||
'%(req_timestamp)s <= %(timestamps)r',
|
||||
{'req_timestamp': req.timestamp.internal,
|
||||
@ -614,11 +616,11 @@ class BaseObjectController(Controller):
|
||||
:param req: a swob Request
|
||||
:param headers: request headers
|
||||
:param logger_thread_locals: The thread local values to be set on the
|
||||
self.app.logger to retain transaction
|
||||
self.logger to retain transaction
|
||||
logging information.
|
||||
:return: an instance of a Putter
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
self.logger.thread_locals = logger_thread_locals
|
||||
for node in nodes:
|
||||
try:
|
||||
putter = self._make_putter(node, part, req, headers)
|
||||
@ -653,7 +655,7 @@ class BaseObjectController(Controller):
|
||||
del nheaders['Content-Length']
|
||||
nheaders['Expect'] = '100-continue'
|
||||
pile.spawn(self._connect_put_node, node_iter, partition,
|
||||
req, nheaders, self.app.logger.thread_locals)
|
||||
req, nheaders, self.logger.thread_locals)
|
||||
|
||||
putters = [putter for putter in pile if putter]
|
||||
|
||||
@ -664,8 +666,7 @@ class BaseObjectController(Controller):
|
||||
'required connections')
|
||||
|
||||
if len(putters) < min_conns:
|
||||
self.app.logger.error((msg),
|
||||
{'conns': len(putters), 'nodes': min_conns})
|
||||
self.logger.error(msg, {'conns': len(putters), 'nodes': min_conns})
|
||||
raise HTTPServiceUnavailable(request=req)
|
||||
|
||||
def _get_footers(self, req):
|
||||
@ -874,7 +875,7 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
node_timeout=self.app.node_timeout,
|
||||
write_timeout=self.app.node_timeout,
|
||||
send_exception_handler=self.app.exception_occurred,
|
||||
logger=self.app.logger,
|
||||
logger=self.logger,
|
||||
need_multiphase=False)
|
||||
else:
|
||||
te = ',' + headers.get('Transfer-Encoding', '')
|
||||
@ -884,7 +885,7 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
node_timeout=self.app.node_timeout,
|
||||
write_timeout=self.app.node_timeout,
|
||||
send_exception_handler=self.app.exception_occurred,
|
||||
logger=self.app.logger,
|
||||
logger=self.logger,
|
||||
chunked=te.endswith(',chunked'))
|
||||
return putter
|
||||
|
||||
@ -927,9 +928,9 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
|
||||
ml = req.message_length()
|
||||
if ml and bytes_transferred < ml:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending enough data')
|
||||
self.app.logger.increment('client_disconnects')
|
||||
self.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
trail_md = self._get_footers(req)
|
||||
@ -942,23 +943,23 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
msg='Object PUT exceptions after last send, '
|
||||
'%(conns)s/%(nodes)s required connections')
|
||||
except ChunkReadTimeout as err:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'ERROR Client read timeout (%ss)', err.seconds)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
self.logger.increment('client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except ChunkReadError:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending last chunk')
|
||||
self.app.logger.increment('client_disconnects')
|
||||
self.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except Timeout:
|
||||
self.app.logger.exception(
|
||||
self.logger.exception(
|
||||
'ERROR Exception causing client disconnect')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except Exception:
|
||||
self.app.logger.exception(
|
||||
self.logger.exception(
|
||||
'ERROR Exception transferring data to object servers %s',
|
||||
{'path': req.path})
|
||||
raise HTTPInternalServerError(request=req)
|
||||
@ -1002,7 +1003,7 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
putter.close()
|
||||
|
||||
if len(etags) > 1:
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
'Object servers returned %s mismatched etags', len(etags))
|
||||
return HTTPServerError(request=req)
|
||||
etag = etags.pop() if len(etags) else None
|
||||
@ -2376,7 +2377,8 @@ def is_good_source(status):
|
||||
class ECFragGetter(object):
|
||||
|
||||
def __init__(self, app, req, node_iter, partition, policy, path,
|
||||
backend_headers, header_provider, logger_thread_locals):
|
||||
backend_headers, header_provider, logger_thread_locals,
|
||||
logger):
|
||||
self.app = app
|
||||
self.req = req
|
||||
self.node_iter = node_iter
|
||||
@ -2390,6 +2392,7 @@ class ECFragGetter(object):
|
||||
self.bytes_used_from_backend = 0
|
||||
self.source = None
|
||||
self.logger_thread_locals = logger_thread_locals
|
||||
self.logger = logger
|
||||
|
||||
def fast_forward(self, num_bytes):
|
||||
"""
|
||||
@ -2574,7 +2577,7 @@ class ECFragGetter(object):
|
||||
try:
|
||||
self.fast_forward(self.bytes_used_from_backend)
|
||||
except (HTTPException, ValueError):
|
||||
self.app.logger.exception('Unable to fast forward')
|
||||
self.logger.exception('Unable to fast forward')
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
@ -2701,10 +2704,10 @@ class ECFragGetter(object):
|
||||
'Trying to read during GET')
|
||||
raise
|
||||
except ChunkWriteTimeout:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Client did not read from proxy within %ss' %
|
||||
self.app.client_timeout)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
self.logger.increment('client_timeouts')
|
||||
except GeneratorExit:
|
||||
warn = True
|
||||
req_range = self.backend_headers['Range']
|
||||
@ -2716,11 +2719,11 @@ class ECFragGetter(object):
|
||||
if end - begin + 1 == self.bytes_used_from_backend:
|
||||
warn = False
|
||||
if not req.environ.get('swift.non_client_disconnect') and warn:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Client disconnected on read of EC frag %r', self.path)
|
||||
raise
|
||||
except Exception:
|
||||
self.app.logger.exception('Trying to send to client')
|
||||
self.logger.exception('Trying to send to client')
|
||||
raise
|
||||
finally:
|
||||
# Close-out the connection as best as possible.
|
||||
@ -2739,7 +2742,7 @@ class ECFragGetter(object):
|
||||
return HeaderKeyDict()
|
||||
|
||||
def _make_node_request(self, node, node_timeout):
|
||||
self.app.logger.thread_locals = self.logger_thread_locals
|
||||
self.logger.thread_locals = self.logger_thread_locals
|
||||
req_headers = dict(self.backend_headers)
|
||||
ip, port = get_ip_port(node, req_headers)
|
||||
req_headers.update(self.header_provider())
|
||||
@ -2774,8 +2777,8 @@ class ECFragGetter(object):
|
||||
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
|
||||
# throw out 5XX and 404s from handoff nodes unless the data is
|
||||
# really on disk and had been DELETEd
|
||||
self.app.logger.debug('Ignoring %s from handoff' %
|
||||
possible_source.status)
|
||||
self.logger.debug('Ignoring %s from handoff' %
|
||||
possible_source.status)
|
||||
conn.close()
|
||||
return None
|
||||
|
||||
@ -2798,7 +2801,7 @@ class ECFragGetter(object):
|
||||
{'status': possible_source.status,
|
||||
'body': self.body[:1024]})
|
||||
else:
|
||||
self.app.logger.debug(
|
||||
self.logger.debug(
|
||||
'Ignoring %s from primary' % possible_source.status)
|
||||
|
||||
return None
|
||||
@ -2830,7 +2833,7 @@ class ECFragGetter(object):
|
||||
# _make_node_request only returns good sources
|
||||
continue
|
||||
if source.getheader('X-Object-Sysmeta-EC-ETag') != used_etag:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Skipping source (etag mismatch: got %s, expected %s)',
|
||||
source.getheader('X-Object-Sysmeta-EC-ETag'), used_etag)
|
||||
else:
|
||||
@ -2846,13 +2849,14 @@ class ECObjectController(BaseObjectController):
|
||||
"""
|
||||
Makes a GET request for a fragment.
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
self.logger.thread_locals = logger_thread_locals
|
||||
backend_headers = self.generate_request_headers(
|
||||
req, additional=req.headers)
|
||||
|
||||
getter = ECFragGetter(self.app, req, node_iter, partition,
|
||||
policy, req.swift_entity_path, backend_headers,
|
||||
header_provider, logger_thread_locals)
|
||||
header_provider, logger_thread_locals,
|
||||
self.logger)
|
||||
return (getter, getter.response_parts_iter(req))
|
||||
|
||||
def _convert_range(self, req, policy):
|
||||
@ -2965,14 +2969,14 @@ class ECObjectController(BaseObjectController):
|
||||
pile.spawn(self._fragment_GET_request,
|
||||
req, safe_iter, partition,
|
||||
policy, buckets.get_extra_headers,
|
||||
self.app.logger.thread_locals)
|
||||
self.logger.thread_locals)
|
||||
|
||||
feeder_q = None
|
||||
if policy_options.concurrent_gets:
|
||||
feeder_q = Queue()
|
||||
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
|
||||
partition, policy, buckets, feeder_q,
|
||||
self.app.logger.thread_locals)
|
||||
self.logger.thread_locals)
|
||||
|
||||
extra_requests = 0
|
||||
# max_extra_requests is an arbitrary hard limit for spawning extra
|
||||
@ -2987,7 +2991,7 @@ class ECObjectController(BaseObjectController):
|
||||
try:
|
||||
buckets.add_response(get, parts_iter)
|
||||
except ValueError as err:
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
"Problem with fragment response: %s", err)
|
||||
best_bucket = buckets.best_bucket
|
||||
if best_bucket.durable and best_bucket.shortfall <= 0:
|
||||
@ -3001,7 +3005,7 @@ class ECObjectController(BaseObjectController):
|
||||
extra_requests += 1
|
||||
pile.spawn(self._fragment_GET_request, req, safe_iter,
|
||||
partition, policy, buckets.get_extra_headers,
|
||||
self.app.logger.thread_locals)
|
||||
self.logger.thread_locals)
|
||||
if feeder_q:
|
||||
feeder_q.put('stop')
|
||||
|
||||
@ -3024,7 +3028,7 @@ class ECObjectController(BaseObjectController):
|
||||
policy,
|
||||
[p_iter for _getter, p_iter in best_bucket.get_responses()],
|
||||
range_specs, fa_length, obj_length,
|
||||
self.app.logger)
|
||||
self.logger)
|
||||
resp = Response(
|
||||
request=req,
|
||||
conditional_response=True,
|
||||
@ -3146,7 +3150,7 @@ class ECObjectController(BaseObjectController):
|
||||
node_timeout=self.app.node_timeout,
|
||||
write_timeout=self.app.node_timeout,
|
||||
send_exception_handler=self.app.exception_occurred,
|
||||
logger=self.app.logger,
|
||||
logger=self.logger,
|
||||
need_multiphase=True)
|
||||
|
||||
def _determine_chunk_destinations(self, putters, policy):
|
||||
@ -3285,9 +3289,9 @@ class ECObjectController(BaseObjectController):
|
||||
|
||||
ml = req.message_length()
|
||||
if ml and bytes_transferred < ml:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending enough data')
|
||||
self.app.logger.increment('client_disconnects')
|
||||
self.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
send_chunk(b'') # flush out any buffered data
|
||||
@ -3327,7 +3331,7 @@ class ECObjectController(BaseObjectController):
|
||||
min_responses=min_conns)
|
||||
if not self.have_quorum(
|
||||
statuses, len(nodes), quorum=min_conns):
|
||||
self.app.logger.error(
|
||||
self.logger.error(
|
||||
'Not enough object servers ack\'ed (got %d)',
|
||||
statuses.count(HTTP_CONTINUE))
|
||||
raise HTTPServiceUnavailable(request=req)
|
||||
@ -3355,23 +3359,23 @@ class ECObjectController(BaseObjectController):
|
||||
for putter in putters:
|
||||
putter.send_commit_confirmation()
|
||||
except ChunkReadTimeout as err:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'ERROR Client read timeout (%ss)', err.seconds)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
self.logger.increment('client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except ChunkReadError:
|
||||
self.app.logger.warning(
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending last chunk')
|
||||
self.app.logger.increment('client_disconnects')
|
||||
self.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Timeout:
|
||||
self.app.logger.exception(
|
||||
self.logger.exception(
|
||||
'ERROR Exception causing client disconnect')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except Exception:
|
||||
self.app.logger.exception(
|
||||
self.logger.exception(
|
||||
'ERROR Exception transferring data to object servers %s',
|
||||
{'path': req.path})
|
||||
raise HTTPInternalServerError(request=req)
|
||||
|
@ -198,6 +198,7 @@ class Application(object):
|
||||
self.logger = get_logger(conf, log_route='proxy-server')
|
||||
else:
|
||||
self.logger = logger
|
||||
self.logger.set_statsd_prefix('proxy-server')
|
||||
self._error_limiting = {}
|
||||
|
||||
swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
@ -514,7 +515,6 @@ class Application(object):
|
||||
:param req: swob.Request object
|
||||
"""
|
||||
try:
|
||||
self.logger.set_statsd_prefix('proxy-server')
|
||||
if req.content_length and req.content_length < 0:
|
||||
self.logger.increment('errors')
|
||||
return HTTPBadRequest(request=req,
|
||||
@ -546,8 +546,6 @@ class Application(object):
|
||||
req.host.split(':')[0] in self.deny_host_headers:
|
||||
return HTTPForbidden(request=req, body='Invalid host header')
|
||||
|
||||
self.logger.set_statsd_prefix('proxy-server.' +
|
||||
controller.server_type.lower())
|
||||
controller = controller(self, **path_parts)
|
||||
if 'swift.trans_id' not in req.environ:
|
||||
# if this wasn't set by an earlier middleware, set it now
|
||||
@ -702,9 +700,9 @@ class Application(object):
|
||||
'msg': msg, 'ip': node['ip'],
|
||||
'port': node['port'], 'device': node['device']})
|
||||
|
||||
def iter_nodes(self, ring, partition, node_iter=None, policy=None):
|
||||
def iter_nodes(self, ring, partition, logger, node_iter=None, policy=None):
|
||||
return NodeIter(self, ring, partition, node_iter=node_iter,
|
||||
policy=policy)
|
||||
policy=policy, logger=logger)
|
||||
|
||||
def exception_occurred(self, node, typ, additional_info,
|
||||
**kwargs):
|
||||
|
@ -34,7 +34,8 @@ class FakeApp(object):
|
||||
def __call__(self, env, start_response):
|
||||
if env.get('PATH_INFO') == '/info':
|
||||
controller = InfoController(
|
||||
app=None, version=None, expose_info=True,
|
||||
app=mock.Mock(logger=debug_logger()),
|
||||
version=None, expose_info=True,
|
||||
disallowed_sections=[], admin_key=None)
|
||||
handler = getattr(controller, env.get('REQUEST_METHOD'))
|
||||
return handler(swob.Request(env))(env, start_response)
|
||||
|
@ -6344,6 +6344,71 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
|
||||
self.assertEqual(called, [12345])
|
||||
|
||||
|
||||
class TestSwiftLoggerAdapter(unittest.TestCase):
|
||||
@reset_logger_state
|
||||
def test_thread_locals(self):
|
||||
logger = utils.get_logger({}, 'foo')
|
||||
adapter1 = utils.SwiftLoggerAdapter(logger, {})
|
||||
adapter2 = utils.SwiftLoggerAdapter(logger, {})
|
||||
locals1 = ('tx_123', '1.2.3.4')
|
||||
adapter1.thread_locals = locals1
|
||||
self.assertEqual(adapter1.thread_locals, locals1)
|
||||
self.assertEqual(adapter2.thread_locals, locals1)
|
||||
self.assertEqual(logger.thread_locals, locals1)
|
||||
|
||||
locals2 = ('tx_456', '1.2.3.456')
|
||||
logger.thread_locals = locals2
|
||||
self.assertEqual(adapter1.thread_locals, locals2)
|
||||
self.assertEqual(adapter2.thread_locals, locals2)
|
||||
self.assertEqual(logger.thread_locals, locals2)
|
||||
logger.thread_locals = (None, None)
|
||||
|
||||
def test_exception(self):
|
||||
# verify that the adapter routes exception calls to utils.LogAdapter
|
||||
# for special case handling
|
||||
logger = utils.get_logger({})
|
||||
adapter = utils.SwiftLoggerAdapter(logger, {})
|
||||
try:
|
||||
raise OSError(errno.ECONNREFUSED, 'oserror')
|
||||
except OSError:
|
||||
with mock.patch('logging.LoggerAdapter.error') as mocked:
|
||||
adapter.exception('Caught')
|
||||
mocked.assert_called_with('Caught: Connection refused')
|
||||
|
||||
|
||||
class TestMetricsPrefixLoggerAdapter(unittest.TestCase):
|
||||
def test_metric_prefix(self):
|
||||
logger = utils.get_logger({}, 'logger_name')
|
||||
adapter1 = utils.MetricsPrefixLoggerAdapter(logger, {}, 'one')
|
||||
adapter2 = utils.MetricsPrefixLoggerAdapter(logger, {}, 'two')
|
||||
adapter3 = utils.SwiftLoggerAdapter(logger, {})
|
||||
self.assertEqual('logger_name', logger.name)
|
||||
self.assertEqual('logger_name', adapter1.logger.name)
|
||||
self.assertEqual('logger_name', adapter2.logger.name)
|
||||
self.assertEqual('logger_name', adapter3.logger.name)
|
||||
|
||||
with mock.patch.object(logger, 'increment') as mock_increment:
|
||||
adapter1.increment('test1')
|
||||
adapter2.increment('test2')
|
||||
adapter3.increment('test3')
|
||||
logger.increment('test')
|
||||
self.assertEqual(
|
||||
[mock.call('one.test1'), mock.call('two.test2'),
|
||||
mock.call('test3'), mock.call('test')],
|
||||
mock_increment.call_args_list)
|
||||
|
||||
adapter1.metric_prefix = 'not one'
|
||||
with mock.patch.object(logger, 'increment') as mock_increment:
|
||||
adapter1.increment('test1')
|
||||
adapter2.increment('test2')
|
||||
adapter3.increment('test3')
|
||||
logger.increment('test')
|
||||
self.assertEqual(
|
||||
[mock.call('not one.test1'), mock.call('two.test2'),
|
||||
mock.call('test3'), mock.call('test')],
|
||||
mock_increment.call_args_list)
|
||||
|
||||
|
||||
class TestAuditLocationGenerator(unittest.TestCase):
|
||||
|
||||
def test_drive_tree_access(self):
|
||||
|
@ -2094,7 +2094,7 @@ class TestContainerController(TestRingBase):
|
||||
# container is sharded but proxy does not have that state cached;
|
||||
# expect a backend request and expect shard ranges to be cached
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
self.logger.clear()
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
@ -2130,7 +2130,7 @@ class TestContainerController(TestRingBase):
|
||||
# no shard ranges cached; expect a cache miss and write-back
|
||||
self.memcache.delete('shard-listing/a/c')
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
self.logger.clear()
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
@ -2161,12 +2161,12 @@ class TestContainerController(TestRingBase):
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.miss'])
|
||||
['container.shard_listing.cache.miss'])
|
||||
|
||||
# container is sharded and proxy does have that state cached and
|
||||
# also has shard ranges cached; expect a read from cache
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
self.logger.clear()
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
resp = req.get_response(self.app)
|
||||
@ -2184,11 +2184,11 @@ class TestContainerController(TestRingBase):
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.hit'])
|
||||
['container.shard_listing.cache.hit'])
|
||||
|
||||
# if there's a chance to skip cache, maybe we go to disk again...
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
self.logger.clear()
|
||||
self.app.container_listing_shard_ranges_skip_cache = 0.10
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
@ -2220,11 +2220,11 @@ class TestContainerController(TestRingBase):
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.skip'])
|
||||
['container.shard_listing.cache.skip'])
|
||||
|
||||
# ... or maybe we serve from cache
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
self.logger.clear()
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
with mock.patch('random.random', return_value=0.11):
|
||||
@ -2243,7 +2243,8 @@ class TestContainerController(TestRingBase):
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.hit'])
|
||||
['container.shard_listing.cache.hit'])
|
||||
|
||||
# put this back the way we found it for later subtests
|
||||
self.app.container_listing_shard_ranges_skip_cache = 0.0
|
||||
|
||||
|
@ -22,6 +22,7 @@ from swift.proxy.controllers import InfoController
|
||||
from swift.proxy.server import Application as ProxyApp
|
||||
from swift.common import utils
|
||||
from swift.common.swob import Request, HTTPException
|
||||
from test.debug_logger import debug_logger
|
||||
|
||||
|
||||
class TestInfoController(unittest.TestCase):
|
||||
@ -34,7 +35,7 @@ class TestInfoController(unittest.TestCase):
|
||||
admin_key=None):
|
||||
disallowed_sections = disallowed_sections or []
|
||||
|
||||
app = Mock(spec=ProxyApp)
|
||||
app = Mock(spec=ProxyApp, logger=debug_logger())
|
||||
return InfoController(app, None, expose_info,
|
||||
disallowed_sections, admin_key)
|
||||
|
||||
|
@ -2526,7 +2526,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
controller = self.controller_cls(
|
||||
self.app, 'a', 'c', 'o')
|
||||
safe_iter = utils.GreenthreadSafeIterator(self.app.iter_nodes(
|
||||
self.policy.object_ring, 0, policy=self.policy))
|
||||
self.policy.object_ring, 0, self.logger, policy=self.policy))
|
||||
controller._fragment_GET_request = lambda *a, **k: next(safe_iter)
|
||||
pile = utils.GreenAsyncPile(self.policy.ec_ndata)
|
||||
for i in range(self.policy.ec_ndata):
|
||||
|
@ -1335,6 +1335,13 @@ class TestProxyServer(unittest.TestCase):
|
||||
self.assertTrue('swift.valid_api_versions' in
|
||||
path_parts.get('disallowed_sections'))
|
||||
|
||||
def test_statsd_prefix(self):
|
||||
app = proxy_server.Application({}, logger=debug_logger(),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
self.assertEqual([(('proxy-server',), {})],
|
||||
app.logger.log_dict['set_statsd_prefix'])
|
||||
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', is_default=True),
|
||||
@ -2294,9 +2301,10 @@ class TestReplicatedObjectController(
|
||||
def setUp(self):
|
||||
skip_if_no_xattrs()
|
||||
_test_servers[0]._error_limiting = {} # clear out errors
|
||||
self.logger = debug_logger('proxy-ut')
|
||||
self.app = proxy_server.Application(
|
||||
None,
|
||||
logger=debug_logger('proxy-ut'),
|
||||
logger=self.logger,
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
super(TestReplicatedObjectController, self).setUp()
|
||||
@ -3955,7 +3963,7 @@ class TestReplicatedObjectController(
|
||||
self.app.recheck_updating_shard_ranges = 0
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger = debug_logger('proxy-ut') # clean capture state
|
||||
self.app.logger.clear() # clean capture state
|
||||
req = Request.blank('/v1/a/c/o', {}, method=method, body='',
|
||||
headers={'Content-Type': 'text/plain'})
|
||||
|
||||
@ -3975,8 +3983,10 @@ class TestReplicatedObjectController(
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.backend.200': 1}, stats)
|
||||
self.assertEqual({'object.shard_updating.backend.200': 1}, stats)
|
||||
backend_requests = fake_conn.requests
|
||||
# verify statsd prefix is not mutated
|
||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||
|
||||
account_request = backend_requests[0]
|
||||
self._check_request(
|
||||
@ -4035,7 +4045,7 @@ class TestReplicatedObjectController(
|
||||
self.app.recheck_updating_shard_ranges = 3600
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger = debug_logger('proxy-ut') # clean capture state
|
||||
self.app.logger.clear() # clean capture state
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o', {'swift.cache': FakeMemcache()},
|
||||
method=method, body='', headers={'Content-Type': 'text/plain'})
|
||||
@ -4064,8 +4074,10 @@ class TestReplicatedObjectController(
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.miss': 1,
|
||||
'shard_updating.backend.200': 1}, stats)
|
||||
self.assertEqual({'object.shard_updating.cache.miss': 1,
|
||||
'object.shard_updating.backend.200': 1}, stats)
|
||||
# verify statsd prefix is not mutated
|
||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||
|
||||
backend_requests = fake_conn.requests
|
||||
account_request = backend_requests[0]
|
||||
@ -4133,7 +4145,7 @@ class TestReplicatedObjectController(
|
||||
self.app.recheck_updating_shard_ranges = 3600
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger = debug_logger('proxy-ut') # clean capture state
|
||||
self.app.logger.clear() # clean capture state
|
||||
shard_ranges = [
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
|
||||
@ -4162,7 +4174,9 @@ class TestReplicatedObjectController(
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.hit': 1}, stats)
|
||||
self.assertEqual({'object.shard_updating.cache.hit': 1}, stats)
|
||||
# verify statsd prefix is not mutated
|
||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||
|
||||
backend_requests = fake_conn.requests
|
||||
account_request = backend_requests[0]
|
||||
@ -4224,7 +4238,7 @@ class TestReplicatedObjectController(
|
||||
self.app.container_updating_shard_ranges_skip_cache = 0.001
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger = debug_logger('proxy-ut') # clean capture state
|
||||
self.app.logger.clear() # clean capture state
|
||||
cached_shard_ranges = [
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_nope', utils.Timestamp.now(), '', 'l'),
|
||||
@ -4254,7 +4268,7 @@ class TestReplicatedObjectController(
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.hit': 1}, stats)
|
||||
self.assertEqual({'object.shard_updating.cache.hit': 1}, stats)
|
||||
|
||||
# cached shard ranges are still there
|
||||
cache_key = 'shard-updating/a/c'
|
||||
@ -4292,9 +4306,11 @@ class TestReplicatedObjectController(
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.skip': 1,
|
||||
'shard_updating.cache.hit': 1,
|
||||
'shard_updating.backend.200': 1}, stats)
|
||||
self.assertEqual({'object.shard_updating.cache.skip': 1,
|
||||
'object.shard_updating.cache.hit': 1,
|
||||
'object.shard_updating.backend.200': 1}, stats)
|
||||
# verify statsd prefix is not mutated
|
||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||
|
||||
backend_requests = fake_conn.requests
|
||||
container_request_shard = backend_requests[0]
|
||||
@ -4356,7 +4372,7 @@ class TestReplicatedObjectController(
|
||||
self.app.recheck_updating_shard_ranges = 0
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger = debug_logger('proxy-ut') # clean capture state
|
||||
self.app.logger.clear() # clean capture state
|
||||
req = Request.blank('/v1/a/c/o', {}, method=method, body='',
|
||||
headers={'Content-Type': 'text/plain'})
|
||||
|
||||
@ -4372,7 +4388,7 @@ class TestReplicatedObjectController(
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.backend.404': 1}, stats)
|
||||
self.assertEqual({'object.shard_updating.backend.404': 1}, stats)
|
||||
|
||||
backend_requests = fake_conn.requests
|
||||
account_request = backend_requests[0]
|
||||
@ -4936,8 +4952,8 @@ class TestReplicatedObjectController(
|
||||
'container',
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring,
|
||||
partition):
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 5)
|
||||
|
||||
@ -4947,21 +4963,22 @@ class TestReplicatedObjectController(
|
||||
'container',
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring,
|
||||
partition):
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 9)
|
||||
|
||||
# zero error-limited primary nodes -> no handoff warnings
|
||||
self.app.log_handoffs = True
|
||||
self.app.logger = debug_logger()
|
||||
self.app.logger.clear() # clean capture state
|
||||
self.app.request_node_count = lambda r: 7
|
||||
object_ring.max_more_nodes = 20
|
||||
partition, nodes = object_ring.get_nodes('account',
|
||||
'container',
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition):
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(self.app.logger.log_dict['warning'], [])
|
||||
@ -4969,14 +4986,15 @@ class TestReplicatedObjectController(
|
||||
|
||||
# one error-limited primary node -> one handoff warning
|
||||
self.app.log_handoffs = True
|
||||
self.app.logger = debug_logger()
|
||||
self.app.logger.clear() # clean capture state
|
||||
self.app.request_node_count = lambda r: 7
|
||||
self.app._error_limiting = {} # clear out errors
|
||||
set_node_errors(self.app, object_ring._devs[0], 999,
|
||||
last_error=(2 ** 63 - 1))
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition):
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(
|
||||
@ -4987,7 +5005,7 @@ class TestReplicatedObjectController(
|
||||
|
||||
# two error-limited primary nodes -> two handoff warnings
|
||||
self.app.log_handoffs = True
|
||||
self.app.logger = debug_logger()
|
||||
self.app.logger.clear() # clean capture state
|
||||
self.app.request_node_count = lambda r: 7
|
||||
self.app._error_limiting = {} # clear out errors
|
||||
for i in range(2):
|
||||
@ -4995,7 +5013,8 @@ class TestReplicatedObjectController(
|
||||
last_error=(2 ** 63 - 1))
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition):
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(
|
||||
@ -5010,7 +5029,7 @@ class TestReplicatedObjectController(
|
||||
# all error-limited primary nodes -> four handoff warnings,
|
||||
# plus a handoff-all metric
|
||||
self.app.log_handoffs = True
|
||||
self.app.logger = debug_logger()
|
||||
self.app.logger.clear() # clean capture state
|
||||
self.app.request_node_count = lambda r: 10
|
||||
object_ring.set_replicas(4) # otherwise we run out of handoffs
|
||||
self.app._error_limiting = {} # clear out errors
|
||||
@ -5019,7 +5038,8 @@ class TestReplicatedObjectController(
|
||||
last_error=(2 ** 63 - 1))
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition):
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 10)
|
||||
self.assertEqual(
|
||||
@ -5050,7 +5070,7 @@ class TestReplicatedObjectController(
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
side_effect=fake_sort_nodes):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
for node in self.app.iter_nodes(object_ring, 0):
|
||||
for node in self.app.iter_nodes(object_ring, 0, self.logger):
|
||||
pass
|
||||
self.assertEqual(called, [
|
||||
mock.call(object_ring.get_part_nodes(0), policy=None)
|
||||
@ -5060,12 +5080,15 @@ class TestReplicatedObjectController(
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
lambda n, *args, **kwargs: n):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
first_nodes = list(self.app.iter_nodes(object_ring, 0))
|
||||
second_nodes = list(self.app.iter_nodes(object_ring, 0))
|
||||
first_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
second_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
self.assertIn(first_nodes[0], second_nodes)
|
||||
|
||||
self.app.error_limit(first_nodes[0], 'test')
|
||||
second_nodes = list(self.app.iter_nodes(object_ring, 0))
|
||||
second_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
self.assertNotIn(first_nodes[0], second_nodes)
|
||||
|
||||
def test_iter_nodes_gives_extra_if_error_limited_inline(self):
|
||||
@ -5075,9 +5098,10 @@ class TestReplicatedObjectController(
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 6), \
|
||||
mock.patch.object(object_ring, 'max_more_nodes', 99):
|
||||
first_nodes = list(self.app.iter_nodes(object_ring, 0))
|
||||
first_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
second_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, 0):
|
||||
for node in self.app.iter_nodes(object_ring, 0, self.logger):
|
||||
if not second_nodes:
|
||||
self.app.error_limit(node, 'test')
|
||||
second_nodes.append(node)
|
||||
@ -5092,7 +5116,7 @@ class TestReplicatedObjectController(
|
||||
lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 3):
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0,
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(node_list[:3], got_nodes)
|
||||
|
||||
@ -5100,7 +5124,7 @@ class TestReplicatedObjectController(
|
||||
lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 1000000):
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0,
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(node_list, got_nodes)
|
||||
|
||||
@ -11144,7 +11168,7 @@ class FakeObjectController(object):
|
||||
resp = Response(app_iter=iter(body))
|
||||
return resp
|
||||
|
||||
def iter_nodes(self, ring, partition):
|
||||
def iter_nodes(self, ring, partition, logger):
|
||||
for node in ring.get_part_nodes(partition):
|
||||
yield node
|
||||
for node in ring.get_more_nodes(partition):
|
||||
|
Loading…
Reference in New Issue
Block a user