Merge "Refactor obj.server.ObjectController.PUT"

This commit is contained in:
Zuul 2018-10-11 02:33:06 +00:00 committed by Gerrit Code Review
commit 97f1c3be00
2 changed files with 232 additions and 153 deletions

View File

@ -529,7 +529,12 @@ class ObjectController(BaseStorageServer):
raise HTTPRequestTimeout()
except StopIteration:
raise HTTPBadRequest(body="couldn't find footer MIME doc")
return self._parse_footer(footer_hdrs, footer_iter)
def _parse_footer(self, footer_hdrs, footer_iter):
"""
Validate footer metadata and translate JSON body into HeaderKeyDict.
"""
timeout_reader = self._make_timeout_reader(footer_iter)
try:
footer_body = ''.join(iter(timeout_reader, ''))
@ -729,23 +734,17 @@ class ObjectController(BaseStorageServer):
return HTTPAccepted(request=request, headers=resp_headers)
@public
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
def _pre_create_checks(self, request, device, partition,
account, container, obj, policy):
req_timestamp = valid_timestamp(request)
error_response = check_object_creation(request, obj)
if error_response:
return error_response
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
raise error_response
try:
fsize = request.message_length()
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
raise HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
# In case of multipart-MIME put, the proxy sends a chunked request,
# but may let us know the real content length so we can verify that
# we have enough disk space to hold the object.
@ -755,8 +754,8 @@ class ObjectController(BaseStorageServer):
try:
fsize = int(fsize)
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
raise HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
# SSYNC will include Frag-Index header for subrequests to primary
# nodes; handoff nodes should 409 subrequests to over-write an
# existing data fragment until they offloaded the existing fragment
@ -768,163 +767,202 @@ class ObjectController(BaseStorageServer):
policy=policy, frag_index=frag_index,
next_part_power=next_part_power)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
raise HTTPInsufficientStorage(drive=device, request=request)
try:
orig_metadata = disk_file.read_metadata(current_time=req_timestamp)
orig_timestamp = disk_file.data_timestamp
except DiskFileXattrNotSupported:
return HTTPInsufficientStorage(drive=device, request=request)
raise HTTPInsufficientStorage(drive=device, request=request)
except DiskFileDeleted as e:
orig_metadata = {}
orig_timestamp = e.timestamp
except (DiskFileNotExist, DiskFileQuarantined):
orig_metadata = {}
orig_timestamp = Timestamp(0)
# Checks for If-None-Match
if request.if_none_match is not None and orig_metadata:
if '*' in request.if_none_match:
# File exists already so return 412
return HTTPPreconditionFailed(request=request)
raise HTTPPreconditionFailed(request=request)
if orig_metadata.get('ETag') in request.if_none_match:
# The current ETag matches, so return 412
return HTTPPreconditionFailed(request=request)
# The current ETag matches, so raise 412
raise HTTPPreconditionFailed(request=request)
if orig_timestamp >= req_timestamp:
return HTTPConflict(
raise HTTPConflict(
request=request,
headers={'X-Backend-Timestamp': orig_timestamp.internal})
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
upload_expiration = time.time() + self.max_upload_time
return disk_file, fsize, orig_metadata
def _do_multi_stage_mime_continue_headers(self, request, obj_input):
"""
If the proxy wants to send us object metadata after the object body, it
sets some headers. We have to tell the proxy, in the 100 Continue
response, that we're able to parse a multipart MIME document and
extract the object and metadata from it. If we don't, then the proxy
won't actually send the footer metadata.
If the proxy doesn't want to do any of that, this is the identity
function for obj_input and multi_stage_mime_state will be False-y.
:returns: a tuple, (obj_input, multi_stage_mime_state)
"""
have_metadata_footer = False
use_multiphase_commit = False
hundred_continue_headers = []
if config_true_value(
request.headers.get(
'X-Backend-Obj-Multiphase-Commit')):
use_multiphase_commit = True
hundred_continue_headers.append(
('X-Obj-Multiphase-Commit', 'yes'))
if config_true_value(
request.headers.get('X-Backend-Obj-Metadata-Footer')):
have_metadata_footer = True
hundred_continue_headers.append(
('X-Obj-Metadata-Footer', 'yes'))
if have_metadata_footer or use_multiphase_commit:
obj_input.set_hundred_continue_response_headers(
hundred_continue_headers)
mime_boundary = request.headers.get(
'X-Backend-Obj-Multipart-Mime-Boundary')
if not mime_boundary:
raise HTTPBadRequest("no MIME boundary")
with ChunkReadTimeout(self.client_timeout):
mime_documents_iter = iter_mime_headers_and_bodies(
request.environ['wsgi.input'],
mime_boundary, self.network_chunk_size)
_junk_hdrs, obj_input = next(mime_documents_iter)
multi_stage_mime_state = {
'have_metadata_footer': have_metadata_footer,
'use_multiphase_commit': use_multiphase_commit,
'mime_documents_iter': mime_documents_iter,
}
else:
multi_stage_mime_state = {}
return obj_input, multi_stage_mime_state
def _stage_obj_data(self, request, device, obj_input, writer, fsize):
"""
Feed the object_input into the writer.
:returns: a tuple, (upload_size, etag)
"""
writer.open()
elapsed_time = 0
upload_expiration = time.time() + self.max_upload_time
timeout_reader = self._make_timeout_reader(obj_input)
for chunk in iter(timeout_reader, ''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
raise HTTPRequestTimeout(request=request)
writer.write(chunk)
elapsed_time += time.time() - start_time
upload_size, etag = writer.chunks_finished()
if fsize is not None and fsize != upload_size:
raise HTTPClientDisconnect(request=request)
if upload_size:
self.logger.transfer_rate(
'PUT.' + device + '.timing', elapsed_time,
upload_size)
return upload_size, etag
def _get_request_metadata(self, request, upload_size, etag):
"""
Pull object metadata off the request.
:returns: metadata, a dict of object metadata
"""
metadata = {
'X-Timestamp': request.timestamp.internal,
'Content-Type': request.headers['content-type'],
'Content-Length': str(upload_size),
'ETag': etag,
}
metadata.update(val for val in request.headers.items()
if (is_sys_or_user_meta('object', val[0]) or
is_object_transient_sysmeta(val[0])))
headers_to_copy = (
request.headers.get(
'X-Backend-Replication-Headers', '').split() +
list(self.allowed_headers))
for header_key in headers_to_copy:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
return metadata
def _read_mime_footers_metadata(self, have_metadata_footer,
mime_documents_iter, **kwargs):
"""
Read footer metadata from the bottom of the multi-stage MIME body.
:returns: metadata, a dict
"""
if have_metadata_footer:
metadata = self._read_metadata_footer(
mime_documents_iter)
footer_etag = metadata.pop('etag', '').lower()
if footer_etag:
metadata['ETag'] = footer_etag
else:
metadata = {}
return metadata
def _apply_extra_metadata(self, request, metadata, footers_metadata):
"""
Apply extra metadata precedence to prepare metadata for storage.
"""
metadata.update(val for val in footers_metadata.items()
if (is_sys_or_user_meta('object', val[0]) or
is_object_transient_sysmeta(val[0])))
# N.B. footers_metadata is a HeaderKeyDict
received_etag = footers_metadata.get('etag', request.headers.get(
'etag', '')).strip('"')
if received_etag and received_etag != metadata['ETag']:
raise HTTPUnprocessableEntity(request=request)
def _send_multi_stage_continue_headers(self, request,
use_multiphase_commit,
mime_documents_iter, **kwargs):
"""
If the PUT requires a two-phase commit (a data and a commit phase) send
the proxy server another 100-continue response to indicate that we are
finished writing object data
"""
if use_multiphase_commit:
request.environ['wsgi.input'].\
send_hundred_continue_response()
if not self._read_put_commit_message(mime_documents_iter):
raise HTTPServerError(request=request)
def _drain_mime_request(self, mime_documents_iter, **kwargs):
"""
Drain any remaining MIME docs from the socket. There shouldn't be any,
but we must read the whole request body.
"""
try:
with disk_file.create(size=fsize) as writer:
# If the proxy wants to send us object metadata after the
# object body, it sets some headers. We have to tell the
# proxy, in the 100 Continue response, that we're able to
# parse a multipart MIME document and extract the object and
# metadata from it. If we don't, then the proxy won't
# actually send the footer metadata.
have_metadata_footer = False
use_multiphase_commit = False
mime_documents_iter = iter([])
obj_input = request.environ['wsgi.input']
while True:
with ChunkReadTimeout(self.client_timeout):
_junk_hdrs, _junk_body = next(mime_documents_iter)
drain(_junk_body, self.network_chunk_size,
self.client_timeout)
except ChunkReadError:
raise HTTPClientDisconnect()
except ChunkReadTimeout:
raise HTTPRequestTimeout()
except StopIteration:
pass
hundred_continue_headers = []
if config_true_value(
request.headers.get(
'X-Backend-Obj-Multiphase-Commit')):
use_multiphase_commit = True
hundred_continue_headers.append(
('X-Obj-Multiphase-Commit', 'yes'))
if config_true_value(
request.headers.get('X-Backend-Obj-Metadata-Footer')):
have_metadata_footer = True
hundred_continue_headers.append(
('X-Obj-Metadata-Footer', 'yes'))
if have_metadata_footer or use_multiphase_commit:
obj_input.set_hundred_continue_response_headers(
hundred_continue_headers)
mime_boundary = request.headers.get(
'X-Backend-Obj-Multipart-Mime-Boundary')
if not mime_boundary:
return HTTPBadRequest("no MIME boundary")
try:
with ChunkReadTimeout(self.client_timeout):
mime_documents_iter = iter_mime_headers_and_bodies(
request.environ['wsgi.input'],
mime_boundary, self.network_chunk_size)
_junk_hdrs, obj_input = next(mime_documents_iter)
except ChunkReadError:
return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
timeout_reader = self._make_timeout_reader(obj_input)
try:
for chunk in iter(timeout_reader, ''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request)
writer.write(chunk)
elapsed_time += time.time() - start_time
except ChunkReadError:
return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
upload_size, etag = writer.chunks_finished()
if upload_size:
self.logger.transfer_rate(
'PUT.' + device + '.timing', elapsed_time,
upload_size)
if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request)
footer_meta = {}
if have_metadata_footer:
footer_meta = self._read_metadata_footer(
mime_documents_iter)
request_etag = (footer_meta.get('etag') or
request.headers.get('etag', '')).lower()
if request_etag and request_etag != etag:
return HTTPUnprocessableEntity(request=request)
metadata = {
'X-Timestamp': request.timestamp.internal,
'Content-Type': request.headers['content-type'],
'ETag': etag,
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.items()
if (is_sys_or_user_meta('object', val[0]) or
is_object_transient_sysmeta(val[0])))
metadata.update(val for val in footer_meta.items()
if (is_sys_or_user_meta('object', val[0]) or
is_object_transient_sysmeta(val[0])))
headers_to_copy = (
request.headers.get(
'X-Backend-Replication-Headers', '').split() +
list(self.allowed_headers))
for header_key in headers_to_copy:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
writer.put(metadata)
# if the PUT requires a two-phase commit (a data and a commit
# phase) send the proxy server another 100-continue response
# to indicate that we are finished writing object data
if use_multiphase_commit:
request.environ['wsgi.input'].\
send_hundred_continue_response()
if not self._read_put_commit_message(mime_documents_iter):
return HTTPServerError(request=request)
# got 2nd phase confirmation (when required), call commit to
# indicate a successful PUT
writer.commit(request.timestamp)
# Drain any remaining MIME docs from the socket. There
# shouldn't be any, but we must read the whole request body.
try:
while True:
with ChunkReadTimeout(self.client_timeout):
_junk_hdrs, _junk_body = next(mime_documents_iter)
drain(_junk_body, self.network_chunk_size,
self.client_timeout)
except ChunkReadError:
raise HTTPClientDisconnect()
except ChunkReadTimeout:
raise HTTPRequestTimeout()
except StopIteration:
pass
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
def _post_commit_updates(self, request, device,
account, container, obj, policy,
orig_metadata, footers_metadata, metadata):
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update(
@ -934,6 +972,7 @@ class ObjectController(BaseStorageServer):
self.delete_at_update(
'DELETE', orig_delete_at, account, container, obj,
request, device, policy)
update_headers = HeaderKeyDict({
'x-size': metadata['Content-Length'],
'x-content-type': metadata['Content-Type'],
@ -941,11 +980,51 @@ class ObjectController(BaseStorageServer):
'x-etag': metadata['ETag']})
# apply any container update header overrides sent with request
self._check_container_override(update_headers, request.headers,
footer_meta)
footers_metadata)
self.container_update(
'PUT', account, container, obj, request,
update_headers,
device, policy)
update_headers, device, policy)
@public
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
disk_file, fsize, orig_metadata = self._pre_create_checks(
request, device, partition, account, container, obj, policy)
writer = disk_file.writer(size=fsize)
try:
obj_input = request.environ['wsgi.input']
obj_input, multi_stage_mime_state = \
self._do_multi_stage_mime_continue_headers(request, obj_input)
upload_size, etag = self._stage_obj_data(
request, device, obj_input, writer, fsize)
metadata = self._get_request_metadata(request, upload_size, etag)
if multi_stage_mime_state:
footers_metadata = self._read_mime_footers_metadata(
**multi_stage_mime_state)
else:
footers_metadata = {}
self._apply_extra_metadata(request, metadata, footers_metadata)
writer.put(metadata)
if multi_stage_mime_state:
self._send_multi_stage_continue_headers(
request, **multi_stage_mime_state)
writer.commit(request.timestamp)
if multi_stage_mime_state:
self._drain_mime_request(**multi_stage_mime_state)
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
except ChunkReadError:
return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
finally:
writer.close()
self._post_commit_updates(request, device,
account, container, obj, policy,
orig_metadata, footers_metadata, metadata)
return HTTPCreated(request=request, etag=etag)
@public

View File

@ -1705,8 +1705,8 @@ class TestObjectController(unittest.TestCase):
req.headers.pop("Content-Length", None)
resp = req.get_response(self.object_controller)
self.assertEqual(resp.etag, obj_etag)
self.assertEqual(resp.status_int, 201)
self.assertEqual(resp.etag, obj_etag)
objfile = os.path.join(
self.testdir, 'sda1',