Merge "Move multipart MIME parser into utils"
This commit is contained in:
commit
a81b2d2c74
@ -145,6 +145,10 @@ class ReplicationLockTimeout(LockTimeout):
|
||||
pass
|
||||
|
||||
|
||||
class MimeInvalid(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class ClientException(Exception):
|
||||
|
||||
def __init__(self, msg, http_scheme='', http_host='', http_port='',
|
||||
|
@ -112,14 +112,15 @@ the file are simply ignored).
|
||||
__all__ = ['FormPost', 'filter_factory', 'READ_CHUNK_SIZE', 'MAX_VALUE_LENGTH']
|
||||
|
||||
import hmac
|
||||
import re
|
||||
import rfc822
|
||||
from hashlib import sha1
|
||||
from time import time
|
||||
from urllib import quote
|
||||
|
||||
from swift.common.exceptions import MimeInvalid
|
||||
from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata
|
||||
from swift.common.utils import streq_const_time, register_swift_info
|
||||
from swift.common.utils import streq_const_time, register_swift_info, \
|
||||
parse_content_disposition, iter_multipart_mime_documents
|
||||
from swift.common.wsgi import make_pre_authed_env
|
||||
from swift.common.swob import HTTPUnauthorized
|
||||
from swift.proxy.controllers.base import get_account_info
|
||||
@ -132,9 +133,6 @@ READ_CHUNK_SIZE = 4096
|
||||
#: truncated.
|
||||
MAX_VALUE_LENGTH = 4096
|
||||
|
||||
#: Regular expression to match form attributes.
|
||||
ATTRIBUTES_RE = re.compile(r'(\w+)=(".*?"|[^";]+)(; ?|$)')
|
||||
|
||||
|
||||
class FormInvalid(Exception):
|
||||
pass
|
||||
@ -144,125 +142,6 @@ class FormUnauthorized(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _parse_attrs(header):
|
||||
"""
|
||||
Given the value of a header like:
|
||||
Content-Disposition: form-data; name="somefile"; filename="test.html"
|
||||
|
||||
Return data like
|
||||
("form-data", {"name": "somefile", "filename": "test.html"})
|
||||
|
||||
:param header: Value of a header (the part after the ': ').
|
||||
:returns: (value name, dict) of the attribute data parsed (see above).
|
||||
"""
|
||||
attributes = {}
|
||||
attrs = ''
|
||||
if '; ' in header:
|
||||
header, attrs = header.split('; ', 1)
|
||||
m = True
|
||||
while m:
|
||||
m = ATTRIBUTES_RE.match(attrs)
|
||||
if m:
|
||||
attrs = attrs[len(m.group(0)):]
|
||||
attributes[m.group(1)] = m.group(2).strip('"')
|
||||
return header, attributes
|
||||
|
||||
|
||||
class _IterRequestsFileLikeObject(object):
|
||||
|
||||
def __init__(self, wsgi_input, boundary, input_buffer):
|
||||
self.no_more_data_for_this_file = False
|
||||
self.no_more_files = False
|
||||
self.wsgi_input = wsgi_input
|
||||
self.boundary = boundary
|
||||
self.input_buffer = input_buffer
|
||||
|
||||
def read(self, length=None):
|
||||
if not length:
|
||||
length = READ_CHUNK_SIZE
|
||||
if self.no_more_data_for_this_file:
|
||||
return ''
|
||||
|
||||
# read enough data to know whether we're going to run
|
||||
# into a boundary in next [length] bytes
|
||||
if len(self.input_buffer) < length + len(self.boundary) + 2:
|
||||
to_read = length + len(self.boundary) + 2
|
||||
while to_read > 0:
|
||||
chunk = self.wsgi_input.read(to_read)
|
||||
to_read -= len(chunk)
|
||||
self.input_buffer += chunk
|
||||
if not chunk:
|
||||
self.no_more_files = True
|
||||
break
|
||||
|
||||
boundary_pos = self.input_buffer.find(self.boundary)
|
||||
|
||||
# boundary does not exist in the next (length) bytes
|
||||
if boundary_pos == -1 or boundary_pos > length:
|
||||
ret = self.input_buffer[:length]
|
||||
self.input_buffer = self.input_buffer[length:]
|
||||
# if it does, just return data up to the boundary
|
||||
else:
|
||||
ret, self.input_buffer = self.input_buffer.split(self.boundary, 1)
|
||||
self.no_more_files = self.input_buffer.startswith('--')
|
||||
self.no_more_data_for_this_file = True
|
||||
self.input_buffer = self.input_buffer[2:]
|
||||
return ret
|
||||
|
||||
def readline(self):
|
||||
if self.no_more_data_for_this_file:
|
||||
return ''
|
||||
boundary_pos = newline_pos = -1
|
||||
while newline_pos < 0 and boundary_pos < 0:
|
||||
chunk = self.wsgi_input.read(READ_CHUNK_SIZE)
|
||||
self.input_buffer += chunk
|
||||
newline_pos = self.input_buffer.find('\r\n')
|
||||
boundary_pos = self.input_buffer.find(self.boundary)
|
||||
if not chunk:
|
||||
self.no_more_files = True
|
||||
break
|
||||
# found a newline
|
||||
if newline_pos >= 0 and \
|
||||
(boundary_pos < 0 or newline_pos < boundary_pos):
|
||||
# Use self.read to ensure any logic there happens...
|
||||
ret = ''
|
||||
to_read = newline_pos + 2
|
||||
while to_read > 0:
|
||||
chunk = self.read(to_read)
|
||||
# Should never happen since we're reading from input_buffer,
|
||||
# but just for completeness...
|
||||
if not chunk:
|
||||
break
|
||||
to_read -= len(chunk)
|
||||
ret += chunk
|
||||
return ret
|
||||
else: # no newlines, just return up to next boundary
|
||||
return self.read(len(self.input_buffer))
|
||||
|
||||
|
||||
def _iter_requests(wsgi_input, boundary):
|
||||
"""
|
||||
Given a multi-part mime encoded input file object and boundary,
|
||||
yield file-like objects for each part.
|
||||
|
||||
:param wsgi_input: The file-like object to read from.
|
||||
:param boundary: The mime boundary to separate new file-like
|
||||
objects on.
|
||||
:returns: A generator of file-like objects for each part.
|
||||
"""
|
||||
boundary = '--' + boundary
|
||||
if wsgi_input.readline(len(boundary + '\r\n')).strip() != boundary:
|
||||
raise FormInvalid('invalid starting boundary')
|
||||
boundary = '\r\n' + boundary
|
||||
input_buffer = ''
|
||||
done = False
|
||||
while not done:
|
||||
it = _IterRequestsFileLikeObject(wsgi_input, boundary, input_buffer)
|
||||
yield it
|
||||
done = it.no_more_files
|
||||
input_buffer = it.input_buffer
|
||||
|
||||
|
||||
class _CappedFileLikeObject(object):
|
||||
"""
|
||||
A file-like object wrapping another file-like object that raises
|
||||
@ -328,7 +207,7 @@ class FormPost(object):
|
||||
if env['REQUEST_METHOD'] == 'POST':
|
||||
try:
|
||||
content_type, attrs = \
|
||||
_parse_attrs(env.get('CONTENT_TYPE') or '')
|
||||
parse_content_disposition(env.get('CONTENT_TYPE') or '')
|
||||
if content_type == 'multipart/form-data' and \
|
||||
'boundary' in attrs:
|
||||
http_user_agent = "%s FormPost" % (
|
||||
@ -338,7 +217,7 @@ class FormPost(object):
|
||||
env, attrs['boundary'])
|
||||
start_response(status, headers)
|
||||
return [body]
|
||||
except (FormInvalid, EOFError) as err:
|
||||
except (FormInvalid, MimeInvalid, EOFError) as err:
|
||||
body = 'FormPost: %s' % err
|
||||
start_response(
|
||||
'400 Bad Request',
|
||||
@ -365,10 +244,11 @@ class FormPost(object):
|
||||
attributes = {}
|
||||
subheaders = []
|
||||
file_count = 0
|
||||
for fp in _iter_requests(env['wsgi.input'], boundary):
|
||||
for fp in iter_multipart_mime_documents(
|
||||
env['wsgi.input'], boundary, read_chunk_size=READ_CHUNK_SIZE):
|
||||
hdrs = rfc822.Message(fp, 0)
|
||||
disp, attrs = \
|
||||
_parse_attrs(hdrs.getheader('Content-Disposition', ''))
|
||||
disp, attrs = parse_content_disposition(
|
||||
hdrs.getheader('Content-Disposition', ''))
|
||||
if disp == 'form-data' and attrs.get('filename'):
|
||||
file_count += 1
|
||||
try:
|
||||
|
@ -2986,3 +2986,129 @@ def get_expirer_container(x_delete_at, expirer_divisor, acc, cont, obj):
|
||||
shard_int = int(hash_path(acc, cont, obj), 16) % 100
|
||||
return normalize_delete_at_timestamp(
|
||||
int(x_delete_at) / expirer_divisor * expirer_divisor - shard_int)
|
||||
|
||||
|
||||
class _MultipartMimeFileLikeObject(object):
|
||||
|
||||
def __init__(self, wsgi_input, boundary, input_buffer, read_chunk_size):
|
||||
self.no_more_data_for_this_file = False
|
||||
self.no_more_files = False
|
||||
self.wsgi_input = wsgi_input
|
||||
self.boundary = boundary
|
||||
self.input_buffer = input_buffer
|
||||
self.read_chunk_size = read_chunk_size
|
||||
|
||||
def read(self, length=None):
|
||||
if not length:
|
||||
length = self.read_chunk_size
|
||||
if self.no_more_data_for_this_file:
|
||||
return ''
|
||||
|
||||
# read enough data to know whether we're going to run
|
||||
# into a boundary in next [length] bytes
|
||||
if len(self.input_buffer) < length + len(self.boundary) + 2:
|
||||
to_read = length + len(self.boundary) + 2
|
||||
while to_read > 0:
|
||||
chunk = self.wsgi_input.read(to_read)
|
||||
to_read -= len(chunk)
|
||||
self.input_buffer += chunk
|
||||
if not chunk:
|
||||
self.no_more_files = True
|
||||
break
|
||||
|
||||
boundary_pos = self.input_buffer.find(self.boundary)
|
||||
|
||||
# boundary does not exist in the next (length) bytes
|
||||
if boundary_pos == -1 or boundary_pos > length:
|
||||
ret = self.input_buffer[:length]
|
||||
self.input_buffer = self.input_buffer[length:]
|
||||
# if it does, just return data up to the boundary
|
||||
else:
|
||||
ret, self.input_buffer = self.input_buffer.split(self.boundary, 1)
|
||||
self.no_more_files = self.input_buffer.startswith('--')
|
||||
self.no_more_data_for_this_file = True
|
||||
self.input_buffer = self.input_buffer[2:]
|
||||
return ret
|
||||
|
||||
def readline(self):
|
||||
if self.no_more_data_for_this_file:
|
||||
return ''
|
||||
boundary_pos = newline_pos = -1
|
||||
while newline_pos < 0 and boundary_pos < 0:
|
||||
chunk = self.wsgi_input.read(self.read_chunk_size)
|
||||
self.input_buffer += chunk
|
||||
newline_pos = self.input_buffer.find('\r\n')
|
||||
boundary_pos = self.input_buffer.find(self.boundary)
|
||||
if not chunk:
|
||||
self.no_more_files = True
|
||||
break
|
||||
# found a newline
|
||||
if newline_pos >= 0 and \
|
||||
(boundary_pos < 0 or newline_pos < boundary_pos):
|
||||
# Use self.read to ensure any logic there happens...
|
||||
ret = ''
|
||||
to_read = newline_pos + 2
|
||||
while to_read > 0:
|
||||
chunk = self.read(to_read)
|
||||
# Should never happen since we're reading from input_buffer,
|
||||
# but just for completeness...
|
||||
if not chunk:
|
||||
break
|
||||
to_read -= len(chunk)
|
||||
ret += chunk
|
||||
return ret
|
||||
else: # no newlines, just return up to next boundary
|
||||
return self.read(len(self.input_buffer))
|
||||
|
||||
|
||||
def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
|
||||
"""
|
||||
Given a multi-part-mime-encoded input file object and boundary,
|
||||
yield file-like objects for each part.
|
||||
|
||||
:param wsgi_input: The file-like object to read from.
|
||||
:param boundary: The mime boundary to separate new file-like
|
||||
objects on.
|
||||
:returns: A generator of file-like objects for each part.
|
||||
:raises: MimeInvalid if the document is malformed
|
||||
"""
|
||||
boundary = '--' + boundary
|
||||
if wsgi_input.readline(len(boundary + '\r\n')).strip() != boundary:
|
||||
raise swift.common.exceptions.MimeInvalid('invalid starting boundary')
|
||||
boundary = '\r\n' + boundary
|
||||
input_buffer = ''
|
||||
done = False
|
||||
while not done:
|
||||
it = _MultipartMimeFileLikeObject(wsgi_input, boundary, input_buffer,
|
||||
read_chunk_size)
|
||||
yield it
|
||||
done = it.no_more_files
|
||||
input_buffer = it.input_buffer
|
||||
|
||||
|
||||
#: Regular expression to match form attributes.
|
||||
ATTRIBUTES_RE = re.compile(r'(\w+)=(".*?"|[^";]+)(; ?|$)')
|
||||
|
||||
|
||||
def parse_content_disposition(header):
|
||||
"""
|
||||
Given the value of a header like:
|
||||
Content-Disposition: form-data; name="somefile"; filename="test.html"
|
||||
|
||||
Return data like
|
||||
("form-data", {"name": "somefile", "filename": "test.html"})
|
||||
|
||||
:param header: Value of a header (the part after the ': ').
|
||||
:returns: (value name, dict) of the attribute data parsed (see above).
|
||||
"""
|
||||
attributes = {}
|
||||
attrs = ''
|
||||
if '; ' in header:
|
||||
header, attrs = header.split('; ', 1)
|
||||
m = True
|
||||
while m:
|
||||
m = ATTRIBUTES_RE.match(attrs)
|
||||
if m:
|
||||
attrs = attrs[len(m.group(0)):]
|
||||
attributes[m.group(1)] = m.group(2).strip('"')
|
||||
return header, attributes
|
||||
|
@ -68,167 +68,6 @@ class FakeApp(object):
|
||||
return ['Client Disconnect\n']
|
||||
|
||||
|
||||
class TestParseAttrs(unittest.TestCase):
|
||||
|
||||
def test_basic_content_type(self):
|
||||
name, attrs = formpost._parse_attrs('text/plain')
|
||||
self.assertEquals(name, 'text/plain')
|
||||
self.assertEquals(attrs, {})
|
||||
|
||||
def test_content_type_with_charset(self):
|
||||
name, attrs = formpost._parse_attrs('text/plain; charset=UTF8')
|
||||
self.assertEquals(name, 'text/plain')
|
||||
self.assertEquals(attrs, {'charset': 'UTF8'})
|
||||
|
||||
def test_content_disposition(self):
|
||||
name, attrs = formpost._parse_attrs(
|
||||
'form-data; name="somefile"; filename="test.html"')
|
||||
self.assertEquals(name, 'form-data')
|
||||
self.assertEquals(attrs, {'name': 'somefile', 'filename': 'test.html'})
|
||||
|
||||
|
||||
class TestIterRequests(unittest.TestCase):
|
||||
|
||||
def test_bad_start(self):
|
||||
it = formpost._iter_requests(StringIO('blah'), 'unique')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except formpost.FormInvalid as err:
|
||||
exc = err
|
||||
self.assertEquals(str(exc), 'invalid starting boundary')
|
||||
|
||||
def test_empty(self):
|
||||
it = formpost._iter_requests(StringIO('--unique'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), '')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_basic(self):
|
||||
it = formpost._iter_requests(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique--'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'abcdefg')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_basic2(self):
|
||||
it = formpost._iter_requests(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique\r\nhijkl\r\n--unique--'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'abcdefg')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'hijkl')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_tiny_reads(self):
|
||||
it = formpost._iter_requests(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique\r\nhijkl\r\n--unique--'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(2), 'ab')
|
||||
self.assertEquals(fp.read(2), 'cd')
|
||||
self.assertEquals(fp.read(2), 'ef')
|
||||
self.assertEquals(fp.read(2), 'g')
|
||||
self.assertEquals(fp.read(2), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'hijkl')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_big_reads(self):
|
||||
it = formpost._iter_requests(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique\r\nhijkl\r\n--unique--'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(65536), 'abcdefg')
|
||||
self.assertEquals(fp.read(), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'hijkl')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_broken_mid_stream(self):
|
||||
# We go ahead and accept whatever is sent instead of rejecting the
|
||||
# whole request, in case the partial form is still useful.
|
||||
it = formpost._iter_requests(
|
||||
StringIO('--unique\r\nabc'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'abc')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_readline(self):
|
||||
it = formpost._iter_requests(
|
||||
StringIO('--unique\r\nab\r\ncd\ref\ng\r\n--unique\r\nhi\r\n\r\n'
|
||||
'jkl\r\n\r\n--unique--'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'ab\r\n')
|
||||
self.assertEquals(fp.readline(), 'cd\ref\ng')
|
||||
self.assertEquals(fp.readline(), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'hi\r\n')
|
||||
self.assertEquals(fp.readline(), '\r\n')
|
||||
self.assertEquals(fp.readline(), 'jkl\r\n')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_readline_with_tiny_chunks(self):
|
||||
orig_read_chunk_size = formpost.READ_CHUNK_SIZE
|
||||
try:
|
||||
formpost.READ_CHUNK_SIZE = 2
|
||||
it = formpost._iter_requests(
|
||||
StringIO('--unique\r\nab\r\ncd\ref\ng\r\n--unique\r\nhi\r\n'
|
||||
'\r\njkl\r\n\r\n--unique--'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'ab\r\n')
|
||||
self.assertEquals(fp.readline(), 'cd\ref\ng')
|
||||
self.assertEquals(fp.readline(), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'hi\r\n')
|
||||
self.assertEquals(fp.readline(), '\r\n')
|
||||
self.assertEquals(fp.readline(), 'jkl\r\n')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
finally:
|
||||
formpost.READ_CHUNK_SIZE = orig_read_chunk_size
|
||||
|
||||
|
||||
class TestCappedFileLikeObject(unittest.TestCase):
|
||||
|
||||
def test_whole(self):
|
||||
|
@ -54,7 +54,8 @@ from mock import MagicMock, patch
|
||||
|
||||
from swift.common.exceptions import (Timeout, MessageTimeout,
|
||||
ConnectionTimeout, LockTimeout,
|
||||
ReplicationLockTimeout)
|
||||
ReplicationLockTimeout,
|
||||
MimeInvalid)
|
||||
from swift.common import utils
|
||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||
from swift.common.swob import Request, Response
|
||||
@ -4168,5 +4169,165 @@ class TestLRUCache(unittest.TestCase):
|
||||
self.assertEqual(f.size(), 4)
|
||||
|
||||
|
||||
class TestParseContentDisposition(unittest.TestCase):
|
||||
|
||||
def test_basic_content_type(self):
|
||||
name, attrs = utils.parse_content_disposition('text/plain')
|
||||
self.assertEquals(name, 'text/plain')
|
||||
self.assertEquals(attrs, {})
|
||||
|
||||
def test_content_type_with_charset(self):
|
||||
name, attrs = utils.parse_content_disposition(
|
||||
'text/plain; charset=UTF8')
|
||||
self.assertEquals(name, 'text/plain')
|
||||
self.assertEquals(attrs, {'charset': 'UTF8'})
|
||||
|
||||
def test_content_disposition(self):
|
||||
name, attrs = utils.parse_content_disposition(
|
||||
'form-data; name="somefile"; filename="test.html"')
|
||||
self.assertEquals(name, 'form-data')
|
||||
self.assertEquals(attrs, {'name': 'somefile', 'filename': 'test.html'})
|
||||
|
||||
|
||||
class TestIterMultipartMimeDocuments(unittest.TestCase):
|
||||
|
||||
def test_bad_start(self):
|
||||
it = utils.iter_multipart_mime_documents(StringIO('blah'), 'unique')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except MimeInvalid as err:
|
||||
exc = err
|
||||
self.assertEquals(str(exc), 'invalid starting boundary')
|
||||
|
||||
def test_empty(self):
|
||||
it = utils.iter_multipart_mime_documents(StringIO('--unique'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), '')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_basic(self):
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique--'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'abcdefg')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_basic2(self):
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique\r\nhijkl\r\n--unique--'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'abcdefg')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'hijkl')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_tiny_reads(self):
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique\r\nhijkl\r\n--unique--'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(2), 'ab')
|
||||
self.assertEquals(fp.read(2), 'cd')
|
||||
self.assertEquals(fp.read(2), 'ef')
|
||||
self.assertEquals(fp.read(2), 'g')
|
||||
self.assertEquals(fp.read(2), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'hijkl')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_big_reads(self):
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('--unique\r\nabcdefg\r\n--unique\r\nhijkl\r\n--unique--'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(65536), 'abcdefg')
|
||||
self.assertEquals(fp.read(), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'hijkl')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_broken_mid_stream(self):
|
||||
# We go ahead and accept whatever is sent instead of rejecting the
|
||||
# whole request, in case the partial form is still useful.
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('--unique\r\nabc'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'abc')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_readline(self):
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('--unique\r\nab\r\ncd\ref\ng\r\n--unique\r\nhi\r\n\r\n'
|
||||
'jkl\r\n\r\n--unique--'), 'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'ab\r\n')
|
||||
self.assertEquals(fp.readline(), 'cd\ref\ng')
|
||||
self.assertEquals(fp.readline(), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'hi\r\n')
|
||||
self.assertEquals(fp.readline(), '\r\n')
|
||||
self.assertEquals(fp.readline(), 'jkl\r\n')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_readline_with_tiny_chunks(self):
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('--unique\r\nab\r\ncd\ref\ng\r\n--unique\r\nhi\r\n'
|
||||
'\r\njkl\r\n\r\n--unique--'),
|
||||
'unique',
|
||||
read_chunk_size=2)
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'ab\r\n')
|
||||
self.assertEquals(fp.readline(), 'cd\ref\ng')
|
||||
self.assertEquals(fp.readline(), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.readline(), 'hi\r\n')
|
||||
self.assertEquals(fp.readline(), '\r\n')
|
||||
self.assertEquals(fp.readline(), 'jkl\r\n')
|
||||
exc = None
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration as err:
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user