You can specify X-Newest: true on GETs and HEADs to indicate you want Swift to query all backend copies and return the newest version retrieved.

This commit is contained in:
gholt 2011-06-07 23:19:48 +00:00
parent 5d7e099596
commit 6c50e9b3a1
3 changed files with 95 additions and 18 deletions
swift
test/unit/proxy

@ -623,6 +623,7 @@ class ObjectController(object):
file.keep_cache = True
if 'Content-Encoding' in file.metadata:
response.content_encoding = file.metadata['Content-Encoding']
response.headers['X-Timestamp'] = file.metadata['X-Timestamp']
return request.get_response(response)
def HEAD(self, request):
@ -657,6 +658,7 @@ class ObjectController(object):
response.content_length = file_size
if 'Content-Encoding' in file.metadata:
response.content_encoding = file.metadata['Content-Encoding']
response.headers['X-Timestamp'] = file.metadata['X-Timestamp']
return response
def DELETE(self, request):

@ -41,8 +41,8 @@ from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
from webob import Request, Response
from swift.common.ring import Ring
from swift.common.utils import get_logger, normalize_timestamp, split_path, \
cache_from_env, ContextPool
from swift.common.utils import cache_from_env, ContextPool, get_logger, \
normalize_timestamp, split_path, TRUE_VALUES
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
@ -162,6 +162,7 @@ class SegmentedIterable(object):
if self.segment > 10:
sleep(max(self.next_get_time - time.time(), 0))
self.next_get_time = time.time() + 1
shuffle(nodes)
resp = self.controller.GETorHEAD_base(req, _('Object'), partition,
self.controller.iter_nodes(partition, nodes,
self.controller.app.object_ring), path,
@ -594,6 +595,8 @@ class Controller(object):
statuses = []
reasons = []
bodies = []
source = None
newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES
for node in nodes:
if len(statuses) >= attempts:
break
@ -606,23 +609,48 @@ class Controller(object):
headers=req.headers,
query_string=req.query_string)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
possible_source = conn.getresponse()
except (Exception, TimeoutError):
self.exception_occurred(node, server_type,
_('Trying to %(method)s %(path)s') %
{'method': req.method, 'path': req.path})
continue
if source.status == 507:
if possible_source.status == 507:
self.error_limit(node)
continue
if 200 <= source.status <= 399:
if 200 <= possible_source.status <= 399:
# 404 if we know we don't have a synced copy
if not float(source.getheader('X-PUT-Timestamp', '1')):
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
statuses.append(404)
reasons.append('')
bodies.append('')
source.read()
possible_source.read()
continue
if (req.method == 'GET' and
possible_source.status in (200, 206)) or \
200 <= possible_source.status <= 399:
if newest:
ts = 0
if source:
ts = float(source.getheader('x-put-timestamp',
source.getheader('x-timestamp', 0)))
pts = float(possible_source.getheader('x-put-timestamp',
possible_source.getheader('x-timestamp', 0)))
if pts > ts:
source = possible_source
continue
else:
source = possible_source
break
statuses.append(possible_source.status)
reasons.append(possible_source.reason)
bodies.append(possible_source.read())
if possible_source.status >= 500:
self.error_occurred(node, _('ERROR %(status)d %(body)s ' \
'From %(type)s Server') %
{'status': possible_source.status,
'body': bodies[-1][:1024], 'type': server_type})
if source:
if req.method == 'GET' and source.status in (200, 206):
res = Response(request=req, conditional_response=True)
res.bytes_transferred = 0
@ -662,13 +690,6 @@ class Controller(object):
res.charset = None
res.content_type = source.getheader('Content-Type')
return res
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(source.read())
if source.status >= 500:
self.error_occurred(node, _('ERROR %(status)d %(body)s ' \
'From %(type)s Server') % {'status': source.status,
'body': bodies[-1][:1024], 'type': server_type})
return self.best_response(req, statuses, reasons, bodies,
'%s %s' % (server_type, req.method))
@ -723,6 +744,7 @@ class ObjectController(Controller):
lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' %
(quote(self.account_name), quote(lcontainer),
quote(lprefix), quote(marker)))
shuffle(lnodes)
lresp = self.GETorHEAD_base(lreq, _('Container'), lpartition,
lnodes, lreq.path_info,
self.app.container_ring.replica_count)
@ -1174,6 +1196,7 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
part, nodes = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
shuffle(nodes)
resp = self.GETorHEAD_base(req, _('Container'), part, nodes,
req.path_info, self.app.container_ring.replica_count)
@ -1304,6 +1327,7 @@ class AccountController(Controller):
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
partition, nodes = self.app.account_ring.get_nodes(self.account_name)
shuffle(nodes)
return self.GETorHEAD_base(req, _('Account'), partition, nodes,
req.path_info.rstrip('/'), self.app.account_ring.replica_count)

