diff --git a/doc/source/overview_encryption.rst b/doc/source/overview_encryption.rst index 8cbee8132b..5ebbbc85fc 100644 --- a/doc/source/overview_encryption.rst +++ b/doc/source/overview_encryption.rst @@ -86,6 +86,11 @@ The keymaster config option ``encryption_root_secret`` MUST be set to a value of at least 44 valid base-64 characters before the middleware is used and should be consistent across all proxy servers. The minimum length of 44 has been chosen because it is the length of a base-64 encoded 32 byte value. +Alternatives to specifying the encryption root secret directly in the +`proxy-server.conf` file are storing it in a separate file, or storing it in +an :ref:`external key management system +` such as `Barbican +`_. .. note:: @@ -119,6 +124,135 @@ into GET and PUT requests by the :ref:`copy` middleware before reaching the encryption middleware and as a result object data and metadata is decrypted and re-encrypted when copied. +.. _encryption_root_secret_in_external_kms: + +Encryption Root Secret in External Key Management System +-------------------------------------------------------- + +The benefits of using +a dedicated system for storing the encryption root secret include the +auditing and access control infrastructure that are already in place in such a +system, and the fact that an encryption root secret stored in a key management +system (KMS) may be backed by a hardware security module (HSM) for additional +security. Another significant benefit of storing the root encryption secret in +an external KMS is that it is in this case never stored on a disk in the Swift +cluster. + +Make sure the required dependencies are installed for retrieving an encryption +root secret from an external KMS. This can be done when installing Swift (add +the ``-e`` flag to install as a development version) by changing to the Swift +directory and running the following command to install Swift together with +the ``kms_keymaster`` extra dependencies:: + + sudo pip install .[kms_keymaster] + +Another way to install the dependencies is by making sure the +following lines exist in the requirements.txt file, and installing them using +``pip install -r requirements.txt``:: + + cryptography>=1.6 # BSD/Apache-2.0 + castellan>=0.6.0 + +.. note:: + + If any of the required packages is already installed, the ``--upgrade`` + flag may be required for the ``pip`` commands in order for the required + minimum version to be installed. + +To make use of an encryption root secret stored in an external KMS, +replace the keymaster middleware with the kms_keymaster middleware in the +proxy server WSGI pipeline in `proxy-server.conf`, in the order shown in this +example:: + + kms_keymaster encryption proxy-logging proxy-server + +and add a section to the same file:: + + [filter:kms_keymaster] + use = egg:swift#kms_keymaster + keymaster_config_path = file_with_kms_keymaster_config + +Create or edit the file `file_with_kms_keymaster_config` referenced above. +For further details on the middleware configuration options, see the +`keymaster.conf-sample` file. An example of the content of this file, with +optional parameters omitted, is below:: + + [kms_keymaster] + key_id = changeme + username = swift + password = password + project_name = swift + auth_endpoint = http://keystonehost:5000/v3 + +The encryption root secret shall be created and stored in the external key +management system before it can be used by the keymaster. It shall be stored +as a symmetric key, with content type ``application/octet-stream``, +``base64`` content encoding, ``AES`` algorithm, bit length ``256``, and secret +type ``symmetric``. The mode ``ctr`` may also be stored for informational +purposes - it is not currently checked by the keymaster. + +The following command can be used to store the currently configured +``encryption_root_secret`` value from the `proxy-server.conf` file +in Barbican:: + + openstack secret store --name swift_root_secret \ + --payload-content-type="application/octet-stream" \ + --payload-content-encoding="base64" --algorithm aes --bit-length 256 \ + --mode ctr --secret-type symmetric --payload + +Alternatively, the existing root secret can also be stored in Barbican using +`curl `__. + +.. note:: + + The credentials used to store the secret in Barbican shall be the same + ones that the proxy server uses to retrieve the secret, i.e., the ones + configured in the `keymaster.conf` file. For clarity reasons the commands + shown here omit the credentials - they may be specified explicitly, or in + environment variables. + +Instead of using an existing root secret, Barbican can also be asked to +generate a new 256-bit root secret, with content type +``application/octet-stream`` and algorithm ``AES`` (the ``mode`` parameter is +currently optional):: + + openstack secret order create --name swift_root_secret \ + --payload-content-type="application/octet-stream" --algorithm aes \ + --bit-length 256 --mode ctr key + +The ``order create`` creates an asynchronous request to create the actual +secret. +The order can be retrieved using ``openstack secret order get``, and once the +order completes successfully, the output will show the key id of the generated +root secret. +Keys currently stored in Barbican can be listed using the +``openstack secret list`` command. + +.. note:: + + Both the order (the asynchronous request for creating or storing a secret), + and the actual secret itself, have similar unique identifiers. Once the + order has been completed, the key id is shown in the output of the ``order + get`` command. + +The keymaster uses the explicitly configured username and password (and +project name etc.) from the `keymaster.conf` file for retrieving the encryption +root secret from an external key management system. The `Castellan library +`_ is used to communicate with +Barbican. + +For the proxy server, reading the encryption root secret directly from the +`proxy-server.conf` file, from the `keymaster.conf` file pointed to +from the `proxy-server.conf` file, or from an external key management system +such as Barbican, are all functionally equivalent. In case reading the +encryption root secret from the external key management system fails, the +proxy server will not start up. If the encryption root secret is retrieved +successfully, it is cached in memory in the proxy server. + +For further details on the configuration options, see the +`[filter:kms_keymaster]` section in the `proxy-server.conf-sample` file, and +the `keymaster.conf-sample` file. + Upgrade Considerations ---------------------- diff --git a/etc/keymaster.conf-sample b/etc/keymaster.conf-sample new file mode 100644 index 0000000000..56f069f52e --- /dev/null +++ b/etc/keymaster.conf-sample @@ -0,0 +1,77 @@ +[keymaster] +# Sets the root secret from which encryption keys are derived. This must be set +# before first use to a value that is a base64 encoding of at least 32 bytes. +# The security of all encrypted data critically depends on this key, therefore +# it should be set to a high-entropy value. For example, a suitable value may +# be obtained by base-64 encoding a 32 byte (or longer) value generated by a +# cryptographically secure random number generator. Changing the root secret is +# likely to result in data loss. If this option is set, the root secret MUST +# NOT be set in proxy-server.conf. +# encryption_root_secret = changeme + +[kms_keymaster] +# The kms_keymaster section is used for configuring a keymaster that retrieves +# the encryption root secret from an external key management system (kms), +# using the Castellan abstraction layer. Castellan can support various kms +# backends that use Keystone for authentication. Currently, the only +# implemented backend is for Barbican. + +# The api_class tells Castellan which key manager to use to access the external +# key management system. The default value that accesses Barbican is +# castellan.key_manager.barbican_key_manager.BarbicanKeyManager. +# api_class = castellan.key_manager.barbican_key_manager.BarbicanKeyManager + +# The configuration options below apply to a Barbican KMS being accessed using +# Castellan. If another KMS type is used (by specifying another value for +# api_class), then other configuration options may be required. + +# The key_id is the identifier of the root secret stored in the KMS. For +# details of how to store an existing root secret in Barbican, or how to +# generate a new root secret in Barbican, see the 'overview_encryption' +# documentation. +# The key_id is the final part of the secret href returned in the +# output of an 'openstack secret order get' command after an order to store or +# create a key has been successfully completed. See the 'overview_encryption' +# documentation for more information on this command. +# key_id = changeme + +# The Keystone username of the user used to access the key from the KMS. The +# username shall be set to match an existing user. +# username = changeme + +# The password to go with the Keystone username above. +# password = changeme + +# The Keystone project name. For security reasons, it is recommended to set +# the project_name to a project separate from the service project used by +# other OpenStack services. Thereby, if another service is compromised, it will +# not have access to the Swift root encryption secret. It is recommended that +# the swift user is the only one that has a role in this project. +# project_name = changeme +# Instead of the project name, the project id may also be used. +# project_id = changeme + +# The Keystone URL to authenticate to. The value of auth_url may be +# set according to the value of auth_uri in [filter:authtoken] in +# proxy-server.conf. Currently, the only supported version of the Identity API +# is v3, which requires that the url end in "/v3". +# auth_endpoint = http://keystonehost:5000/v3 + +# The project and user domain names may optionally be specified. If they are +# not specified, the default values of 'Default' (for *_domain_name) and +# 'default' (for *_domain_id) are used (note the capitalization). +# project_domain_name = Default +# user_domain_name = Default +# Instead of the project domain name and user domain name, the project domain +# id and user domain id may also be specified. +# project_domain_id = default +# user_domain_id = default + +# The following configuration options may also be used in addition to/instead +# of the above options. Refer to the Keystone documentation for more details +# on the usage of the options: https://docs.openstack.org/keystone/ +# user_id = changeme +# trust_id = changeme +# reauthenticate = changeme +# domain_id = changeme +# domain_name = changeme diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index ea1620a54c..c07c48ff35 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -889,6 +889,24 @@ encryption_root_secret = changeme # MUST NOT be set in proxy-server.conf. # keymaster_config_path = +# To store the encryption root secret in a remote key management system (KMS) +# such as Barbican, replace the keymaster middleware with the kms_keymaster +# middleware in the proxy-server pipeline. They should be to the right of all +# other middleware apart from the final proxy-logging middleware, and in the +# order shown in this example: +# kms_keymaster encryption proxy-logging proxy-server +[filter:kms_keymaster] +use = egg:swift#kms_keymaster + +# Sets the path from which the keymaster config options should be read. This +# allows multiple processes which need to be encryption-aware (for example, +# proxy-server and container-sync) to share the same config file, ensuring +# that the encryption keys used are the same. The format expected is similar +# to other config files, with a single [kms_keymaster] section. See the +# keymaster.conf-sample file for details on the kms_keymaster configuration +# options. +# keymaster_config_path = + [filter:encryption] use = egg:swift#encryption diff --git a/requirements.txt b/requirements.txt index 9f1582e16a..1001be5723 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ pastedeploy>=1.3.3 six>=1.9.0 xattr>=0.4 PyECLib>=1.3.1 # BSD -cryptography>=1.0,!=1.3.0 # BSD/Apache-2.0 +cryptography!=2.0,>=1.6 # BSD/Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 735c518d71..e99d858108 100644 --- a/setup.cfg +++ b/setup.cfg @@ -63,6 +63,11 @@ scripts = bin/swift-ring-builder-analyzer bin/swift-temp-url +[extras] +kms_keymaster = + oslo.config>=4.0.0,!=4.3.0,!=4.4.0 # Apache-2.0 + castellan>=0.7.0 # Apache-2.0 + [entry_points] paste.app_factory = proxy = swift.proxy.server:app_factory @@ -100,6 +105,7 @@ paste.filter_factory = copy = swift.common.middleware.copy:filter_factory keymaster = swift.common.middleware.crypto.keymaster:filter_factory encryption = swift.common.middleware.crypto:filter_factory + kms_keymaster = swift.common.middleware.crypto.kms_keymaster:filter_factory [build_sphinx] all_files = 1 diff --git a/swift/common/middleware/crypto/keymaster.py b/swift/common/middleware/crypto/keymaster.py index f7a7dd6391..574b8a85ec 100644 --- a/swift/common/middleware/crypto/keymaster.py +++ b/swift/common/middleware/crypto/keymaster.py @@ -101,42 +101,61 @@ class KeyMasterContext(WSGIContext): class KeyMaster(object): """Middleware for providing encryption keys. - The middleware requires its ``encryption_root_secret`` option to be set. - This is the root secret from which encryption keys are derived. This must - be set before first use to a value that is a base64 encoding of at least 32 - bytes. The security of all encrypted data critically depends on this key, - therefore it should be set to a high-entropy value. For example, a suitable - value may be obtained by base-64 encoding a 32 byte (or longer) value - generated by a cryptographically secure random number generator. Changing - the root secret is likely to result in data loss. + The middleware requires its encryption root secret to be set. This is the + root secret from which encryption keys are derived. This must be set before + first use to a value that is at least 256 bits. The security of all + encrypted data critically depends on this key, therefore it should be set + to a high-entropy value. For example, a suitable value may be obtained by + generating a 32 byte (or longer) value using a cryptographically secure + random number generator. Changing the root secret is likely to result in + data loss. """ def __init__(self, app, conf): self.app = app + self.keymaster_config_path = conf.get('keymaster_config_path') + # The _get_root_secret() function is overridden by other keymasters + self.root_secret = self._get_root_secret(conf) - keymaster_config_path = conf.get('keymaster_config_path') - if keymaster_config_path: - if any(opt in conf for opt in ('encryption_root_secret',)): + def _get_root_secret(self, conf): + """ + This keymaster requires its ``encryption_root_secret`` option to be + set. This must be set before first use to a value that is a base64 + encoding of at least 32 bytes. The encryption root secret is stored + in either proxy-server.conf, or in an external file referenced from + proxy-server.conf using ``keymaster_config_path``. + + :param conf: the keymaster config section from proxy-server.conf + :type conf: dict + + :return: the encryption root secret binary bytes + :rtype: bytearray + """ + if self.keymaster_config_path: + keymaster_opts = ['encryption_root_secret'] + if any(opt in conf for opt in keymaster_opts): raise ValueError('keymaster_config_path is set, but there ' - 'are other config options specified!') - conf = readconf(keymaster_config_path, 'keymaster') - - self.root_secret = conf.get('encryption_root_secret') + 'are other config options specified: %s' % + ", ".join(list( + set(keymaster_opts).intersection(conf)))) + conf = readconf(self.keymaster_config_path, 'keymaster') + b64_root_secret = conf.get('encryption_root_secret') try: # b64decode will silently discard bad characters, but we should # treat them as an error - if not isinstance(self.root_secret, six.string_types) or any( + if not isinstance(b64_root_secret, six.string_types) or any( c not in string.digits + string.ascii_letters + '/+\r\n' - for c in self.root_secret.strip('\r\n=')): + for c in b64_root_secret.strip('\r\n=')): raise ValueError - self.root_secret = base64.b64decode(self.root_secret) - if len(self.root_secret) < 32: + binary_root_secret = base64.b64decode(b64_root_secret) + if len(binary_root_secret) < 32: raise ValueError + return binary_root_secret except (TypeError, ValueError): raise ValueError( 'encryption_root_secret option in %s must be a base64 ' 'encoding of at least 32 raw bytes' % ( - keymaster_config_path or 'proxy-server.conf')) + self.keymaster_config_path or 'proxy-server.conf')) def __call__(self, env, start_response): req = Request(env) diff --git a/swift/common/middleware/crypto/kms_keymaster.py b/swift/common/middleware/crypto/kms_keymaster.py new file mode 100644 index 0000000000..383953d809 --- /dev/null +++ b/swift/common/middleware/crypto/kms_keymaster.py @@ -0,0 +1,114 @@ +# Copyright (c) 2016 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from castellan import key_manager, options +from castellan.common.credentials import keystone_password +from oslo_config import cfg +from swift.common.middleware.crypto.keymaster import KeyMaster +from swift.common.utils import readconf + + +class KmsKeyMaster(KeyMaster): + """Middleware for retrieving a encryption root secret from an external KMS. + + The middleware accesses the encryption root secret from an external key + management system (KMS), e.g., a Barbican service, using Castellan. To be + able to do so, the appropriate configuration options shall be set in the + proxy-server.conf file, or in the configuration pointed to using the + keymaster_config_path configuration value in the proxy-server.conf file. + """ + + def __init__(self, app, conf): + # Call the superclass __init__() method, which calls the overridden + # self._get_root_secret() below. + super(KmsKeyMaster, self).__init__(app, conf) + + def _get_root_secret(self, conf): + """ + Retrieve the root encryption secret from an external key management + system using Castellan. + + :param conf: the keymaster config section from proxy-server.conf + :type conf: dict + + :return: the encryption root secret binary bytes + :rtype: bytearray + """ + if self.keymaster_config_path is not None: + keymaster_opts = ['username', 'password', 'project_name', + 'user_domain_name', 'project_domain_name', + 'user_id', 'user_domain_id', 'trust_id', + 'domain_id', 'domain_name', 'project_id', + 'project_domain_id', 'reauthenticate', + 'auth_endpoint', 'api_class', 'key_id'] + if any(opt in conf for opt in keymaster_opts): + raise ValueError('keymaster_config_path is set, but there ' + 'are other config options specified: %s' % + ", ".join(list( + set(keymaster_opts).intersection(conf)))) + conf = readconf(self.keymaster_config_path, 'kms_keymaster') + ctxt = keystone_password.KeystonePassword( + username=conf.get('username'), + password=conf.get('password'), + project_name=conf.get('project_name'), + user_domain_name=conf.get('user_domain_name'), + project_domain_name=conf.get( + 'project_domain_name'), + user_id=conf.get('user_id'), + user_domain_id=conf.get('user_domain_id'), + trust_id=conf.get('trust_id'), + domain_id=conf.get('domain_id'), + domain_name=conf.get('domain_name'), + project_id=conf.get('project_id'), + project_domain_id=conf.get('project_domain_id'), + reauthenticate=conf.get('reauthenticate')) + oslo_conf = cfg.ConfigOpts() + options.set_defaults( + oslo_conf, auth_endpoint=conf.get('auth_endpoint'), + api_class=conf.get('api_class') + ) + options.enable_logging() + manager = key_manager.API(oslo_conf) + key = manager.get(ctxt, conf.get('key_id')) + if key is None: + raise ValueError("Retrieval of encryption root secret with key_id " + "'%s' returned None." % conf.get('key_id')) + try: + if (key.bit_length < 256) or (key.algorithm.lower() != "aes"): + raise ValueError('encryption root secret stored in the ' + 'external KMS must be an AES key of at least ' + '256 bits (provided key length: %d, provided ' + 'key algorithm: %s)' + % (key.bit_length, key.algorithm)) + if (key.format != 'RAW'): + raise ValueError('encryption root secret stored in the ' + 'external KMS must be in RAW format and not ' + 'e.g., as a base64 encoded string (format of ' + 'key with uuid %s: %s)' % + (conf.get('key_id'), key.format)) + except Exception: + raise ValueError("Secret with key_id '%s' is not a symmetric key " + "(type: %s)" % (conf.get('key_id'), + str(type(key)))) + return key.get_encoded() + + +def filter_factory(global_conf, **local_conf): + conf = global_conf.copy() + conf.update(local_conf) + + def kms_keymaster_filter(app): + return KmsKeyMaster(app, conf) + + return kms_keymaster_filter diff --git a/test/functional/mock_swift_key_manager.py b/test/functional/mock_swift_key_manager.py new file mode 100644 index 0000000000..388b84d5a1 --- /dev/null +++ b/test/functional/mock_swift_key_manager.py @@ -0,0 +1,59 @@ +# Copyright (c) 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from castellan.tests.unit.key_manager.mock_key_manager import MockKeyManager + + +class MockSwiftKeyManager(MockKeyManager): + """Mocking key manager for Swift functional tests. + + This mock key manager implementation extends the Castellan mock key + manager with support for a pre-existing key that the Swift proxy server + can use as the root encryption secret. The actual key material bytes + for the root encryption secret changes each time this mock key manager is + instantiated, meaning that data written earlier is no longer accessible + once the proxy server is restarted. + + To use this mock key manager instead of the default Barbican key manager, + set the following property in the [kms_keymaster] section in the + keymaster.conf configuration file pointed to using the + keymaster_config_path property in the [filter:kms_keymaster] section in the + proxy-server.conf file: + + api_class = test.functional.mock_swift_key_manager.MockSwiftKeyManager + + In case of a Python import error, make sure that the swift directory under + which this mock key manager resides is early in the sys.path, e.g., by + setting it in the PYTHONPATH environment variable before starting the + proxy server. + + This key manager is not suitable for use in production deployments. + """ + + def __init__(self, configuration=None): + super(MockSwiftKeyManager, self).__init__(configuration) + ''' + Create a new, random symmetric key for use as the encryption root + secret. + ''' + existing_key = self._generate_key(algorithm='AES', length=256) + ''' + Store the key under the UUID 'mock_key_manager_existing_key', from + where it can be retrieved by the proxy server. In the kms_keymaster + configuration, set the following property to use this key: + + key_id = mock_key_manager_existing_key + ''' + self.keys['mock_key_manager_existing_key'] = existing_key diff --git a/test/unit/common/middleware/crypto/test_keymaster.py b/test/unit/common/middleware/crypto/test_keymaster.py index 676de93e36..a92ee5308c 100644 --- a/test/unit/common/middleware/crypto/test_keymaster.py +++ b/test/unit/common/middleware/crypto/test_keymaster.py @@ -213,9 +213,11 @@ class TestKeymaster(unittest.TestCase): 'keymaster_config_path': '/ets/swift/keymaster.conf'} with self.assertRaises(ValueError) as err: keymaster.KeyMaster(self.swift, conf) - self.assertEqual('keymaster_config_path is set, but there are ' - 'other config options specified!', - err.exception.message) + expected_message = ('keymaster_config_path is set, but there are ' + 'other config options specified:') + self.assertTrue(err.exception.message.startswith(expected_message), + "Error message does not start with '%s'" % + expected_message) if __name__ == '__main__': diff --git a/test/unit/common/middleware/crypto/test_kms_keymaster.py b/test/unit/common/middleware/crypto/test_kms_keymaster.py new file mode 100644 index 0000000000..a2c2206e4f --- /dev/null +++ b/test/unit/common/middleware/crypto/test_kms_keymaster.py @@ -0,0 +1,783 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import base64 +import mock +import unittest +import sys +sys.modules['castellan'] = mock.Mock() +sys.modules['castellan.common'] = mock.Mock() +sys.modules['castellan.common.credentials'] = mock.Mock() + +from keystoneauth1.exceptions.connection import ConnectFailure +from keystoneauth1.exceptions.http import Unauthorized +from keystoneclient.exceptions import DiscoveryFailure +from swift.common.middleware.crypto import kms_keymaster +from swift.common.swob import Request +from test.unit.common.middleware.helpers import FakeSwift, FakeAppThatExcepts + +TEST_KMS_INVALID_KEY_ID = 'invalid-kms-key-id' +TEST_KMS_NONEXISTENT_KEY_ID = '11111111-1111-1111-1111-ffffffffffff' +TEST_KMS_OPAQUE_KEY_ID = '22222222-2222-2222-2222-aaaaaaaaaaaa' +TEST_KMS_SHORT_KEY_ID = '22222222-2222-2222-2222-bbbbbbbbbbbb' +TEST_KMS_DES_KEY_ID = '22222222-2222-2222-2222-cccccccccccc' +TEST_KMS_NONE_KEY_ID = '22222222-2222-2222-2222-dddddddddddd' +TEST_KMS_INVALID_API_VERSION = 'vBadVersion' +TEST_KMS_INVALID_USER_DOMAIN_NAME = "baduserdomainname" +TEST_KMS_CONNECT_FAILURE_URL = 'http://endpoint_url_connect_error:45621' +TEST_KMS_NON_BARBICAN_URL = 'http://endpoint_url_nonbarbican:45621' +TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF = { + 'keymaster_config_path': 'PATH_TO_KEYMASTER_CONFIG_FILE', +} +TEST_KMS_KEYMASTER_CONF = { + 'auth_endpoint': 'kmsauthurlv3', + 'password': 'kmspass', + 'username': 'kmsuser', + 'user_domain_id': None, + 'user_domain_name': 'default', + 'project_id': None, + 'project_name': 'kmsproject', + 'project_domain_id': None, + 'project_domain_name': 'default', + 'key_id': 'valid_kms_key_id-abcdefg-123456' +} + + +def capture_start_response(): + calls = [] + + def start_response(*args): + calls.append(args) + return start_response, calls + + +def mock_castellan_api_side_effect(*args, **kwargs): + return MockBarbicanKeyManager(args[0]) + + +def mock_options_set_defaults_side_effect(*args, **kwargs): + ''' + Add options from kwargs into args dict. + ''' + args[0].update(kwargs) + + +def mock_config_opts_side_effect(*args, **kwargs): + return dict() + + +def mock_keystone_password_side_effect(username, password, project_name, + user_domain_name, project_domain_name, + user_id, user_domain_id, trust_id, + domain_id, domain_name, project_id, + project_domain_id, reauthenticate): + return MockPassword(username, password, project_name, user_domain_name, + project_domain_name, user_id, user_domain_id, trust_id, + domain_id, domain_name, project_id, project_domain_id, + reauthenticate) + +ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED = 'Secret incorrectly specified.' +ERR_MESSAGE_KEY_UUID_NOT_FOUND = 'Key not found, uuid: ' + + +class MockBarbicanKeyManager(object): + def __init__(self, conf): + self.conf = conf + + def get(self, ctxt, key_id): + # If authentication fails, raise an exception here. + if (TEST_KMS_KEYMASTER_CONF['username'] != + ctxt.username + or TEST_KMS_KEYMASTER_CONF['password'] != + ctxt.password or + TEST_KMS_KEYMASTER_CONF['user_domain_name'] != + ctxt.user_domain_name): + raise Unauthorized( + message='The request you have made requires authentication.', + http_status=401) + elif self.conf['auth_endpoint'] == TEST_KMS_CONNECT_FAILURE_URL: + raise ConnectFailure('Unable to establish connection') + elif self.conf['auth_endpoint'] == TEST_KMS_NON_BARBICAN_URL: + raise DiscoveryFailure( + 'Could not determine a suitable URL for the plugin') + elif (self.conf['auth_endpoint'] != + TEST_KMS_KEYMASTER_CONF['auth_endpoint']): + raise Unauthorized( + message='Cannot authorize API client.') + elif (key_id == TEST_KMS_NONEXISTENT_KEY_ID): + message = ERR_MESSAGE_KEY_UUID_NOT_FOUND + key_id + ''' + Raising a ManagedObjectNotFoundError would require importing it + from castellan.common.exception. To avoid this import, raising a + general Exception. + ''' + raise Exception(message) + elif key_id == TEST_KMS_INVALID_KEY_ID: + raise ValueError(ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED) + elif key_id == TEST_KMS_NONE_KEY_ID: + return None + return MockBarbicanKey(b'x' * 32, key_id) + + +class MockBarbicanKey(object): + def __init__(self, key_material, key_id): + self.key_material = key_material + self.bit_length = len(key_material) * 8 + if key_id == TEST_KMS_OPAQUE_KEY_ID: + self.format = 'Opaque' + else: + self.format = 'RAW' + self.algorithm = "aes" + if key_id == TEST_KMS_DES_KEY_ID: + self.format = 'des' + if key_id == TEST_KMS_SHORT_KEY_ID: + self.bit_length = 128 + self.key_material[:128] + + def get_encoded(self): + return self.key_material + + def format(self): + return self.format + + +class MockPassword(object): + def __init__(self, username, password, project_name, user_domain_name, + project_domain_name, user_id, user_domain_id, trust_id, + domain_id, domain_name, project_id, project_domain_id, + reauthenticate): + self.password = password + self.username = username + self.user_domain_name = user_domain_name + self.project_name = project_name + self.project_domain_name = project_domain_name + self.user_id = user_id, + self.user_domain_id = user_domain_id, + self.trust_id = trust_id, + self.domain_id = domain_id, + self.domain_name = domain_name, + self.project_id = project_id, + self.project_domain_id = project_domain_id, + self.reauthenticate = reauthenticate + + +class TestKmsKeymaster(unittest.TestCase): + """ + Unit tests for storing the encryption root secret in a Barbican external + key management system accessed using Castellan. + """ + + def setUp(self): + super(TestKmsKeymaster, self).setUp() + self.swift = FakeSwift() + + """ + Tests using the v3 Identity API, where all calls to Barbican are mocked. + """ + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch.object(kms_keymaster.KmsKeyMaster, + '_get_root_secret') + def test_filter_v3(self, mock_get_root_secret_from_kms, + mock_readconf): + mock_get_root_secret_from_kms.return_value = ( + base64.b64encode(b'x' * 32)) + mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF + factory = kms_keymaster.filter_factory(TEST_KMS_KEYMASTER_CONF) + self.assertTrue(callable(factory)) + self.assertTrue(callable(factory(self.swift))) + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch.object(kms_keymaster.KmsKeyMaster, + '_get_root_secret') + def test_app_exception_v3(self, mock_get_root_secret_from_kms, + mock_readconf): + mock_get_root_secret_from_kms.return_value = ( + base64.b64encode(b'x' * 32)) + mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF + app = kms_keymaster.KmsKeyMaster( + FakeAppThatExcepts(), TEST_KMS_KEYMASTER_CONF) + req = Request.blank('/', environ={'REQUEST_METHOD': 'PUT'}) + start_response, _ = capture_start_response() + self.assertRaises(Exception, app, req.environ, start_response) + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch.object(kms_keymaster.KmsKeyMaster, '_get_root_secret') + def test_get_root_secret( + self, mock_get_root_secret_from_kms, mock_readconf): + # Successful call with coarse _get_root_secret_from_kms() mock. + mock_get_root_secret_from_kms.return_value = ( + base64.b64encode(b'x' * 32)) + ''' + Return valid Barbican configuration parameters. + ''' + mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF + ''' + Verify that keys are derived correctly by the keymaster. + ''' + self.app = kms_keymaster.KmsKeyMaster(self.swift, + TEST_KMS_KEYMASTER_CONF) + ''' + Verify that _get_root_secret_from_kms() was called with the + correct parameters. + ''' + mock_get_root_secret_from_kms.assert_called_with( + TEST_KMS_KEYMASTER_CONF + ) + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, mock_keystone_password): + # Successful call with finer grained mocks. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return valid Barbican configuration parameters. + ''' + mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF + + ''' + Verify that no exceptions are raised by the mocked functions. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster(self.swift, + TEST_KMS_KEYMASTER_CONF) + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_invalid_key_id( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Invalid key ID. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['key_id'] = TEST_KMS_INVALID_KEY_ID + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though key id invalid') + except ValueError as e: + self.assertEqual(e.message, + ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED) + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_nonexistent_key_id( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Nonexistent key. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['key_id'] = TEST_KMS_NONEXISTENT_KEY_ID + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though key id invalid') + except Exception as e: + expected_message = ('Key not found, uuid: ' + + TEST_KMS_NONEXISTENT_KEY_ID) + self.assertEqual(e.message, expected_message) + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_invalid_key_format( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Nonexistent key. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['key_id'] = TEST_KMS_OPAQUE_KEY_ID + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though key format invalid') + except ValueError: + pass + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_config_file_and_params( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Both external config file and config parameters specified. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['keymaster_config_path'] = ( + 'PATH_TO_KEYMASTER_CONFIG_FILE' + ) + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster(self.swift, kms_conf) + raise Exception('Success even though config invalid') + except Exception as e: + expected_message = ('keymaster_config_path is set, but there are ' + 'other config options specified:') + self.assertTrue(e.message.startswith(expected_message), + "Error message does not start with '%s'" % + expected_message) + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_invalid_username( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Invalid username. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['username'] = 'invaliduser' + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though username invalid') + except Unauthorized as e: + self.assertEqual(e.http_status, 401) + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_invalid_password( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Invalid password. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['password'] = 'invalidpassword' + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though password invalid') + except Unauthorized as e: + self.assertEqual(e.http_status, 401) + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_connect_failure_auth_url( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, mock_keystone_password): + # Connect failure kms auth_url. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['auth_endpoint'] = TEST_KMS_CONNECT_FAILURE_URL + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though auth_url invalid') + except ConnectFailure: + pass + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_bad_auth_url( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Bad kms auth_url. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['auth_endpoint'] = TEST_KMS_NON_BARBICAN_URL + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though auth_url invalid') + except DiscoveryFailure: + pass + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_bad_user_domain_name( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, mock_keystone_password): + # Bad user domain name with mocks. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['user_domain_name'] = ( + TEST_KMS_INVALID_USER_DOMAIN_NAME) + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though api_version invalid') + except Unauthorized as e: + self.assertEqual(e.http_status, 401) + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_invalid_key_algorithm( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Nonexistent key. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['key_id'] = TEST_KMS_DES_KEY_ID + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though key format invalid') + except ValueError: + pass + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_invalid_key_length( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Nonexistent key. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['key_id'] = TEST_KMS_SHORT_KEY_ID + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though key format invalid') + except ValueError: + pass + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + @mock.patch('swift.common.middleware.crypto.kms_keymaster.' + 'keystone_password.KeystonePassword') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.options') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf') + @mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager') + def test_mocked_castellan_keymanager_none_key( + self, mock_castellan_key_manager, mock_readconf, + mock_castellan_options, mock_oslo_config, + mock_keystone_password): + # Nonexistent key. + mock_keystone_password.side_effect = ( + mock_keystone_password_side_effect) + ''' + Set side_effect functions. + ''' + mock_castellan_key_manager.API.side_effect = ( + mock_castellan_api_side_effect) + mock_castellan_options.set_defaults.side_effect = ( + mock_options_set_defaults_side_effect) + mock_oslo_config.ConfigOpts.side_effect = ( + mock_config_opts_side_effect) + ''' + Return invalid Barbican configuration parameters. + ''' + kms_conf = dict(TEST_KMS_KEYMASTER_CONF) + kms_conf['key_id'] = TEST_KMS_NONE_KEY_ID + mock_readconf.return_value = kms_conf + + ''' + Verify that an exception is raised by the mocked function. + ''' + try: + self.app = kms_keymaster.KmsKeyMaster( + self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF) + raise Exception('Success even though None key returned') + except ValueError: + pass + except Exception: + print("Unexpected error: %s" % sys.exc_info()[0]) + raise + + +if __name__ == '__main__': + unittest.main()