diff --git a/swift/common/middleware/crypto/keymaster.py b/swift/common/middleware/crypto/keymaster.py index a23179193f..423f3543b7 100644 --- a/swift/common/middleware/crypto/keymaster.py +++ b/swift/common/middleware/crypto/keymaster.py @@ -14,7 +14,6 @@ # limitations under the License. import hashlib import hmac -import os from swift.common.exceptions import UnknownSecretIdError from swift.common.middleware.crypto.crypto_utils import CRYPTO_KEY_CALLBACK @@ -48,8 +47,8 @@ class KeyMasterContext(WSGIContext): self._keys = {} self.alternate_fetch_keys = None - def _make_key_id(self, path, secret_id): - key_id = {'v': '1', 'path': path} + def _make_key_id(self, path, secret_id, version): + key_id = {'v': version, 'path': path} if secret_id: # stash secret_id so that decrypter can pass it back to get the # same keys @@ -75,22 +74,32 @@ class KeyMasterContext(WSGIContext): """ if key_id: secret_id = key_id.get('secret_id') + version = key_id['v'] + if version not in ('1', '2'): + raise ValueError('Unknown key_id version: %s' % version) else: secret_id = self.keymaster.active_secret_id - if secret_id in self._keys: - return self._keys[secret_id] + # v1 had a bug where we would claim the path was just the object + # name if the object started with a slash. Bump versions to + # establish that we can trust the path. + version = '2' + if (secret_id, version) in self._keys: + return self._keys[(secret_id, version)] keys = {} - account_path = os.path.join(os.sep, self.account) + account_path = '/' + self.account try: if self.container: - path = os.path.join(account_path, self.container) + path = account_path + '/' + self.container keys['container'] = self.keymaster.create_key( path, secret_id=secret_id) if self.obj: - path = os.path.join(path, self.obj) + if self.obj.startswith('/') and version == '1': + path = self.obj + else: + path = path + '/' + self.obj keys['object'] = self.keymaster.create_key( path, secret_id=secret_id) @@ -104,21 +113,21 @@ class KeyMasterContext(WSGIContext): # metadata had its keys generated. Currently we have no need to # do that, so we are simply persisting this information for # future use. - keys['id'] = self._make_key_id(path, secret_id) + keys['id'] = self._make_key_id(path, secret_id, version) # pass back a list of key id dicts for all other secret ids in # case the caller is interested, in which case the caller can # call this method again for different secret ids; this avoided # changing the return type of the callback or adding another # callback. Note that the caller should assume no knowledge of # the content of these key id dicts. - keys['all_ids'] = [self._make_key_id(path, id_) + keys['all_ids'] = [self._make_key_id(path, id_, version) for id_ in self.keymaster.root_secret_ids] if self.alternate_fetch_keys: alternate_keys = self.alternate_fetch_keys( key_id=None, *args, **kwargs) keys['all_ids'].extend(alternate_keys.get('all_ids', [])) - self._keys[secret_id] = keys + self._keys[(secret_id, version)] = keys return keys except UnknownSecretIdError: diff --git a/test/unit/common/middleware/crypto/test_encryption.py b/test/unit/common/middleware/crypto/test_encryption.py index 813977a249..11f37d4230 100644 --- a/test/unit/common/middleware/crypto/test_encryption.py +++ b/test/unit/common/middleware/crypto/test_encryption.py @@ -479,7 +479,7 @@ class TestCryptoPipelineChanges(unittest.TestCase): '/a/%s/%s' % (self.container_name, self.object_name), meta['key_id']['path']) self.assertIn('v', meta['key_id']) - self.assertEqual('1', meta['key_id']['v']) + self.assertEqual('2', meta['key_id']['v']) self.assertIn('cipher', meta) self.assertEqual(Crypto.cipher, meta['cipher']) diff --git a/test/unit/common/middleware/crypto/test_keymaster.py b/test/unit/common/middleware/crypto/test_keymaster.py index 0eb86bf47f..324bf0c58a 100644 --- a/test/unit/common/middleware/crypto/test_keymaster.py +++ b/test/unit/common/middleware/crypto/test_keymaster.py @@ -51,6 +51,8 @@ class TestKeymaster(unittest.TestCase): def test_object_path(self): self.verify_keys_for_path( '/a/c/o', expected_keys=('object', 'container')) + self.verify_keys_for_path( + '/a/c//o', expected_keys=('object', 'container')) def test_container_path(self): self.verify_keys_for_path( @@ -79,7 +81,7 @@ class TestKeymaster(unittest.TestCase): self.assertIn('id', keys) id = keys.pop('id') self.assertEqual(path, id['path']) - self.assertEqual('1', id['v']) + self.assertEqual('2', id['v']) keys.pop('all_ids') self.assertListEqual(sorted(expected_keys), sorted(keys.keys()), '%s %s got keys %r, but expected %r' @@ -203,7 +205,7 @@ class TestKeymaster(unittest.TestCase): keys = copy.deepcopy(req.environ[CRYPTO_KEY_CALLBACK](key_id=None)) self.assertIn('id', keys) self.assertEqual(keys.pop('id'), { - 'v': '1', + 'v': '2', 'path': '/a/c', 'secret_id': '22', }) @@ -227,7 +229,7 @@ class TestKeymaster(unittest.TestCase): at_least_one_old_style_id = True self.assertEqual(key_id, { 'path': '/a/c', - 'v': '1', + 'v': '2', }) self.assertTrue(at_least_one_old_style_id) self.assertEqual(len(all_keys), 3) @@ -245,7 +247,7 @@ class TestKeymaster(unittest.TestCase): keys = req.environ.get(CRYPTO_KEY_CALLBACK)(key_id=None) self.assertIn('id', keys) self.assertEqual(keys.pop('id'), { - 'v': '1', + 'v': '2', 'path': '/a/c/o', 'secret_id': '22', }) @@ -267,7 +269,7 @@ class TestKeymaster(unittest.TestCase): self.assertIn(key_id.pop('secret_id'), {'22', 'my_secret_id'}) self.assertEqual(key_id, { 'path': '/a/c/o', - 'v': '1', + 'v': '2', }) self.assertTrue(at_least_one_old_style_id) self.assertEqual(len(all_keys), 3) @@ -346,8 +348,9 @@ class TestKeymaster(unittest.TestCase): # secret_id passed to fetch_crypto_keys callback for secret_id in ('my_secret_id', None): - keys = self.verify_keys_for_path('/a/c/o', ('container', 'object'), - key_id={'secret_id': secret_id}) + keys = self.verify_keys_for_path( + '/a/c/o', ('container', 'object'), + key_id={'secret_id': secret_id, 'v': '2', 'path': '/a/c/o'}) expected_keys = { 'container': hmac.new(secrets[secret_id], b'/a/c', digestmod=hashlib.sha256).digest(), @@ -381,11 +384,11 @@ class TestKeymaster(unittest.TestCase): digestmod=hashlib.sha256).digest(), 'object': hmac.new(secrets['22'], b'/a/c/o', digestmod=hashlib.sha256).digest(), - 'id': {'path': '/a/c/o', 'secret_id': '22', 'v': '1'}, + 'id': {'path': '/a/c/o', 'secret_id': '22', 'v': '2'}, 'all_ids': [ - {'path': '/a/c/o', 'v': '1'}, - {'path': '/a/c/o', 'secret_id': '22', 'v': '1'}, - {'path': '/a/c/o', 'secret_id': 'my_secret_id', 'v': '1'}]} + {'path': '/a/c/o', 'v': '2'}, + {'path': '/a/c/o', 'secret_id': '22', 'v': '2'}, + {'path': '/a/c/o', 'secret_id': 'my_secret_id', 'v': '2'}]} self.assertEqual(expected_keys, keys) self.assertEqual([('/a/c', '22'), ('/a/c/o', '22')], calls) with mock.patch.object(self.app, 'create_key', mock_create_key): @@ -394,22 +397,91 @@ class TestKeymaster(unittest.TestCase): self.assertEqual([('/a/c', '22'), ('/a/c/o', '22')], calls) self.assertEqual(expected_keys, keys) with mock.patch.object(self.app, 'create_key', mock_create_key): - keys = context.fetch_crypto_keys(key_id={'secret_id': None}) + keys = context.fetch_crypto_keys(key_id={ + 'secret_id': None, 'v': '2', 'path': '/a/c/o'}) expected_keys = { 'container': hmac.new(secrets[None], b'/a/c', digestmod=hashlib.sha256).digest(), 'object': hmac.new(secrets[None], b'/a/c/o', digestmod=hashlib.sha256).digest(), - 'id': {'path': '/a/c/o', 'v': '1'}, + 'id': {'path': '/a/c/o', 'v': '2'}, 'all_ids': [ - {'path': '/a/c/o', 'v': '1'}, - {'path': '/a/c/o', 'secret_id': '22', 'v': '1'}, - {'path': '/a/c/o', 'secret_id': 'my_secret_id', 'v': '1'}]} + {'path': '/a/c/o', 'v': '2'}, + {'path': '/a/c/o', 'secret_id': '22', 'v': '2'}, + {'path': '/a/c/o', 'secret_id': 'my_secret_id', 'v': '2'}]} self.assertEqual(expected_keys, keys) self.assertEqual([('/a/c', '22'), ('/a/c/o', '22'), ('/a/c', None), ('/a/c/o', None)], calls) + def test_v1_keys(self): + secrets = {None: os.urandom(32), + '22': os.urandom(33)} + conf = {} + for secret_id, secret in secrets.items(): + opt = ('encryption_root_secret%s' % + (('_%s' % secret_id) if secret_id else '')) + conf[opt] = base64.b64encode(secret) + conf['active_root_secret_id'] = '22' + self.app = keymaster.KeyMaster(self.swift, conf) + orig_create_key = self.app.create_key + calls = [] + + def mock_create_key(path, secret_id=None): + calls.append((path, secret_id)) + return orig_create_key(path, secret_id) + + context = keymaster.KeyMasterContext(self.app, 'a', 'c', 'o') + for version in ('1', '2'): + with mock.patch.object(self.app, 'create_key', mock_create_key): + keys = context.fetch_crypto_keys(key_id={ + 'v': version, 'path': '/a/c/o'}) + expected_keys = { + 'container': hmac.new(secrets[None], b'/a/c', + digestmod=hashlib.sha256).digest(), + 'object': hmac.new(secrets[None], b'/a/c/o', + digestmod=hashlib.sha256).digest(), + 'id': {'path': '/a/c/o', 'v': version}, + 'all_ids': [ + {'path': '/a/c/o', 'v': version}, + {'path': '/a/c/o', 'secret_id': '22', 'v': version}]} + self.assertEqual(expected_keys, keys) + self.assertEqual([('/a/c', None), ('/a/c/o', None)], calls) + del calls[:] + + context = keymaster.KeyMasterContext(self.app, 'a', 'c', '/o') + with mock.patch.object(self.app, 'create_key', mock_create_key): + keys = context.fetch_crypto_keys(key_id={ + 'v': '1', 'path': '/o'}) + expected_keys = { + 'container': hmac.new(secrets[None], b'/a/c', + digestmod=hashlib.sha256).digest(), + 'object': hmac.new(secrets[None], b'/o', + digestmod=hashlib.sha256).digest(), + 'id': {'path': '/o', 'v': '1'}, + 'all_ids': [ + {'path': '/o', 'v': '1'}, + {'path': '/o', 'secret_id': '22', 'v': '1'}]} + self.assertEqual(expected_keys, keys) + self.assertEqual([('/a/c', None), ('/o', None)], calls) + del calls[:] + + context = keymaster.KeyMasterContext(self.app, 'a', 'c', '/o') + with mock.patch.object(self.app, 'create_key', mock_create_key): + keys = context.fetch_crypto_keys(key_id={ + 'v': '2', 'path': '/a/c//o'}) + expected_keys = { + 'container': hmac.new(secrets[None], b'/a/c', + digestmod=hashlib.sha256).digest(), + 'object': hmac.new(secrets[None], b'/a/c//o', + digestmod=hashlib.sha256).digest(), + 'id': {'path': '/a/c//o', 'v': '2'}, + 'all_ids': [ + {'path': '/a/c//o', 'v': '2'}, + {'path': '/a/c//o', 'secret_id': '22', 'v': '2'}]} + self.assertEqual(expected_keys, keys) + self.assertEqual([('/a/c', None), ('/a/c//o', None)], calls) + @mock.patch('swift.common.middleware.crypto.keymaster.readconf') def test_keymaster_config_path(self, mock_readconf): for secret in (os.urandom(32), os.urandom(33), os.urandom(50)):