From 893deca2743cc7590dbea90d5b0adfcf999bbc8b Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Thu, 16 May 2024 10:44:19 -0500 Subject: [PATCH] test: more test for s3api v4 checksum Related-Change: I9924ff3b8d7d246631fe61b916823e028e2c01f2 Change-Id: I2a4581e408e3012a44bfb0cd58563b82040e23fc --- .../common/middleware/s3api/test_bucket.py | 48 +++++++++++++++++++ test/unit/common/middleware/s3api/test_obj.py | 30 ++++++++++++ 2 files changed, 78 insertions(+) diff --git a/test/unit/common/middleware/s3api/test_bucket.py b/test/unit/common/middleware/s3api/test_bucket.py index ac6c5ef2d8..ef24932584 100644 --- a/test/unit/common/middleware/s3api/test_bucket.py +++ b/test/unit/common/middleware/s3api/test_bucket.py @@ -15,6 +15,7 @@ import unittest import mock +from hashlib import sha256 import six from six.moves.urllib.parse import quote, parse_qsl @@ -1490,6 +1491,53 @@ class TestS3ApiBucketNoACL(BaseS3ApiBucket, S3ApiTestCase): status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200', body) + def test_bucket_PUT_v4_with_body(self): + elem = Element('CreateBucketConfiguration') + SubElement(elem, 'LocationConstraint').text = self.s3api.conf.location + req_body = tostring(elem) + body_sha = sha256(req_body).hexdigest() + headers = { + 'Authorization': 'AWS4-HMAC-SHA256 ' + ', '.join([ + 'Credential=test:tester/%s/us-east-1/s3/aws4_request' % + self.get_v4_amz_date_header().split('T', 1)[0], + 'SignedHeaders=host', + 'Signature=X', + ]), + 'Date': self.get_date_header(), + 'x-amz-content-sha256': body_sha, + } + req = Request.blank('/bucket', + environ={'REQUEST_METHOD': 'PUT'}, + headers=headers, + body=req_body) + status, headers, body = self.call_s3api(req) + self.assertEqual(status.split()[0], '200', body) + + def test_bucket_PUT_v4_with_body_bad_hash(self): + elem = Element('CreateBucketConfiguration') + SubElement(elem, 'LocationConstraint').text = self.s3api.conf.location + req_body = tostring(elem) + headers = { + 'Authorization': 'AWS4-HMAC-SHA256 ' + ', '.join([ + 'Credential=test:tester/%s/us-east-1/s3/aws4_request' % + self.get_v4_amz_date_header().split('T', 1)[0], + 'SignedHeaders=host', + 'Signature=X', + ]), + 'Date': self.get_date_header(), + 'x-amz-content-sha256': 'not the hash', + } + req = Request.blank('/bucket', + environ={'REQUEST_METHOD': 'PUT'}, + headers=headers, + body=req_body) + status, headers, body = self.call_s3api(req) + self.assertEqual(status.split()[0], '400') + self.assertEqual(self._get_error_code(body), 'BadDigest') + self.assertIn(b'X-Amz-Content-SHA56', body) + # we maybe haven't parsed the location/path yet? + self.assertNotIn('swift.backend_path', req.environ) + def test_bucket_PUT_with_canned_acl(self): req = Request.blank('/bucket', environ={'REQUEST_METHOD': 'PUT'}, diff --git a/test/unit/common/middleware/s3api/test_obj.py b/test/unit/common/middleware/s3api/test_obj.py index e186e9c416..79d4819946 100644 --- a/test/unit/common/middleware/s3api/test_obj.py +++ b/test/unit/common/middleware/s3api/test_obj.py @@ -714,6 +714,35 @@ class BaseS3ApiObj(object): # Check that s3api converts a Content-MD5 header into an etag. self.assertEqual(headers['etag'], etag) + def test_object_PUT_bad_hash(self): + # FakeSwift doesn't care if the etag matches, so we explicitly register + # the 422 that the proxy would have passed on from the object servers + self.swift.register('PUT', '/v1/AUTH_test/bucket/object', + swob.HTTPUnprocessableEntity, {}, + 'Unprocessable Entity') + + bad_etag = md5(b'not-same-content').hexdigest() + content_md5 = binascii.b2a_base64(binascii.a2b_hex(bad_etag)).strip() + if not six.PY2: + content_md5 = content_md5.decode('ascii') + + req = Request.blank( + '/bucket/object', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'x-amz-storage-class': 'STANDARD', + 'Content-MD5': content_md5, + 'Date': self.get_date_header()}, + body=self.object_body) + req.date = datetime.now() + req.content_type = 'text/plain' + status, headers, body = self.call_s3api(req) + self.assertEqual(status.split()[0], '400') + self.assertEqual(self._get_error_code(body), 'BadDigest') + self.assertIn(b'Content-MD5', body) + self.assertEqual('/v1/AUTH_test/bucket/object', + req.environ.get('swift.backend_path')) + def test_object_PUT_quota_exceeded(self): etag = self.response_headers['etag'] content_md5 = binascii.b2a_base64(binascii.a2b_hex(etag)).strip() @@ -802,6 +831,7 @@ class BaseS3ApiObj(object): status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '400') self.assertEqual(self._get_error_code(body), 'BadDigest') + self.assertIn(b'X-Amz-Content-SHA56', body) self.assertEqual('/v1/AUTH_test/bucket/object', req.environ.get('swift.backend_path'))