Merge "test: more test for s3api v4 checksum"

This commit is contained in:
Zuul 2024-05-18 00:34:18 +00:00 committed by Gerrit Code Review
commit d5c26bb690
2 changed files with 78 additions and 0 deletions

View File

@ -15,6 +15,7 @@
import unittest import unittest
import mock import mock
from hashlib import sha256
import six import six
from six.moves.urllib.parse import quote, parse_qsl from six.moves.urllib.parse import quote, parse_qsl
@ -1490,6 +1491,53 @@ class TestS3ApiBucketNoACL(BaseS3ApiBucket, S3ApiTestCase):
status, headers, body = self.call_s3api(req) status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200', body) self.assertEqual(status.split()[0], '200', body)
def test_bucket_PUT_v4_with_body(self):
elem = Element('CreateBucketConfiguration')
SubElement(elem, 'LocationConstraint').text = self.s3api.conf.location
req_body = tostring(elem)
body_sha = sha256(req_body).hexdigest()
headers = {
'Authorization': 'AWS4-HMAC-SHA256 ' + ', '.join([
'Credential=test:tester/%s/us-east-1/s3/aws4_request' %
self.get_v4_amz_date_header().split('T', 1)[0],
'SignedHeaders=host',
'Signature=X',
]),
'Date': self.get_date_header(),
'x-amz-content-sha256': body_sha,
}
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers=headers,
body=req_body)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200', body)
def test_bucket_PUT_v4_with_body_bad_hash(self):
elem = Element('CreateBucketConfiguration')
SubElement(elem, 'LocationConstraint').text = self.s3api.conf.location
req_body = tostring(elem)
headers = {
'Authorization': 'AWS4-HMAC-SHA256 ' + ', '.join([
'Credential=test:tester/%s/us-east-1/s3/aws4_request' %
self.get_v4_amz_date_header().split('T', 1)[0],
'SignedHeaders=host',
'Signature=X',
]),
'Date': self.get_date_header(),
'x-amz-content-sha256': 'not the hash',
}
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers=headers,
body=req_body)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body), 'BadDigest')
self.assertIn(b'X-Amz-Content-SHA56', body)
# we maybe haven't parsed the location/path yet?
self.assertNotIn('swift.backend_path', req.environ)
def test_bucket_PUT_with_canned_acl(self): def test_bucket_PUT_with_canned_acl(self):
req = Request.blank('/bucket', req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'}, environ={'REQUEST_METHOD': 'PUT'},

View File

@ -714,6 +714,35 @@ class BaseS3ApiObj(object):
# Check that s3api converts a Content-MD5 header into an etag. # Check that s3api converts a Content-MD5 header into an etag.
self.assertEqual(headers['etag'], etag) self.assertEqual(headers['etag'], etag)
def test_object_PUT_bad_hash(self):
# FakeSwift doesn't care if the etag matches, so we explicitly register
# the 422 that the proxy would have passed on from the object servers
self.swift.register('PUT', '/v1/AUTH_test/bucket/object',
swob.HTTPUnprocessableEntity, {},
'Unprocessable Entity')
bad_etag = md5(b'not-same-content').hexdigest()
content_md5 = binascii.b2a_base64(binascii.a2b_hex(bad_etag)).strip()
if not six.PY2:
content_md5 = content_md5.decode('ascii')
req = Request.blank(
'/bucket/object',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'x-amz-storage-class': 'STANDARD',
'Content-MD5': content_md5,
'Date': self.get_date_header()},
body=self.object_body)
req.date = datetime.now()
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body), 'BadDigest')
self.assertIn(b'Content-MD5', body)
self.assertEqual('/v1/AUTH_test/bucket/object',
req.environ.get('swift.backend_path'))
def test_object_PUT_quota_exceeded(self): def test_object_PUT_quota_exceeded(self):
etag = self.response_headers['etag'] etag = self.response_headers['etag']
content_md5 = binascii.b2a_base64(binascii.a2b_hex(etag)).strip() content_md5 = binascii.b2a_base64(binascii.a2b_hex(etag)).strip()
@ -802,6 +831,7 @@ class BaseS3ApiObj(object):
status, headers, body = self.call_s3api(req) status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400') self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body), 'BadDigest') self.assertEqual(self._get_error_code(body), 'BadDigest')
self.assertIn(b'X-Amz-Content-SHA56', body)
self.assertEqual('/v1/AUTH_test/bucket/object', self.assertEqual('/v1/AUTH_test/bucket/object',
req.environ.get('swift.backend_path')) req.environ.get('swift.backend_path'))