Merge "py3: Fix formpost unicode filename issues"

This commit is contained in:
Zuul 2020-01-21 05:21:14 +00:00 committed by Gerrit Code Review
commit b7c1f8d149
2 changed files with 129 additions and 5 deletions

View File

@ -129,13 +129,14 @@ from time import time
import six import six
from six.moves.urllib.parse import quote from six.moves.urllib.parse import quote
from swift.common.constraints import valid_api_version
from swift.common.exceptions import MimeInvalid from swift.common.exceptions import MimeInvalid
from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata
from swift.common.utils import streq_const_time, register_swift_info, \ from swift.common.utils import streq_const_time, register_swift_info, \
parse_content_disposition, parse_mime_headers, \ parse_content_disposition, parse_mime_headers, \
iter_multipart_mime_documents, reiterate, close_if_possible iter_multipart_mime_documents, reiterate, close_if_possible
from swift.common.wsgi import make_pre_authed_env from swift.common.wsgi import make_pre_authed_env
from swift.common.swob import HTTPUnauthorized, wsgi_to_str from swift.common.swob import HTTPUnauthorized, wsgi_to_str, str_to_wsgi
from swift.proxy.controllers.base import get_account_info, get_container_info from swift.proxy.controllers.base import get_account_info, get_container_info
@ -365,7 +366,8 @@ class FormPost(object):
if not subenv['PATH_INFO'].endswith('/') and \ if not subenv['PATH_INFO'].endswith('/') and \
subenv['PATH_INFO'].count('/') < 4: subenv['PATH_INFO'].count('/') < 4:
subenv['PATH_INFO'] += '/' subenv['PATH_INFO'] += '/'
subenv['PATH_INFO'] += attributes['filename'] or 'filename' subenv['PATH_INFO'] += str_to_wsgi(
attributes['filename'] or 'filename')
if 'x_delete_at' in attributes: if 'x_delete_at' in attributes:
try: try:
subenv['HTTP_X_DELETE_AT'] = int(attributes['x_delete_at']) subenv['HTTP_X_DELETE_AT'] = int(attributes['x_delete_at'])
@ -442,8 +444,8 @@ class FormPost(object):
:returns: list of tempurl keys :returns: list of tempurl keys
""" """
parts = env['PATH_INFO'].split('/', 4) parts = env['PATH_INFO'].split('/', 4)
if len(parts) < 4 or parts[0] or parts[1] != 'v1' or not parts[2] or \ if len(parts) < 4 or parts[0] or not valid_api_version(parts[1]) \
not parts[3]: or not parts[2] or not parts[3]:
return [] return []
account_info = get_account_info(env, self.app, swift_source='FP') account_info = get_account_info(env, self.app, swift_source='FP')

View File

@ -21,7 +21,7 @@ from time import time
import six import six
from io import BytesIO from io import BytesIO
from swift.common.swob import Request, Response from swift.common.swob import Request, Response, wsgi_quote
from swift.common.middleware import tempauth, formpost from swift.common.middleware import tempauth, formpost
from swift.common.utils import split_path from swift.common.utils import split_path
from swift.proxy.controllers.base import get_cache_key from swift.proxy.controllers.base import get_cache_key
@ -49,6 +49,8 @@ class FakeApp(object):
self.check_no_query_string = check_no_query_string self.check_no_query_string = check_no_query_string
def __call__(self, env, start_response): def __call__(self, env, start_response):
# use wsgi_quote to spot check that it really *is* a WSGI string
wsgi_quote(env['PATH_INFO'])
try: try:
if self.check_no_query_string and env.get('QUERY_STRING'): if self.check_no_query_string and env.get('QUERY_STRING'):
raise Exception('Query string %s should have been discarded!' % raise Exception('Query string %s should have been discarded!' %
@ -756,6 +758,126 @@ class TestFormPost(unittest.TestCase):
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n') self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n') self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_curl_with_unicode(self):
key = b'abc'
path = u'/v1/AUTH_test/container/let_it_\N{SNOWFLAKE}/'
if six.PY2:
path = path.encode('utf-8')
redirect = 'http://brim.net'
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
wsgi_input = '\r\n'.join([
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="file1"; '
'filename="\xe2\x98\x83.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'',
'Test\nFile\nTwo\n',
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'--------------------------dea19ac8502ca805--',
''
])
if not six.PY2:
wsgi_input = wsgi_input.encode('latin1')
wsgi_input = BytesIO(wsgi_input)
wsgi_errors = six.StringIO()
env = {
'CONTENT_LENGTH': str(len(wsgi_input.getvalue())),
'CONTENT_TYPE': 'multipart/form-data; '
'boundary=------------------------dea19ac8502ca805',
'HTTP_ACCEPT': '*/*',
'HTTP_HOST': 'ubuntu:8080',
'HTTP_USER_AGENT': 'curl/7.58.0',
'PATH_INFO': '/v1/AUTH_test/container/let_it_\xE2\x9D\x84/',
'REMOTE_ADDR': '172.16.83.1',
'REQUEST_METHOD': 'POST',
'SCRIPT_NAME': '',
'SERVER_NAME': '172.16.83.128',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'swift.infocache': {
get_cache_key('AUTH_test'): self._fake_cache_env(
'AUTH_test', [key]),
get_cache_key('AUTH_test', 'container'): {
'meta': {}}},
'wsgi.errors': wsgi_errors,
'wsgi.input': wsgi_input,
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0),
}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other', body)
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://brim.net?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(b'http://brim.net?status=201&message=' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(
self.app.requests[0].path,
'/v1/AUTH_test/container/let_it_%E2%9D%84/%E2%98%83.txt')
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(
self.app.requests[1].path,
'/v1/AUTH_test/container/let_it_%E2%9D%84/testfile2.txt')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_messed_up_start(self): def test_messed_up_start(self):
key = b'abc' key = b'abc'
sig, env, body = self._make_sig_env_body( sig, env, body = self._make_sig_env_body(