diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 9fc8fefff5..b36a5317b9 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -76,6 +76,8 @@ class Application(object): else: self.logger = logger + self._error_limiting = {} + swift_dir = conf.get('swift_dir', '/etc/swift') self.swift_dir = swift_dir self.node_timeout = int(conf.get('node_timeout', 10)) @@ -406,6 +408,9 @@ class Application(object): timing = round(timing, 3) # sort timings to the millisecond self.node_timings[node['ip']] = (timing, now + self.timing_expiry) + def _error_limit_node_key(self, node): + return "{ip}:{port}/{device}".format(**node) + def error_limited(self, node): """ Check if the node is currently error limited. @@ -414,15 +419,16 @@ class Application(object): :returns: True if error limited, False otherwise """ now = time() - if 'errors' not in node: + node_key = self._error_limit_node_key(node) + error_stats = self._error_limiting.get(node_key) + + if error_stats is None or 'errors' not in error_stats: return False - if 'last_error' in node and node['last_error'] < \ + if 'last_error' in error_stats and error_stats['last_error'] < \ now - self.error_suppression_interval: - del node['last_error'] - if 'errors' in node: - del node['errors'] + self._error_limiting.pop(node_key, None) return False - limited = node['errors'] > self.error_suppression_limit + limited = error_stats['errors'] > self.error_suppression_limit if limited: self.logger.debug( _('Node error limited %(ip)s:%(port)s (%(device)s)'), node) @@ -438,8 +444,10 @@ class Application(object): :param node: dictionary of node to error limit :param msg: error message """ - node['errors'] = self.error_suppression_limit + 1 - node['last_error'] = time() + node_key = self._error_limit_node_key(node) + error_stats = self._error_limiting.setdefault(node_key, {}) + error_stats['errors'] = self.error_suppression_limit + 1 + error_stats['last_error'] = time() self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), {'msg': msg, 'ip': node['ip'], 'port': node['port'], 'device': node['device']}) @@ -451,8 +459,10 @@ class Application(object): :param node: dictionary of node to handle errors for :param msg: error message """ - node['errors'] = node.get('errors', 0) + 1 - node['last_error'] = time() + node_key = self._error_limit_node_key(node) + error_stats = self._error_limiting.setdefault(node_key, {}) + error_stats['errors'] = error_stats.get('errors', 0) + 1 + error_stats['last_error'] = time() self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), {'msg': msg, 'ip': node['ip'], 'port': node['port'], 'device': node['device']}) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index b869c9a1e3..cbf6f176ef 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -124,7 +124,8 @@ class PatchPolicies(object): class FakeRing(Ring): - def __init__(self, replicas=3, max_more_nodes=0, part_power=0): + def __init__(self, replicas=3, max_more_nodes=0, part_power=0, + base_port=1000): """ :param part_power: make part calculation based on the path @@ -132,27 +133,23 @@ class FakeRing(Ring): out of ring methods will actually be based on the path - otherwise we exercise the real ring code, but ignore the result and return 1. """ + self._base_port = base_port + self.max_more_nodes = max_more_nodes + self._part_shift = 32 - part_power # 9 total nodes (6 more past the initial 3) is the cap, no matter if # this is set higher, or R^2 for R replicas self.set_replicas(replicas) - self.max_more_nodes = max_more_nodes - self._part_shift = 32 - part_power self._reload() def _reload(self): self._rtime = time.time() - def clear_errors(self): - for dev in self.devs: - for key in ('errors', 'last_error'): - dev.pop(key, None) - def set_replicas(self, replicas): self.replicas = replicas self._devs = [] for x in range(self.replicas): ip = '10.0.0.%s' % x - port = 1000 + x + port = self._base_port + x self._devs.append({ 'ip': ip, 'replication_ip': ip, @@ -176,7 +173,7 @@ class FakeRing(Ring): for x in xrange(self.replicas, min(self.replicas + self.max_more_nodes, self.replicas * self.replicas)): yield {'ip': '10.0.0.%s' % x, - 'port': 1000 + x, + 'port': self._base_port + x, 'device': 'sda', 'zone': x % 3, 'region': x % 2, diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index b3b18a744d..9f44d674e9 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -280,6 +280,28 @@ def sortHeaderNames(headerNames): return ', '.join(headers) +def node_error_count(proxy_app, ring_node): + # Reach into the proxy's internals to get the error count for a + # particular node + node_key = proxy_app._error_limit_node_key(ring_node) + return proxy_app._error_limiting.get(node_key, {}).get('errors', 0) + + +def node_last_error(proxy_app, ring_node): + # Reach into the proxy's internals to get the last error for a + # particular node + node_key = proxy_app._error_limit_node_key(ring_node) + return proxy_app._error_limiting.get(node_key, {}).get('last_error') + + +def set_node_errors(proxy_app, ring_node, value, last_error): + # Set the node's error count to value + node_key = proxy_app._error_limit_node_key(ring_node) + stats = proxy_app._error_limiting.setdefault(node_key, {}) + stats['errors'] = value + stats['last_error'] = last_error + + class FakeMemcacheReturnsNone(FakeMemcache): def get(self, key): @@ -923,20 +945,22 @@ class TestProxyServerLoading(unittest.TestCase): self.assert_(policy.object_ring) -@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) +@patch_policies([StoragePolicy(0, 'zero', True, + object_ring=FakeRing(base_port=3000))]) class TestObjectController(unittest.TestCase): def setUp(self): - self.app = proxy_server.Application(None, FakeMemcache(), - logger=debug_logger('proxy-ut'), - account_ring=FakeRing(), - container_ring=FakeRing()) + self.app = proxy_server.Application( + None, FakeMemcache(), + logger=debug_logger('proxy-ut'), + account_ring=FakeRing(), + container_ring=FakeRing()) def tearDown(self): self.app.account_ring.set_replicas(3) self.app.container_ring.set_replicas(3) for policy in POLICIES: - policy.object_ring = FakeRing() + policy.object_ring = FakeRing(base_port=3000) def assert_status_map(self, method, statuses, expected, raise_exc=False): with save_globals(): @@ -2493,9 +2517,9 @@ class TestObjectController(unittest.TestCase): self.app.log_handoffs = True self.app.logger = FakeLogger() self.app.request_node_count = lambda r: 7 - object_ring.clear_errors() - object_ring._devs[0]['errors'] = 999 - object_ring._devs[0]['last_error'] = 2 ** 63 - 1 + self.app._error_limiting = {} # clear out errors + set_node_errors(self.app, object_ring._devs[0], 999, + last_error=(2 ** 63 - 1)) collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition): @@ -2510,10 +2534,10 @@ class TestObjectController(unittest.TestCase): self.app.log_handoffs = True self.app.logger = FakeLogger() self.app.request_node_count = lambda r: 7 - object_ring.clear_errors() + self.app._error_limiting = {} # clear out errors for i in range(2): - object_ring._devs[i]['errors'] = 999 - object_ring._devs[i]['last_error'] = 2 ** 63 - 1 + set_node_errors(self.app, object_ring._devs[i], 999, + last_error=(2 ** 63 - 1)) collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition): @@ -2532,10 +2556,10 @@ class TestObjectController(unittest.TestCase): self.app.logger = FakeLogger() self.app.request_node_count = lambda r: 10 object_ring.set_replicas(4) # otherwise we run out of handoffs - object_ring.clear_errors() + self.app._error_limiting = {} # clear out errors for i in range(4): - object_ring._devs[i]['errors'] = 999 - object_ring._devs[i]['last_error'] = 2 ** 63 - 1 + set_node_errors(self.app, object_ring._devs[i], 999, + last_error=(2 ** 63 - 1)) collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition): @@ -2593,7 +2617,8 @@ class TestObjectController(unittest.TestCase): def test_iter_nodes_with_custom_node_iter(self): object_ring = self.app.get_object_ring(None) - node_list = [dict(id=n) for n in xrange(10)] + node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D') + for n in xrange(10)] with nested( mock.patch.object(self.app, 'sort_nodes', lambda n: n), mock.patch.object(self.app, 'request_node_count', @@ -2674,16 +2699,20 @@ class TestObjectController(unittest.TestCase): object_ring = controller.app.get_object_ring(None) self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), 200) - self.assertEquals(object_ring.devs[0]['errors'], 2) - self.assert_('last_error' in object_ring.devs[0]) + self.assertEquals( + node_error_count(controller.app, object_ring.devs[0]), 2) + self.assert_(node_last_error(controller.app, object_ring.devs[0]) + is not None) for _junk in xrange(self.app.error_suppression_limit): self.assert_status_map(controller.HEAD, (200, 200, 503, 503, 503), 503) - self.assertEquals(object_ring.devs[0]['errors'], - self.app.error_suppression_limit + 1) + self.assertEquals( + node_error_count(controller.app, object_ring.devs[0]), + self.app.error_suppression_limit + 1) self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200), 503) - self.assert_('last_error' in object_ring.devs[0]) + self.assert_(node_last_error(controller.app, object_ring.devs[0]) + is not None) self.assert_status_map(controller.PUT, (200, 200, 200, 201, 201, 201), 503) self.assert_status_map(controller.POST, @@ -2699,6 +2728,34 @@ class TestObjectController(unittest.TestCase): (200, 200, 200, 204, 204, 204), 503, raise_exc=True) + def test_error_limiting_survives_ring_reload(self): + with save_globals(): + controller = proxy_server.ObjectController(self.app, 'account', + 'container', 'object') + controller.app.sort_nodes = lambda l: l + object_ring = controller.app.get_object_ring(None) + self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), + 200) + self.assertEquals( + node_error_count(controller.app, object_ring.devs[0]), 2) + self.assert_(node_last_error(controller.app, object_ring.devs[0]) + is not None) + for _junk in xrange(self.app.error_suppression_limit): + self.assert_status_map(controller.HEAD, (200, 200, 503, 503, + 503), 503) + self.assertEquals( + node_error_count(controller.app, object_ring.devs[0]), + self.app.error_suppression_limit + 1) + + # wipe out any state in the ring + for policy in POLICIES: + policy.object_ring = FakeRing(base_port=3000) + + # and we still get an error, which proves that the + # error-limiting info survived a ring reload + self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200), + 503) + def test_PUT_error_limiting(self): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', @@ -2710,12 +2767,13 @@ class TestObjectController(unittest.TestCase): 200) # 2, not 1, because assert_status_map() calls the method twice - self.assertEquals(object_ring.devs[0].get('errors', 0), 2) - self.assertEquals(object_ring.devs[1].get('errors', 0), 0) - self.assertEquals(object_ring.devs[2].get('errors', 0), 0) - self.assert_('last_error' in object_ring.devs[0]) - self.assert_('last_error' not in object_ring.devs[1]) - self.assert_('last_error' not in object_ring.devs[2]) + odevs = object_ring.devs + self.assertEquals(node_error_count(controller.app, odevs[0]), 2) + self.assertEquals(node_error_count(controller.app, odevs[1]), 0) + self.assertEquals(node_error_count(controller.app, odevs[2]), 0) + self.assert_(node_last_error(controller.app, odevs[0]) is not None) + self.assert_(node_last_error(controller.app, odevs[1]) is None) + self.assert_(node_last_error(controller.app, odevs[2]) is None) def test_PUT_error_limiting_last_node(self): with save_globals(): @@ -2728,18 +2786,18 @@ class TestObjectController(unittest.TestCase): 200) # 2, not 1, because assert_status_map() calls the method twice - self.assertEquals(object_ring.devs[0].get('errors', 0), 0) - self.assertEquals(object_ring.devs[1].get('errors', 0), 0) - self.assertEquals(object_ring.devs[2].get('errors', 0), 2) - self.assert_('last_error' not in object_ring.devs[0]) - self.assert_('last_error' not in object_ring.devs[1]) - self.assert_('last_error' in object_ring.devs[2]) + odevs = object_ring.devs + self.assertEquals(node_error_count(controller.app, odevs[0]), 0) + self.assertEquals(node_error_count(controller.app, odevs[1]), 0) + self.assertEquals(node_error_count(controller.app, odevs[2]), 2) + self.assert_(node_last_error(controller.app, odevs[0]) is None) + self.assert_(node_last_error(controller.app, odevs[1]) is None) + self.assert_(node_last_error(controller.app, odevs[2]) is not None) def test_acc_or_con_missing_returns_404(self): with save_globals(): self.app.memcache = FakeMemcacheReturnsNone() - self.app.account_ring.clear_errors() - self.app.container_ring.clear_errors() + self.app._error_limiting = {} controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') set_http_connect(200, 200, 200, 200, 200, 200) @@ -2806,8 +2864,9 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 404) for dev in self.app.account_ring.devs: - dev['errors'] = self.app.error_suppression_limit + 1 - dev['last_error'] = time.time() + set_node_errors( + self.app, dev, self.app.error_suppression_limit + 1, + time.time()) set_http_connect(200) # acct [isn't actually called since everything # is error limited] @@ -2818,10 +2877,11 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 404) for dev in self.app.account_ring.devs: - dev['errors'] = 0 + set_node_errors(self.app, dev, 0, last_error=None) for dev in self.app.container_ring.devs: - dev['errors'] = self.app.error_suppression_limit + 1 - dev['last_error'] = time.time() + set_node_errors(self.app, dev, + self.app.error_suppression_limit + 1, + time.time()) set_http_connect(200, 200) # acct cont [isn't actually called since # everything is error limited] @@ -5170,18 +5230,19 @@ class TestObjectController(unittest.TestCase): @patch_policies([ - StoragePolicy(0, 'zero', True, object_ring=FakeRing()), - StoragePolicy(1, 'one', False, object_ring=FakeRing()), - StoragePolicy(2, 'two', False, True, object_ring=FakeRing()) + StoragePolicy(0, 'zero', True, object_ring=FakeRing(base_port=3000)), + StoragePolicy(1, 'one', False, object_ring=FakeRing(base_port=3000)), + StoragePolicy(2, 'two', False, True, object_ring=FakeRing(base_port=3000)) ]) class TestContainerController(unittest.TestCase): "Test swift.proxy_server.ContainerController" def setUp(self): - self.app = proxy_server.Application(None, FakeMemcache(), - account_ring=FakeRing(), - container_ring=FakeRing(), - logger=debug_logger()) + self.app = proxy_server.Application( + None, FakeMemcache(), + account_ring=FakeRing(), + container_ring=FakeRing(base_port=2000), + logger=debug_logger()) def test_convert_policy_to_index(self): controller = swift.proxy.controllers.ContainerController(self.app, @@ -5598,7 +5659,7 @@ class TestContainerController(unittest.TestCase): for meth in ('DELETE', 'PUT'): with save_globals(): self.app.memcache = FakeMemcacheReturnsNone() - self.app.account_ring.clear_errors() + self.app._error_limiting = {} controller = proxy_server.ContainerController(self.app, 'account', 'container') @@ -5636,8 +5697,9 @@ class TestContainerController(unittest.TestCase): self.assertEquals(resp.status_int, 404) for dev in self.app.account_ring.devs: - dev['errors'] = self.app.error_suppression_limit + 1 - dev['last_error'] = time.time() + set_node_errors(self.app, dev, + self.app.error_suppression_limit + 1, + time.time()) set_http_connect(200, 200, 200, 200, 200, 200) # Make sure it is a blank request wthout env caching req = Request.blank('/v1/a/c', @@ -5675,19 +5737,26 @@ class TestContainerController(unittest.TestCase): with save_globals(): controller = proxy_server.ContainerController(self.app, 'account', 'container') + container_ring = controller.app.container_ring controller.app.sort_nodes = lambda l: l self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200, missing_container=False) + self.assertEquals( - controller.app.container_ring.devs[0]['errors'], 2) - self.assert_('last_error' in controller.app.container_ring.devs[0]) + node_error_count(controller.app, container_ring.devs[0]), 2) + self.assert_( + node_last_error(controller.app, container_ring.devs[0]) + is not None) for _junk in xrange(self.app.error_suppression_limit): self.assert_status_map(controller.HEAD, (200, 503, 503, 503), 503) - self.assertEquals(controller.app.container_ring.devs[0]['errors'], - self.app.error_suppression_limit + 1) + self.assertEquals( + node_error_count(controller.app, container_ring.devs[0]), + self.app.error_suppression_limit + 1) self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 503) - self.assert_('last_error' in controller.app.container_ring.devs[0]) + self.assert_( + node_last_error(controller.app, container_ring.devs[0]) + is not None) self.assert_status_map(controller.PUT, (200, 201, 201, 201), 503, missing_container=True) self.assert_status_map(controller.DELETE,