From f595a7e70492c2751d8c6f5ab60b5512e63281cf Mon Sep 17 00:00:00 2001 From: Matthew Oliver <matt@oliver.net.au> Date: Fri, 29 Aug 2014 17:14:46 +1000 Subject: [PATCH] Add concurrent reads option to proxy This change adds 2 new parameters to enable and control concurrent GETs in swift, these are 'concurrent_gets' and 'concurrency_timeout'. 'concurrent_gets' allows you to turn on or off concurrent GETs, when on it will set the GET/HEAD concurrency to replica count. And in the case of EC HEADs it will set it to ndata. The proxy will then serve only the first valid source to respond. This applies to all account, container and object GETs except for EC. For EC only HEAD requests are effected. It achieves this by changing the request sending mechanism to using GreenAsyncPile and green threads with a time out between each request. 'concurrency_timeout' is related to concurrent_gets. And is the amount of time to wait before firing the next thread. A value of 0 will fire at the same time (fully concurrent), setting another value will stagger the firing allowing you the ability to give a node a shorter chance to respond before firing the next. This value is a float and should be somewhere between 0 and node_timeout. The default is conn_timeout. Meaning by default it will stagger the firing. DocImpact Implements: blueprint concurrent-reads Change-Id: I789d39472ec48b22415ff9d9821b1eefab7da867 --- doc/source/deployment_guide.rst | 30 ++++ etc/proxy-server.conf-sample | 17 ++- swift/common/utils.py | 19 +++ swift/proxy/controllers/account.py | 4 +- swift/proxy/controllers/base.py | 167 +++++++++++++---------- swift/proxy/controllers/container.py | 4 +- swift/proxy/controllers/obj.py | 13 +- swift/proxy/server.py | 4 + test/unit/common/test_utils.py | 31 +++++ test/unit/proxy/controllers/test_base.py | 48 ++++++- test/unit/proxy/controllers/test_obj.py | 8 +- test/unit/proxy/test_server.py | 82 +++++++++++ 12 files changed, 348 insertions(+), 79 deletions(-) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 9ed83e4a30..c879c07db4 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -1367,6 +1367,36 @@ swift_owner_headers <see the sample These are the headers whose headers> up to the auth system in use, but usually indicates administrative responsibilities. +sorting_method shuffle Storage nodes can be chosen at + random (shuffle), by using timing + measurements (timing), or by using + an explicit match (affinity). + Using timing measurements may allow + for lower overall latency, while + using affinity allows for finer + control. In both the timing and + affinity cases, equally-sorting nodes + are still randomly chosen to spread + load. +timing_expiry 300 If the "timing" sorting_method is + used, the timings will only be valid + for the number of seconds configured + by timing_expiry. +concurrent_gets off Use replica count number of + threads concurrently during a + GET/HEAD and return with the + first successful response. In + the EC case, this parameter only + effects an EC HEAD as an EC GET + behaves differently. +concurrency_timeout conn_timeout This parameter controls how long + to wait before firing off the + next concurrent_get thread. A + value of 0 would we fully concurrent + any other number will stagger the + firing of the threads. This number + should be between 0 and node_timeout. + The default is conn_timeout (0.5). ============================ =============== ============================= [tempauth] diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index a06e15a9a6..0314980e5a 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -164,13 +164,28 @@ use = egg:swift#proxy # using affinity allows for finer control. In both the timing and # affinity cases, equally-sorting nodes are still randomly chosen to # spread load. -# The valid values for sorting_method are "affinity", "shuffle", and "timing". +# The valid values for sorting_method are "affinity", "shuffle", or "timing". # sorting_method = shuffle # # If the "timing" sorting_method is used, the timings will only be valid for # the number of seconds configured by timing_expiry. # timing_expiry = 300 # +# By default on a GET/HEAD swift will connect to a storage node one at a time +# in a single thread. There is smarts in the order they are hit however. If you +# turn on concurrent_gets below, then replica count threads will be used. +# With addition of the concurrency_timeout option this will allow swift to send +# out GET/HEAD requests to the storage nodes concurrently and answer with the +# first to respond. With an EC policy the parameter only affects HEAD requests. +# concurrent_gets = off +# +# This parameter controls how long to wait before firing off the next +# concurrent_get thread. A value of 0 would be fully concurrent, any other +# number will stagger the firing of the threads. This number should be +# between 0 and node_timeout. The default is what ever you set for the +# conn_timeout parameter. +# concurrency_timeout = 0.5 +# # Set to the number of nodes to contact for a normal request. You can use # '* replicas' at the end to have it use the number given times the number of # replicas for the ring being used for the request. diff --git a/swift/common/utils.py b/swift/common/utils.py index 9547bf8f6a..e975bf1ad2 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2471,6 +2471,10 @@ class GreenAsyncPile(object): finally: self._inflight -= 1 + @property + def inflight(self): + return self._inflight + def spawn(self, func, *args, **kwargs): """ Spawn a job in a green thread on the pile. @@ -2479,6 +2483,16 @@ class GreenAsyncPile(object): self._inflight += 1 self._pool.spawn(self._run_func, func, args, kwargs) + def waitfirst(self, timeout): + """ + Wait up to timeout seconds for first result to come in. + + :param timeout: seconds to wait for results + :returns: first item to come back, or None + """ + for result in self._wait(timeout, first_n=1): + return result + def waitall(self, timeout): """ Wait timeout seconds for any results to come in. @@ -2486,11 +2500,16 @@ class GreenAsyncPile(object): :param timeout: seconds to wait for results :returns: list of results accrued in that time """ + return self._wait(timeout) + + def _wait(self, timeout, first_n=None): results = [] try: with GreenAsyncPileWaitallTimeout(timeout): while True: results.append(next(self)) + if first_n and len(results) >= first_n: + break except (GreenAsyncPileWaitallTimeout, StopIteration): pass return results diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index 25cbc62187..faf4ccdee6 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -60,10 +60,12 @@ class AccountController(Controller): return resp partition = self.app.account_ring.get_part(self.account_name) + concurrency = self.app.account_ring.replica_count \ + if self.app.concurrent_gets else 1 node_iter = self.app.iter_nodes(self.app.account_ring, partition) resp = self.GETorHEAD_base( req, _('Account'), node_iter, partition, - req.swift_entity_path.rstrip('/')) + req.swift_entity_path.rstrip('/'), concurrency) if resp.status_int == HTTP_NOT_FOUND: if resp.headers.get('X-Account-Status', '').lower() == 'deleted': resp.status = HTTP_GONE diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index f225bba3ad..3bebd7f52b 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -623,7 +623,8 @@ def bytes_to_skip(record_size, range_start): class ResumingGetter(object): def __init__(self, app, req, server_type, node_iter, partition, path, - backend_headers, client_chunk_size=None, newest=None): + backend_headers, concurrency=1, client_chunk_size=None, + newest=None): self.app = app self.node_iter = node_iter self.server_type = server_type @@ -634,6 +635,7 @@ class ResumingGetter(object): self.skip_bytes = 0 self.used_nodes = [] self.used_source_etag = '' + self.concurrency = concurrency # stuff from request self.req_method = req.method @@ -649,6 +651,7 @@ class ResumingGetter(object): self.reasons = [] self.bodies = [] self.source_headers = [] + self.sources = [] # populated from response headers self.start_byte = self.end_byte = self.length = None @@ -971,88 +974,106 @@ class ResumingGetter(object): else: return None + def _make_node_request(self, node, node_timeout, logger_thread_locals): + self.app.logger.thread_locals = logger_thread_locals + if node in self.used_nodes: + return False + start_node_timing = time.time() + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect( + node['ip'], node['port'], node['device'], + self.partition, self.req_method, self.path, + headers=self.backend_headers, + query_string=self.req_query_string) + self.app.set_node_timing(node, time.time() - start_node_timing) + + with Timeout(node_timeout): + possible_source = conn.getresponse() + # See NOTE: swift_conn at top of file about this. + possible_source.swift_conn = conn + except (Exception, Timeout): + self.app.exception_occurred( + node, self.server_type, + _('Trying to %(method)s %(path)s') % + {'method': self.req_method, 'path': self.req_path}) + return False + if self.is_good_source(possible_source): + # 404 if we know we don't have a synced copy + if not float(possible_source.getheader('X-PUT-Timestamp', 1)): + self.statuses.append(HTTP_NOT_FOUND) + self.reasons.append('') + self.bodies.append('') + self.source_headers.append([]) + close_swift_conn(possible_source) + else: + if self.used_source_etag: + src_headers = dict( + (k.lower(), v) for k, v in + possible_source.getheaders()) + + if self.used_source_etag != src_headers.get( + 'x-object-sysmeta-ec-etag', + src_headers.get('etag', '')).strip('"'): + self.statuses.append(HTTP_NOT_FOUND) + self.reasons.append('') + self.bodies.append('') + self.source_headers.append([]) + return False + + self.statuses.append(possible_source.status) + self.reasons.append(possible_source.reason) + self.bodies.append(None) + self.source_headers.append(possible_source.getheaders()) + self.sources.append((possible_source, node)) + if not self.newest: # one good source is enough + return True + else: + self.statuses.append(possible_source.status) + self.reasons.append(possible_source.reason) + self.bodies.append(possible_source.read()) + self.source_headers.append(possible_source.getheaders()) + if possible_source.status == HTTP_INSUFFICIENT_STORAGE: + self.app.error_limit(node, _('ERROR Insufficient Storage')) + elif is_server_error(possible_source.status): + self.app.error_occurred( + node, _('ERROR %(status)d %(body)s ' + 'From %(type)s Server') % + {'status': possible_source.status, + 'body': self.bodies[-1][:1024], + 'type': self.server_type}) + return False + def _get_source_and_node(self): self.statuses = [] self.reasons = [] self.bodies = [] self.source_headers = [] - sources = [] + self.sources = [] + + nodes = GreenthreadSafeIterator(self.node_iter) node_timeout = self.app.node_timeout if self.server_type == 'Object' and not self.newest: node_timeout = self.app.recoverable_node_timeout - for node in self.node_iter: - if node in self.used_nodes: - continue - start_node_timing = time.time() - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect( - node['ip'], node['port'], node['device'], - self.partition, self.req_method, self.path, - headers=self.backend_headers, - query_string=self.req_query_string) - self.app.set_node_timing(node, time.time() - start_node_timing) - with Timeout(node_timeout): - possible_source = conn.getresponse() - # See NOTE: swift_conn at top of file about this. - possible_source.swift_conn = conn - except (Exception, Timeout): - self.app.exception_occurred( - node, self.server_type, - _('Trying to %(method)s %(path)s') % - {'method': self.req_method, 'path': self.req_path}) - continue - if self.is_good_source(possible_source): - # 404 if we know we don't have a synced copy - if not float(possible_source.getheader('X-PUT-Timestamp', 1)): - self.statuses.append(HTTP_NOT_FOUND) - self.reasons.append('') - self.bodies.append('') - self.source_headers.append([]) - close_swift_conn(possible_source) - else: - if self.used_source_etag: - src_headers = dict( - (k.lower(), v) for k, v in - possible_source.getheaders()) + pile = GreenAsyncPile(self.concurrency) - if self.used_source_etag != src_headers.get( - 'x-object-sysmeta-ec-etag', - src_headers.get('etag', '')).strip('"'): - self.statuses.append(HTTP_NOT_FOUND) - self.reasons.append('') - self.bodies.append('') - self.source_headers.append([]) - continue + for node in nodes: + pile.spawn(self._make_node_request, node, node_timeout, + self.app.logger.thread_locals) + _timeout = self.app.concurrency_timeout \ + if pile.inflight < self.concurrency else None + if pile.waitfirst(_timeout): + break + else: + # ran out of nodes, see if any stragglers will finish + any(pile) - self.statuses.append(possible_source.status) - self.reasons.append(possible_source.reason) - self.bodies.append(None) - self.source_headers.append(possible_source.getheaders()) - sources.append((possible_source, node)) - if not self.newest: # one good source is enough - break - else: - self.statuses.append(possible_source.status) - self.reasons.append(possible_source.reason) - self.bodies.append(possible_source.read()) - self.source_headers.append(possible_source.getheaders()) - if possible_source.status == HTTP_INSUFFICIENT_STORAGE: - self.app.error_limit(node, _('ERROR Insufficient Storage')) - elif is_server_error(possible_source.status): - self.app.error_occurred( - node, _('ERROR %(status)d %(body)s ' - 'From %(type)s Server') % - {'status': possible_source.status, - 'body': self.bodies[-1][:1024], - 'type': self.server_type}) - - if sources: - sources.sort(key=lambda s: source_key(s[0])) - source, node = sources.pop() - for src, _junk in sources: + if self.sources: + self.sources.sort(key=lambda s: source_key(s[0])) + source, node = self.sources.pop() + for src, _junk in self.sources: close_swift_conn(src) self.used_nodes.append(node) src_headers = dict( @@ -1613,7 +1634,7 @@ class Controller(object): self.app.logger.warning('Could not autocreate account %r' % path) def GETorHEAD_base(self, req, server_type, node_iter, partition, path, - client_chunk_size=None): + concurrency=1, client_chunk_size=None): """ Base handler for HTTP GET or HEAD requests. @@ -1622,6 +1643,7 @@ class Controller(object): :param node_iter: an iterator to obtain nodes from :param partition: partition :param path: path for the request + :param concurrency: number of requests to run concurrently :param client_chunk_size: chunk size for response body iterator :returns: swob.Response object """ @@ -1630,6 +1652,7 @@ class Controller(object): handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter, partition, path, backend_headers, + concurrency, client_chunk_size=client_chunk_size) res = handler.get_working_response(req) diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index d5e52618c2..08a51f10d6 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -93,10 +93,12 @@ class ContainerController(Controller): return HTTPNotFound(request=req) part = self.app.container_ring.get_part( self.account_name, self.container_name) + concurrency = self.app.container_ring.replica_count \ + if self.app.concurrent_gets else 1 node_iter = self.app.iter_nodes(self.app.container_ring, part) resp = self.GETorHEAD_base( req, _('Container'), node_iter, part, - req.swift_entity_path) + req.swift_entity_path, concurrency) if 'swift.authorize' in req.environ: req.acl = resp.headers.get('x-container-read') aresp = req.environ['swift.authorize'](req) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index dea29eab3a..fadca564f5 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -879,9 +879,11 @@ class BaseObjectController(Controller): class ReplicatedObjectController(BaseObjectController): def _get_or_head_response(self, req, node_iter, partition, policy): + concurrency = self.app.get_object_ring(policy.idx).replica_count \ + if self.app.concurrent_gets else 1 resp = self.GETorHEAD_base( req, _('Object'), node_iter, partition, - req.swift_entity_path) + req.swift_entity_path, concurrency) return resp def _connect_put_node(self, nodes, part, path, headers, @@ -2000,9 +2002,10 @@ class ECObjectController(BaseObjectController): # no fancy EC decoding here, just one plain old HEAD request to # one object server because all fragments hold all metadata # information about the object. + concurrency = policy.ec_ndata if self.app.concurrent_gets else 1 resp = self.GETorHEAD_base( req, _('Object'), node_iter, partition, - req.swift_entity_path) + req.swift_entity_path, concurrency) else: # GET request orig_range = None range_specs = [] @@ -2011,6 +2014,12 @@ class ECObjectController(BaseObjectController): range_specs = self._convert_range(req, policy) safe_iter = GreenthreadSafeIterator(node_iter) + # Sending the request concurrently to all nodes, and responding + # with the first response isn't something useful for EC as all + # nodes contain different fragments. Also EC has implemented it's + # own specific implementation of concurrent gets to ec_ndata nodes. + # So we don't need to worry about plumbing and sending a + # concurrency value to ResumingGetter. with ContextPool(policy.ec_ndata) as pool: pile = GreenAsyncPile(pool) for _junk in range(policy.ec_ndata): diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 1f23e9bb20..f8f4296a25 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -147,6 +147,10 @@ class Application(object): self.node_timings = {} self.timing_expiry = int(conf.get('timing_expiry', 300)) self.sorting_method = conf.get('sorting_method', 'shuffle').lower() + self.concurrent_gets = \ + config_true_value(conf.get('concurrent_gets')) + self.concurrency_timeout = float(conf.get('concurrency_timeout', + self.conn_timeout)) value = conf.get('request_node_count', '2 * replicas').lower().split() if len(value) == 1: rnc_value = int(value[0]) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 3ebc8f6dc4..14e3aa8696 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -4988,6 +4988,37 @@ class TestGreenAsyncPile(unittest.TestCase): self.assertEqual(pile.waitall(0.5), [0.1, 0.1]) self.assertEqual(completed[0], 2) + def test_waitfirst_only_returns_first(self): + def run_test(name): + eventlet.sleep(0) + completed.append(name) + return name + + completed = [] + pile = utils.GreenAsyncPile(3) + pile.spawn(run_test, 'first') + pile.spawn(run_test, 'second') + pile.spawn(run_test, 'third') + self.assertEqual(pile.waitfirst(0.5), completed[0]) + # 3 still completed, but only the first was returned. + self.assertEqual(3, len(completed)) + + def test_wait_with_firstn(self): + def run_test(name): + eventlet.sleep(0) + completed.append(name) + return name + + for first_n in [None] + list(range(6)): + completed = [] + pile = utils.GreenAsyncPile(10) + for i in range(10): + pile.spawn(run_test, i) + actual = pile._wait(1, first_n) + expected_n = first_n if first_n else 10 + self.assertEqual(completed[:expected_n], actual) + self.assertEqual(10, len(completed)) + def test_pending(self): pile = utils.GreenAsyncPile(3) self.assertEqual(0, pile._pending) diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 4bc8991d04..330250e2c9 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -28,7 +28,7 @@ from swift.common import exceptions from swift.common.utils import split_path from swift.common.header_key_dict import HeaderKeyDict from swift.common.http import is_success -from swift.common.storage_policy import StoragePolicy +from swift.common.storage_policy import StoragePolicy, POLICIES from test.unit import fake_http_connect, FakeRing, FakeMemcache from swift.proxy import server as proxy_server from swift.common.request_helpers import get_sys_meta_prefix @@ -193,6 +193,52 @@ class TestFuncs(unittest.TestCase): self.assertTrue('swift.account/a' in resp.environ) self.assertEqual(resp.environ['swift.account/a']['status'], 200) + # Run the above tests again, but this time with concurrent_reads + # turned on + policy = next(iter(POLICIES)) + concurrent_get_threads = policy.object_ring.replica_count + for concurrency_timeout in (0, 2): + self.app.concurrency_timeout = concurrency_timeout + req = Request.blank('/v1/a/c/o/with/slashes') + # NOTE: We are using slow_connect of fake_http_connect as using + # a concurrency of 0 when mocking the connection is a little too + # fast for eventlet. Network i/o will make this fine, but mocking + # it seems is too instantaneous. + with patch('swift.proxy.controllers.base.http_connect', + fake_http_connect(200, slow_connect=True)): + resp = base.GETorHEAD_base( + req, 'object', iter(nodes), 'part', '/a/c/o/with/slashes', + concurrency=concurrent_get_threads) + self.assertTrue('swift.object/a/c/o/with/slashes' in resp.environ) + self.assertEqual( + resp.environ['swift.object/a/c/o/with/slashes']['status'], 200) + req = Request.blank('/v1/a/c/o') + with patch('swift.proxy.controllers.base.http_connect', + fake_http_connect(200, slow_connect=True)): + resp = base.GETorHEAD_base( + req, 'object', iter(nodes), 'part', '/a/c/o', + concurrency=concurrent_get_threads) + self.assertTrue('swift.object/a/c/o' in resp.environ) + self.assertEqual(resp.environ['swift.object/a/c/o']['status'], 200) + req = Request.blank('/v1/a/c') + with patch('swift.proxy.controllers.base.http_connect', + fake_http_connect(200, slow_connect=True)): + resp = base.GETorHEAD_base( + req, 'container', iter(nodes), 'part', '/a/c', + concurrency=concurrent_get_threads) + self.assertTrue('swift.container/a/c' in resp.environ) + self.assertEqual(resp.environ['swift.container/a/c']['status'], + 200) + + req = Request.blank('/v1/a') + with patch('swift.proxy.controllers.base.http_connect', + fake_http_connect(200, slow_connect=True)): + resp = base.GETorHEAD_base( + req, 'account', iter(nodes), 'part', '/a', + concurrency=concurrent_get_threads) + self.assertTrue('swift.account/a' in resp.environ) + self.assertEqual(resp.environ['swift.account/a']['status'], 200) + def test_get_info(self): app = FakeApp() # Do a non cached call to account diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 08a0be9e98..41e180eadc 100755 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -722,9 +722,15 @@ class TestReplicatedObjController(BaseObjectControllerMixin, def test_GET_error(self): req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(503, 200): + self.app.logger.txn_id = req.environ['swift.trans_id'] = 'my-txn-id' + stdout = BytesIO() + with set_http_connect(503, 200), \ + mock.patch('sys.stdout', stdout): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 200) + for line in stdout.getvalue().splitlines(): + self.assertIn('my-txn-id', line) + self.assertIn('From Object Server', stdout.getvalue()) def test_GET_handoff(self): req = swift.common.swob.Request.blank('/v1/a/c/o') diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 8aba81ffb1..d9cebdc8c2 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -928,6 +928,88 @@ class TestProxyServer(unittest.TestCase): {'region': 2, 'zone': 1, 'ip': '127.0.0.1'}] self.assertEqual(exp_sorted, app_sorted) + def test_node_concurrency(self): + nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1', 'port': 6010, + 'device': 'sda'}, + {'region': 2, 'zone': 2, 'ip': '127.0.0.2', 'port': 6010, + 'device': 'sda'}, + {'region': 3, 'zone': 3, 'ip': '127.0.0.3', 'port': 6010, + 'device': 'sda'}] + timings = {'127.0.0.1': 2, '127.0.0.2': 1, '127.0.0.3': 0} + statuses = {'127.0.0.1': 200, '127.0.0.2': 200, '127.0.0.3': 200} + req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'GET'}) + + def fake_iter_nodes(*arg, **karg): + return iter(nodes) + + class FakeConn(object): + def __init__(self, ip, *args, **kargs): + self.ip = ip + self.args = args + self.kargs = kargs + + def getresponse(self): + def mygetheader(header, *args, **kargs): + if header == "Content-Type": + return "" + else: + return 1 + + resp = mock.Mock() + resp.read.side_effect = ['Response from %s' % self.ip, ''] + resp.getheader = mygetheader + resp.getheaders.return_value = {} + resp.reason = '' + resp.status = statuses[self.ip] + sleep(timings[self.ip]) + return resp + + def myfake_http_connect_raw(ip, *args, **kargs): + conn = FakeConn(ip, *args, **kargs) + return conn + + with mock.patch('swift.proxy.server.Application.iter_nodes', + fake_iter_nodes): + with mock.patch('swift.common.bufferedhttp.http_connect_raw', + myfake_http_connect_raw): + app_conf = {'concurrent_gets': 'on', + 'concurrency_timeout': 0} + baseapp = proxy_server.Application(app_conf, + FakeMemcache(), + container_ring=FakeRing(), + account_ring=FakeRing()) + self.assertEqual(baseapp.concurrent_gets, True) + self.assertEqual(baseapp.concurrency_timeout, 0) + baseapp.update_request(req) + resp = baseapp.handle_request(req) + + # Should get 127.0.0.3 as this has a wait of 0 seconds. + self.assertEqual(resp.body, 'Response from 127.0.0.3') + + # lets try again, with 127.0.0.1 with 0 timing but returns an + # error. + timings['127.0.0.1'] = 0 + statuses['127.0.0.1'] = 500 + + # Should still get 127.0.0.3 as this has a wait of 0 seconds + # and a success + baseapp.update_request(req) + resp = baseapp.handle_request(req) + self.assertEqual(resp.body, 'Response from 127.0.0.3') + + # Now lets set the concurrency_timeout + app_conf['concurrency_timeout'] = 2 + baseapp = proxy_server.Application(app_conf, + FakeMemcache(), + container_ring=FakeRing(), + account_ring=FakeRing()) + self.assertEqual(baseapp.concurrency_timeout, 2) + baseapp.update_request(req) + resp = baseapp.handle_request(req) + + # Should get 127.0.0.2 as this has a wait of 1 seconds. + self.assertEqual(resp.body, 'Response from 127.0.0.2') + def test_info_defaults(self): app = proxy_server.Application({}, FakeMemcache(), account_ring=FakeRing(),