Client should retry when there's just one 404 and a bunch of errors
During a rebalance, it's expected that we may get a 404 for data that does exist elsewhere in the cluster. Normally this isn't a problem; the proxy sees the 404, keeps digging, and one of the other primaries will serve the response. Previously, if the other replicas were heavily loaded, the proxy would see a bunch of timeouts and the fresh (empty) primary, treat the 404 as good, and send that on to the client. Now, have the proxy throw out that first 404 (provided it doesn't have a timestamp); it will then return a 503 to the client, indicating that it should try again. Add a new (per-policy) proxy-server config option, rebalance_missing_suppression_count; operators may use this to increase the number of 404-no-timestamp responses to discard if their rebalances are going faster than replication can keep up, or set it to zero to return to the previous behavior. Change-Id: If4bd39788642c00d66579b26144af8f116735b4d
This commit is contained in:
parent
06091172c2
commit
754defc39c
@ -200,6 +200,12 @@ use = egg:swift#proxy
|
||||
# the number of seconds configured by timing_expiry.
|
||||
# timing_expiry = 300
|
||||
#
|
||||
# Normally, you should only be moving one replica's worth of data at a time
|
||||
# when rebalancing. If you're rebalancing more aggressively, increase this
|
||||
# to avoid erroneously returning a 404 when the primary assignments that
|
||||
# *didn't* change get overloaded.
|
||||
# rebalance_missing_suppression_count = 1
|
||||
#
|
||||
# By default on a GET/HEAD swift will connect to a minimum number storage nodes
|
||||
# in a minimum number of threads - for replicated data just a single request to
|
||||
# a single node one at a time. When enabled concurrent_gets allows the proxy
|
||||
@ -307,6 +313,7 @@ use = egg:swift#proxy
|
||||
# write_affinity =
|
||||
# write_affinity_node_count =
|
||||
# write_affinity_handoff_delete_count =
|
||||
# rebalance_missing_suppression_count = 1
|
||||
# concurrent_gets = off
|
||||
# concurrency_timeout = 0.5
|
||||
# concurrent_ec_extra_requests = 0
|
||||
|
@ -873,6 +873,10 @@ class GetOrHeadHandler(object):
|
||||
self.policy = policy
|
||||
self.node = None
|
||||
self.latest_404_timestamp = Timestamp(0)
|
||||
policy_options = self.app.get_policy_options(self.policy)
|
||||
self.rebalance_missing_suppression_count = min(
|
||||
policy_options.rebalance_missing_suppression_count,
|
||||
node_iter.num_primary_nodes - 1)
|
||||
|
||||
# stuff from request
|
||||
self.req_method = req.method
|
||||
@ -1320,6 +1324,13 @@ class GetOrHeadHandler(object):
|
||||
# throw out 5XX and 404s from handoff nodes unless the data is
|
||||
# really on disk and had been DELETEd
|
||||
return False
|
||||
|
||||
if self.rebalance_missing_suppression_count > 0 and \
|
||||
possible_source.status == HTTP_NOT_FOUND and \
|
||||
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
|
||||
self.rebalance_missing_suppression_count -= 1
|
||||
return False
|
||||
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append(possible_source.read())
|
||||
|
@ -2905,8 +2905,9 @@ class ECObjectController(BaseObjectController):
|
||||
|
||||
safe_iter = GreenthreadSafeIterator(node_iter)
|
||||
|
||||
ec_request_count = policy.ec_ndata + self.app.get_policy_options(
|
||||
policy).concurrent_ec_extra_requests
|
||||
policy_options = self.app.get_policy_options(policy)
|
||||
ec_request_count = policy.ec_ndata + \
|
||||
policy_options.concurrent_ec_extra_requests
|
||||
with ContextPool(ec_request_count) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
buckets = ECGetResponseCollection(policy)
|
||||
@ -2998,6 +2999,9 @@ class ECObjectController(BaseObjectController):
|
||||
bodies = []
|
||||
headers = []
|
||||
best_bucket.close_conns()
|
||||
rebalance_missing_suppression_count = min(
|
||||
policy_options.rebalance_missing_suppression_count,
|
||||
node_iter.num_primary_nodes - 1)
|
||||
for status, bad_bucket in buckets.bad_buckets.items():
|
||||
for getter, _parts_iter in bad_bucket.get_responses():
|
||||
if best_bucket.durable:
|
||||
@ -3013,6 +3017,14 @@ class ECObjectController(BaseObjectController):
|
||||
# out there, it's just currently unavailable
|
||||
continue
|
||||
if getter.status:
|
||||
timestamp = Timestamp(getter.last_headers.get(
|
||||
'X-Backend-Timestamp',
|
||||
getter.last_headers.get('X-Timestamp', 0)))
|
||||
if (rebalance_missing_suppression_count > 0 and
|
||||
getter.status == HTTP_NOT_FOUND and
|
||||
not timestamp):
|
||||
rebalance_missing_suppression_count -= 1
|
||||
continue
|
||||
statuses.append(getter.status)
|
||||
reasons.append(getter.reason)
|
||||
bodies.append(getter.body)
|
||||
|
@ -148,6 +148,8 @@ class ProxyOverrideOptions(object):
|
||||
get('write_affinity_handoff_delete_count', 'auto'), None
|
||||
)
|
||||
|
||||
self.rebalance_missing_suppression_count = int(get(
|
||||
'rebalance_missing_suppression_count', 1))
|
||||
self.concurrent_gets = config_true_value(get('concurrent_gets', False))
|
||||
self.concurrency_timeout = float(get(
|
||||
'concurrency_timeout', app.conn_timeout))
|
||||
@ -163,6 +165,7 @@ class ProxyOverrideOptions(object):
|
||||
'write_affinity',
|
||||
'write_affinity_node_count',
|
||||
'write_affinity_handoff_delete_count',
|
||||
'rebalance_missing_suppression_count',
|
||||
'concurrent_gets',
|
||||
'concurrency_timeout',
|
||||
'concurrent_ec_extra_requests',
|
||||
@ -177,6 +180,7 @@ class ProxyOverrideOptions(object):
|
||||
'write_affinity',
|
||||
'write_affinity_node_count',
|
||||
'write_affinity_handoff_delete_count',
|
||||
'rebalance_missing_suppression_count',
|
||||
'concurrent_gets',
|
||||
'concurrency_timeout',
|
||||
'concurrent_ec_extra_requests',
|
||||
|
@ -118,6 +118,7 @@ class TestAccountController(unittest.TestCase):
|
||||
|
||||
def test_get_deleted_account(self):
|
||||
resp_headers = {
|
||||
'x-backend-timestamp': '123.456',
|
||||
'x-account-status': 'deleted',
|
||||
}
|
||||
controller = proxy_server.AccountController(self.app, 'a')
|
||||
@ -415,7 +416,8 @@ class TestGetAccountInfo(unittest.TestCase):
|
||||
account_ring=FakeRing(), container_ring=FakeRing())
|
||||
|
||||
def test_get_deleted_account_410(self):
|
||||
resp_headers = {'x-account-status': 'deleted'}
|
||||
resp_headers = {'x-account-status': 'deleted',
|
||||
'x-backend-timestamp': '123.456'}
|
||||
|
||||
req = Request.blank('/v1/a')
|
||||
with mock.patch('swift.proxy.controllers.base.http_connect',
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from argparse import Namespace
|
||||
import itertools
|
||||
import json
|
||||
from collections import defaultdict
|
||||
@ -876,12 +877,15 @@ class TestFuncs(BaseTest):
|
||||
|
||||
def test_range_fast_forward(self):
|
||||
req = Request.blank('/')
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None, {})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{})
|
||||
handler.fast_forward(50)
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=50-')
|
||||
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None,
|
||||
{'Range': 'bytes=23-50'})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{'Range': 'bytes=23-50'})
|
||||
handler.fast_forward(20)
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=43-50')
|
||||
self.assertRaises(HTTPException,
|
||||
@ -889,13 +893,15 @@ class TestFuncs(BaseTest):
|
||||
self.assertRaises(exceptions.RangeAlreadyComplete,
|
||||
handler.fast_forward, 8)
|
||||
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None,
|
||||
{'Range': 'bytes=23-'})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{'Range': 'bytes=23-'})
|
||||
handler.fast_forward(20)
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=43-')
|
||||
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None,
|
||||
{'Range': 'bytes=-100'})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{'Range': 'bytes=-100'})
|
||||
handler.fast_forward(20)
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=-80')
|
||||
self.assertRaises(HTTPException,
|
||||
@ -903,8 +909,9 @@ class TestFuncs(BaseTest):
|
||||
self.assertRaises(exceptions.RangeAlreadyComplete,
|
||||
handler.fast_forward, 80)
|
||||
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None,
|
||||
{'Range': 'bytes=0-0'})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{'Range': 'bytes=0-0'})
|
||||
self.assertRaises(exceptions.RangeAlreadyComplete,
|
||||
handler.fast_forward, 1)
|
||||
|
||||
@ -915,21 +922,26 @@ class TestFuncs(BaseTest):
|
||||
# bytes of data, so then we get a new node, fast_forward(0), and
|
||||
# send out a new request. That new request must be for all 1000
|
||||
# bytes.
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None, {})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{})
|
||||
handler.learn_size_from_content_range(0, 999, 1000)
|
||||
handler.fast_forward(0)
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=0-999')
|
||||
|
||||
# Same story as above, but a 1-byte object so we can have our byte
|
||||
# indices be 0.
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None, {})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{})
|
||||
handler.learn_size_from_content_range(0, 0, 1)
|
||||
handler.fast_forward(0)
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=0-0')
|
||||
|
||||
# last 100 bytes
|
||||
handler = GetOrHeadHandler(None, req, None, None, None, None,
|
||||
{'Range': 'bytes=-100'})
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{'Range': 'bytes=-100'})
|
||||
handler.learn_size_from_content_range(900, 999, 1000)
|
||||
handler.fast_forward(0)
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=900-999')
|
||||
@ -1013,8 +1025,9 @@ class TestFuncs(BaseTest):
|
||||
b'abcd', b'1234', b'abc', b'd1', b'234abcd1234abcd1', b'2'))
|
||||
req = Request.blank('/v1/a/c/o')
|
||||
node = {}
|
||||
handler = GetOrHeadHandler(self.app, req, None, None, None, None, {},
|
||||
client_chunk_size=8)
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{}, client_chunk_size=8)
|
||||
|
||||
app_iter = handler._make_app_iter(req, node, source)
|
||||
client_chunks = list(app_iter)
|
||||
@ -1057,8 +1070,8 @@ class TestFuncs(BaseTest):
|
||||
source3 = TestSource([b'lots', b'more', b'data'])
|
||||
req = Request.blank('/v1/a/c/o')
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, 'Object', None, None, None, {},
|
||||
client_chunk_size=8)
|
||||
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
|
||||
None, {}, client_chunk_size=8)
|
||||
|
||||
range_headers = []
|
||||
sources = [(source2, node), (source3, node)]
|
||||
@ -1106,8 +1119,8 @@ class TestFuncs(BaseTest):
|
||||
source2 = TestChunkedSource([b'efgh5678'])
|
||||
req = Request.blank('/v1/a/c/o')
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, 'Object', None, None, None, {},
|
||||
client_chunk_size=8)
|
||||
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
|
||||
None, {}, client_chunk_size=8)
|
||||
|
||||
app_iter = handler._make_app_iter(req, node, source1)
|
||||
with mock.patch.object(handler, '_get_source_and_node',
|
||||
@ -1138,7 +1151,8 @@ class TestFuncs(BaseTest):
|
||||
|
||||
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, 'Object', None, None, 'some-path', {})
|
||||
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
|
||||
'some-path', {})
|
||||
app_iter = handler._make_app_iter(req, node, source)
|
||||
app_iter.close()
|
||||
self.app.logger.warning.assert_called_once_with(
|
||||
@ -1147,7 +1161,8 @@ class TestFuncs(BaseTest):
|
||||
self.app.logger = mock.Mock()
|
||||
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, 'Object', None, None, None, {})
|
||||
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
|
||||
None, {})
|
||||
app_iter = handler._make_app_iter(req, node, source)
|
||||
next(app_iter)
|
||||
app_iter.close()
|
||||
|
@ -368,7 +368,8 @@ class TestContainerController(TestRingBase):
|
||||
([Timeout()] * nodes + [404] * handoffs, 503),
|
||||
([Timeout()] * (nodes + handoffs), 503),
|
||||
([Timeout()] * (nodes + handoffs - 1) + [404], 503),
|
||||
([Timeout()] * (nodes - 1) + [404] * (handoffs + 1), 404),
|
||||
([Timeout()] * (nodes - 1) + [404] * (handoffs + 1), 503),
|
||||
([Timeout()] * (nodes - 2) + [404] * (handoffs + 2), 404),
|
||||
([500] * (nodes - 1) + [404] * (handoffs + 1), 503),
|
||||
([503, 200], 200),
|
||||
([507, 200], 200),
|
||||
@ -394,8 +395,14 @@ class TestContainerController(TestRingBase):
|
||||
'\n'.join(failures))
|
||||
|
||||
# One more test, simulating all nodes being error-limited
|
||||
class FakeIter(object):
|
||||
num_primary_nodes = 3
|
||||
|
||||
def __iter__(self):
|
||||
return iter([])
|
||||
|
||||
with mocked_http_conn(), mock.patch.object(self.app, 'iter_nodes',
|
||||
return_value=[]):
|
||||
return_value=FakeIter()):
|
||||
req = Request.blank('/v1/a/c')
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
@ -1600,6 +1600,32 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
def test_GET_primaries_error_during_rebalance(self):
|
||||
def do_test(primary_codes, expected, include_timestamp=False):
|
||||
random.shuffle(primary_codes)
|
||||
handoff_codes = [404] * self.obj_ring.max_more_nodes
|
||||
headers = None
|
||||
if include_timestamp:
|
||||
headers = [{'X-Backend-Timestamp': '123.456'}] * 3
|
||||
headers.extend({} for _ in handoff_codes)
|
||||
with set_http_connect(*primary_codes + handoff_codes,
|
||||
headers=headers):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, expected)
|
||||
|
||||
# with two of out three backend errors a client should retry
|
||||
do_test([Timeout(), Exception('kaboom!'), 404], 503)
|
||||
# unless there's a timestamp associated
|
||||
do_test([Timeout(), Exception('kaboom!'), 404], 404,
|
||||
include_timestamp=True)
|
||||
# when there's more 404s, we trust it more
|
||||
do_test([Timeout(), 404, 404], 404)
|
||||
# unless we explicitly *don't* want to trust it
|
||||
policy_opts = self.app.get_policy_options(None)
|
||||
policy_opts.rebalance_missing_suppression_count = 2
|
||||
do_test([Timeout(), 404, 404], 503)
|
||||
|
||||
def test_GET_primaries_mixed_explode_and_timeout(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
primaries = []
|
||||
@ -2299,6 +2325,31 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
def test_GET_primaries_error_during_rebalance(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
codes = [404] * (2 * self.policy.object_ring.replica_count)
|
||||
with mocked_http_conn(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
for i in range(self.policy.object_ring.replica_count - 2):
|
||||
codes[i] = Timeout()
|
||||
with mocked_http_conn(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.app._error_limiting = {} # Reset error limiting
|
||||
|
||||
# one more timeout is past the tipping point
|
||||
codes[self.policy.object_ring.replica_count - 2] = Timeout()
|
||||
with mocked_http_conn(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
self.app._error_limiting = {} # Reset error limiting
|
||||
|
||||
# unless we have tombstones
|
||||
with mocked_http_conn(*codes, headers={'X-Backend-Timestamp': '1'}):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
def _test_if_match(self, method):
|
||||
num_responses = self.policy.ec_ndata if method == 'GET' else 1
|
||||
|
||||
|
@ -1020,7 +1020,13 @@ class TestProxyServer(unittest.TestCase):
|
||||
req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'GET'})
|
||||
|
||||
def fake_iter_nodes(*arg, **karg):
|
||||
return iter(nodes)
|
||||
class FakeNodeIter(object):
|
||||
num_primary_nodes = 3
|
||||
|
||||
def __iter__(self):
|
||||
return iter(nodes)
|
||||
|
||||
return FakeNodeIter()
|
||||
|
||||
class FakeConn(object):
|
||||
def __init__(self, ip, *args, **kargs):
|
||||
@ -1535,18 +1541,21 @@ class TestProxyServerConfigLoading(unittest.TestCase):
|
||||
write_affinity = r1
|
||||
write_affinity_node_count = 1 * replicas
|
||||
write_affinity_handoff_delete_count = 4
|
||||
rebalance_missing_suppression_count = 2
|
||||
"""
|
||||
expected_default = {"read_affinity": "",
|
||||
"sorting_method": "shuffle",
|
||||
"write_affinity": "",
|
||||
"write_affinity_node_count_fn": 6,
|
||||
"write_affinity_handoff_delete_count": None}
|
||||
"write_affinity_handoff_delete_count": None,
|
||||
"rebalance_missing_suppression_count": 1}
|
||||
exp_options = {None: expected_default,
|
||||
POLICIES[0]: {"read_affinity": "r1=100",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity": "r1",
|
||||
"write_affinity_node_count_fn": 3,
|
||||
"write_affinity_handoff_delete_count": 4},
|
||||
"write_affinity_handoff_delete_count": 4,
|
||||
"rebalance_missing_suppression_count": 2},
|
||||
POLICIES[1]: expected_default}
|
||||
exp_is_local = {POLICIES[0]: [({'region': 1, 'zone': 2}, True),
|
||||
({'region': 2, 'zone': 1}, False)],
|
||||
@ -1560,6 +1569,7 @@ class TestProxyServerConfigLoading(unittest.TestCase):
|
||||
"'read_affinity': '', 'write_affinity': '', "
|
||||
"'write_affinity_node_count': '2 * replicas', "
|
||||
"'write_affinity_handoff_delete_count': None, "
|
||||
"'rebalance_missing_suppression_count': 1, "
|
||||
"'concurrent_gets': False, 'concurrency_timeout': 0.5, "
|
||||
"'concurrent_ec_extra_requests': 0"
|
||||
"}, app)",
|
||||
@ -1573,6 +1583,7 @@ class TestProxyServerConfigLoading(unittest.TestCase):
|
||||
"'read_affinity': 'r1=100', 'write_affinity': 'r1', "
|
||||
"'write_affinity_node_count': '1 * replicas', "
|
||||
"'write_affinity_handoff_delete_count': 4, "
|
||||
"'rebalance_missing_suppression_count': 2, "
|
||||
"'concurrent_gets': False, 'concurrency_timeout': 0.5, "
|
||||
"'concurrent_ec_extra_requests': 0"
|
||||
"}, app)",
|
||||
|
Loading…
x
Reference in New Issue
Block a user