formpost: Add support for sha256/512 signatures

Sha1 has known to be deprecated for a while so allow the formpost
middleware to use SHA256 and SHA512. Follow the tempurl model and
accept signatures of the form:

   <hex-encoded signature>

or

   sha1:<base64-encoded signature>
   sha256:<base64-encoded signature>
   sha512:<base64-encoded signature>

where the base64-encoding can be either standard or URL-safe, and the
trailing '=' chars may be stripped off.

As part of this, pull the signature-parsing out to a new function, and
add detection for hex-encoded sha512 signatures to tempurl.

Change-Id: Iaba3725551bd47d75067a634a7571485b9afa2de
Related-Change: Ia9dd1a91cc3c9c946f5f029cdefc9e66bcf01046
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Closes-Bug: #1794601
This commit is contained in:
Matthew Oliver 2022-04-19 15:23:30 +10:00 committed by Tim Burke
parent 93c432342b
commit ef31baf3fc
7 changed files with 352 additions and 60 deletions

View File

@ -84,11 +84,11 @@ desired.
The expires attribute is the Unix timestamp before which the form
must be submitted before it is invalidated.
The signature attribute is the HMAC-SHA1 signature of the form. Here is
The signature attribute is the HMAC signature of the form. Here is
sample code for computing the signature::
import hmac
from hashlib import sha1
from hashlib import sha512
from time import time
path = '/v1/account/container/object_prefix'
redirect = 'https://srv.com/some-page' # set to '' if redirect not in form
@ -98,7 +98,7 @@ sample code for computing the signature::
key = 'mykey'
hmac_body = '%s\n%s\n%s\n%s\n%s' % (path, redirect,
max_file_size, max_file_count, expires)
signature = hmac.new(key, hmac_body, sha1).hexdigest()
signature = hmac.new(key, hmac_body, sha512).hexdigest()
The key is the value of either the account (X-Account-Meta-Temp-URL-Key,
X-Account-Meta-Temp-Url-Key-2) or the container
@ -123,7 +123,7 @@ the file are simply ignored).
__all__ = ['FormPost', 'filter_factory', 'READ_CHUNK_SIZE', 'MAX_VALUE_LENGTH']
import hmac
from hashlib import sha1
import hashlib
from time import time
import six
@ -131,10 +131,11 @@ from six.moves.urllib.parse import quote
from swift.common.constraints import valid_api_version
from swift.common.exceptions import MimeInvalid
from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata
from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata, \
SUPPORTED_DIGESTS
from swift.common.utils import streq_const_time, parse_content_disposition, \
parse_mime_headers, iter_multipart_mime_documents, reiterate, \
close_if_possible
close_if_possible, get_logger, extract_digest_and_algorithm
from swift.common.registry import register_swift_info
from swift.common.wsgi import make_pre_authed_env
from swift.common.swob import HTTPUnauthorized, wsgi_to_str, str_to_wsgi
@ -210,6 +211,11 @@ class FormPost(object):
self.app = app
#: The filter configuration dict.
self.conf = conf
# Defaulting to SUPPORTED_DIGESTS just so we don't completely
# deprecate sha1 yet. We'll change this to DEFAULT_ALLOWED_DIGESTS
# later.
self.allowed_digests = conf.get(
'allowed_digests', SUPPORTED_DIGESTS)
def __call__(self, env, start_response):
"""
@ -405,13 +411,22 @@ class FormPost(object):
hmac_body = hmac_body.encode('utf-8')
has_valid_sig = False
signature = attributes.get('signature', '')
try:
hash_algorithm, signature = extract_digest_and_algorithm(signature)
except ValueError:
raise FormUnauthorized('invalid signature')
if hash_algorithm not in self.allowed_digests:
raise FormUnauthorized('invalid signature')
if six.PY2:
hash_algorithm = getattr(hashlib, hash_algorithm)
for key in keys:
# Encode key like in swift.common.utls.get_hmac.
if not isinstance(key, six.binary_type):
key = key.encode('utf8')
sig = hmac.new(key, hmac_body, sha1).hexdigest()
if streq_const_time(sig, (attributes.get('signature') or
'invalid')):
sig = hmac.new(key, hmac_body, hash_algorithm).hexdigest()
if streq_const_time(sig, signature):
has_valid_sig = True
if not has_valid_sig:
raise FormUnauthorized('invalid signature')
@ -467,6 +482,18 @@ def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
register_swift_info('formpost')
logger = get_logger(conf, log_route='formpost')
allowed_digests = set(conf.get('allowed_digests', '').split()) or \
SUPPORTED_DIGESTS
not_supported = allowed_digests - SUPPORTED_DIGESTS
if not_supported:
logger.warning('The following digest algorithms are configured but '
'not supported: %s', ', '.join(not_supported))
allowed_digests -= not_supported
if not allowed_digests:
raise ValueError('No valid digest algorithms are configured '
'for formpost')
info = {'allowed_digests': sorted(allowed_digests)}
register_swift_info('formpost', **info)
conf.update(info)
return lambda app: FormPost(app, conf)

View File

@ -298,7 +298,6 @@ __all__ = ['TempURL', 'filter_factory',
'DEFAULT_OUTGOING_REMOVE_HEADERS',
'DEFAULT_OUTGOING_ALLOW_HEADERS']
import binascii
from calendar import timegm
import six
from os.path import basename
@ -313,7 +312,7 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.swob import header_to_environ_key, HTTPUnauthorized, \
HTTPBadRequest, wsgi_to_str
from swift.common.utils import split_path, get_valid_utf8_str, \
get_hmac, streq_const_time, quote, get_logger, strict_b64decode
get_hmac, streq_const_time, quote, get_logger, extract_digest_and_algorithm
from swift.common.registry import register_swift_info, register_sensitive_param
@ -505,24 +504,11 @@ class TempURL(object):
if not temp_url_sig or not temp_url_expires:
return self._invalid(env, start_response)
if ':' in temp_url_sig:
hash_algorithm, temp_url_sig = temp_url_sig.split(':', 1)
if ('-' in temp_url_sig or '_' in temp_url_sig) and not (
'+' in temp_url_sig or '/' in temp_url_sig):
temp_url_sig = temp_url_sig.replace('-', '+').replace('_', '/')
try:
temp_url_sig = binascii.hexlify(strict_b64decode(
temp_url_sig + '=='))
if not six.PY2:
temp_url_sig = temp_url_sig.decode('ascii')
hash_algorithm, temp_url_sig = extract_digest_and_algorithm(
temp_url_sig)
except ValueError:
return self._invalid(env, start_response)
elif len(temp_url_sig) == 40:
hash_algorithm = 'sha1'
elif len(temp_url_sig) == 64:
hash_algorithm = 'sha256'
else:
return self._invalid(env, start_response)
if hash_algorithm not in self.allowed_digests:
return self._invalid(env, start_response)

View File

@ -282,6 +282,46 @@ except (InvalidHashPathConfigError, IOError):
pass
def extract_digest_and_algorithm(value):
"""
Returns a tuple of (digest_algorithm, hex_encoded_digest)
from a client-provided string of the form::
<hex-encoded digest>
or::
<algorithm>:<base64-encoded digest>
Note that hex-encoded strings must use one of sha1, sha256, or sha512.
:raises: ValueError on parse failures
"""
if ':' in value:
algo, value = value.split(':', 1)
# accept both standard and url-safe base64
if ('-' in value or '_' in value) and not (
'+' in value or '/' in value):
value = value.replace('-', '+').replace('_', '/')
value = binascii.hexlify(strict_b64decode(value + '=='))
if not six.PY2:
value = value.decode('ascii')
else:
try:
binascii.unhexlify(value) # make sure it decodes
except TypeError:
# This is just for py2
raise ValueError('Non-hexadecimal digit found')
algo = {
40: 'sha1',
64: 'sha256',
128: 'sha512',
}.get(len(value))
if not algo:
raise ValueError('Bad digest length')
return algo, value
def get_hmac(request_method, path, expires, key, digest="sha1",
ip_range=None):
"""

