diff --git a/swift/obj/server.py b/swift/obj/server.py index 1d9f7390b5..d8351bb4b9 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -529,7 +529,12 @@ class ObjectController(BaseStorageServer): raise HTTPRequestTimeout() except StopIteration: raise HTTPBadRequest(body="couldn't find footer MIME doc") + return self._parse_footer(footer_hdrs, footer_iter) + def _parse_footer(self, footer_hdrs, footer_iter): + """ + Validate footer metadata and translate JSON body into HeaderKeyDict. + """ timeout_reader = self._make_timeout_reader(footer_iter) try: footer_body = ''.join(iter(timeout_reader, '')) @@ -729,23 +734,17 @@ class ObjectController(BaseStorageServer): return HTTPAccepted(request=request, headers=resp_headers) - @public - @timing_stats() - def PUT(self, request): - """Handle HTTP PUT requests for the Swift Object Server.""" - device, partition, account, container, obj, policy = \ - get_name_and_placement(request, 5, 5, True) + def _pre_create_checks(self, request, device, partition, + account, container, obj, policy): req_timestamp = valid_timestamp(request) error_response = check_object_creation(request, obj) if error_response: - return error_response - new_delete_at = int(request.headers.get('X-Delete-At') or 0) + raise error_response try: fsize = request.message_length() except ValueError as e: - return HTTPBadRequest(body=str(e), request=request, - content_type='text/plain') - + raise HTTPBadRequest(body=str(e), request=request, + content_type='text/plain') # In case of multipart-MIME put, the proxy sends a chunked request, # but may let us know the real content length so we can verify that # we have enough disk space to hold the object. @@ -755,8 +754,8 @@ class ObjectController(BaseStorageServer): try: fsize = int(fsize) except ValueError as e: - return HTTPBadRequest(body=str(e), request=request, - content_type='text/plain') + raise HTTPBadRequest(body=str(e), request=request, + content_type='text/plain') # SSYNC will include Frag-Index header for subrequests to primary # nodes; handoff nodes should 409 subrequests to over-write an # existing data fragment until they offloaded the existing fragment @@ -768,163 +767,202 @@ class ObjectController(BaseStorageServer): policy=policy, frag_index=frag_index, next_part_power=next_part_power) except DiskFileDeviceUnavailable: - return HTTPInsufficientStorage(drive=device, request=request) + raise HTTPInsufficientStorage(drive=device, request=request) try: orig_metadata = disk_file.read_metadata(current_time=req_timestamp) orig_timestamp = disk_file.data_timestamp except DiskFileXattrNotSupported: - return HTTPInsufficientStorage(drive=device, request=request) + raise HTTPInsufficientStorage(drive=device, request=request) except DiskFileDeleted as e: orig_metadata = {} orig_timestamp = e.timestamp except (DiskFileNotExist, DiskFileQuarantined): orig_metadata = {} orig_timestamp = Timestamp(0) - # Checks for If-None-Match if request.if_none_match is not None and orig_metadata: if '*' in request.if_none_match: # File exists already so return 412 - return HTTPPreconditionFailed(request=request) + raise HTTPPreconditionFailed(request=request) if orig_metadata.get('ETag') in request.if_none_match: - # The current ETag matches, so return 412 - return HTTPPreconditionFailed(request=request) - + # The current ETag matches, so raise 412 + raise HTTPPreconditionFailed(request=request) if orig_timestamp >= req_timestamp: - return HTTPConflict( + raise HTTPConflict( request=request, headers={'X-Backend-Timestamp': orig_timestamp.internal}) - orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0) - upload_expiration = time.time() + self.max_upload_time + return disk_file, fsize, orig_metadata + + def _do_multi_stage_mime_continue_headers(self, request, obj_input): + """ + If the proxy wants to send us object metadata after the object body, it + sets some headers. We have to tell the proxy, in the 100 Continue + response, that we're able to parse a multipart MIME document and + extract the object and metadata from it. If we don't, then the proxy + won't actually send the footer metadata. + + If the proxy doesn't want to do any of that, this is the identity + function for obj_input and multi_stage_mime_state will be False-y. + + :returns: a tuple, (obj_input, multi_stage_mime_state) + """ + have_metadata_footer = False + use_multiphase_commit = False + + hundred_continue_headers = [] + if config_true_value( + request.headers.get( + 'X-Backend-Obj-Multiphase-Commit')): + use_multiphase_commit = True + hundred_continue_headers.append( + ('X-Obj-Multiphase-Commit', 'yes')) + + if config_true_value( + request.headers.get('X-Backend-Obj-Metadata-Footer')): + have_metadata_footer = True + hundred_continue_headers.append( + ('X-Obj-Metadata-Footer', 'yes')) + + if have_metadata_footer or use_multiphase_commit: + obj_input.set_hundred_continue_response_headers( + hundred_continue_headers) + mime_boundary = request.headers.get( + 'X-Backend-Obj-Multipart-Mime-Boundary') + if not mime_boundary: + raise HTTPBadRequest("no MIME boundary") + + with ChunkReadTimeout(self.client_timeout): + mime_documents_iter = iter_mime_headers_and_bodies( + request.environ['wsgi.input'], + mime_boundary, self.network_chunk_size) + _junk_hdrs, obj_input = next(mime_documents_iter) + multi_stage_mime_state = { + 'have_metadata_footer': have_metadata_footer, + 'use_multiphase_commit': use_multiphase_commit, + 'mime_documents_iter': mime_documents_iter, + } + else: + multi_stage_mime_state = {} + return obj_input, multi_stage_mime_state + + def _stage_obj_data(self, request, device, obj_input, writer, fsize): + """ + Feed the object_input into the writer. + + :returns: a tuple, (upload_size, etag) + """ + writer.open() elapsed_time = 0 + upload_expiration = time.time() + self.max_upload_time + timeout_reader = self._make_timeout_reader(obj_input) + for chunk in iter(timeout_reader, ''): + start_time = time.time() + if start_time > upload_expiration: + self.logger.increment('PUT.timeouts') + raise HTTPRequestTimeout(request=request) + writer.write(chunk) + elapsed_time += time.time() - start_time + upload_size, etag = writer.chunks_finished() + if fsize is not None and fsize != upload_size: + raise HTTPClientDisconnect(request=request) + if upload_size: + self.logger.transfer_rate( + 'PUT.' + device + '.timing', elapsed_time, + upload_size) + return upload_size, etag + + def _get_request_metadata(self, request, upload_size, etag): + """ + Pull object metadata off the request. + + :returns: metadata, a dict of object metadata + """ + metadata = { + 'X-Timestamp': request.timestamp.internal, + 'Content-Type': request.headers['content-type'], + 'Content-Length': str(upload_size), + 'ETag': etag, + } + metadata.update(val for val in request.headers.items() + if (is_sys_or_user_meta('object', val[0]) or + is_object_transient_sysmeta(val[0]))) + headers_to_copy = ( + request.headers.get( + 'X-Backend-Replication-Headers', '').split() + + list(self.allowed_headers)) + for header_key in headers_to_copy: + if header_key in request.headers: + header_caps = header_key.title() + metadata[header_caps] = request.headers[header_key] + return metadata + + def _read_mime_footers_metadata(self, have_metadata_footer, + mime_documents_iter, **kwargs): + """ + Read footer metadata from the bottom of the multi-stage MIME body. + + :returns: metadata, a dict + """ + if have_metadata_footer: + metadata = self._read_metadata_footer( + mime_documents_iter) + footer_etag = metadata.pop('etag', '').lower() + if footer_etag: + metadata['ETag'] = footer_etag + else: + metadata = {} + return metadata + + def _apply_extra_metadata(self, request, metadata, footers_metadata): + """ + Apply extra metadata precedence to prepare metadata for storage. + """ + metadata.update(val for val in footers_metadata.items() + if (is_sys_or_user_meta('object', val[0]) or + is_object_transient_sysmeta(val[0]))) + # N.B. footers_metadata is a HeaderKeyDict + received_etag = footers_metadata.get('etag', request.headers.get( + 'etag', '')).strip('"') + if received_etag and received_etag != metadata['ETag']: + raise HTTPUnprocessableEntity(request=request) + + def _send_multi_stage_continue_headers(self, request, + use_multiphase_commit, + mime_documents_iter, **kwargs): + """ + If the PUT requires a two-phase commit (a data and a commit phase) send + the proxy server another 100-continue response to indicate that we are + finished writing object data + """ + if use_multiphase_commit: + request.environ['wsgi.input'].\ + send_hundred_continue_response() + if not self._read_put_commit_message(mime_documents_iter): + raise HTTPServerError(request=request) + + def _drain_mime_request(self, mime_documents_iter, **kwargs): + """ + Drain any remaining MIME docs from the socket. There shouldn't be any, + but we must read the whole request body. + """ try: - with disk_file.create(size=fsize) as writer: - # If the proxy wants to send us object metadata after the - # object body, it sets some headers. We have to tell the - # proxy, in the 100 Continue response, that we're able to - # parse a multipart MIME document and extract the object and - # metadata from it. If we don't, then the proxy won't - # actually send the footer metadata. - have_metadata_footer = False - use_multiphase_commit = False - mime_documents_iter = iter([]) - obj_input = request.environ['wsgi.input'] + while True: + with ChunkReadTimeout(self.client_timeout): + _junk_hdrs, _junk_body = next(mime_documents_iter) + drain(_junk_body, self.network_chunk_size, + self.client_timeout) + except ChunkReadError: + raise HTTPClientDisconnect() + except ChunkReadTimeout: + raise HTTPRequestTimeout() + except StopIteration: + pass - hundred_continue_headers = [] - if config_true_value( - request.headers.get( - 'X-Backend-Obj-Multiphase-Commit')): - use_multiphase_commit = True - hundred_continue_headers.append( - ('X-Obj-Multiphase-Commit', 'yes')) - - if config_true_value( - request.headers.get('X-Backend-Obj-Metadata-Footer')): - have_metadata_footer = True - hundred_continue_headers.append( - ('X-Obj-Metadata-Footer', 'yes')) - - if have_metadata_footer or use_multiphase_commit: - obj_input.set_hundred_continue_response_headers( - hundred_continue_headers) - mime_boundary = request.headers.get( - 'X-Backend-Obj-Multipart-Mime-Boundary') - if not mime_boundary: - return HTTPBadRequest("no MIME boundary") - - try: - with ChunkReadTimeout(self.client_timeout): - mime_documents_iter = iter_mime_headers_and_bodies( - request.environ['wsgi.input'], - mime_boundary, self.network_chunk_size) - _junk_hdrs, obj_input = next(mime_documents_iter) - except ChunkReadError: - return HTTPClientDisconnect(request=request) - except ChunkReadTimeout: - return HTTPRequestTimeout(request=request) - - timeout_reader = self._make_timeout_reader(obj_input) - try: - for chunk in iter(timeout_reader, ''): - start_time = time.time() - if start_time > upload_expiration: - self.logger.increment('PUT.timeouts') - return HTTPRequestTimeout(request=request) - writer.write(chunk) - elapsed_time += time.time() - start_time - except ChunkReadError: - return HTTPClientDisconnect(request=request) - except ChunkReadTimeout: - return HTTPRequestTimeout(request=request) - upload_size, etag = writer.chunks_finished() - if upload_size: - self.logger.transfer_rate( - 'PUT.' + device + '.timing', elapsed_time, - upload_size) - if fsize is not None and fsize != upload_size: - return HTTPClientDisconnect(request=request) - - footer_meta = {} - if have_metadata_footer: - footer_meta = self._read_metadata_footer( - mime_documents_iter) - - request_etag = (footer_meta.get('etag') or - request.headers.get('etag', '')).lower() - if request_etag and request_etag != etag: - return HTTPUnprocessableEntity(request=request) - metadata = { - 'X-Timestamp': request.timestamp.internal, - 'Content-Type': request.headers['content-type'], - 'ETag': etag, - 'Content-Length': str(upload_size), - } - metadata.update(val for val in request.headers.items() - if (is_sys_or_user_meta('object', val[0]) or - is_object_transient_sysmeta(val[0]))) - metadata.update(val for val in footer_meta.items() - if (is_sys_or_user_meta('object', val[0]) or - is_object_transient_sysmeta(val[0]))) - headers_to_copy = ( - request.headers.get( - 'X-Backend-Replication-Headers', '').split() + - list(self.allowed_headers)) - for header_key in headers_to_copy: - if header_key in request.headers: - header_caps = header_key.title() - metadata[header_caps] = request.headers[header_key] - writer.put(metadata) - - # if the PUT requires a two-phase commit (a data and a commit - # phase) send the proxy server another 100-continue response - # to indicate that we are finished writing object data - if use_multiphase_commit: - request.environ['wsgi.input'].\ - send_hundred_continue_response() - if not self._read_put_commit_message(mime_documents_iter): - return HTTPServerError(request=request) - - # got 2nd phase confirmation (when required), call commit to - # indicate a successful PUT - writer.commit(request.timestamp) - - # Drain any remaining MIME docs from the socket. There - # shouldn't be any, but we must read the whole request body. - try: - while True: - with ChunkReadTimeout(self.client_timeout): - _junk_hdrs, _junk_body = next(mime_documents_iter) - drain(_junk_body, self.network_chunk_size, - self.client_timeout) - except ChunkReadError: - raise HTTPClientDisconnect() - except ChunkReadTimeout: - raise HTTPRequestTimeout() - except StopIteration: - pass - - except (DiskFileXattrNotSupported, DiskFileNoSpace): - return HTTPInsufficientStorage(drive=device, request=request) + def _post_commit_updates(self, request, device, + account, container, obj, policy, + orig_metadata, footers_metadata, metadata): + orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0) + new_delete_at = int(request.headers.get('X-Delete-At') or 0) if orig_delete_at != new_delete_at: if new_delete_at: self.delete_at_update( @@ -934,6 +972,7 @@ class ObjectController(BaseStorageServer): self.delete_at_update( 'DELETE', orig_delete_at, account, container, obj, request, device, policy) + update_headers = HeaderKeyDict({ 'x-size': metadata['Content-Length'], 'x-content-type': metadata['Content-Type'], @@ -941,11 +980,51 @@ class ObjectController(BaseStorageServer): 'x-etag': metadata['ETag']}) # apply any container update header overrides sent with request self._check_container_override(update_headers, request.headers, - footer_meta) + footers_metadata) self.container_update( 'PUT', account, container, obj, request, - update_headers, - device, policy) + update_headers, device, policy) + + @public + @timing_stats() + def PUT(self, request): + """Handle HTTP PUT requests for the Swift Object Server.""" + device, partition, account, container, obj, policy = \ + get_name_and_placement(request, 5, 5, True) + disk_file, fsize, orig_metadata = self._pre_create_checks( + request, device, partition, account, container, obj, policy) + writer = disk_file.writer(size=fsize) + try: + obj_input = request.environ['wsgi.input'] + obj_input, multi_stage_mime_state = \ + self._do_multi_stage_mime_continue_headers(request, obj_input) + upload_size, etag = self._stage_obj_data( + request, device, obj_input, writer, fsize) + metadata = self._get_request_metadata(request, upload_size, etag) + if multi_stage_mime_state: + footers_metadata = self._read_mime_footers_metadata( + **multi_stage_mime_state) + else: + footers_metadata = {} + self._apply_extra_metadata(request, metadata, footers_metadata) + writer.put(metadata) + if multi_stage_mime_state: + self._send_multi_stage_continue_headers( + request, **multi_stage_mime_state) + writer.commit(request.timestamp) + if multi_stage_mime_state: + self._drain_mime_request(**multi_stage_mime_state) + except (DiskFileXattrNotSupported, DiskFileNoSpace): + return HTTPInsufficientStorage(drive=device, request=request) + except ChunkReadError: + return HTTPClientDisconnect(request=request) + except ChunkReadTimeout: + return HTTPRequestTimeout(request=request) + finally: + writer.close() + self._post_commit_updates(request, device, + account, container, obj, policy, + orig_metadata, footers_metadata, metadata) return HTTPCreated(request=request, etag=etag) @public diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 8f780d1250..a1a98c8caf 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -1705,8 +1705,8 @@ class TestObjectController(unittest.TestCase): req.headers.pop("Content-Length", None) resp = req.get_response(self.object_controller) - self.assertEqual(resp.etag, obj_etag) self.assertEqual(resp.status_int, 201) + self.assertEqual(resp.etag, obj_etag) objfile = os.path.join( self.testdir, 'sda1',