proxy: emit metric when updating namespaces memcache set fails
Previously, if a memcache set command had an unexpected response then an error was logged but no exception raised from MemcacheRing.set. With this patch, when MemcacheRing.set fails in this way, a MemcacheConnectionError will be raised if 'raise_on_error' is True. This exception is now handled when the proxy tries to set updating namespaces in memcache, and a counter metric emitted with name 'object.shard_updating.cache.set_error'. If the set is successful an 'object.shard_updating.cache.set' counter metric is emitted. The logging of MemcacheRing errors is improved to include the memcache key being set, in line with exception logging. Change-Id: Icfc97751c80f4bb4a3373796c01bff6ed5843937
This commit is contained in:
parent
195f54ecb5
commit
965908d753
@ -245,6 +245,13 @@ class MemcacheRing(object):
|
||||
def memcache_servers(self):
|
||||
return list(self._client_cache.keys())
|
||||
|
||||
def _log_error(self, server, cmd, action, msg):
|
||||
self.logger.error(
|
||||
"Error %(action)s to memcached: %(server)s"
|
||||
": with key_prefix %(key_prefix)s, method %(method)s: %(msg)s",
|
||||
{'action': action, 'server': server, 'key_prefix': cmd.key_prefix,
|
||||
'method': cmd.method, 'msg': msg})
|
||||
|
||||
"""
|
||||
Handles exceptions.
|
||||
|
||||
@ -367,7 +374,8 @@ class MemcacheRing(object):
|
||||
self._exception_occurred(server, e, cmd, pool_start_time,
|
||||
action='connecting', sock=sock)
|
||||
if not any_yielded:
|
||||
self.logger.error('All memcached servers error-limited')
|
||||
self._log_error('ALL', cmd, 'connecting',
|
||||
'All memcached servers error-limited')
|
||||
|
||||
def _return_conn(self, server, fp, sock):
|
||||
"""Returns a server connection to the pool."""
|
||||
@ -404,6 +412,14 @@ class MemcacheRing(object):
|
||||
elif not isinstance(value, bytes):
|
||||
value = str(value).encode('utf-8')
|
||||
|
||||
if 0 <= self.item_size_warning_threshold <= len(value):
|
||||
self.logger.warning(
|
||||
"Item size larger than warning threshold: "
|
||||
"%d (%s) >= %d (%s)", len(value),
|
||||
human_readable(len(value)),
|
||||
self.item_size_warning_threshold,
|
||||
human_readable(self.item_size_warning_threshold))
|
||||
|
||||
for (server, fp, sock) in self._get_conns(cmd):
|
||||
conn_start_time = tm.time()
|
||||
try:
|
||||
@ -414,17 +430,7 @@ class MemcacheRing(object):
|
||||
if msg != b'STORED':
|
||||
if not six.PY2:
|
||||
msg = msg.decode('ascii')
|
||||
self.logger.error(
|
||||
"Error setting value in memcached: "
|
||||
"%(server)s: %(msg)s",
|
||||
{'server': server, 'msg': msg})
|
||||
if 0 <= self.item_size_warning_threshold <= len(value):
|
||||
self.logger.warning(
|
||||
"Item size larger than warning threshold: "
|
||||
"%d (%s) >= %d (%s)", len(value),
|
||||
human_readable(len(value)),
|
||||
self.item_size_warning_threshold,
|
||||
human_readable(self.item_size_warning_threshold))
|
||||
raise MemcacheConnectionError('failed set: %s' % msg)
|
||||
self._return_conn(server, fp, sock)
|
||||
return
|
||||
except (Exception, Timeout) as e:
|
||||
|
@ -345,6 +345,25 @@ class BaseObjectController(Controller):
|
||||
# there will be only one shard range in the list if any
|
||||
return shard_ranges[0] if shard_ranges else None
|
||||
|
||||
def _cache_update_namespaces(self, memcache, cache_key, namespaces):
|
||||
if not memcache:
|
||||
return
|
||||
|
||||
self.logger.info(
|
||||
'Caching updating shards for %s (%d shards)',
|
||||
cache_key, len(namespaces.bounds))
|
||||
try:
|
||||
memcache.set(
|
||||
cache_key, namespaces.bounds,
|
||||
time=self.app.recheck_updating_shard_ranges,
|
||||
raise_on_error=True)
|
||||
cache_state = 'set'
|
||||
except MemcacheConnectionError:
|
||||
cache_state = 'set_error'
|
||||
finally:
|
||||
record_cache_op_metrics(self.logger, self.server_type.lower(),
|
||||
'shard_updating', cache_state, None)
|
||||
|
||||
def _get_update_shard(self, req, account, container, obj):
|
||||
"""
|
||||
Find the appropriate shard range for an object update.
|
||||
@ -386,16 +405,9 @@ class BaseObjectController(Controller):
|
||||
if shard_ranges:
|
||||
# only store the list of namespace lower bounds and names into
|
||||
# infocache and memcache.
|
||||
cached_namespaces = NamespaceBoundList.parse(
|
||||
shard_ranges)
|
||||
infocache[cache_key] = cached_namespaces
|
||||
if memcache:
|
||||
self.logger.info(
|
||||
'Caching updating shards for %s (%d shards)',
|
||||
cache_key, len(cached_namespaces.bounds))
|
||||
memcache.set(
|
||||
cache_key, cached_namespaces.bounds,
|
||||
time=self.app.recheck_updating_shard_ranges)
|
||||
namespaces = NamespaceBoundList.parse(shard_ranges)
|
||||
infocache[cache_key] = namespaces
|
||||
self._cache_update_namespaces(memcache, cache_key, namespaces)
|
||||
update_shard = find_namespace(obj, shard_ranges or [])
|
||||
record_cache_op_metrics(
|
||||
self.logger, self.server_type.lower(), 'shard_updating',
|
||||
|
@ -410,16 +410,50 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertAlmostEqual(float(cache_timeout), esttimeout, delta=1)
|
||||
|
||||
def test_set_error(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211'], logger=self.logger,
|
||||
item_size_warning_threshold=1)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
memcache_client.set('too-big', [1, 2, 3])
|
||||
now = time.time()
|
||||
with patch('time.time', return_value=now):
|
||||
memcache_client.set('too-big', [1, 2, 3])
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('error'),
|
||||
['Error setting value in memcached: 1.2.3.4:11211: '
|
||||
'SERVER_ERROR object too large for cache'])
|
||||
['Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix too-big, method set, time_spent 0.0, '
|
||||
'failed set: SERVER_ERROR object too large for cache'])
|
||||
warning_lines = self.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(warning_lines))
|
||||
self.assertIn('Item size larger than warning threshold',
|
||||
warning_lines[0])
|
||||
self.assertTrue(mock.close_called)
|
||||
|
||||
def test_set_error_raise_on_error(self):
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211'], logger=self.logger,
|
||||
item_size_warning_threshold=1)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
now = time.time()
|
||||
|
||||
with self.assertRaises(MemcacheConnectionError) as cm:
|
||||
with patch('time.time', return_value=now):
|
||||
memcache_client.set('too-big', [1, 2, 3], raise_on_error=True)
|
||||
self.assertIn("No memcached connections succeeded", str(cm.exception))
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('error'),
|
||||
['Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix too-big, method set, time_spent 0.0, '
|
||||
'failed set: SERVER_ERROR object too large for cache'])
|
||||
warning_lines = self.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(warning_lines))
|
||||
self.assertIn('Item size larger than warning threshold',
|
||||
warning_lines[0])
|
||||
self.assertTrue(mock.close_called)
|
||||
|
||||
def test_get_failed_connection_mid_request(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
@ -620,7 +654,8 @@ class TestMemcached(unittest.TestCase):
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.4:11211',
|
||||
'All memcached servers error-limited',
|
||||
'Error connecting to memcached: ALL: with key_prefix some_key, '
|
||||
'method set: All memcached servers error-limited',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
@ -628,14 +663,16 @@ class TestMemcached(unittest.TestCase):
|
||||
for _ in range(12):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'All memcached servers error-limited',
|
||||
'Error connecting to memcached: ALL: with key_prefix some_key, '
|
||||
'method set: All memcached servers error-limited',
|
||||
] * 12)
|
||||
self.logger.clear()
|
||||
|
||||
# and get()s are all a "cache miss"
|
||||
self.assertIsNone(memcache_client.get('some_key'))
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'All memcached servers error-limited',
|
||||
'Error connecting to memcached: ALL: with key_prefix some_key, '
|
||||
'method get: All memcached servers error-limited',
|
||||
])
|
||||
|
||||
def test_error_disabled(self):
|
||||
@ -752,7 +789,8 @@ class TestMemcached(unittest.TestCase):
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
'Error connecting to memcached: ALL: with key_prefix some_key, '
|
||||
'method set: All memcached servers error-limited',
|
||||
])
|
||||
|
||||
# with default error_limit_time of 60, one call per 6 secs, error limit
|
||||
@ -776,8 +814,8 @@ class TestMemcached(unittest.TestCase):
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
'Error connecting to memcached: ALL: with key_prefix some_key, '
|
||||
'method set: All memcached servers error-limited'])
|
||||
|
||||
# with error_limit_time of 70, one call per 6 secs, error_limit_count
|
||||
# of 11, 13th call triggers error limit
|
||||
@ -791,8 +829,8 @@ class TestMemcached(unittest.TestCase):
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
'Error connecting to memcached: ALL: with key_prefix some_key, '
|
||||
'method set: All memcached servers error-limited'])
|
||||
|
||||
def test_delete(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
|
@ -4431,7 +4431,8 @@ class TestReplicatedObjectController(
|
||||
'account.info.infocache.hit': 2,
|
||||
'container.info.cache.miss.200': 1,
|
||||
'container.info.infocache.hit': 1,
|
||||
'object.shard_updating.cache.miss.200': 1},
|
||||
'object.shard_updating.cache.miss.200': 1,
|
||||
'object.shard_updating.cache.set': 1},
|
||||
stats)
|
||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||
info_lines = self.logger.get_lines_for_level('info')
|
||||
@ -4795,7 +4796,8 @@ class TestReplicatedObjectController(
|
||||
'object.shard_updating.cache.hit': 1,
|
||||
'container.info.cache.hit': 1,
|
||||
'account.info.cache.hit': 1,
|
||||
'object.shard_updating.cache.skip.200': 1},
|
||||
'object.shard_updating.cache.skip.200': 1,
|
||||
'object.shard_updating.cache.set': 1},
|
||||
stats)
|
||||
# verify statsd prefix is not mutated
|
||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||
@ -4865,7 +4867,99 @@ class TestReplicatedObjectController(
|
||||
'container.info.infocache.hit': 3,
|
||||
'object.shard_updating.cache.skip.200': 1,
|
||||
'object.shard_updating.cache.hit': 1,
|
||||
'object.shard_updating.cache.error.200': 1})
|
||||
'object.shard_updating.cache.error.200': 1,
|
||||
'object.shard_updating.cache.set': 2
|
||||
})
|
||||
|
||||
do_test('POST', 'sharding')
|
||||
do_test('POST', 'sharded')
|
||||
do_test('DELETE', 'sharding')
|
||||
do_test('DELETE', 'sharded')
|
||||
do_test('PUT', 'sharding')
|
||||
do_test('PUT', 'sharded')
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||
])
|
||||
def test_backend_headers_update_shard_container_cache_set_error(self):
|
||||
# verify that backend container update is directed to the shard
|
||||
# container despite memcache set error
|
||||
# reset the router post patch_policies
|
||||
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
|
||||
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
|
||||
self.app.recheck_updating_shard_ranges = 3600
|
||||
self.app.container_updating_shard_ranges_skip_cache = 0.001
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger.clear() # clean capture state
|
||||
# simulate memcache error when setting updating namespaces;
|
||||
# expect 4 memcache sets: account info, container info, container
|
||||
# info again from namespaces GET subrequest, namespaces
|
||||
cache = FakeMemcache(error_on_set=[False, False, False, True])
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o', {'swift.cache': cache},
|
||||
method=method, body='', headers={'Content-Type': 'text/plain'})
|
||||
# acct HEAD, cont HEAD, cont shard GET, obj POSTs
|
||||
status_codes = (200, 200, 200, 202, 202, 202)
|
||||
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
|
||||
'x-backend-sharding-state': sharding_state,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
shard_ranges = [
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
|
||||
]
|
||||
body = json.dumps([
|
||||
dict(shard_range)
|
||||
for shard_range in shard_ranges]).encode('ascii')
|
||||
with mock.patch('random.random', return_value=0), \
|
||||
mocked_http_conn(*status_codes, headers=resp_headers,
|
||||
body=body) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
stats = self.app.logger.statsd_client.get_increment_counts()
|
||||
self.assertEqual({'account.info.cache.miss.200': 1,
|
||||
'account.info.infocache.hit': 2,
|
||||
'container.info.cache.miss.200': 1,
|
||||
'container.info.infocache.hit': 1,
|
||||
'object.shard_updating.cache.skip.200': 1,
|
||||
'object.shard_updating.cache.set_error': 1},
|
||||
stats)
|
||||
# verify statsd prefix is not mutated
|
||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||
# sanity check: namespaces not in cache
|
||||
cache_key = 'shard-updating-v2/a/c'
|
||||
self.assertNotIn(cache_key, req.environ['swift.cache'].store)
|
||||
|
||||
# make sure backend requests included expected container headers
|
||||
container_headers = {}
|
||||
for backend_request in fake_conn.requests[3:]:
|
||||
req_headers = backend_request['headers']
|
||||
device = req_headers['x-container-device']
|
||||
container_headers[device] = req_headers['x-container-host']
|
||||
expectations = {
|
||||
'method': method,
|
||||
'path': '/0/a/c/o',
|
||||
'headers': {
|
||||
'X-Container-Partition': '0',
|
||||
'Host': 'localhost:80',
|
||||
'Referer': '%s http://localhost/v1/a/c/o' % method,
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
|
||||
},
|
||||
}
|
||||
self._check_request(backend_request, **expectations)
|
||||
|
||||
expected = {}
|
||||
for i, device in enumerate(['sda', 'sdb', 'sdc']):
|
||||
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
||||
self.assertEqual(container_headers, expected)
|
||||
|
||||
do_test('POST', 'sharding')
|
||||
do_test('POST', 'sharded')
|
||||
|
Loading…
Reference in New Issue
Block a user