Multi-key KMS keymaster

Now that the trivial keymaster supports multiple keys, let's not
forget about the KMS/Barbican keymaster. Additional keys are
configured as:

    key_id_<secret_id> = <KMS unique identifier>

As with the trivial keymaster, the key to use for PUTs and POSTs is
specified with:

    active_root_secret_id = <secret_id>

Change-Id: I4f485dcb31e5bea511c4e539c54681091fc5bb1c
This commit is contained in:
Matthew Oliver 2018-08-14 03:09:35 +00:00 committed by Tim Burke
parent e58dce1e4e
commit fda3052a2d
2 changed files with 95 additions and 24 deletions

View File

@ -33,7 +33,8 @@ class KmsKeyMaster(BaseKeyMaster):
'user_id', 'user_domain_id', 'trust_id', 'user_id', 'user_domain_id', 'trust_id',
'domain_id', 'domain_name', 'project_id', 'domain_id', 'domain_name', 'project_id',
'project_domain_id', 'reauthenticate', 'project_domain_id', 'reauthenticate',
'auth_endpoint', 'api_class', 'key_id') 'auth_endpoint', 'api_class', 'key_id*',
'active_root_secret_id')
keymaster_conf_section = 'kms_keymaster' keymaster_conf_section = 'kms_keymaster'
def _get_root_secret(self, conf): def _get_root_secret(self, conf):
@ -70,28 +71,33 @@ class KmsKeyMaster(BaseKeyMaster):
) )
options.enable_logging() options.enable_logging()
manager = key_manager.API(oslo_conf) manager = key_manager.API(oslo_conf)
key = manager.get(ctxt, conf.get('key_id'))
if key is None: root_secrets = {}
raise ValueError("Retrieval of encryption root secret with key_id " for opt, secret_id, key_id in self._load_multikey_opts(
"'%s' returned None." % conf.get('key_id')) conf, 'key_id'):
try: key = manager.get(ctxt, key_id)
if (key.bit_length < 256) or (key.algorithm.lower() != "aes"): if key is None:
raise ValueError('encryption root secret stored in the ' raise ValueError("Retrieval of encryption root secret with "
'external KMS must be an AES key of at least ' "key_id '%s' returned None."
'256 bits (provided key length: %d, provided ' % (key_id, ))
'key algorithm: %s)' try:
% (key.bit_length, key.algorithm)) if (key.bit_length < 256) or (key.algorithm.lower() != "aes"):
if (key.format != 'RAW'): raise ValueError('encryption root secret stored in the '
raise ValueError('encryption root secret stored in the ' 'external KMS must be an AES key of at '
'external KMS must be in RAW format and not ' 'least 256 bits (provided key '
'e.g., as a base64 encoded string (format of ' 'length: %d, provided key algorithm: %s)'
'key with uuid %s: %s)' % % (key.bit_length, key.algorithm))
(conf.get('key_id'), key.format)) if (key.format != 'RAW'):
except Exception: raise ValueError('encryption root secret stored in the '
raise ValueError("Secret with key_id '%s' is not a symmetric key " 'external KMS must be in RAW format and '
"(type: %s)" % (conf.get('key_id'), 'not e.g., as a base64 encoded string '
str(type(key)))) '(format of key with uuid %s: %s)' %
return key.get_encoded() (key_id, key.format))
except Exception:
raise ValueError("Secret with key_id '%s' is not a symmetric "
"key (type: %s)" % (key_id, str(type(key))))
root_secrets[secret_id] = key.get_encoded()
return root_secrets
def filter_factory(global_conf, **local_conf): def filter_factory(global_conf, **local_conf):

View File

@ -129,7 +129,8 @@ class MockBarbicanKeyManager(object):
raise ValueError(ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED) raise ValueError(ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED)
elif key_id == TEST_KMS_NONE_KEY_ID: elif key_id == TEST_KMS_NONE_KEY_ID:
return None return None
return MockBarbicanKey(b'x' * 32, key_id) key_str = (str(key_id[0]) * 32).encode('utf8')
return MockBarbicanKey(key_str, key_id)
class MockBarbicanKey(object): class MockBarbicanKey(object):
@ -792,6 +793,70 @@ class TestKmsKeymaster(unittest.TestCase):
print("Unexpected error: %s" % sys.exc_info()[0]) print("Unexpected error: %s" % sys.exc_info()[0])
raise raise
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
'keystone_password.KeystonePassword', MockPassword)
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
@mock.patch('swift.common.middleware.crypto.keymaster.readconf')
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
def test_get_root_secret_multiple_keys(
self, mock_castellan_key_manager, mock_readconf,
mock_castellan_options, mock_oslo_config,):
config = dict(TEST_KMS_KEYMASTER_CONF)
config.update({
'key_id_foo': 'foo-valid_kms_key_id-123456',
'key_id_bar': 'bar-valid_kms_key_id-123456',
'active_root_secret_id': 'foo'})
# Set side_effect functions.
mock_castellan_key_manager.API.side_effect = (
mock_castellan_api_side_effect)
mock_castellan_options.set_defaults.side_effect = (
mock_options_set_defaults_side_effect)
mock_oslo_config.ConfigOpts.side_effect = (
mock_config_opts_side_effect)
# Return valid Barbican configuration parameters.
mock_readconf.return_value = config
self.app = kms_keymaster.KmsKeyMaster(self.swift,
config)
expected_secrets = {
None: b'vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv',
'foo': b'ffffffffffffffffffffffffffffffff',
'bar': b'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'}
self.assertDictEqual(self.app._root_secrets, expected_secrets)
self.assertEqual(self.app.active_secret_id, 'foo')
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
'keystone_password.KeystonePassword', MockPassword)
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
@mock.patch('swift.common.middleware.crypto.keymaster.readconf')
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
def test_get_root_secret_legacy_key_id(
self, mock_castellan_key_manager, mock_readconf,
mock_castellan_options, mock_oslo_config):
# Set side_effect functions.
mock_castellan_key_manager.API.side_effect = (
mock_castellan_api_side_effect)
mock_castellan_options.set_defaults.side_effect = (
mock_options_set_defaults_side_effect)
mock_oslo_config.ConfigOpts.side_effect = (
mock_config_opts_side_effect)
# Return valid Barbican configuration parameters.
mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF
self.app = kms_keymaster.KmsKeyMaster(self.swift,
TEST_KMS_KEYMASTER_CONF)
expected_secrets = {None: b'vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv'}
self.assertDictEqual(self.app._root_secrets, expected_secrets)
self.assertIsNone(self.app.active_secret_id)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()