Refactor obj.server.ObjectController.PUT

Change-Id: Iebc5cd4c22db159db3b26685e02b37a028eb2be6
This commit is contained in:
Clay Gerrard 2018-09-12 15:04:14 -05:00
parent 52ecbf9539
commit cbfa585d3b
2 changed files with 232 additions and 153 deletions

View File

@ -529,7 +529,12 @@ class ObjectController(BaseStorageServer):
raise HTTPRequestTimeout() raise HTTPRequestTimeout()
except StopIteration: except StopIteration:
raise HTTPBadRequest(body="couldn't find footer MIME doc") raise HTTPBadRequest(body="couldn't find footer MIME doc")
return self._parse_footer(footer_hdrs, footer_iter)
def _parse_footer(self, footer_hdrs, footer_iter):
"""
Validate footer metadata and translate JSON body into HeaderKeyDict.
"""
timeout_reader = self._make_timeout_reader(footer_iter) timeout_reader = self._make_timeout_reader(footer_iter)
try: try:
footer_body = ''.join(iter(timeout_reader, '')) footer_body = ''.join(iter(timeout_reader, ''))
@ -729,23 +734,17 @@ class ObjectController(BaseStorageServer):
return HTTPAccepted(request=request, headers=resp_headers) return HTTPAccepted(request=request, headers=resp_headers)
@public def _pre_create_checks(self, request, device, partition,
@timing_stats() account, container, obj, policy):
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request) req_timestamp = valid_timestamp(request)
error_response = check_object_creation(request, obj) error_response = check_object_creation(request, obj)
if error_response: if error_response:
return error_response raise error_response
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
try: try:
fsize = request.message_length() fsize = request.message_length()
except ValueError as e: except ValueError as e:
return HTTPBadRequest(body=str(e), request=request, raise HTTPBadRequest(body=str(e), request=request,
content_type='text/plain') content_type='text/plain')
# In case of multipart-MIME put, the proxy sends a chunked request, # In case of multipart-MIME put, the proxy sends a chunked request,
# but may let us know the real content length so we can verify that # but may let us know the real content length so we can verify that
# we have enough disk space to hold the object. # we have enough disk space to hold the object.
@ -755,7 +754,7 @@ class ObjectController(BaseStorageServer):
try: try:
fsize = int(fsize) fsize = int(fsize)
except ValueError as e: except ValueError as e:
return HTTPBadRequest(body=str(e), request=request, raise HTTPBadRequest(body=str(e), request=request,
content_type='text/plain') content_type='text/plain')
# SSYNC will include Frag-Index header for subrequests to primary # SSYNC will include Frag-Index header for subrequests to primary
# nodes; handoff nodes should 409 subrequests to over-write an # nodes; handoff nodes should 409 subrequests to over-write an
@ -768,47 +767,47 @@ class ObjectController(BaseStorageServer):
policy=policy, frag_index=frag_index, policy=policy, frag_index=frag_index,
next_part_power=next_part_power) next_part_power=next_part_power)
except DiskFileDeviceUnavailable: except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request) raise HTTPInsufficientStorage(drive=device, request=request)
try: try:
orig_metadata = disk_file.read_metadata(current_time=req_timestamp) orig_metadata = disk_file.read_metadata(current_time=req_timestamp)
orig_timestamp = disk_file.data_timestamp orig_timestamp = disk_file.data_timestamp
except DiskFileXattrNotSupported: except DiskFileXattrNotSupported:
return HTTPInsufficientStorage(drive=device, request=request) raise HTTPInsufficientStorage(drive=device, request=request)
except DiskFileDeleted as e: except DiskFileDeleted as e:
orig_metadata = {} orig_metadata = {}
orig_timestamp = e.timestamp orig_timestamp = e.timestamp
except (DiskFileNotExist, DiskFileQuarantined): except (DiskFileNotExist, DiskFileQuarantined):
orig_metadata = {} orig_metadata = {}
orig_timestamp = Timestamp(0) orig_timestamp = Timestamp(0)
# Checks for If-None-Match # Checks for If-None-Match
if request.if_none_match is not None and orig_metadata: if request.if_none_match is not None and orig_metadata:
if '*' in request.if_none_match: if '*' in request.if_none_match:
# File exists already so return 412 # File exists already so return 412
return HTTPPreconditionFailed(request=request) raise HTTPPreconditionFailed(request=request)
if orig_metadata.get('ETag') in request.if_none_match: if orig_metadata.get('ETag') in request.if_none_match:
# The current ETag matches, so return 412 # The current ETag matches, so raise 412
return HTTPPreconditionFailed(request=request) raise HTTPPreconditionFailed(request=request)
if orig_timestamp >= req_timestamp: if orig_timestamp >= req_timestamp:
return HTTPConflict( raise HTTPConflict(
request=request, request=request,
headers={'X-Backend-Timestamp': orig_timestamp.internal}) headers={'X-Backend-Timestamp': orig_timestamp.internal})
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0) return disk_file, fsize, orig_metadata
upload_expiration = time.time() + self.max_upload_time
elapsed_time = 0 def _do_multi_stage_mime_continue_headers(self, request, obj_input):
try: """
with disk_file.create(size=fsize) as writer: If the proxy wants to send us object metadata after the object body, it
# If the proxy wants to send us object metadata after the sets some headers. We have to tell the proxy, in the 100 Continue
# object body, it sets some headers. We have to tell the response, that we're able to parse a multipart MIME document and
# proxy, in the 100 Continue response, that we're able to extract the object and metadata from it. If we don't, then the proxy
# parse a multipart MIME document and extract the object and won't actually send the footer metadata.
# metadata from it. If we don't, then the proxy won't
# actually send the footer metadata. If the proxy doesn't want to do any of that, this is the identity
function for obj_input and multi_stage_mime_state will be False-y.
:returns: a tuple, (obj_input, multi_stage_mime_state)
"""
have_metadata_footer = False have_metadata_footer = False
use_multiphase_commit = False use_multiphase_commit = False
mime_documents_iter = iter([])
obj_input = request.environ['wsgi.input']
hundred_continue_headers = [] hundred_continue_headers = []
if config_true_value( if config_true_value(
@ -830,61 +829,63 @@ class ObjectController(BaseStorageServer):
mime_boundary = request.headers.get( mime_boundary = request.headers.get(
'X-Backend-Obj-Multipart-Mime-Boundary') 'X-Backend-Obj-Multipart-Mime-Boundary')
if not mime_boundary: if not mime_boundary:
return HTTPBadRequest("no MIME boundary") raise HTTPBadRequest("no MIME boundary")
try:
with ChunkReadTimeout(self.client_timeout): with ChunkReadTimeout(self.client_timeout):
mime_documents_iter = iter_mime_headers_and_bodies( mime_documents_iter = iter_mime_headers_and_bodies(
request.environ['wsgi.input'], request.environ['wsgi.input'],
mime_boundary, self.network_chunk_size) mime_boundary, self.network_chunk_size)
_junk_hdrs, obj_input = next(mime_documents_iter) _junk_hdrs, obj_input = next(mime_documents_iter)
except ChunkReadError: multi_stage_mime_state = {
return HTTPClientDisconnect(request=request) 'have_metadata_footer': have_metadata_footer,
except ChunkReadTimeout: 'use_multiphase_commit': use_multiphase_commit,
return HTTPRequestTimeout(request=request) 'mime_documents_iter': mime_documents_iter,
}
else:
multi_stage_mime_state = {}
return obj_input, multi_stage_mime_state
def _stage_obj_data(self, request, device, obj_input, writer, fsize):
"""
Feed the object_input into the writer.
:returns: a tuple, (upload_size, etag)
"""
writer.open()
elapsed_time = 0
upload_expiration = time.time() + self.max_upload_time
timeout_reader = self._make_timeout_reader(obj_input) timeout_reader = self._make_timeout_reader(obj_input)
try:
for chunk in iter(timeout_reader, ''): for chunk in iter(timeout_reader, ''):
start_time = time.time() start_time = time.time()
if start_time > upload_expiration: if start_time > upload_expiration:
self.logger.increment('PUT.timeouts') self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request) raise HTTPRequestTimeout(request=request)
writer.write(chunk) writer.write(chunk)
elapsed_time += time.time() - start_time elapsed_time += time.time() - start_time
except ChunkReadError:
return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
upload_size, etag = writer.chunks_finished() upload_size, etag = writer.chunks_finished()
if fsize is not None and fsize != upload_size:
raise HTTPClientDisconnect(request=request)
if upload_size: if upload_size:
self.logger.transfer_rate( self.logger.transfer_rate(
'PUT.' + device + '.timing', elapsed_time, 'PUT.' + device + '.timing', elapsed_time,
upload_size) upload_size)
if fsize is not None and fsize != upload_size: return upload_size, etag
return HTTPClientDisconnect(request=request)
footer_meta = {} def _get_request_metadata(self, request, upload_size, etag):
if have_metadata_footer: """
footer_meta = self._read_metadata_footer( Pull object metadata off the request.
mime_documents_iter)
request_etag = (footer_meta.get('etag') or :returns: metadata, a dict of object metadata
request.headers.get('etag', '')).lower() """
if request_etag and request_etag != etag:
return HTTPUnprocessableEntity(request=request)
metadata = { metadata = {
'X-Timestamp': request.timestamp.internal, 'X-Timestamp': request.timestamp.internal,
'Content-Type': request.headers['content-type'], 'Content-Type': request.headers['content-type'],
'ETag': etag,
'Content-Length': str(upload_size), 'Content-Length': str(upload_size),
'ETag': etag,
} }
metadata.update(val for val in request.headers.items() metadata.update(val for val in request.headers.items()
if (is_sys_or_user_meta('object', val[0]) or if (is_sys_or_user_meta('object', val[0]) or
is_object_transient_sysmeta(val[0]))) is_object_transient_sysmeta(val[0])))
metadata.update(val for val in footer_meta.items()
if (is_sys_or_user_meta('object', val[0]) or
is_object_transient_sysmeta(val[0])))
headers_to_copy = ( headers_to_copy = (
request.headers.get( request.headers.get(
'X-Backend-Replication-Headers', '').split() + 'X-Backend-Replication-Headers', '').split() +
@ -893,23 +894,57 @@ class ObjectController(BaseStorageServer):
if header_key in request.headers: if header_key in request.headers:
header_caps = header_key.title() header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key] metadata[header_caps] = request.headers[header_key]
writer.put(metadata) return metadata
# if the PUT requires a two-phase commit (a data and a commit def _read_mime_footers_metadata(self, have_metadata_footer,
# phase) send the proxy server another 100-continue response mime_documents_iter, **kwargs):
# to indicate that we are finished writing object data """
Read footer metadata from the bottom of the multi-stage MIME body.
:returns: metadata, a dict
"""
if have_metadata_footer:
metadata = self._read_metadata_footer(
mime_documents_iter)
footer_etag = metadata.pop('etag', '').lower()
if footer_etag:
metadata['ETag'] = footer_etag
else:
metadata = {}
return metadata
def _apply_extra_metadata(self, request, metadata, footers_metadata):
"""
Apply extra metadata precedence to prepare metadata for storage.
"""
metadata.update(val for val in footers_metadata.items()
if (is_sys_or_user_meta('object', val[0]) or
is_object_transient_sysmeta(val[0])))
# N.B. footers_metadata is a HeaderKeyDict
received_etag = footers_metadata.get('etag', request.headers.get(
'etag', '')).strip('"')
if received_etag and received_etag != metadata['ETag']:
raise HTTPUnprocessableEntity(request=request)
def _send_multi_stage_continue_headers(self, request,
use_multiphase_commit,
mime_documents_iter, **kwargs):
"""
If the PUT requires a two-phase commit (a data and a commit phase) send
the proxy server another 100-continue response to indicate that we are
finished writing object data
"""
if use_multiphase_commit: if use_multiphase_commit:
request.environ['wsgi.input'].\ request.environ['wsgi.input'].\
send_hundred_continue_response() send_hundred_continue_response()
if not self._read_put_commit_message(mime_documents_iter): if not self._read_put_commit_message(mime_documents_iter):
return HTTPServerError(request=request) raise HTTPServerError(request=request)
# got 2nd phase confirmation (when required), call commit to def _drain_mime_request(self, mime_documents_iter, **kwargs):
# indicate a successful PUT """
writer.commit(request.timestamp) Drain any remaining MIME docs from the socket. There shouldn't be any,
but we must read the whole request body.
# Drain any remaining MIME docs from the socket. There """
# shouldn't be any, but we must read the whole request body.
try: try:
while True: while True:
with ChunkReadTimeout(self.client_timeout): with ChunkReadTimeout(self.client_timeout):
@ -923,8 +958,11 @@ class ObjectController(BaseStorageServer):
except StopIteration: except StopIteration:
pass pass
except (DiskFileXattrNotSupported, DiskFileNoSpace): def _post_commit_updates(self, request, device,
return HTTPInsufficientStorage(drive=device, request=request) account, container, obj, policy,
orig_metadata, footers_metadata, metadata):
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
if orig_delete_at != new_delete_at: if orig_delete_at != new_delete_at:
if new_delete_at: if new_delete_at:
self.delete_at_update( self.delete_at_update(
@ -934,6 +972,7 @@ class ObjectController(BaseStorageServer):
self.delete_at_update( self.delete_at_update(
'DELETE', orig_delete_at, account, container, obj, 'DELETE', orig_delete_at, account, container, obj,
request, device, policy) request, device, policy)
update_headers = HeaderKeyDict({ update_headers = HeaderKeyDict({
'x-size': metadata['Content-Length'], 'x-size': metadata['Content-Length'],
'x-content-type': metadata['Content-Type'], 'x-content-type': metadata['Content-Type'],
@ -941,11 +980,51 @@ class ObjectController(BaseStorageServer):
'x-etag': metadata['ETag']}) 'x-etag': metadata['ETag']})
# apply any container update header overrides sent with request # apply any container update header overrides sent with request
self._check_container_override(update_headers, request.headers, self._check_container_override(update_headers, request.headers,
footer_meta) footers_metadata)
self.container_update( self.container_update(
'PUT', account, container, obj, request, 'PUT', account, container, obj, request,
update_headers, update_headers, device, policy)
device, policy)
@public
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
disk_file, fsize, orig_metadata = self._pre_create_checks(
request, device, partition, account, container, obj, policy)
writer = disk_file.writer(size=fsize)
try:
obj_input = request.environ['wsgi.input']
obj_input, multi_stage_mime_state = \
self._do_multi_stage_mime_continue_headers(request, obj_input)
upload_size, etag = self._stage_obj_data(
request, device, obj_input, writer, fsize)
metadata = self._get_request_metadata(request, upload_size, etag)
if multi_stage_mime_state:
footers_metadata = self._read_mime_footers_metadata(
**multi_stage_mime_state)
else:
footers_metadata = {}
self._apply_extra_metadata(request, metadata, footers_metadata)
writer.put(metadata)
if multi_stage_mime_state:
self._send_multi_stage_continue_headers(
request, **multi_stage_mime_state)
writer.commit(request.timestamp)
if multi_stage_mime_state:
self._drain_mime_request(**multi_stage_mime_state)
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
except ChunkReadError:
return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
finally:
writer.close()
self._post_commit_updates(request, device,
account, container, obj, policy,
orig_metadata, footers_metadata, metadata)
return HTTPCreated(request=request, etag=etag) return HTTPCreated(request=request, etag=etag)
@public @public

View File

@ -1705,8 +1705,8 @@ class TestObjectController(unittest.TestCase):
req.headers.pop("Content-Length", None) req.headers.pop("Content-Length", None)
resp = req.get_response(self.object_controller) resp = req.get_response(self.object_controller)
self.assertEqual(resp.etag, obj_etag)
self.assertEqual(resp.status_int, 201) self.assertEqual(resp.status_int, 201)
self.assertEqual(resp.etag, obj_etag)
objfile = os.path.join( objfile = os.path.join(
self.testdir, 'sda1', self.testdir, 'sda1',