Use swift3's check_signature function

This adds support for v4 while getting us out of needing to know
how signatures work.

Related-Change: Iafb6114c12deb9a40d0f8324611de27b48ed95f6
Change-Id: I14be2845101f6af8f73bc46a416c09e4b9449515
This commit is contained in:
Tim Burke 2017-03-03 20:56:39 +00:00
parent f3ef616dc6
commit f90ba1acb0
2 changed files with 53 additions and 24 deletions

View File

@ -18,8 +18,6 @@ from __future__ import print_function
from time import time from time import time
from traceback import format_exc from traceback import format_exc
from uuid import uuid4 from uuid import uuid4
from hashlib import sha1
import hmac
import base64 import base64
from eventlet import Timeout from eventlet import Timeout
@ -395,20 +393,21 @@ class TempAuth(object):
s3_auth_details = env.get('swift3.auth_details') s3_auth_details = env.get('swift3.auth_details')
if s3_auth_details: if s3_auth_details:
if 'check_signature' not in s3_auth_details:
self.logger.warning(
'Swift3 did not provide a check_signature function; '
'upgrade Swift3 if you want to use it with tempauth')
return None
account_user = s3_auth_details['access_key'] account_user = s3_auth_details['access_key']
signature_from_user = s3_auth_details['signature']
if account_user not in self.users: if account_user not in self.users:
return None return None
account, user = account_user.split(':', 1) user = self.users[account_user]
account_id = self.users[account_user]['url'].rsplit('/', 1)[-1] account = account_user.split(':', 1)[0]
path = env['PATH_INFO'] account_id = user['url'].rsplit('/', 1)[-1]
env['PATH_INFO'] = path.replace(account_user, account_id, 1) if not s3_auth_details['check_signature'](user['key']):
valid_signature = base64.encodestring(hmac.new(
self.users[account_user]['key'],
s3_auth_details['string_to_sign'],
sha1).digest()).strip()
if signature_from_user != valid_signature:
return None return None
env['PATH_INFO'] = env['PATH_INFO'].replace(
account_user, account_id, 1)
groups = self._get_user_groups(account, account_user, account_id) groups = self._get_user_groups(account, account_user, account_id)
return groups return groups

View File

@ -19,7 +19,6 @@ import unittest
from contextlib import contextmanager from contextlib import contextmanager
from base64 import b64encode from base64 import b64encode
from time import time from time import time
import mock
from swift.common.middleware import tempauth as auth from swift.common.middleware import tempauth as auth
from swift.common.middleware.acl import format_acl from swift.common.middleware.acl import format_acl
@ -265,27 +264,58 @@ class TestAuth(unittest.TestCase):
self.assertEqual(req.environ['swift.authorize'], self.assertEqual(req.environ['swift.authorize'],
local_auth.denied_response) local_auth.denied_response)
def test_auth_with_s3_authorization(self): def test_auth_with_s3_authorization_good(self):
local_app = FakeApp() local_app = FakeApp()
local_auth = auth.filter_factory( local_auth = auth.filter_factory(
{'user_s3_s3': 'secret .admin'})(local_app) {'user_s3_s3': 'secret .admin'})(local_app)
req = self._make_request('/v1/AUTH_s3', environ={ req = self._make_request('/v1/s3:s3', environ={
'swift3.auth_details': {
'access_key': 's3:s3',
'signature': b64encode('sig'),
'string_to_sign': 't',
'check_signature': lambda secret: True}})
resp = req.get_response(local_auth)
self.assertEqual(resp.status_int, 404)
self.assertEqual(local_app.calls, 1)
self.assertEqual(req.environ['PATH_INFO'], '/v1/AUTH_s3')
self.assertEqual(req.environ['swift.authorize'],
local_auth.authorize)
def test_auth_with_s3_authorization_invalid(self):
local_app = FakeApp()
local_auth = auth.filter_factory(
{'user_s3_s3': 'secret .admin'})(local_app)
req = self._make_request('/v1/s3:s3', environ={
'swift3.auth_details': {
'access_key': 's3:s3',
'signature': b64encode('sig'),
'string_to_sign': 't',
'check_signature': lambda secret: False}})
resp = req.get_response(local_auth)
self.assertEqual(resp.status_int, 401)
self.assertEqual(local_app.calls, 1)
self.assertEqual(req.environ['PATH_INFO'], '/v1/s3:s3')
self.assertEqual(req.environ['swift.authorize'],
local_auth.denied_response)
def test_auth_with_old_s3_details(self):
local_app = FakeApp()
local_auth = auth.filter_factory(
{'user_s3_s3': 'secret .admin'})(local_app)
req = self._make_request('/v1/s3:s3', environ={
'swift3.auth_details': { 'swift3.auth_details': {
'access_key': 's3:s3', 'access_key': 's3:s3',
'signature': b64encode('sig'), 'signature': b64encode('sig'),
'string_to_sign': 't'}}) 'string_to_sign': 't'}})
resp = req.get_response(local_auth)
with mock.patch('hmac.new') as hmac: self.assertEqual(resp.status_int, 401)
hmac.return_value.digest.return_value = 'sig'
resp = req.get_response(local_auth)
self.assertEqual(hmac.mock_calls, [
mock.call('secret', 't', mock.ANY),
mock.call().digest()])
self.assertEqual(resp.status_int, 404)
self.assertEqual(local_app.calls, 1) self.assertEqual(local_app.calls, 1)
self.assertEqual(req.environ['PATH_INFO'], '/v1/s3:s3')
self.assertEqual(req.environ['swift.authorize'], self.assertEqual(req.environ['swift.authorize'],
local_auth.authorize) local_auth.denied_response)
def test_auth_no_reseller_prefix_no_token(self): def test_auth_no_reseller_prefix_no_token(self):
# Check that normally we set up a call back to our authorize. # Check that normally we set up a call back to our authorize.