View File

@ -840,7 +840,7 @@ class TestTempurlAlgorithms(Base):
else:
raise ValueError('Unrecognized encoding: %r' % encoding)
def _do_test(self, digest, encoding, expect_failure=False):
def _do_test(self, digest, encoding):
expires = int(time()) + 86400
sig = self.get_sig(expires, digest, encoding)
@ -852,16 +852,6 @@ class TestTempurlAlgorithms(Base):
parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
if expect_failure:
with self.assertRaises(ResponseError):
self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
self.assert_status([401])
# ditto for HEADs
with self.assertRaises(ResponseError):
self.env.obj.info(parms=parms, cfg={'no_auth_token': True})
self.assert_status([401])
else:
contents = self.env.obj.read(
parms=parms,
cfg={'no_auth_token': True})
@ -889,8 +879,7 @@ class TestTempurlAlgorithms(Base):
@requires_digest('sha512')
def test_sha512(self):
# 128 chars seems awfully long for a signature -- let's require base64
self._do_test('sha512', 'hex', expect_failure=True)
self._do_test('sha512', 'hex')
self._do_test('sha512', 'base64')
self._do_test('sha512', 'base64-no-padding')
self._do_test('sha512', 'url-safe-base64')

View File

@ -13,18 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import hmac
import hashlib
import unittest
from hashlib import sha1
from time import time
import six
if six.PY3:
from unittest import mock
else:
import mock
from io import BytesIO
from swift.common.swob import Request, Response, wsgi_quote
from swift.common.middleware import tempauth, formpost
from swift.common.utils import split_path
from swift.common import registry
from swift.proxy.controllers.base import get_cache_key
from test.debug_logger import debug_logger
def hmac_msg(path, redirect, max_file_size, max_file_count, expires):
@ -163,11 +170,23 @@ class TestFormPost(unittest.TestCase):
'meta': meta}
def _make_sig_env_body(self, path, redirect, max_file_size, max_file_count,
expires, key, user_agent=True):
sig = hmac.new(
expires, key, user_agent=True, algorithm='sha512',
prefix=True):
alg_name = algorithm
if six.PY2:
algorithm = getattr(hashlib, algorithm)
mac = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
algorithm)
if prefix:
if six.PY2:
sig = alg_name + ':' + base64.b64encode(mac.digest())
else:
sig = alg_name + ':' + base64.b64encode(
mac.digest()).decode('ascii')
else:
sig = mac.hexdigest()
body = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
@ -297,7 +316,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
@ -415,7 +434,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="redirect"',
@ -532,7 +551,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="redirect"',
@ -652,7 +671,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="redirect"',
@ -770,7 +789,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="redirect"',
@ -1459,6 +1478,78 @@ class TestFormPost(unittest.TestCase):
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_prefixed_and_not_prefixed_sigs_good(self):
def do_test(digest, prefixed):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10,
int(time() + 86400), key, algorithm=digest, prefix=prefixed)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
for digest in ('sha1', 'sha256', 'sha512'):
do_test(digest, True)
do_test(digest, False)
def test_prefixed_and_not_prefixed_sigs_unsupported(self):
def do_test(digest, prefixed):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10,
int(time() + 86400), key, algorithm=digest, prefix=prefixed)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
def start_response(s, h, e=None):
status[0] = s
body = b''.join(self.formpost(env, start_response))
status = status[0]
self.assertEqual(status, '401 Unauthorized')
for digest in ('md5', 'sha224'):
do_test(digest, True)
do_test(digest, False)
def test_no_redirect_expired(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
@ -1559,6 +1650,51 @@ class TestFormPost(unittest.TestCase):
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: invalid starting boundary' in body)
def test_redirect_allowed_and_unsupported_digests(self):
def do_test(digest):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://redirect', 1024, 10,
int(time() + 86400), key, algorithm=digest)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
return body, status[0], headers[0], exc_info[0]
for algorithm in ('sha1', 'sha256', 'sha512'):
body, status, headers, exc_info = do_test(algorithm)
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://redirect?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(location.encode('utf-8') in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
# unsupported
_body, status, _headers, _exc_info = do_test("md5")
self.assertEqual(status, '401 Unauthorized')
def test_no_v1(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
@ -2099,5 +2235,45 @@ class TestFormPost(unittest.TestCase):
self.assertFalse("Content-Encoding" in self.app.requests[2].headers)
class TestSwiftInfo(unittest.TestCase):
def setUp(self):
registry._swift_info = {}
registry._swift_admin_info = {}
def test_registered_defaults(self):
formpost.filter_factory({})
swift_info = registry.get_swift_info()
self.assertIn('formpost', swift_info)
info = swift_info['formpost']
self.assertIn('allowed_digests', info)
self.assertEqual(info['allowed_digests'], ['sha1', 'sha256', 'sha512'])
def test_non_default_methods(self):
logger = debug_logger()
with mock.patch('swift.common.middleware.formpost.get_logger',
return_value=logger):
formpost.filter_factory({
'allowed_digests': 'sha1 sha512 md5 not-a-valid-digest',
})
swift_info = registry.get_swift_info()
self.assertIn('formpost', swift_info)
info = swift_info['formpost']
self.assertIn('allowed_digests', info)
self.assertEqual(info['allowed_digests'], ['sha1', 'sha512'])
warning_lines = logger.get_lines_for_level('warning')
self.assertIn(
'The following digest algorithms are configured '
'but not supported:',
warning_lines[0])
self.assertIn('not-a-valid-digest', warning_lines[0])
self.assertIn('md5', warning_lines[0])
def test_bad_config(self):
with self.assertRaises(ValueError):
formpost.filter_factory({
'allowed_digests': 'md4',
})
if __name__ == '__main__':
unittest.main()

View File

@ -220,7 +220,8 @@ class TestTempURL(unittest.TestCase):
self.tempurl = tempurl.filter_factory({
'allowed_digests': 'sha1'})(self.auth)
sig = 'valid_sigs_will_be_exactly_40_characters'
# valid sig should be exactly 40 hex chars
sig = 'deadbeefdeadbeefdeadbeefdeadbeefdeadbeef'
expires = int(time() + 1000)
p_logging.access_logger.logger = debug_logger('fake')
resp = self._make_request(

View File

@ -3836,6 +3836,79 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(u'abc_%EC%9D%BC%EC%98%81',
utils.quote(u'abc_\uc77c\uc601'))
def test_extract_digest_and_algorithm(self):
self.assertEqual(
utils.extract_digest_and_algorithm(
'b17f6ff8da0e251737aa9e3ee69a881e3e092e2f'),
('sha1', 'b17f6ff8da0e251737aa9e3ee69a881e3e092e2f'))
self.assertEqual(
utils.extract_digest_and_algorithm(
'sha1:sw3eTSuFYrhJZGbDtGsrmsUFRGE='),
('sha1', 'b30dde4d2b8562b8496466c3b46b2b9ac5054461'))
# also good with '=' stripped
self.assertEqual(
utils.extract_digest_and_algorithm(
'sha1:sw3eTSuFYrhJZGbDtGsrmsUFRGE'),
('sha1', 'b30dde4d2b8562b8496466c3b46b2b9ac5054461'))
self.assertEqual(
utils.extract_digest_and_algorithm(
'b963712313cd4236696fb4c4cf11fc56'
'ff4158e0bcbf1d4424df147783fd1045'),
('sha256', 'b963712313cd4236696fb4c4cf11fc56'
'ff4158e0bcbf1d4424df147783fd1045'))
self.assertEqual(
utils.extract_digest_and_algorithm(
'sha256:uWNxIxPNQjZpb7TEzxH8Vv9BWOC8vx1EJN8Ud4P9EEU='),
('sha256', 'b963712313cd4236696fb4c4cf11fc56'
'ff4158e0bcbf1d4424df147783fd1045'))
self.assertEqual(
utils.extract_digest_and_algorithm(
'sha256:uWNxIxPNQjZpb7TEzxH8Vv9BWOC8vx1EJN8Ud4P9EEU'),
('sha256', 'b963712313cd4236696fb4c4cf11fc56'
'ff4158e0bcbf1d4424df147783fd1045'))
self.assertEqual(
utils.extract_digest_and_algorithm(
'26df3d9d59da574d6f8d359cb2620b1b'
'86737215c38c412dfee0a410acea1ac4'
'285ad0c37229ca74e715c443979da17d'
'3d77a97d2ac79cc5e395b05bfa4bdd30'),
('sha512', '26df3d9d59da574d6f8d359cb2620b1b'
'86737215c38c412dfee0a410acea1ac4'
'285ad0c37229ca74e715c443979da17d'
'3d77a97d2ac79cc5e395b05bfa4bdd30'))
self.assertEqual(
utils.extract_digest_and_algorithm(
'sha512:Jt89nVnaV01vjTWcsmILG4ZzchXDjEEt/uCkEKzq'
'GsQoWtDDcinKdOcVxEOXnaF9PXepfSrHnMXjlbBb+kvdMA=='),
('sha512', '26df3d9d59da574d6f8d359cb2620b1b'
'86737215c38c412dfee0a410acea1ac4'
'285ad0c37229ca74e715c443979da17d'
'3d77a97d2ac79cc5e395b05bfa4bdd30'))
self.assertEqual(
utils.extract_digest_and_algorithm(
'sha512:Jt89nVnaV01vjTWcsmILG4ZzchXDjEEt_uCkEKzq'
'GsQoWtDDcinKdOcVxEOXnaF9PXepfSrHnMXjlbBb-kvdMA'),
('sha512', '26df3d9d59da574d6f8d359cb2620b1b'
'86737215c38c412dfee0a410acea1ac4'
'285ad0c37229ca74e715c443979da17d'
'3d77a97d2ac79cc5e395b05bfa4bdd30'))
with self.assertRaises(ValueError):
utils.extract_digest_and_algorithm('')
with self.assertRaises(ValueError):
utils.extract_digest_and_algorithm(
'exactly_forty_chars_but_not_hex_encoded!')
# Too short (md5)
with self.assertRaises(ValueError):
utils.extract_digest_and_algorithm(
'd41d8cd98f00b204e9800998ecf8427e')
# but you can slip it in via the prefix notation!
self.assertEqual(
utils.extract_digest_and_algorithm('md5:1B2M2Y8AsgTpgAmY7PhCfg'),
('md5', 'd41d8cd98f00b204e9800998ecf8427e'))
def test_get_hmac(self):
self.assertEqual(
utils.get_hmac('GET', '/path', 1, 'abc'),