Merge "proxy: Get rid of MetricsPrefixLoggerAdapter"
This commit is contained in:
commit
d99ad8fb31
@ -1577,28 +1577,23 @@ class SwiftLoggerAdapter(logging.LoggerAdapter):
|
||||
# py3 does this for us already; add it for py2
|
||||
return self.logger.name
|
||||
|
||||
def get_metric_name(self, metric):
|
||||
# subclasses may override this method to annotate the metric name
|
||||
return metric
|
||||
def update_stats(self, *a, **kw):
|
||||
return self.logger.update_stats(*a, **kw)
|
||||
|
||||
def update_stats(self, metric, *a, **kw):
|
||||
return self.logger.update_stats(self.get_metric_name(metric), *a, **kw)
|
||||
def increment(self, *a, **kw):
|
||||
return self.logger.increment(*a, **kw)
|
||||
|
||||
def increment(self, metric, *a, **kw):
|
||||
return self.logger.increment(self.get_metric_name(metric), *a, **kw)
|
||||
def decrement(self, *a, **kw):
|
||||
return self.logger.decrement(*a, **kw)
|
||||
|
||||
def decrement(self, metric, *a, **kw):
|
||||
return self.logger.decrement(self.get_metric_name(metric), *a, **kw)
|
||||
def timing(self, *a, **kw):
|
||||
return self.logger.timing(*a, **kw)
|
||||
|
||||
def timing(self, metric, *a, **kw):
|
||||
return self.logger.timing(self.get_metric_name(metric), *a, **kw)
|
||||
def timing_since(self, *a, **kw):
|
||||
return self.logger.timing_since(*a, **kw)
|
||||
|
||||
def timing_since(self, metric, *a, **kw):
|
||||
return self.logger.timing_since(self.get_metric_name(metric), *a, **kw)
|
||||
|
||||
def transfer_rate(self, metric, *a, **kw):
|
||||
return self.logger.transfer_rate(
|
||||
self.get_metric_name(metric), *a, **kw)
|
||||
def transfer_rate(self, *a, **kw):
|
||||
return self.logger.transfer_rate(*a, **kw)
|
||||
|
||||
@property
|
||||
def thread_locals(self):
|
||||
@ -1635,27 +1630,6 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter):
|
||||
return (msg, kwargs)
|
||||
|
||||
|
||||
class MetricsPrefixLoggerAdapter(SwiftLoggerAdapter):
|
||||
"""
|
||||
Adds a prefix to all Statsd metrics' names.
|
||||
"""
|
||||
|
||||
def __init__(self, logger, extra, metric_prefix):
|
||||
"""
|
||||
:param logger: an instance of logging.Logger
|
||||
:param extra: a dict-like object
|
||||
:param metric_prefix: A prefix that will be added to the start of each
|
||||
metric name such that the metric name is transformed to:
|
||||
``<metric_prefix>.<metric name>``. Note that the logger's
|
||||
StatsdClient also adds its configured prefix to metric names.
|
||||
"""
|
||||
super(MetricsPrefixLoggerAdapter, self).__init__(logger, extra)
|
||||
self.metric_prefix = metric_prefix
|
||||
|
||||
def get_metric_name(self, metric):
|
||||
return '%s.%s' % (self.metric_prefix, metric)
|
||||
|
||||
|
||||
# double inheritance to support property with setter
|
||||
class LogAdapter(logging.LoggerAdapter, object):
|
||||
"""
|
||||
|
@ -63,7 +63,8 @@ class AccountController(Controller):
|
||||
partition = self.app.account_ring.get_part(self.account_name)
|
||||
concurrency = self.app.account_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = NodeIter(self.app, self.app.account_ring, partition,
|
||||
node_iter = NodeIter(
|
||||
'account', self.app, self.app.account_ring, partition,
|
||||
self.logger, req)
|
||||
params = req.params
|
||||
params['format'] = 'json'
|
||||
|
@ -44,7 +44,7 @@ from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
|
||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
|
||||
document_iters_to_http_response_body, ShardRange, cache_from_env, \
|
||||
MetricsPrefixLoggerAdapter, CooperativeIterator
|
||||
CooperativeIterator
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common import constraints
|
||||
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
||||
@ -423,9 +423,9 @@ def _record_ac_info_cache_metrics(
|
||||
logger = None
|
||||
else:
|
||||
logger = proxy_app.logger
|
||||
op_type = 'container.info' if container else 'account.info'
|
||||
server_type = 'container' if container else 'account'
|
||||
if logger:
|
||||
record_cache_op_metrics(logger, op_type, cache_state, resp)
|
||||
record_cache_op_metrics(logger, server_type, 'info', cache_state, resp)
|
||||
|
||||
|
||||
def get_container_info(env, app, swift_source=None, cache_only=False):
|
||||
@ -774,11 +774,12 @@ def _get_info_from_infocache(env, account, container=None):
|
||||
|
||||
|
||||
def record_cache_op_metrics(
|
||||
logger, op_type, cache_state, resp=None):
|
||||
logger, server_type, op_type, cache_state, resp=None):
|
||||
"""
|
||||
Record a single cache operation into its corresponding metrics.
|
||||
|
||||
:param logger: the metrics logger
|
||||
:param server_type: 'account' or 'container'
|
||||
:param op_type: the name of the operation type, includes 'shard_listing',
|
||||
'shard_updating', and etc.
|
||||
:param cache_state: the state of this cache operation. When it's
|
||||
@ -787,21 +788,23 @@ def record_cache_op_metrics(
|
||||
which will make to backend, expect a valid 'resp'.
|
||||
:param resp: the response from backend for all cases except cache hits.
|
||||
"""
|
||||
server_type = server_type.lower()
|
||||
if cache_state == 'infocache_hit':
|
||||
logger.increment('%s.infocache.hit' % op_type)
|
||||
logger.increment('%s.%s.infocache.hit' % (server_type, op_type))
|
||||
elif cache_state == 'hit':
|
||||
# memcache hits.
|
||||
logger.increment('%s.cache.hit' % op_type)
|
||||
logger.increment('%s.%s.cache.hit' % (server_type, op_type))
|
||||
else:
|
||||
# the cases of cache_state is memcache miss, error, skip, force_skip
|
||||
# or disabled.
|
||||
if resp:
|
||||
logger.increment(
|
||||
'%s.cache.%s.%d' % (op_type, cache_state, resp.status_int))
|
||||
logger.increment('%s.%s.cache.%s.%d' % (
|
||||
server_type, op_type, cache_state, resp.status_int))
|
||||
else:
|
||||
# In some situation, we choose not to lookup backend after cache
|
||||
# miss.
|
||||
logger.increment('%s.cache.%s' % (op_type, cache_state))
|
||||
logger.increment('%s.%s.cache.%s' % (
|
||||
server_type, op_type, cache_state))
|
||||
|
||||
|
||||
def _get_info_from_memcache(app, env, account, container=None):
|
||||
@ -1383,7 +1386,8 @@ class GetOrHeadHandler(GetterBase):
|
||||
self.logger.info(
|
||||
'Client did not read from proxy within %ss',
|
||||
self.app.client_timeout)
|
||||
self.logger.increment('client_timeouts')
|
||||
self.logger.increment('%s.client_timeouts' %
|
||||
self.server_type.lower())
|
||||
except GeneratorExit:
|
||||
warn = True
|
||||
req_range = self.backend_headers['Range']
|
||||
@ -1644,6 +1648,7 @@ class NodeIter(object):
|
||||
may not, depending on how logging is configured, the vagaries of
|
||||
socket IO and eventlet, and the phase of the moon.)
|
||||
|
||||
:param server_type: one of 'account', 'container', or 'object'
|
||||
:param app: a proxy app
|
||||
:param ring: ring to get yield nodes from
|
||||
:param partition: ring partition to yield nodes for
|
||||
@ -1656,8 +1661,9 @@ class NodeIter(object):
|
||||
None for an account or container ring.
|
||||
"""
|
||||
|
||||
def __init__(self, app, ring, partition, logger, request, node_iter=None,
|
||||
policy=None):
|
||||
def __init__(self, server_type, app, ring, partition, logger, request,
|
||||
node_iter=None, policy=None):
|
||||
self.server_type = server_type
|
||||
self.app = app
|
||||
self.ring = ring
|
||||
self.partition = partition
|
||||
@ -1704,12 +1710,14 @@ class NodeIter(object):
|
||||
return
|
||||
extra_handoffs = handoffs - self.expected_handoffs
|
||||
if extra_handoffs > 0:
|
||||
self.logger.increment('handoff_count')
|
||||
self.logger.increment('%s.handoff_count' %
|
||||
self.server_type.lower())
|
||||
self.logger.warning(
|
||||
'Handoff requested (%d)' % handoffs)
|
||||
if (extra_handoffs == self.num_primary_nodes):
|
||||
# all the primaries were skipped, and handoffs didn't help
|
||||
self.logger.increment('handoff_all_count')
|
||||
self.logger.increment('%s.handoff_all_count' %
|
||||
self.server_type.lower())
|
||||
|
||||
def set_node_provider(self, callback):
|
||||
"""
|
||||
@ -1786,9 +1794,10 @@ class Controller(object):
|
||||
self.trans_id = '-'
|
||||
self._allowed_methods = None
|
||||
self._private_methods = None
|
||||
# adapt the app logger to prefix statsd metrics with the server type
|
||||
self.logger = MetricsPrefixLoggerAdapter(
|
||||
self.app.logger, {}, self.server_type.lower())
|
||||
|
||||
@property
|
||||
def logger(self):
|
||||
return self.app.logger
|
||||
|
||||
@property
|
||||
def allowed_methods(self):
|
||||
@ -2006,9 +2015,8 @@ class Controller(object):
|
||||
:param node_iterator: optional node iterator.
|
||||
:returns: a swob.Response object
|
||||
"""
|
||||
nodes = GreenthreadSafeIterator(
|
||||
node_iterator or NodeIter(self.app, ring, part, self.logger, req)
|
||||
)
|
||||
nodes = GreenthreadSafeIterator(node_iterator or NodeIter(
|
||||
self.server_type.lower(), self.app, ring, part, self.logger, req))
|
||||
node_number = node_count or len(ring.get_part_nodes(part))
|
||||
pile = GreenAsyncPile(node_number)
|
||||
|
||||
|
@ -103,7 +103,8 @@ class ContainerController(Controller):
|
||||
self.account_name, self.container_name)
|
||||
concurrency = self.app.container_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = NodeIter(self.app, self.app.container_ring, part,
|
||||
node_iter = NodeIter(
|
||||
'container', self.app, self.app.container_ring, part,
|
||||
self.logger, req)
|
||||
resp = self.GETorHEAD_base(
|
||||
req, 'Container', node_iter, part,
|
||||
@ -327,7 +328,8 @@ class ContainerController(Controller):
|
||||
|
||||
if should_record:
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_listing', cache_state, resp)
|
||||
self.logger, self.server_type.lower(), 'shard_listing',
|
||||
cache_state, resp)
|
||||
|
||||
def _GET_using_cache(self, req, info):
|
||||
# It may be possible to fulfil the request from cache: we only reach
|
||||
|
@ -201,7 +201,8 @@ class BaseObjectController(Controller):
|
||||
policy_options = self.app.get_policy_options(policy)
|
||||
is_local = policy_options.write_affinity_is_local_fn
|
||||
if is_local is None:
|
||||
return NodeIter(self.app, ring, partition, self.logger, request,
|
||||
return NodeIter(
|
||||
'object', self.app, ring, partition, self.logger, request,
|
||||
policy=policy)
|
||||
|
||||
primary_nodes = ring.get_part_nodes(partition)
|
||||
@ -235,7 +236,8 @@ class BaseObjectController(Controller):
|
||||
(node for node in all_nodes if node not in preferred_nodes)
|
||||
)
|
||||
|
||||
return NodeIter(self.app, ring, partition, self.logger, request,
|
||||
return NodeIter(
|
||||
'object', self.app, ring, partition, self.logger, request,
|
||||
node_iter=node_iter, policy=policy)
|
||||
|
||||
def GETorHEAD(self, req):
|
||||
@ -255,7 +257,8 @@ class BaseObjectController(Controller):
|
||||
return aresp
|
||||
partition = obj_ring.get_part(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
node_iter = NodeIter(self.app, obj_ring, partition, self.logger, req,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, obj_ring, partition, self.logger, req,
|
||||
policy=policy)
|
||||
|
||||
resp = self._get_or_head_response(req, node_iter, partition, policy)
|
||||
@ -337,7 +340,8 @@ class BaseObjectController(Controller):
|
||||
shard_ranges, response = self._get_shard_ranges(
|
||||
req, account, container, states='updating', includes=obj)
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_updating', 'disabled', response)
|
||||
self.logger, self.server_type.lower(), 'shard_updating',
|
||||
'disabled', response)
|
||||
# there will be only one shard range in the list if any
|
||||
return shard_ranges[0] if shard_ranges else None
|
||||
|
||||
@ -394,7 +398,8 @@ class BaseObjectController(Controller):
|
||||
time=self.app.recheck_updating_shard_ranges)
|
||||
update_shard = find_namespace(obj, shard_ranges or [])
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_updating', cache_state, response)
|
||||
self.logger, self.server_type.lower(), 'shard_updating',
|
||||
cache_state, response)
|
||||
return update_shard
|
||||
|
||||
def _get_update_target(self, req, container_info):
|
||||
@ -1046,7 +1051,7 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
if ml and bytes_transferred < ml:
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending enough data')
|
||||
self.logger.increment('client_disconnects')
|
||||
self.logger.increment('object.client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
trail_md = self._get_footers(req)
|
||||
@ -1061,14 +1066,14 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
except ChunkReadTimeout as err:
|
||||
self.logger.warning(
|
||||
'ERROR Client read timeout (%ss)', err.seconds)
|
||||
self.logger.increment('client_timeouts')
|
||||
self.logger.increment('object.client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except ChunkReadError:
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending last chunk')
|
||||
self.logger.increment('client_disconnects')
|
||||
self.logger.increment('object.client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except Timeout:
|
||||
self.logger.exception(
|
||||
@ -2624,7 +2629,7 @@ class ECFragGetter(GetterBase):
|
||||
self.logger.warning(
|
||||
'Client did not read from proxy within %ss' %
|
||||
self.app.client_timeout)
|
||||
self.logger.increment('client_timeouts')
|
||||
self.logger.increment('object.client_timeouts')
|
||||
except GeneratorExit:
|
||||
warn = True
|
||||
req_range = self.backend_headers['Range']
|
||||
@ -3226,7 +3231,7 @@ class ECObjectController(BaseObjectController):
|
||||
if ml and bytes_transferred < ml:
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending enough data')
|
||||
self.logger.increment('client_disconnects')
|
||||
self.logger.increment('object.client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
send_chunk(b'') # flush out any buffered data
|
||||
@ -3296,12 +3301,12 @@ class ECObjectController(BaseObjectController):
|
||||
except ChunkReadTimeout as err:
|
||||
self.logger.warning(
|
||||
'ERROR Client read timeout (%ss)', err.seconds)
|
||||
self.logger.increment('client_timeouts')
|
||||
self.logger.increment('object.client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except ChunkReadError:
|
||||
self.logger.warning(
|
||||
'Client disconnected without sending last chunk')
|
||||
self.logger.increment('client_disconnects')
|
||||
self.logger.increment('object.client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
|
@ -85,7 +85,8 @@ class FakeStatsdClient(utils.StatsdClient):
|
||||
counts = defaultdict(int)
|
||||
for metric in self.get_increments():
|
||||
counts[metric] += 1
|
||||
return counts
|
||||
# convert to normal dict for better failure messages
|
||||
return dict(counts)
|
||||
|
||||
def get_update_stats(self):
|
||||
return [call[0][:2] for call in self.calls['update_stats']]
|
||||
@ -94,7 +95,8 @@ class FakeStatsdClient(utils.StatsdClient):
|
||||
counts = defaultdict(int)
|
||||
for metric, step in self.get_update_stats():
|
||||
counts[metric] += step
|
||||
return counts
|
||||
# convert to normal dict for better failure messages
|
||||
return dict(counts)
|
||||
|
||||
|
||||
class CaptureLog(object):
|
||||
|
@ -5747,57 +5747,6 @@ class TestSwiftLoggerAdapter(unittest.TestCase):
|
||||
mocked.assert_called_with('Caught: Connection refused')
|
||||
|
||||
|
||||
class TestMetricsPrefixLoggerAdapter(unittest.TestCase):
|
||||
def test_metric_prefix(self):
|
||||
logger = utils.get_logger({}, 'logger_name')
|
||||
adapter1 = utils.MetricsPrefixLoggerAdapter(logger, {}, 'one')
|
||||
adapter2 = utils.MetricsPrefixLoggerAdapter(logger, {}, 'two')
|
||||
adapter3 = utils.SwiftLoggerAdapter(logger, {})
|
||||
self.assertEqual('logger_name', logger.name)
|
||||
self.assertEqual('logger_name', adapter1.logger.name)
|
||||
self.assertEqual('logger_name', adapter2.logger.name)
|
||||
self.assertEqual('logger_name', adapter3.logger.name)
|
||||
|
||||
with mock.patch.object(logger, 'increment') as mock_increment:
|
||||
adapter1.increment('test1')
|
||||
adapter2.increment('test2')
|
||||
adapter3.increment('test3')
|
||||
logger.increment('test')
|
||||
self.assertEqual(
|
||||
[mock.call('one.test1'), mock.call('two.test2'),
|
||||
mock.call('test3'), mock.call('test')],
|
||||
mock_increment.call_args_list)
|
||||
|
||||
adapter1.metric_prefix = 'not one'
|
||||
with mock.patch.object(logger, 'increment') as mock_increment:
|
||||
adapter1.increment('test1')
|
||||
adapter2.increment('test2')
|
||||
adapter3.increment('test3')
|
||||
logger.increment('test')
|
||||
self.assertEqual(
|
||||
[mock.call('not one.test1'), mock.call('two.test2'),
|
||||
mock.call('test3'), mock.call('test')],
|
||||
mock_increment.call_args_list)
|
||||
|
||||
def test_wrapped_prefixing(self):
|
||||
logger = utils.get_logger({}, 'logger_name')
|
||||
adapter1 = utils.MetricsPrefixLoggerAdapter(logger, {}, 'one')
|
||||
adapter2 = utils.MetricsPrefixLoggerAdapter(adapter1, {}, 'two')
|
||||
self.assertEqual('logger_name', logger.name)
|
||||
self.assertEqual('logger_name', adapter1.logger.name)
|
||||
self.assertEqual('logger_name', adapter2.logger.name)
|
||||
|
||||
with mock.patch.object(logger, 'increment') as mock_increment:
|
||||
adapter1.increment('test1')
|
||||
adapter2.increment('test2')
|
||||
logger.increment('test')
|
||||
self.assertEqual(
|
||||
[mock.call('one.test1'),
|
||||
mock.call('one.two.test2'),
|
||||
mock.call('test')],
|
||||
mock_increment.call_args_list)
|
||||
|
||||
|
||||
class TestAuditLocationGenerator(unittest.TestCase):
|
||||
|
||||
def test_drive_tree_access(self):
|
||||
|
@ -682,36 +682,36 @@ class TestFuncs(BaseTest):
|
||||
|
||||
def test_record_cache_op_metrics(self):
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_listing', 'infocache_hit')
|
||||
self.logger, 'container', 'shard_listing', 'infocache_hit')
|
||||
self.assertEqual(
|
||||
self.logger.statsd_client.get_increment_counts().get(
|
||||
'shard_listing.infocache.hit'),
|
||||
'container.shard_listing.infocache.hit'),
|
||||
1)
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_listing', 'hit')
|
||||
self.logger, 'container', 'shard_listing', 'hit')
|
||||
self.assertEqual(
|
||||
self.logger.statsd_client.get_increment_counts().get(
|
||||
'shard_listing.cache.hit'),
|
||||
'container.shard_listing.cache.hit'),
|
||||
1)
|
||||
resp = FakeResponse(status_int=200)
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_updating', 'skip', resp)
|
||||
self.logger, 'object', 'shard_updating', 'skip', resp)
|
||||
self.assertEqual(
|
||||
self.logger.statsd_client.get_increment_counts().get(
|
||||
'shard_updating.cache.skip.200'),
|
||||
'object.shard_updating.cache.skip.200'),
|
||||
1)
|
||||
resp = FakeResponse(status_int=503)
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_updating', 'disabled', resp)
|
||||
self.logger, 'object', 'shard_updating', 'disabled', resp)
|
||||
self.assertEqual(
|
||||
self.logger.statsd_client.get_increment_counts().get(
|
||||
'shard_updating.cache.disabled.503'),
|
||||
'object.shard_updating.cache.disabled.503'),
|
||||
1)
|
||||
|
||||
# test a cache miss call without response, expect no metric recorded.
|
||||
self.app.logger = mock.Mock()
|
||||
record_cache_op_metrics(
|
||||
self.logger, 'shard_updating', 'miss')
|
||||
self.logger, 'object', 'shard_updating', 'miss')
|
||||
self.app.logger.increment.assert_not_called()
|
||||
|
||||
def test_get_account_info_swift_source(self):
|
||||
@ -1667,7 +1667,7 @@ class TestNodeIter(BaseTest):
|
||||
def test_iter_default_fake_ring(self):
|
||||
for ring in (self.account_ring, self.container_ring):
|
||||
self.assertEqual(ring.replica_count, 3.0)
|
||||
node_iter = NodeIter(self.app, ring, 0, self.logger,
|
||||
node_iter = NodeIter('db', self.app, ring, 0, self.logger,
|
||||
request=Request.blank(''))
|
||||
self.assertEqual(6, node_iter.nodes_left)
|
||||
self.assertEqual(3, node_iter.primaries_left)
|
||||
@ -1682,7 +1682,8 @@ class TestNodeIter(BaseTest):
|
||||
def test_iter_with_handoffs(self):
|
||||
ring = FakeRing(replicas=3, max_more_nodes=20) # handoffs available
|
||||
policy = StoragePolicy(0, 'zero', object_ring=ring)
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=Request.blank(''))
|
||||
self.assertEqual(6, node_iter.nodes_left)
|
||||
self.assertEqual(3, node_iter.primaries_left)
|
||||
@ -1706,11 +1707,13 @@ class TestNodeIter(BaseTest):
|
||||
policy = StoragePolicy(0, 'ec', object_ring=ring)
|
||||
|
||||
# sanity
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=Request.blank(''))
|
||||
self.assertEqual(16, len([n for n in node_iter]))
|
||||
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=Request.blank(''))
|
||||
self.assertEqual(16, node_iter.nodes_left)
|
||||
self.assertEqual(8, node_iter.primaries_left)
|
||||
@ -1741,14 +1744,16 @@ class TestNodeIter(BaseTest):
|
||||
ring = FakeRing(replicas=8, max_more_nodes=20)
|
||||
policy = StoragePolicy(0, 'ec', object_ring=ring)
|
||||
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=Request.blank(''))
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
self.assertFalse(node['use_replication'])
|
||||
|
||||
req = Request.blank('a/c')
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=req)
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
@ -1756,7 +1761,8 @@ class TestNodeIter(BaseTest):
|
||||
|
||||
req = Request.blank(
|
||||
'a/c', headers={'x-backend-use-replication-network': 'False'})
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=req)
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
@ -1764,7 +1770,8 @@ class TestNodeIter(BaseTest):
|
||||
|
||||
req = Request.blank(
|
||||
'a/c', headers={'x-backend-use-replication-network': 'yes'})
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=req)
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
@ -1774,7 +1781,8 @@ class TestNodeIter(BaseTest):
|
||||
ring = FakeRing(replicas=8, max_more_nodes=20)
|
||||
policy = StoragePolicy(0, 'ec', object_ring=ring)
|
||||
other_iter = ring.get_part_nodes(0)
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
node_iter = NodeIter(
|
||||
'object', self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, node_iter=iter(other_iter),
|
||||
request=Request.blank(''))
|
||||
nodes = list(node_iter)
|
||||
|
@ -2802,7 +2802,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
controller = self.controller_cls(
|
||||
self.app, 'a', 'c', 'o')
|
||||
safe_iter = utils.GreenthreadSafeIterator(NodeIter(
|
||||
self.app, self.policy.object_ring, 0, self.logger,
|
||||
'object', self.app, self.policy.object_ring, 0, self.logger,
|
||||
policy=self.policy, request=Request.blank('')))
|
||||
controller._fragment_GET_request = lambda *a, **k: next(safe_iter)
|
||||
pile = utils.GreenAsyncPile(self.policy.ec_ndata)
|
||||
|
@ -5473,8 +5473,8 @@ class TestReplicatedObjectController(
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, partition, self.logger,
|
||||
request=Request.blank('')):
|
||||
'object', self.app, object_ring, partition,
|
||||
self.logger, request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 5)
|
||||
|
||||
@ -5485,8 +5485,8 @@ class TestReplicatedObjectController(
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, partition, self.logger,
|
||||
request=Request.blank('')):
|
||||
'object', self.app, object_ring, partition,
|
||||
self.logger, request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 9)
|
||||
|
||||
@ -5500,8 +5500,8 @@ class TestReplicatedObjectController(
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, partition, self.logger,
|
||||
request=Request.blank('')):
|
||||
'object', self.app, object_ring, partition,
|
||||
self.logger, request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(self.app.logger.log_dict['warning'], [])
|
||||
@ -5518,8 +5518,8 @@ class TestReplicatedObjectController(
|
||||
|
||||
collected_nodes = []
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, partition, self.logger,
|
||||
request=Request.blank('')):
|
||||
'object', self.app, object_ring, partition,
|
||||
self.logger, request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(
|
||||
@ -5527,7 +5527,7 @@ class TestReplicatedObjectController(
|
||||
'Handoff requested (5)'])
|
||||
self.assertEqual(
|
||||
self.app.logger.statsd_client.get_increments(),
|
||||
['error_limiter.is_limited', 'handoff_count'])
|
||||
['error_limiter.is_limited', 'object.handoff_count'])
|
||||
|
||||
# two error-limited primary nodes -> two handoff warnings
|
||||
self.app.log_handoffs = True
|
||||
@ -5540,8 +5540,8 @@ class TestReplicatedObjectController(
|
||||
|
||||
collected_nodes = []
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, partition, self.logger,
|
||||
request=Request.blank('')):
|
||||
'object', self.app, object_ring, partition,
|
||||
self.logger, request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(
|
||||
@ -5551,7 +5551,7 @@ class TestReplicatedObjectController(
|
||||
])
|
||||
stats = self.app.logger.statsd_client.get_increment_counts()
|
||||
self.assertEqual(2, stats.get('error_limiter.is_limited', 0))
|
||||
self.assertEqual(2, stats.get('handoff_count', 0))
|
||||
self.assertEqual(2, stats.get('object.handoff_count', 0))
|
||||
|
||||
# all error-limited primary nodes -> four handoff warnings,
|
||||
# plus a handoff-all metric
|
||||
@ -5566,8 +5566,8 @@ class TestReplicatedObjectController(
|
||||
|
||||
collected_nodes = []
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, partition, self.logger,
|
||||
request=Request.blank('')):
|
||||
'object', self.app, object_ring, partition,
|
||||
self.logger, request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 10)
|
||||
self.assertEqual(
|
||||
@ -5579,8 +5579,8 @@ class TestReplicatedObjectController(
|
||||
])
|
||||
stats = self.app.logger.statsd_client.get_increment_counts()
|
||||
self.assertEqual(4, stats.get('error_limiter.is_limited', 0))
|
||||
self.assertEqual(4, stats.get('handoff_count', 0))
|
||||
self.assertEqual(1, stats.get('handoff_all_count', 0))
|
||||
self.assertEqual(4, stats.get('object.handoff_count', 0))
|
||||
self.assertEqual(1, stats.get('object.handoff_all_count', 0))
|
||||
|
||||
finally:
|
||||
object_ring.max_more_nodes = 0
|
||||
@ -5597,7 +5597,7 @@ class TestReplicatedObjectController(
|
||||
side_effect=fake_sort_nodes):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger,
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
request=Request.blank('')):
|
||||
pass
|
||||
self.assertEqual(called, [
|
||||
@ -5609,10 +5609,10 @@ class TestReplicatedObjectController(
|
||||
lambda n, *args, **kwargs: n):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
first_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger,
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
request=Request.blank('')))
|
||||
second_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger,
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
request=Request.blank('')))
|
||||
self.assertIn(first_nodes[0], second_nodes)
|
||||
|
||||
@ -5633,14 +5633,14 @@ class TestReplicatedObjectController(
|
||||
% (node_to_string(first_nodes[0]), 'test')), line)
|
||||
|
||||
second_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger,
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
request=Request.blank('')))
|
||||
self.assertNotIn(first_nodes[0], second_nodes)
|
||||
self.assertEqual(
|
||||
1, self.logger.statsd_client.get_increment_counts().get(
|
||||
'error_limiter.is_limited', 0))
|
||||
third_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger,
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
request=Request.blank('')))
|
||||
self.assertNotIn(first_nodes[0], third_nodes)
|
||||
self.assertEqual(
|
||||
@ -5655,11 +5655,11 @@ class TestReplicatedObjectController(
|
||||
lambda r: 6), \
|
||||
mock.patch.object(object_ring, 'max_more_nodes', 99):
|
||||
first_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger,
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
request=Request.blank('')))
|
||||
second_nodes = []
|
||||
for node in proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger,
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
request=Request.blank('')):
|
||||
if not second_nodes:
|
||||
self.app.error_limit(node, 'test')
|
||||
@ -5678,8 +5678,8 @@ class TestReplicatedObjectController(
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 3):
|
||||
got_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger, Request.blank(''),
|
||||
node_iter=iter(node_list)))
|
||||
'object', self.app, object_ring, 0, self.logger,
|
||||
Request.blank(''), node_iter=iter(node_list)))
|
||||
self.assertEqual(expected[:3], got_nodes)
|
||||
|
||||
req = Request.blank('/v1/a/c')
|
||||
@ -5690,7 +5690,7 @@ class TestReplicatedObjectController(
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 1000000):
|
||||
got_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger, req,
|
||||
'object', self.app, object_ring, 0, self.logger, req,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(expected, got_nodes)
|
||||
|
||||
@ -5706,7 +5706,7 @@ class TestReplicatedObjectController(
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 3):
|
||||
got_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger, req,
|
||||
'object', self.app, object_ring, 0, self.logger, req,
|
||||
node_iter=iter(node_list)))
|
||||
expected = [dict(n, use_replication=True) for n in node_list]
|
||||
self.assertEqual(expected[:3], got_nodes)
|
||||
@ -5718,7 +5718,7 @@ class TestReplicatedObjectController(
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 13):
|
||||
got_nodes = list(proxy_base.NodeIter(
|
||||
self.app, object_ring, 0, self.logger, req,
|
||||
'object', self.app, object_ring, 0, self.logger, req,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(expected, got_nodes)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user