@ -150,7 +150,7 @@ def fake_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status, etag=None, body=''):
def __init__(self, status, etag=None, body='', timestamp='1'):
self.status = status
self.reason = 'Fake'
self.host = '1.2.3.4'
@ -159,6 +159,7 @@ def fake_http_connect(*code_iter, **kwargs):
self.received = 0
self.etag = etag
self.body = body
self.timestamp = timestamp
def getresponse(self):
if kwargs.get('raise_exc'):
@ -173,7 +174,8 @@ def fake_http_connect(*code_iter, **kwargs):
def getheaders(self):
headers = {'content-length': len(self.body),
'content-type': 'x-application/test',
'x-timestamp': '1',
'x-timestamp': self.timestamp,
'last-modified': self.timestamp,
'x-object-meta-test': 'testing',
'etag':
self.etag or '"68b329da9893e34099c7d8ad5cb9c940"',
@ -209,7 +211,8 @@ def fake_http_connect(*code_iter, **kwargs):
def getheader(self, name, default=None):
return dict(self.getheaders()).get(name.lower(), default)
etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter))
timestamps_iter = iter(kwargs.get('timestamps') or [None] * len(code_iter))
etag_iter = iter(kwargs.get('etags') or ['1'] * len(code_iter))
x = kwargs.get('missing_container', [False] * len(code_iter))
if not isinstance(x, (tuple, list)):
x = [x] * len(code_iter)
@ -226,9 +229,11 @@ def fake_http_connect(*code_iter, **kwargs):
kwargs['give_connect'](*args, **ckwargs)
status = code_iter.next()
etag = etag_iter.next()
timestamp = timestamps_iter.next()
if status == -1:
raise HTTPException()
return FakeConn(status, etag, body=kwargs.get('body', ''))
return FakeConn(status, etag, body=kwargs.get('body', ''),
timestamp=timestamp)
return connect
@ -986,6 +991,51 @@ class TestObjectController(unittest.TestCase):
test_status_map((404, 404, 500), 404)
test_status_map((500, 500, 500), 503)
def test_HEAD_newest(self):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected, timestamps,
expected_timestamp):
proxy_server.http_connect = \
fake_http_connect(*statuses, timestamps=timestamps)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {}, headers={'x-newest': 'true'})
self.app.update_request(req)
res = controller.HEAD(req)
self.assertEquals(res.status[:len(str(expected))],
str(expected))
self.assertEquals(res.headers.get('last-modified'),
expected_timestamp)
test_status_map((200, 200, 200), 200, ('1', '2', '3'), '3')
test_status_map((200, 200, 200), 200, ('1', '3', '2'), '3')
test_status_map((200, 200, 200), 200, ('1', '3', '1'), '3')
test_status_map((200, 200, 200), 200, ('3', '3', '1'), '3')
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected, timestamps,
expected_timestamp):
proxy_server.http_connect = \
fake_http_connect(*statuses, timestamps=timestamps)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {})
self.app.update_request(req)
res = controller.HEAD(req)
self.assertEquals(res.status[:len(str(expected))],
str(expected))
self.assertEquals(res.headers.get('last-modified'),
expected_timestamp)
test_status_map((200, 200, 200), 200, ('1', '2', '3'), '1')
test_status_map((200, 200, 200), 200, ('1', '3', '2'), '1')
test_status_map((200, 200, 200), 200, ('1', '3', '1'), '1')
test_status_map((200, 200, 200), 200, ('3', '3', '1'), '3')
def test_POST_meta_val_len(self):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
@ -2772,6 +2822,7 @@ class TestContainerController(unittest.TestCase):
def test_error_limiting(self):
with save_globals():
proxy_server.shuffle = lambda l: None
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200,