Add content-encoding parameter to formpost middleware

The content-type parameter can already be set when uploading file
objects, for example as a hidden field in the form sent by the browser.

Sometimes it is also required to set the content-encoding of an object,
for example when uploading a gzipped logfile or html file; see [1] for
further reference. This patch adds this option and tests existing and
new functionality for content-type and content-encoding.

[1] https://tools.ietf.org/html/rfc2616#section-14.11

Change-Id: I3dd47c7c999c29d62075955301fd763ad0b2c602
This commit is contained in:
Christian Schwede 2016-06-13 08:06:29 +00:00 committed by Tim Burke
parent f59ee15374
commit 90b161b74f
2 changed files with 128 additions and 6 deletions

View File

@ -38,6 +38,16 @@ form input::
<input type="hidden" name="x_delete_at" value="<unix-timestamp>" />
<input type="hidden" name="x_delete_after" value="<seconds>" />
If you want to specify the content type or content encoding of the files you
can set content-encoding or content-type by adding them to the form input::
<input type="hidden" name="content-type" value="text/html" />
<input type="hidden" name="content-encoding" value="gzip" />
The above example applies these parameters to all uploaded files. You can also
set the content-type and content-encoding on a per-file basis by adding the
parameters to each part of the upload.
The <swift-url> is the URL of the Swift destination, such as::
https://swift-cluster.example.com/v1/AUTH_account/container/object_prefix
@ -270,6 +280,10 @@ class FormPost(object):
if 'content-type' not in attributes and 'content-type' in hdrs:
attributes['content-type'] = \
hdrs['Content-Type'] or 'application/octet-stream'
if 'content-encoding' not in attributes and \
'content-encoding' in hdrs:
attributes['content-encoding'] = \
hdrs['Content-Encoding']
status, subheaders = \
self._perform_subrequest(env, attributes, fp, keys)
if not status.startswith('2'):
@ -357,6 +371,9 @@ class FormPost(object):
if 'content-type' in attributes:
subenv['CONTENT_TYPE'] = \
attributes['content-type'] or 'application/octet-stream'
if 'content-encoding' in attributes:
subenv['HTTP_CONTENT_ENCODING'] = \
attributes['content-encoding']
try:
if int(attributes.get('expires') or 0) < time():
raise FormUnauthorized('form expired')

View File

@ -190,6 +190,7 @@ class TestFormPost(unittest.TestCase):
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'Content-Encoding: gzip',
'',
'Test\nFile\nTwo\n',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
@ -768,7 +769,7 @@ class TestFormPost(unittest.TestCase):
self.formpost = formpost.filter_factory({})(self.auth)
def log_assert_int_status(env, response_status_int):
self.assertTrue(isinstance(response_status_int, int))
self.assertIsInstance(response_status_int, int)
self.formpost._log_request = log_assert_int_status
status = [None]
@ -1117,7 +1118,7 @@ class TestFormPost(unittest.TestCase):
def start_response(s, h, e=None):
pass
body = ''.join(self.formpost(env, start_response))
self.assertTrue('User-Agent' in self.app.requests[0].headers)
self.assertIn('User-Agent', self.app.requests[0].headers)
self.assertEqual(self.app.requests[0].headers['User-Agent'],
'FormPost')
@ -1675,8 +1676,8 @@ class TestFormPost(unittest.TestCase):
self.assertEqual(status, '201 Created')
self.assertTrue('201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertTrue("X-Delete-At" in self.app.requests[0].headers)
self.assertTrue("X-Delete-At" in self.app.requests[1].headers)
self.assertIn("X-Delete-At", self.app.requests[0].headers)
self.assertIn("X-Delete-At", self.app.requests[1].headers)
self.assertEqual(delete_at,
self.app.requests[0].headers["X-Delete-At"])
self.assertEqual(delete_at,
@ -1754,8 +1755,8 @@ class TestFormPost(unittest.TestCase):
self.assertEqual(status, '201 Created')
self.assertTrue('201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertTrue("X-Delete-After" in self.app.requests[0].headers)
self.assertTrue("X-Delete-After" in self.app.requests[1].headers)
self.assertIn("X-Delete-After", self.app.requests[0].headers)
self.assertIn("X-Delete-After", self.app.requests[1].headers)
self.assertEqual(delete_after,
self.app.requests[0].headers["X-Delete-After"])
self.assertEqual(delete_after,
@ -1796,6 +1797,110 @@ class TestFormPost(unittest.TestCase):
self.assertEqual(status, '400 Bad Request')
self.assertTrue('FormPost: x_delete_after not an integer' in body)
def test_global_content_type_encoding(self):
body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="content-encoding"',
'',
'gzip',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="content-type"',
'',
'text/html',
]
key = 'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(body_part + body)
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, ''),
('201 Created', {}, '')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = ''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
self.assertTrue('201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertIn("Content-Type", self.app.requests[0].headers)
self.assertIn("Content-Type", self.app.requests[1].headers)
self.assertIn("Content-Encoding", self.app.requests[0].headers)
self.assertIn("Content-Encoding", self.app.requests[1].headers)
self.assertEqual("text/html",
self.app.requests[0].headers["Content-Type"])
self.assertEqual("text/html",
self.app.requests[1].headers["Content-Type"])
self.assertEqual("gzip",
self.app.requests[0].headers["Content-Encoding"])
self.assertEqual("gzip",
self.app.requests[1].headers["Content-Encoding"])
def test_single_content_type_encoding(self):
key = 'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(body)
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, ''),
('201 Created', {}, '')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = ''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
self.assertTrue('201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[1].body, 'Test\nFile\nTwo\n')
self.assertIn("Content-Type", self.app.requests[0].headers)
self.assertIn("Content-Type", self.app.requests[1].headers)
self.assertEqual("text/plain",
self.app.requests[0].headers["Content-Type"])
self.assertEqual("text/plain",
self.app.requests[1].headers["Content-Type"])
self.assertFalse("Content-Encoding" in self.app.requests[0].headers)
self.assertIn("Content-Encoding", self.app.requests[1].headers)
self.assertEqual("gzip",
self.app.requests[1].headers["Content-Encoding"])
if __name__ == '__main__':
unittest.main()