Consistently apply node error limiting rules in proxy
All GET or HEAD requests consistently error limit nodes that return 507 and increment errors for nodes responding with any other 5XX. There were two places in the object PUT path where the proxy was error limiting nodes and their behavior was inconsistent. During expect-100 connect we would only error_limit nodes on 507, and during response we would increment errors for all 5XX series responses. This was pretty hard to reason about and the divergence in behavior of questionable value. An audit of base controller highlighted where make_requests would apply error_limit's on 507 but not increment errors on other 5XX responses. Now anywhere we track errors on nodes we use error_limit on 507 and error_occurred on any other 5XX series request. Additionally a Timeout or Exception that is logged through exception_occurred will bump errors - which is consistent with the approach in "Add Error Limiting to slow nodes" [1]. 1. https://review.openstack.org/#/c/112424/ Change-Id: I67e489d18afd6bdfc730bfdba76f85a2e3ca74f0
This commit is contained in:
parent
e429cd81be
commit
99d501831e
@ -1033,6 +1033,14 @@ class Controller(object):
|
|||||||
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
|
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
|
||||||
self.app.error_limit(node,
|
self.app.error_limit(node,
|
||||||
_('ERROR Insufficient Storage'))
|
_('ERROR Insufficient Storage'))
|
||||||
|
elif is_server_error(resp.status):
|
||||||
|
self.app.error_occurred(
|
||||||
|
node, _('ERROR %(status)d '
|
||||||
|
'Trying to %(method)s %(path)s'
|
||||||
|
'From Container Server') % {
|
||||||
|
'status': resp.status,
|
||||||
|
'method': method,
|
||||||
|
'path': path})
|
||||||
except (Exception, Timeout):
|
except (Exception, Timeout):
|
||||||
self.app.exception_occurred(
|
self.app.exception_occurred(
|
||||||
node, self.server_type,
|
node, self.server_type,
|
||||||
|
@ -47,10 +47,11 @@ from swift.common import constraints
|
|||||||
from swift.common.exceptions import ChunkReadTimeout, \
|
from swift.common.exceptions import ChunkReadTimeout, \
|
||||||
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
|
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
|
||||||
ListingIterNotAuthorized, ListingIterError
|
ListingIterNotAuthorized, ListingIterError
|
||||||
from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \
|
from swift.common.http import (
|
||||||
HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, \
|
is_success, is_client_error, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
|
||||||
HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \
|
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
|
||||||
HTTP_INSUFFICIENT_STORAGE, HTTP_PRECONDITION_FAILED
|
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
|
||||||
|
HTTP_PRECONDITION_FAILED)
|
||||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||||
cors_validation
|
cors_validation
|
||||||
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||||
@ -372,6 +373,11 @@ class ObjectController(Controller):
|
|||||||
return conn
|
return conn
|
||||||
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
|
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
|
||||||
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
||||||
|
elif is_server_error(resp.status):
|
||||||
|
self.app.error_occurred(
|
||||||
|
node, _('ERROR %(status)d Expect: 100-continue '
|
||||||
|
'From Object Server') % {
|
||||||
|
'status': resp.status})
|
||||||
except (Exception, Timeout):
|
except (Exception, Timeout):
|
||||||
self.app.exception_occurred(
|
self.app.exception_occurred(
|
||||||
node, _('Object'),
|
node, _('Object'),
|
||||||
@ -404,7 +410,10 @@ class ObjectController(Controller):
|
|||||||
statuses.append(response.status)
|
statuses.append(response.status)
|
||||||
reasons.append(response.reason)
|
reasons.append(response.reason)
|
||||||
bodies.append(response.read())
|
bodies.append(response.read())
|
||||||
if response.status >= HTTP_INTERNAL_SERVER_ERROR:
|
if response.status == HTTP_INSUFFICIENT_STORAGE:
|
||||||
|
self.app.error_limit(conn.node,
|
||||||
|
_('ERROR Insufficient Storage'))
|
||||||
|
elif response.status >= HTTP_INTERNAL_SERVER_ERROR:
|
||||||
self.app.error_occurred(
|
self.app.error_occurred(
|
||||||
conn.node,
|
conn.node,
|
||||||
_('ERROR %(status)d %(body)s From Object Server '
|
_('ERROR %(status)d %(body)s From Object Server '
|
||||||
|
@ -452,6 +452,12 @@ class Application(object):
|
|||||||
{'msg': msg, 'ip': node['ip'],
|
{'msg': msg, 'ip': node['ip'],
|
||||||
'port': node['port'], 'device': node['device']})
|
'port': node['port'], 'device': node['device']})
|
||||||
|
|
||||||
|
def _incr_node_errors(self, node):
|
||||||
|
node_key = self._error_limit_node_key(node)
|
||||||
|
error_stats = self._error_limiting.setdefault(node_key, {})
|
||||||
|
error_stats['errors'] = error_stats.get('errors', 0) + 1
|
||||||
|
error_stats['last_error'] = time()
|
||||||
|
|
||||||
def error_occurred(self, node, msg):
|
def error_occurred(self, node, msg):
|
||||||
"""
|
"""
|
||||||
Handle logging, and handling of errors.
|
Handle logging, and handling of errors.
|
||||||
@ -459,10 +465,7 @@ class Application(object):
|
|||||||
:param node: dictionary of node to handle errors for
|
:param node: dictionary of node to handle errors for
|
||||||
:param msg: error message
|
:param msg: error message
|
||||||
"""
|
"""
|
||||||
node_key = self._error_limit_node_key(node)
|
self._incr_node_errors(node)
|
||||||
error_stats = self._error_limiting.setdefault(node_key, {})
|
|
||||||
error_stats['errors'] = error_stats.get('errors', 0) + 1
|
|
||||||
error_stats['last_error'] = time()
|
|
||||||
self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
|
self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
|
||||||
{'msg': msg, 'ip': node['ip'],
|
{'msg': msg, 'ip': node['ip'],
|
||||||
'port': node['port'], 'device': node['device']})
|
'port': node['port'], 'device': node['device']})
|
||||||
@ -531,6 +534,7 @@ class Application(object):
|
|||||||
:param typ: server type
|
:param typ: server type
|
||||||
:param additional_info: additional information to log
|
:param additional_info: additional information to log
|
||||||
"""
|
"""
|
||||||
|
self._incr_node_errors(node)
|
||||||
self.logger.exception(
|
self.logger.exception(
|
||||||
_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
|
_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
|
||||||
'%(info)s'),
|
'%(info)s'),
|
||||||
|
@ -772,7 +772,25 @@ def fake_http_connect(*code_iter, **kwargs):
|
|||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def mocked_http_conn(*args, **kwargs):
|
def mocked_http_conn(*args, **kwargs):
|
||||||
|
requests = []
|
||||||
|
|
||||||
|
def capture_requests(ip, port, method, path, headers, qs, ssl):
|
||||||
|
req = {
|
||||||
|
'ip': ip,
|
||||||
|
'port': port,
|
||||||
|
'method': method,
|
||||||
|
'path': path,
|
||||||
|
'headers': headers,
|
||||||
|
'qs': qs,
|
||||||
|
'ssl': ssl,
|
||||||
|
}
|
||||||
|
requests.append(req)
|
||||||
|
kwargs.setdefault('give_connect', capture_requests)
|
||||||
fake_conn = fake_http_connect(*args, **kwargs)
|
fake_conn = fake_http_connect(*args, **kwargs)
|
||||||
|
fake_conn.requests = requests
|
||||||
with mocklib.patch('swift.common.bufferedhttp.http_connect_raw',
|
with mocklib.patch('swift.common.bufferedhttp.http_connect_raw',
|
||||||
new=fake_conn):
|
new=fake_conn):
|
||||||
yield fake_conn
|
yield fake_conn
|
||||||
|
left_over_status = list(fake_conn.code_iter)
|
||||||
|
if left_over_status:
|
||||||
|
raise AssertionError('left over status %r' % left_over_status)
|
||||||
|
@ -16,6 +16,8 @@
|
|||||||
import mock
|
import mock
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
from eventlet import Timeout
|
||||||
|
|
||||||
from swift.common.swob import Request
|
from swift.common.swob import Request
|
||||||
from swift.proxy import server as proxy_server
|
from swift.proxy import server as proxy_server
|
||||||
from swift.proxy.controllers.base import headers_to_container_info
|
from swift.proxy.controllers.base import headers_to_container_info
|
||||||
@ -23,17 +25,48 @@ from test.unit import fake_http_connect, FakeRing, FakeMemcache
|
|||||||
from swift.common.storage_policy import StoragePolicy
|
from swift.common.storage_policy import StoragePolicy
|
||||||
from swift.common.request_helpers import get_sys_meta_prefix
|
from swift.common.request_helpers import get_sys_meta_prefix
|
||||||
|
|
||||||
from test.unit import patch_policies
|
from test.unit import patch_policies, mocked_http_conn, debug_logger
|
||||||
from test.unit.common.ring.test_ring import TestRingBase
|
from test.unit.common.ring.test_ring import TestRingBase
|
||||||
|
from test.unit.proxy.test_server import node_error_count
|
||||||
|
|
||||||
|
|
||||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
||||||
class TestContainerController(TestRingBase):
|
class TestContainerController(TestRingBase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
TestRingBase.setUp(self)
|
TestRingBase.setUp(self)
|
||||||
|
self.logger = debug_logger()
|
||||||
|
self.container_ring = FakeRing(max_more_nodes=9)
|
||||||
self.app = proxy_server.Application(None, FakeMemcache(),
|
self.app = proxy_server.Application(None, FakeMemcache(),
|
||||||
|
logger=self.logger,
|
||||||
account_ring=FakeRing(),
|
account_ring=FakeRing(),
|
||||||
container_ring=FakeRing())
|
container_ring=self.container_ring)
|
||||||
|
|
||||||
|
self.account_info = {
|
||||||
|
'status': 200,
|
||||||
|
'container_count': '10',
|
||||||
|
'total_object_count': '100',
|
||||||
|
'bytes': '1000',
|
||||||
|
'meta': {},
|
||||||
|
'sysmeta': {},
|
||||||
|
}
|
||||||
|
|
||||||
|
class FakeAccountInfoContainerController(
|
||||||
|
proxy_server.ContainerController):
|
||||||
|
|
||||||
|
def account_info(controller, *args, **kwargs):
|
||||||
|
patch_path = 'swift.proxy.controllers.base.get_info'
|
||||||
|
with mock.patch(patch_path) as mock_get_info:
|
||||||
|
mock_get_info.return_value = dict(self.account_info)
|
||||||
|
return super(FakeAccountInfoContainerController,
|
||||||
|
controller).account_info(
|
||||||
|
*args, **kwargs)
|
||||||
|
_orig_get_controller = self.app.get_controller
|
||||||
|
|
||||||
|
def wrapped_get_controller(*args, **kwargs):
|
||||||
|
with mock.patch('swift.proxy.server.ContainerController',
|
||||||
|
new=FakeAccountInfoContainerController):
|
||||||
|
return _orig_get_controller(*args, **kwargs)
|
||||||
|
self.app.get_controller = wrapped_get_controller
|
||||||
|
|
||||||
def test_container_info_in_response_env(self):
|
def test_container_info_in_response_env(self):
|
||||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||||
@ -123,6 +156,53 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual(context['headers'][user_meta_key], 'bar')
|
self.assertEqual(context['headers'][user_meta_key], 'bar')
|
||||||
self.assertNotEqual(context['headers']['x-timestamp'], '1.0')
|
self.assertNotEqual(context['headers']['x-timestamp'], '1.0')
|
||||||
|
|
||||||
|
def test_node_errors(self):
|
||||||
|
self.app.sort_nodes = lambda n: n
|
||||||
|
|
||||||
|
for method in ('PUT', 'DELETE', 'POST'):
|
||||||
|
def test_status_map(statuses, expected):
|
||||||
|
self.app._error_limiting = {}
|
||||||
|
req = Request.blank('/v1/a/c', method=method)
|
||||||
|
with mocked_http_conn(*statuses) as fake_conn:
|
||||||
|
print 'a' * 50
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, expected)
|
||||||
|
for req in fake_conn.requests:
|
||||||
|
self.assertEqual(req['method'], method)
|
||||||
|
self.assert_(req['path'].endswith('/a/c'))
|
||||||
|
|
||||||
|
base_status = [201] * 3
|
||||||
|
# test happy path
|
||||||
|
test_status_map(list(base_status), 201)
|
||||||
|
for i in range(3):
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, self.container_ring.devs[i]), 0)
|
||||||
|
# single node errors and test isolation
|
||||||
|
for i in range(3):
|
||||||
|
status_list = list(base_status)
|
||||||
|
status_list[i] = 503
|
||||||
|
status_list.append(201)
|
||||||
|
test_status_map(status_list, 201)
|
||||||
|
for j in range(3):
|
||||||
|
expected = 1 if j == i else 0
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, self.container_ring.devs[j]), expected)
|
||||||
|
# timeout
|
||||||
|
test_status_map((201, Timeout(), 201, 201), 201)
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, self.container_ring.devs[1]), 1)
|
||||||
|
|
||||||
|
# exception
|
||||||
|
test_status_map((Exception('kaboom!'), 201, 201, 201), 201)
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, self.container_ring.devs[0]), 1)
|
||||||
|
|
||||||
|
# insufficient storage
|
||||||
|
test_status_map((201, 201, 507, 201), 201)
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, self.container_ring.devs[2]),
|
||||||
|
self.app.error_suppression_limit + 1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -20,6 +20,7 @@ import unittest
|
|||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from eventlet import Timeout
|
||||||
|
|
||||||
import swift
|
import swift
|
||||||
from swift.common import utils, swob
|
from swift.common import utils, swob
|
||||||
@ -28,6 +29,7 @@ from swift.common.storage_policy import StoragePolicy, POLICIES
|
|||||||
|
|
||||||
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
|
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
|
||||||
debug_logger, patch_policies
|
debug_logger, patch_policies
|
||||||
|
from test.unit.proxy.test_server import node_error_count
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
@ -51,6 +53,11 @@ def set_http_connect(*args, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
class PatchedObjControllerApp(proxy_server.Application):
|
class PatchedObjControllerApp(proxy_server.Application):
|
||||||
|
"""
|
||||||
|
This patch is just a hook over handle_request to ensure that when
|
||||||
|
get_controller is called the ObjectController class is patched to
|
||||||
|
return a (possibly stubbed) ObjectController class.
|
||||||
|
"""
|
||||||
|
|
||||||
object_controller = proxy_server.ObjectController
|
object_controller = proxy_server.ObjectController
|
||||||
|
|
||||||
@ -154,6 +161,10 @@ class TestObjController(unittest.TestCase):
|
|||||||
return super(FakeContainerInfoObjController,
|
return super(FakeContainerInfoObjController,
|
||||||
controller).container_info(*args, **kwargs)
|
controller).container_info(*args, **kwargs)
|
||||||
|
|
||||||
|
# this is taking advantage of the fact that self.app is a
|
||||||
|
# PachedObjControllerApp, so handle_response will route into an
|
||||||
|
# instance of our FakeContainerInfoObjController just by
|
||||||
|
# overriding the class attribute for object_controller
|
||||||
self.app.object_controller = FakeContainerInfoObjController
|
self.app.object_controller = FakeContainerInfoObjController
|
||||||
|
|
||||||
def test_PUT_simple(self):
|
def test_PUT_simple(self):
|
||||||
@ -187,6 +198,59 @@ class TestObjController(unittest.TestCase):
|
|||||||
resp = req.get_response(self.app)
|
resp = req.get_response(self.app)
|
||||||
self.assertEquals(resp.status_int, 400)
|
self.assertEquals(resp.status_int, 400)
|
||||||
|
|
||||||
|
def test_PUT_connect_exceptions(self):
|
||||||
|
object_ring = self.app.get_object_ring(None)
|
||||||
|
self.app.sort_nodes = lambda n: n # disable shuffle
|
||||||
|
|
||||||
|
def test_status_map(statuses, expected):
|
||||||
|
self.app._error_limiting = {}
|
||||||
|
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
|
||||||
|
body='test body')
|
||||||
|
with set_http_connect(*statuses):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, expected)
|
||||||
|
|
||||||
|
base_status = [201] * 3
|
||||||
|
# test happy path
|
||||||
|
test_status_map(list(base_status), 201)
|
||||||
|
for i in range(3):
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, object_ring.devs[i]), 0)
|
||||||
|
# single node errors and test isolation
|
||||||
|
for i in range(3):
|
||||||
|
status_list = list(base_status)
|
||||||
|
status_list[i] = 503
|
||||||
|
test_status_map(status_list, 201)
|
||||||
|
for j in range(3):
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, object_ring.devs[j]), 1 if j == i else 0)
|
||||||
|
# connect errors
|
||||||
|
test_status_map((201, Timeout(), 201, 201), 201)
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, object_ring.devs[1]), 1)
|
||||||
|
test_status_map((Exception('kaboom!'), 201, 201, 201), 201)
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, object_ring.devs[0]), 1)
|
||||||
|
# expect errors
|
||||||
|
test_status_map((201, 201, (503, None), 201), 201)
|
||||||
|
self.assertEqual(node_error_count(
|
||||||
|
self.app, object_ring.devs[2]), 1)
|
||||||
|
test_status_map(((507, None), 201, 201, 201), 201)
|
||||||
|
self.assertEqual(
|
||||||
|
node_error_count(self.app, object_ring.devs[0]),
|
||||||
|
self.app.error_suppression_limit + 1)
|
||||||
|
# response errors
|
||||||
|
test_status_map(((100, Timeout()), 201, 201), 201)
|
||||||
|
self.assertEqual(
|
||||||
|
node_error_count(self.app, object_ring.devs[0]), 1)
|
||||||
|
test_status_map((201, 201, (100, Exception())), 201)
|
||||||
|
self.assertEqual(
|
||||||
|
node_error_count(self.app, object_ring.devs[2]), 1)
|
||||||
|
test_status_map((201, (100, 507), 201), 201)
|
||||||
|
self.assertEqual(
|
||||||
|
node_error_count(self.app, object_ring.devs[1]),
|
||||||
|
self.app.error_suppression_limit + 1)
|
||||||
|
|
||||||
def test_GET_simple(self):
|
def test_GET_simple(self):
|
||||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||||
with set_http_connect(200):
|
with set_http_connect(200):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user