s3api: Clean up some errors

- SHA256 mismatches should trip XAmzContentSHA256Mismatch errors,
  not BadDigest. This should include ClientComputedContentSHA256 and
  S3ComputedContentSHA256 elements.
- BadDigest responses should include ExpectedDigest elements.
- Fix a typo in InvalidDigest error message.
- Requests with a v4 authorization header require a sha256 header,
  rejecting with InvalidRequest on failure (and pretty darn early!).
- Requests with a v4 authorization header perform a
  looks-like-a-valid-sha256 check, rejecting with InvalidArgument
  on failure.
- Invalid SHA256 should take precedence over invalid MD5.
- v2-signed requests can still raise XAmzContentSHA256Mismatch errors
  (though they *don't* do the looks-like-a-valid-sha256 check).
- If provided, SHA256 should be used in calculating canonical request
  for v4 pre-signed URLs.

Change-Id: I06c2a16126886bab8807d704294b9809844be086
This commit is contained in:
Tim Burke 2024-05-22 11:21:01 -07:00
parent ec8166be33
commit 7bf2797799
9 changed files with 1683 additions and 101 deletions

View File

@ -57,7 +57,7 @@ from swift.common.middleware.s3api.s3response import AccessDenied, \
MalformedXML, InvalidRequest, RequestTimeout, InvalidBucketName, \
BadDigest, AuthorizationHeaderMalformed, SlowDown, \
AuthorizationQueryParametersError, ServiceUnavailable, BrokenMPU, \
InvalidPartNumber, InvalidPartArgument
InvalidPartNumber, InvalidPartArgument, XAmzContentSHA256Mismatch
from swift.common.middleware.s3api.exception import NotS3Request
from swift.common.middleware.s3api.utils import utf8encode, \
S3Timestamp, mktime, MULTIUPLOAD_SUFFIX
@ -129,6 +129,9 @@ class S3InputSHA256Mismatch(BaseException):
through all the layers of the pipeline back to us. It should never escape
the s3api middleware.
"""
def __init__(self, expected, computed):
self.expected = expected
self.computed = computed
class HashingInput(object):
@ -141,6 +144,13 @@ class HashingInput(object):
self._to_read = content_length
self._hasher = hasher()
self._expected = expected_hex_hash
if content_length == 0 and \
self._hasher.hexdigest() != self._expected.lower():
self.close()
raise XAmzContentSHA256Mismatch(
client_computed_content_s_h_a256=self._expected,
s3_computed_content_s_h_a256=self._hasher.hexdigest(),
)
def read(self, size=None):
chunk = self._input.read(size)
@ -149,12 +159,12 @@ class HashingInput(object):
short_read = bool(chunk) if size is None else (len(chunk) < size)
if self._to_read < 0 or (short_read and self._to_read) or (
self._to_read == 0 and
self._hasher.hexdigest() != self._expected):
self._hasher.hexdigest() != self._expected.lower()):
self.close()
# Since we don't return the last chunk, the PUT never completes
raise S3InputSHA256Mismatch(
'The X-Amz-Content-SHA56 you specified did not match '
'what we received.')
self._expected,
self._hasher.hexdigest())
return chunk
def close(self):
@ -249,6 +259,28 @@ class SigV4Mixin(object):
if int(self.timestamp) + expires < S3Timestamp.now():
raise AccessDenied('Request has expired', reason='expired')
def _validate_sha256(self):
aws_sha256 = self.headers.get('x-amz-content-sha256')
looks_like_sha256 = (
aws_sha256 and len(aws_sha256) == 64 and
all(c in '0123456789abcdef' for c in aws_sha256.lower()))
if not aws_sha256:
if 'X-Amz-Credential' in self.params:
pass # pre-signed URL; not required
else:
msg = 'Missing required header for this request: ' \
'x-amz-content-sha256'
raise InvalidRequest(msg)
elif aws_sha256 == 'UNSIGNED-PAYLOAD':
pass
elif not looks_like_sha256 and 'X-Amz-Credential' not in self.params:
raise InvalidArgument(
'x-amz-content-sha256',
aws_sha256,
'x-amz-content-sha256 must be UNSIGNED-PAYLOAD, or '
'a valid sha256 value.')
return aws_sha256
def _parse_credential(self, credential_string):
parts = credential_string.split("/")
# credential must be in following format:
@ -459,30 +491,9 @@ class SigV4Mixin(object):
cr.append(b';'.join(swob.wsgi_to_bytes(k) for k, v in headers_to_sign))
# 6. Add payload string at the tail
if 'X-Amz-Credential' in self.params:
# V4 with query parameters only
hashed_payload = 'UNSIGNED-PAYLOAD'
elif 'X-Amz-Content-SHA256' not in self.headers:
msg = 'Missing required header for this request: ' \
'x-amz-content-sha256'
raise InvalidRequest(msg)
else:
hashed_payload = self.headers['X-Amz-Content-SHA256']
if hashed_payload != 'UNSIGNED-PAYLOAD':
if self.content_length == 0:
if hashed_payload.lower() != sha256().hexdigest():
raise BadDigest(
'The X-Amz-Content-SHA56 you specified did not '
'match what we received.')
elif self.content_length:
self.environ['wsgi.input'] = HashingInput(
self.environ['wsgi.input'],
self.content_length,
sha256,
hashed_payload.lower())
# else, length not provided -- Swift will kick out a
# 411 Length Required which will get translated back
# to a S3-style response in S3Request._swift_error_codes
hashed_payload = self.headers.get('X-Amz-Content-SHA256',
'UNSIGNED-PAYLOAD')
cr.append(swob.wsgi_to_bytes(hashed_payload))
return b'\n'.join(cr)
@ -810,6 +821,9 @@ class S3Request(swob.Request):
if delta > self.conf.allowable_clock_skew:
raise RequestTimeTooSkewed()
def _validate_sha256(self):
return self.headers.get('x-amz-content-sha256')
def _validate_headers(self):
if 'CONTENT_LENGTH' in self.environ:
try:
@ -820,21 +834,6 @@ class S3Request(swob.Request):
raise InvalidArgument('Content-Length',
self.environ['CONTENT_LENGTH'])
value = _header_strip(self.headers.get('Content-MD5'))
if value is not None:
if not re.match('^[A-Za-z0-9+/]+={0,2}$', value):
# Non-base64-alphabet characters in value.
raise InvalidDigest(content_md5=value)
try:
self.headers['ETag'] = binascii.b2a_hex(
binascii.a2b_base64(value))
except binascii.Error:
# incorrect padding, most likely
raise InvalidDigest(content_md5=value)
if len(self.headers['ETag']) != 32:
raise InvalidDigest(content_md5=value)
if self.method == 'PUT' and any(h in self.headers for h in (
'If-Match', 'If-None-Match',
'If-Modified-Since', 'If-Unmodified-Since')):
@ -880,6 +879,38 @@ class S3Request(swob.Request):
if 'x-amz-website-redirect-location' in self.headers:
raise S3NotImplemented('Website redirection is not supported.')
aws_sha256 = self._validate_sha256()
if (aws_sha256
and aws_sha256 != 'UNSIGNED-PAYLOAD'
and self.content_length is not None):
# Even if client-provided SHA doesn't look like a SHA, wrap the
# input anyway so we'll send the SHA of what the client sent in
# the eventual error
self.environ['wsgi.input'] = HashingInput(
self.environ['wsgi.input'],
self.content_length,
sha256,
aws_sha256)
# If no content-length, either client's trying to do a HTTP chunked
# transfer, or a HTTP/1.0-style transfer (in which case swift will
# reject with length-required and we'll translate back to
# MissingContentLength)
value = _header_strip(self.headers.get('Content-MD5'))
if value is not None:
if not re.match('^[A-Za-z0-9+/]+={0,2}$', value):
# Non-base64-alphabet characters in value.
raise InvalidDigest(content_md5=value)
try:
self.headers['ETag'] = binascii.b2a_hex(
binascii.a2b_base64(value))
except binascii.Error:
# incorrect padding, most likely
raise InvalidDigest(content_md5=value)
if len(self.headers['ETag']) != 32:
raise InvalidDigest(content_md5=value)
# https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
# describes some of what would be required to support this
if any(['aws-chunked' in self.headers.get('content-encoding', ''),
@ -922,7 +953,10 @@ class S3Request(swob.Request):
try:
body = self.body_file.read(max_length)
except S3InputSHA256Mismatch as err:
raise BadDigest(err.args[0])
raise XAmzContentSHA256Mismatch(
client_computed_content_s_h_a256=err.expected,
s3_computed_content_s_h_a256=err.computed,
)
else:
# No (or zero) Content-Length provided, and not chunked transfer;
# no body. Assume zero-length, and enforce a required body below.
@ -1368,6 +1402,16 @@ class S3Request(swob.Request):
return NoSuchKey(obj)
return NoSuchBucket(container)
# Since BadDigest ought to plumb in some client-provided values,
# defer evaluation until we know they're provided
def bad_digest_handler():
etag = binascii.hexlify(base64.b64decode(
env['HTTP_CONTENT_MD5']))
return BadDigest(
expected_digest=etag, # yes, really hex
# TODO: plumb in calculated_digest, as b64
)
code_map = {
'HEAD': {
HTTP_NOT_FOUND: not_found_handler,
@ -1379,7 +1423,7 @@ class S3Request(swob.Request):
},
'PUT': {
HTTP_NOT_FOUND: (NoSuchBucket, container),
HTTP_UNPROCESSABLE_ENTITY: BadDigest,
HTTP_UNPROCESSABLE_ENTITY: bad_digest_handler,
HTTP_REQUEST_ENTITY_TOO_LARGE: EntityTooLarge,
HTTP_LENGTH_REQUIRED: MissingContentLength,
HTTP_REQUEST_TIMEOUT: RequestTimeout,
@ -1420,7 +1464,10 @@ class S3Request(swob.Request):
# hopefully by now any modifications to the path (e.g. tenant to
# account translation) will have been made by auth middleware
self.environ['s3api.backend_path'] = sw_req.environ['PATH_INFO']
raise BadDigest(err.args[0])
raise XAmzContentSHA256Mismatch(
client_computed_content_s_h_a256=err.expected,
s3_computed_content_s_h_a256=err.computed,
)
else:
# reuse account
_, self.account, _ = split_path(sw_resp.environ['PATH_INFO'],

View File

@ -327,6 +327,12 @@ class BadDigest(ErrorResponse):
_msg = 'The Content-MD5 you specified did not match what we received.'
class XAmzContentSHA256Mismatch(ErrorResponse):
_status = '400 Bad Request'
_msg = "The provided 'x-amz-content-sha256' header does not match what " \
"was computed."
class BucketAlreadyExists(ErrorResponse):
_status = '409 Conflict'
_msg = 'The requested bucket name is not available. The bucket ' \
@ -443,7 +449,7 @@ class InvalidBucketState(ErrorResponse):
class InvalidDigest(ErrorResponse):
_status = '400 Bad Request'
_msg = 'The Content-MD5 you specified was an invalid.'
_msg = 'The Content-MD5 you specified was invalid.'
class InvalidLocationConstraint(ErrorResponse):

View File

@ -147,14 +147,16 @@ def get_s3_client(user=1, signature_version='s3v4', addressing_style='path'):
TEST_PREFIX = 's3api-test-'
class BaseS3TestCase(unittest.TestCase):
class BaseS3Mixin(object):
# Default to v4 signatures (as aws-cli does), but subclasses can override
signature_version = 's3v4'
def get_s3_client(self, user):
return get_s3_client(user, self.signature_version)
@classmethod
def get_s3_client(cls, user):
return get_s3_client(user, cls.signature_version)
def _remove_all_object_versions_from_bucket(self, client, bucket_name):
@classmethod
def _remove_all_object_versions_from_bucket(cls, client, bucket_name):
resp = client.list_object_versions(Bucket=bucket_name)
objs_to_delete = (resp.get('Versions', []) +
resp.get('DeleteMarkers', []))
@ -180,10 +182,11 @@ class BaseS3TestCase(unittest.TestCase):
objs_to_delete = (resp.get('Versions', []) +
resp.get('DeleteMarkers', []))
def clear_bucket(self, client, bucket_name):
@classmethod
def clear_bucket(cls, client, bucket_name):
timeout = time.time() + 10
backoff = 0.1
self._remove_all_object_versions_from_bucket(client, bucket_name)
cls._remove_all_object_versions_from_bucket(client, bucket_name)
try:
client.delete_bucket(Bucket=bucket_name)
except ClientError as e:
@ -196,7 +199,7 @@ class BaseS3TestCase(unittest.TestCase):
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Suspended'})
while True:
self._remove_all_object_versions_from_bucket(
cls._remove_all_object_versions_from_bucket(
client, bucket_name)
# also try some version-unaware operations...
for key in client.list_objects(Bucket=bucket_name).get(
@ -218,16 +221,20 @@ class BaseS3TestCase(unittest.TestCase):
else:
break
def create_name(self, slug):
@classmethod
def create_name(cls, slug):
return '%s%s-%s' % (TEST_PREFIX, slug, uuid.uuid4().hex)
def clear_account(self, client):
@classmethod
def clear_account(cls, client):
for bucket in client.list_buckets()['Buckets']:
if not bucket['Name'].startswith(TEST_PREFIX):
# these tests run against real s3 accounts
continue
self.clear_bucket(client, bucket['Name'])
cls.clear_bucket(client, bucket['Name'])
class BaseS3TestCase(BaseS3Mixin, unittest.TestCase):
def tearDown(self):
client = self.get_s3_client(1)
self.clear_account(client)
@ -237,3 +244,22 @@ class BaseS3TestCase(unittest.TestCase):
pass
else:
self.clear_account(client)
class BaseS3TestCaseWithBucket(BaseS3Mixin, unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.bucket_name = cls.create_name('test-bucket')
client = cls.get_s3_client(1)
client.create_bucket(Bucket=cls.bucket_name)
@classmethod
def tearDownClass(cls):
client = cls.get_s3_client(1)
cls.clear_account(client)
try:
client = cls.get_s3_client(2)
except ConfigError:
pass
else:
cls.clear_account(client)

File diff suppressed because it is too large Load Diff

View File

@ -1525,7 +1525,7 @@ class TestS3ApiBucketNoACL(BaseS3ApiBucket, S3ApiTestCase):
'Signature=X',
]),
'Date': self.get_date_header(),
'x-amz-content-sha256': 'not the hash',
'x-amz-content-sha256': '0' * 64,
}
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
@ -1533,8 +1533,9 @@ class TestS3ApiBucketNoACL(BaseS3ApiBucket, S3ApiTestCase):
body=req_body)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body), 'BadDigest')
self.assertIn(b'X-Amz-Content-SHA56', body)
self.assertEqual(self._get_error_code(body),
'XAmzContentSHA256Mismatch')
self.assertIn(b'x-amz-content-sha256', body)
# we maybe haven't parsed the location/path yet?
self.assertNotIn('swift.backend_path', req.environ)

View File

@ -988,14 +988,15 @@ class TestS3ApiMultiUpload(BaseS3ApiMultiUpload, S3ApiTestCase):
method='PUT',
headers={'Authorization': authz_header,
'X-Amz-Date': self.get_v4_amz_date_header(),
'X-Amz-Content-SHA256': 'not_the_hash'},
'X-Amz-Content-SHA256': '0' * 64},
body=b'test')
with patch('swift.common.middleware.s3api.s3request.'
'get_container_info',
lambda env, app, swift_source: {'status': 204}):
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '400 Bad Request')
self.assertEqual(self._get_error_code(body), 'BadDigest')
self.assertEqual(self._get_error_code(body),
'XAmzContentSHA256Mismatch')
self.assertEqual([
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
('PUT', '/v1/AUTH_test/bucket+segments/object/X/1'),
@ -1717,7 +1718,8 @@ class TestS3ApiMultiUpload(BaseS3ApiMultiUpload, S3ApiTestCase):
body=XML)
status, headers, body = self.call_s3api(req)
self.assertEqual('400 Bad Request', status)
self.assertEqual(self._get_error_code(body), 'BadDigest')
self.assertEqual(self._get_error_code(body),
'XAmzContentSHA256Mismatch')
self.assertEqual('/v1/AUTH_test/bucket+segments/object/X',
req.environ.get('swift.backend_path'))

View File

@ -629,8 +629,10 @@ class BaseS3ApiObj(object):
code = self._test_method_error('PUT', '/bucket/object',
swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
code = self._test_method_error('PUT', '/bucket/object',
swob.HTTPUnprocessableEntity)
code = self._test_method_error(
'PUT', '/bucket/object',
swob.HTTPUnprocessableEntity,
headers={'Content-MD5': '1B2M2Y8AsgTpgAmY7PhCfg=='})
self.assertEqual(code, 'BadDigest')
code = self._test_method_error('PUT', '/bucket/object',
swob.HTTPLengthRequired)
@ -811,6 +813,31 @@ class BaseS3ApiObj(object):
self.s3api.app = error_catching_app
req = Request.blank(
'/bucket/object',
environ={'REQUEST_METHOD': 'PUT'},
headers={
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test:tester/%s/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-date, '
'Signature=hmac' % (
self.get_v4_amz_date_header().split('T', 1)[0]),
'x-amz-date': self.get_v4_amz_date_header(),
'x-amz-storage-class': 'STANDARD',
'x-amz-content-sha256': '0' * 64,
'Date': self.get_date_header()},
body=self.object_body)
req.date = datetime.now()
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body),
'XAmzContentSHA256Mismatch')
self.assertIn(b'x-amz-content-sha256', body)
self.assertEqual('/v1/AUTH_test/bucket/object',
req.environ.get('swift.backend_path'))
req = Request.blank(
'/bucket/object',
environ={'REQUEST_METHOD': 'PUT'},
@ -830,10 +857,11 @@ class BaseS3ApiObj(object):
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body), 'BadDigest')
self.assertIn(b'X-Amz-Content-SHA56', body)
self.assertEqual('/v1/AUTH_test/bucket/object',
req.environ.get('swift.backend_path'))
self.assertEqual(self._get_error_code(body),
'InvalidArgument')
self.assertIn(b'<ArgumentName>x-amz-content-sha256</ArgumentName>',
body)
self.assertNotIn('swift.backend_path', req.environ)
def test_object_PUT_v4_unsigned_payload(self):
req = Request.blank(

View File

@ -1137,7 +1137,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
headers = {
'Authorization': authz_header,
'X-Amz-Date': self.get_v4_amz_date_header(),
'X-Amz-Content-SHA256': '0123456789'}
'X-Amz-Content-SHA256': '0' * 64}
req = Request.blank('/bucket/object', environ=environ, headers=headers)
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)

View File

@ -33,8 +33,9 @@ from swift.common.middleware.s3api.s3request import S3Request, \
S3InputSHA256Mismatch
from swift.common.middleware.s3api.s3response import InvalidArgument, \
NoSuchBucket, InternalError, ServiceUnavailable, \
AccessDenied, SignatureDoesNotMatch, RequestTimeTooSkewed, BadDigest, \
InvalidPartArgument, InvalidPartNumber, InvalidRequest
AccessDenied, SignatureDoesNotMatch, RequestTimeTooSkewed, \
InvalidPartArgument, InvalidPartNumber, InvalidRequest, \
XAmzContentSHA256Mismatch
from swift.common.utils import md5
from test.debug_logger import debug_logger
@ -412,7 +413,7 @@ class TestRequest(S3ApiTestCase):
'Signature=X' % (
scope_date,
';'.join(sorted(['host', included_header]))),
'X-Amz-Content-SHA256': '0123456789'}
'X-Amz-Content-SHA256': '0' * 64}
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
@ -594,7 +595,7 @@ class TestRequest(S3ApiTestCase):
'Credential=test/%s/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0],
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
@ -604,7 +605,7 @@ class TestRequest(S3ApiTestCase):
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(headers_to_sign, [
('host', 'localhost:80'),
('x-amz-content-sha256', '0123456789'),
('x-amz-content-sha256', '0' * 64),
('x-amz-date', x_amz_date)])
# no x-amz-date
@ -614,7 +615,7 @@ class TestRequest(S3ApiTestCase):
'Credential=test/%s/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0],
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '1' * 64,
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
@ -623,7 +624,7 @@ class TestRequest(S3ApiTestCase):
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(headers_to_sign, [
('host', 'localhost:80'),
('x-amz-content-sha256', '0123456789')])
('x-amz-content-sha256', '1' * 64)])
# SignedHeaders says, host and x-amz-date included but there is not
# X-Amz-Date header
@ -633,7 +634,7 @@ class TestRequest(S3ApiTestCase):
'Credential=test/%s/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0],
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '2' * 64,
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
@ -730,7 +731,7 @@ class TestRequest(S3ApiTestCase):
'Credential=test/%s/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0],
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
@ -872,7 +873,7 @@ class TestRequest(S3ApiTestCase):
'Credential=test/%s/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % amz_date_header.split('T', 1)[0],
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'X-Amz-Date': amz_date_header
})
sigv4_req = SigV4Request(
@ -942,18 +943,18 @@ class TestRequest(S3ApiTestCase):
# Virtual hosted-style
self.s3api.conf.storage_domains = ['s3.test.com']
# bad sha256
# bad sha256 -- but note that SHAs are not checked for GET/HEAD!
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'GET'}
'REQUEST_METHOD': 'PUT'}
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20210104/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=f721a7941d5b7710344bc62cc45f87e66f4bb1dd00d9075ee61'
'5b1a5c72b0f8c',
'X-Amz-Content-SHA256': 'bad',
'Signature=5f31c77dbc63e7c6ffc84dae60a9261c57c44884fe7927baeb9'
'84f418d4d511a',
'X-Amz-Content-SHA256': '0' * 64,
'Date': 'Mon, 04 Jan 2021 10:26:23 -0000',
'X-Amz-Date': '20210104T102623Z',
'Content-Length': 0,
@ -961,15 +962,15 @@ class TestRequest(S3ApiTestCase):
# lowercase sha256
req = Request.blank('/', environ=environ, headers=headers)
self.assertRaises(BadDigest, SigV4Request, req.environ)
self.assertRaises(XAmzContentSHA256Mismatch, SigV4Request, req.environ)
sha256_of_nothing = hashlib.sha256().hexdigest().encode('ascii')
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20210104/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=d90542e8b4c0d2f803162040a948e8e51db00b62a59ffb16682'
'ef433718fde12',
'Signature=96df261d8f0b617b7c6368e0c5d96ee61f1ec84005e826ece65'
'c0e0f97eba945',
'X-Amz-Content-SHA256': sha256_of_nothing,
'Date': 'Mon, 04 Jan 2021 10:26:23 -0000',
'X-Amz-Date': '20210104T102623Z',
@ -981,14 +982,14 @@ class TestRequest(S3ApiTestCase):
sigv4_req._canonical_request().endswith(sha256_of_nothing))
self.assertTrue(sigv4_req.check_signature('secret'))
# uppercase sha256
# uppercase sha256 -- signature changes, but content's valid
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20210104/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=4aab5102e58e9e40f331417d322465c24cac68a7ce77260e9bf'
'5ce9a6200862b',
'Signature=7a3c396fd6043fb397888e6f4d6acc294a99636ff0bb57b283d'
'9e075ed87fce2',
'X-Amz-Content-SHA256': sha256_of_nothing.upper(),
'Date': 'Mon, 04 Jan 2021 10:26:23 -0000',
'X-Amz-Date': '20210104T102623Z',
@ -1000,6 +1001,91 @@ class TestRequest(S3ApiTestCase):
sigv4_req._canonical_request().endswith(sha256_of_nothing.upper()))
self.assertTrue(sigv4_req.check_signature('secret'))
@patch.object(S3Request, '_validate_dates', lambda *a: None)
def test_v4_req_xmz_content_sha256_mismatch(self):
# Virtual hosted-style
def fake_app(environ, start_response):
environ['wsgi.input'].read()
self.s3api.conf.storage_domains = ['s3.test.com']
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'PUT'}
sha256_of_body = hashlib.sha256(b'body').hexdigest()
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20210104/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-date,'
'Signature=5f31c77dbc63e7c6ffc84dae60a9261c57c44884fe7927baeb9'
'84f418d4d511a',
'Date': 'Mon, 04 Jan 2021 10:26:23 -0000',
'X-Amz-Date': '20210104T102623Z',
'Content-Length': 4,
'X-Amz-Content-SHA256': sha256_of_body,
}
req = Request.blank('/', environ=environ, headers=headers,
body=b'not_body')
with self.assertRaises(XAmzContentSHA256Mismatch) as caught:
SigV4Request(req.environ).get_response(fake_app)
self.assertIn(b'<Code>XAmzContentSHA256Mismatch</Code>',
caught.exception.body)
self.assertIn(
('<ClientComputedContentSHA256>%s</ClientComputedContentSHA256>'
% sha256_of_body).encode('ascii'),
caught.exception.body)
self.assertIn(
('<S3ComputedContentSHA256>%s</S3ComputedContentSHA256>'
% hashlib.sha256(b'not_body').hexdigest()).encode('ascii'),
caught.exception.body)
@patch.object(S3Request, '_validate_dates', lambda *a: None)
def test_v4_req_xmz_content_sha256_missing(self):
# Virtual hosted-style
self.s3api.conf.storage_domains = ['s3.test.com']
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'PUT'}
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20210104/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-date,'
'Signature=5f31c77dbc63e7c6ffc84dae60a9261c57c44884fe7927baeb9'
'84f418d4d511a',
'Date': 'Mon, 04 Jan 2021 10:26:23 -0000',
'X-Amz-Date': '20210104T102623Z',
'Content-Length': 0,
}
req = Request.blank('/', environ=environ, headers=headers)
self.assertRaises(InvalidRequest, SigV4Request, req.environ)
@patch.object(S3Request, '_validate_dates', lambda *a: None)
def test_v4_req_x_mz_content_sha256_bad_format(self):
# Virtual hosted-style
self.s3api.conf.storage_domains = ['s3.test.com']
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'PUT'}
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20210104/us-east-1/s3/aws4_request, '
'SignedHeaders=host;x-amz-date,'
'Signature=5f31c77dbc63e7c6ffc84dae60a9261c57c44884fe7927baeb9'
'84f418d4d511a',
'Date': 'Mon, 04 Jan 2021 10:26:23 -0000',
'X-Amz-Date': '20210104T102623Z',
'Content-Length': 0,
'X-Amz-Content-SHA256': '0' * 63 # too short
}
req = Request.blank('/', environ=environ, headers=headers)
self.assertRaises(InvalidArgument, SigV4Request, req.environ)
headers['X-Amz-Content-SHA256'] = '0' * 63 + 'x' # bad character
req = Request.blank('/', environ=environ, headers=headers)
self.assertRaises(InvalidArgument, SigV4Request, req.environ)
def test_validate_part_number(self):
sw_req = Request.blank('/nojunk',
environ={'REQUEST_METHOD': 'GET'},
@ -1113,7 +1199,7 @@ class TestSigV4Request(S3ApiTestCase):
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization': auth,
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
@ -1144,7 +1230,7 @@ class TestSigV4Request(S3ApiTestCase):
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization': auth,
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
@ -1202,7 +1288,7 @@ class TestSigV4Request(S3ApiTestCase):
x_amz_date = self.get_v4_amz_date_header()
params['X-Amz-Date'] = x_amz_date
signed_headers = {
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=signed_headers,
@ -1237,7 +1323,7 @@ class TestSigV4Request(S3ApiTestCase):
x_amz_date = self.get_v4_amz_date_header()
params['X-Amz-Date'] = x_amz_date
signed_headers = {
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=signed_headers,
@ -1294,7 +1380,7 @@ class TestSigV4Request(S3ApiTestCase):
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
headers = {
'Authorization': auth,
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
@ -1346,7 +1432,7 @@ class TestSigV4Request(S3ApiTestCase):
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
headers = {
'Authorization': auth,
'X-Amz-Content-SHA256': '0123456789',
'X-Amz-Content-SHA256': '0' * 64,
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
@ -1438,10 +1524,12 @@ class TestHashingInput(S3ApiTestCase):
self.assertTrue(wrapped._input.closed)
def test_empty_bad_hash(self):
wrapped = HashingInput(BytesIO(b''), 0, hashlib.sha256, 'nope')
with self.assertRaises(S3InputSHA256Mismatch):
wrapped.read(3)
self.assertTrue(wrapped._input.closed)
_input = BytesIO(b'')
self.assertFalse(_input.closed)
with self.assertRaises(XAmzContentSHA256Mismatch):
# Don't even get a chance to try to read it
HashingInput(_input, 0, hashlib.sha256, 'nope')
self.assertTrue(_input.closed)
if __name__ == '__main__':