diff --git a/glance/common/signature_utils.py b/glance/common/signature_utils.py index 3e6d952a36..f8ca992a6f 100644 --- a/glance/common/signature_utils.py +++ b/glance/common/signature_utils.py @@ -21,6 +21,7 @@ import datetime from castellan import key_manager from cryptography import exceptions as crypto_exception from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes @@ -45,15 +46,20 @@ HASH_METHODS = { 'SHA-512': hashes.SHA512() } -# These are the currently supported signature key types -(RSA_PSS,) = ( - 'RSA-PSS', -) +# Currently supported signature key types +# RSA Options +RSA_PSS = 'RSA-PSS' -# This includes the supported public key type for the signature key type -SIGNATURE_KEY_TYPES = { - RSA_PSS: rsa.RSAPublicKey -} +# ECC curves -- note that only those with key sizes >=384 are included +# Note also that some of these may not be supported by the cryptography backend +ECC_CURVES = ( + ec.SECT571K1(), + ec.SECT409K1(), + ec.SECT571R1(), + ec.SECT409R1(), + ec.SECP521R1(), + ec.SECP384R1(), +) # These are the currently supported certificate formats (X_509,) = ( @@ -93,6 +99,43 @@ MASK_GEN_ALGORITHMS = { ) +class SignatureKeyType(object): + + _REGISTERED_TYPES = {} + + def __init__(self, name, public_key_type, create_verifier): + self.name = name + self.public_key_type = public_key_type + self.create_verifier = create_verifier + + @classmethod + def register(cls, name, public_key_type, create_verifier): + """Register a signature key type. + + :param name: the name of the signature key type + :param public_key_type: e.g. RSAPublicKey, DSAPublicKey, etc. + :param create_verifier: a function to create a verifier for this type + """ + cls._REGISTERED_TYPES[name] = cls(name, + public_key_type, + create_verifier) + + @classmethod + def lookup(cls, name): + """Look up the signature key type. + + :param name: the name of the signature key type + :returns: the SignatureKeyType object + :raises: glance.common.exception.SignatureVerificationError if + signature key type is invalid + """ + if name not in cls._REGISTERED_TYPES: + raise exception.SignatureVerificationError( + _('Invalid signature key type: %s') % name + ) + return cls._REGISTERED_TYPES[name] + + # each key type will require its own verifier def create_verifier_for_pss(signature, hash_method, public_key, image_properties): @@ -136,10 +179,32 @@ def create_verifier_for_pss(signature, hash_method, public_key, ) +def create_verifier_for_ecc(signature, hash_method, public_key, + image_properties): + """Create the verifier to use when the key type is ECC_*. + + :param signature: the decoded signature to use + :param hash_method: the hash method to use, as a cryptography object + :param public_key: the public key to use, as a cryptography object + :param image_properties: the key-value properties about the image + :return: the verifier to use to verify the signature for ECC_* + """ + # return the verifier + return public_key.verifier( + signature, + ec.ECDSA(hash_method) + ) + + # map the key type to the verifier function to use -KEY_TYPE_METHODS = { - RSA_PSS: create_verifier_for_pss -} +SignatureKeyType.register(RSA_PSS, rsa.RSAPublicKey, create_verifier_for_pss) + +# Register the elliptic curves which are supported by the backend +for curve in ECC_CURVES: + if default_backend().elliptic_curve_supported(curve): + SignatureKeyType.register('ECC_' + curve.name.upper(), + ec.EllipticCurvePublicKey, + create_verifier_for_ecc) def should_create_verifier(image_properties): @@ -175,7 +240,7 @@ def get_verifier(context, image_properties): signature = get_signature(image_properties[SIGNATURE]) hash_method = get_hash_method(image_properties[HASH_METHOD]) - signature_key_type = get_signature_key_type( + signature_key_type = SignatureKeyType.lookup( image_properties[KEY_TYPE]) public_key = get_public_key(context, image_properties[CERT_UUID], @@ -183,10 +248,10 @@ def get_verifier(context, image_properties): # create the verifier based on the signature key type try: - verifier = KEY_TYPE_METHODS[signature_key_type](signature, - hash_method, - public_key, - image_properties) + verifier = signature_key_type.create_verifier(signature, + hash_method, + public_key, + image_properties) except crypto_exception.UnsupportedAlgorithm as e: msg = (_LE("Unable to create verifier since algorithm is " "unsupported: %(e)s") @@ -249,7 +314,7 @@ def verify_signature(context, checksum_hash, image_properties): signature = get_signature(image_properties[OLD_SIGNATURE]) hash_method = get_hash_method(image_properties[OLD_HASH_METHOD]) - signature_key_type = get_signature_key_type( + signature_key_type = SignatureKeyType.lookup( image_properties[OLD_KEY_TYPE]) public_key = get_public_key(context, image_properties[OLD_CERT_UUID], @@ -257,10 +322,10 @@ def verify_signature(context, checksum_hash, image_properties): # create the verifier based on the signature key type try: - verifier = KEY_TYPE_METHODS[signature_key_type](signature, - hash_method, - public_key, - image_properties) + verifier = signature_key_type.create_verifier(signature, + hash_method, + public_key, + image_properties) except crypto_exception.UnsupportedAlgorithm as e: msg = (_LE("Unable to create verifier since algorithm is " "unsupported: %(e)s") @@ -315,27 +380,13 @@ def get_hash_method(hash_method_name): return HASH_METHODS[hash_method_name] -def get_signature_key_type(signature_key_type): - """Verify the signature key type. - - :param signature_key_type: the key type of the signature - :returns: the validated signature key type - :raises: SignatureVerificationError if the signature key type is invalid - """ - if signature_key_type not in SIGNATURE_KEY_TYPES: - raise exception.SignatureVerificationError( - 'Invalid signature key type: %s' % signature_key_type) - - return signature_key_type - - def get_public_key(context, signature_certificate_uuid, signature_key_type): """Create the public key object from a retrieved certificate. :param context: the user context for authentication :param signature_certificate_uuid: the uuid to use to retrieve the certificate - :param signature_key_type: the key type of the signature + :param signature_key_type: a SignatureKeyType object :returns: the public key cryptography object :raises: SignatureVerificationError if public key format is invalid """ @@ -346,10 +397,10 @@ def get_public_key(context, signature_certificate_uuid, signature_key_type): public_key = certificate.public_key() # Confirm the type is of the type expected based on the signature key type - if not isinstance(public_key, SIGNATURE_KEY_TYPES[signature_key_type]): + if not isinstance(public_key, signature_key_type.public_key_type): raise exception.SignatureVerificationError( 'Invalid public key type for signature key type: %s' - % signature_key_type) + % signature_key_type.name) return public_key diff --git a/glance/tests/unit/common/test_signature_utils.py b/glance/tests/unit/common/test_signature_utils.py index 7015ad84dd..323b807b95 100644 --- a/glance/tests/unit/common/test_signature_utils.py +++ b/glance/tests/unit/common/test_signature_utils.py @@ -20,6 +20,7 @@ import unittest from cryptography import exceptions as crypto_exception from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes @@ -29,9 +30,13 @@ from glance.common import exception from glance.common import signature_utils from glance.tests import utils as test_utils -TEST_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3, - key_size=1024, - backend=default_backend()) +TEST_RSA_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3, + key_size=1024, + backend=default_backend()) + +# secp521r1 is assumed to be available on all supported platforms +TEST_ECC_PRIVATE_KEY = ec.generate_private_key(ec.SECP521R1(), + default_backend()) # Required image property names (SIGNATURE, HASH_METHOD, KEY_TYPE, CERT_UUID) = ( @@ -90,7 +95,7 @@ class FakeCastellanCertificate(object): class FakeCryptoCertificate(object): - def __init__(self, pub_key=TEST_PRIVATE_KEY.public_key(), + def __init__(self, pub_key=TEST_RSA_PRIVATE_KEY.public_key(), not_valid_before=(datetime.datetime.utcnow() - datetime.timedelta(hours=1)), not_valid_after=(datetime.datetime.utcnow() + @@ -153,9 +158,9 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_old_verify_signature_PSS(self, mock_get_pub_key): checksum_hash = b'224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): - signer = TEST_PRIVATE_KEY.signer( + signer = TEST_RSA_PRIVATE_KEY.signer( padding.PSS( mgf=padding.MGF1(hash_alg), salt_length=padding.PSS.MAX_LENGTH @@ -180,10 +185,10 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_old_verify_signature_custom_PSS_salt(self, mock_get_pub_key): checksum_hash = b'224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() custom_salt_length = 32 for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): - signer = TEST_PRIVATE_KEY.signer( + signer = TEST_RSA_PRIVATE_KEY.signer( padding.PSS( mgf=padding.MGF1(hash_alg), salt_length=custom_salt_length @@ -209,7 +214,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_old_verify_signature_bad_signature(self, mock_get_pub_key): checksum_hash = '224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() image_properties = {OLD_CERT_UUID: 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', OLD_HASH_METHOD: 'SHA-256', @@ -236,7 +241,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_old_verify_signature_bad_sig_key_type(self, mock_get_pub_key): checksum_hash = '224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() image_properties = {OLD_CERT_UUID: 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', OLD_HASH_METHOD: 'SHA-256', @@ -254,7 +259,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_old_verify_signature_RSA_no_mask_gen(self, mock_get_pub_key): checksum_hash = '224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() image_properties = {OLD_CERT_UUID: 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', OLD_HASH_METHOD: 'SHA-256', @@ -269,7 +274,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_old_verify_signature_RSA_bad_mask_gen(self, mock_get_pub_key): checksum_hash = '224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() image_properties = {OLD_CERT_UUID: 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', OLD_HASH_METHOD: 'SHA-256', @@ -285,7 +290,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_old_verify_signature_bad_pss_salt(self, mock_get_pub_key): checksum_hash = '224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() image_properties = {OLD_CERT_UUID: 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', OLD_HASH_METHOD: 'SHA-256', @@ -320,7 +325,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): def test_old_verify_signature_unsupported_algorithm(self, mock_get_pub_key): checksum_hash = '224626ae19824466f2a7f39ab7b80f7f' - public_key = TEST_PRIVATE_KEY.public_key() + public_key = TEST_RSA_PRIVATE_KEY.public_key() public_key.verifier = mock.MagicMock( side_effect=crypto_exception.UnsupportedAlgorithm( "When OpenSSL is older than 1.0.1 then only SHA1 is " @@ -368,9 +373,9 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_verify_signature_PSS(self, mock_get_pub_key): data = b'224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): - signer = TEST_PRIVATE_KEY.signer( + signer = TEST_RSA_PRIVATE_KEY.signer( padding.PSS( mgf=padding.MGF1(hash_alg), salt_length=padding.PSS.MAX_LENGTH @@ -388,12 +393,44 @@ class TestSignatureUtils(test_utils.BaseTestCase): verifier.update(data) verifier.verify() + @mock.patch('glance.common.signature_utils.get_public_key') + def test_verify_signature_ECC(self, mock_get_pub_key): + data = b'224626ae19824466f2a7f39ab7b80f7f' + # test every ECC curve + for curve in signature_utils.ECC_CURVES: + key_type_name = 'ECC_' + curve.name.upper() + try: + signature_utils.SignatureKeyType.lookup(key_type_name) + except exception.SignatureVerificationError: + import warnings + warnings.warn("ECC curve '%s' not supported" % curve.name) + continue + + # Create a private key to use + private_key = ec.generate_private_key(curve, + default_backend()) + mock_get_pub_key.return_value = private_key.public_key() + for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): + signer = private_key.signer( + ec.ECDSA(hash_alg) + ) + signer.update(data) + signature = base64.b64encode(signer.finalize()) + image_props = {CERT_UUID: + 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', + HASH_METHOD: hash_name, + KEY_TYPE: key_type_name, + SIGNATURE: signature} + verifier = signature_utils.get_verifier(None, image_props) + verifier.update(data) + verifier.verify() + @unittest.skipIf(not default_backend().hash_supported(hashes.SHA256()), "SHA-2 hash algorithms not supported by backend") @mock.patch('glance.common.signature_utils.get_public_key') def test_verify_signature_bad_signature(self, mock_get_pub_key): data = b'224626ae19824466f2a7f39ab7b80f7f' - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() image_properties = {CERT_UUID: 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', HASH_METHOD: 'SHA-256', @@ -407,7 +444,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_verify_signature_unsupported_algorithm(self, mock_get_pub_key): - public_key = TEST_PRIVATE_KEY.public_key() + public_key = TEST_RSA_PRIVATE_KEY.public_key() public_key.verifier = mock.MagicMock( side_effect=crypto_exception.UnsupportedAlgorithm( "When OpenSSL is older than 1.0.1 then only SHA1 is " @@ -437,7 +474,7 @@ class TestSignatureUtils(test_utils.BaseTestCase): @mock.patch('glance.common.signature_utils.get_public_key') def test_verify_signature_bad_sig_key_type(self, mock_get_pub_key): - mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key() + mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() image_properties = {CERT_UUID: 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693', HASH_METHOD: 'SHA-256', @@ -485,33 +522,47 @@ class TestSignatureUtils(test_utils.BaseTestCase): 'Invalid signature hash method: .*', signature_utils.get_hash_method, 'SHA-2') - def test_get_signature_key_type(self): - for sig_format in signature_utils.SIGNATURE_KEY_TYPES: - result = signature_utils.get_signature_key_type(sig_format) - self.assertEqual(sig_format, result) + def test_get_signature_key_type_lookup(self): + for sig_format in ['RSA-PSS', 'ECC_SECT571K1']: + sig_key_type = signature_utils.SignatureKeyType.lookup(sig_format) + self.assertIsInstance(sig_key_type, + signature_utils.SignatureKeyType) + self.assertEqual(sig_format, sig_key_type.name) - def test_get_signature_key_type_fail(self): + def test_signature_key_type_lookup_fail(self): self.assertRaisesRegex(exception.SignatureVerificationError, 'Invalid signature key type: .*', - signature_utils.get_signature_key_type, + signature_utils.SignatureKeyType.lookup, 'RSB-PSS') @mock.patch('glance.common.signature_utils.get_certificate') - def test_get_public_key(self, mock_get_cert): + def test_get_public_key_rsa(self, mock_get_cert): fake_cert = FakeCryptoCertificate() mock_get_cert.return_value = fake_cert - result_pub_key = signature_utils.get_public_key(None, None, 'RSA-PSS') + sig_key_type = signature_utils.SignatureKeyType.lookup('RSA-PSS') + result_pub_key = signature_utils.get_public_key(None, None, + sig_key_type) + self.assertEqual(fake_cert.public_key(), result_pub_key) + + @mock.patch('glance.common.signature_utils.get_certificate') + def test_get_public_key_ecc(self, mock_get_cert): + fake_cert = FakeCryptoCertificate(TEST_ECC_PRIVATE_KEY.public_key()) + mock_get_cert.return_value = fake_cert + sig_key_type = signature_utils.SignatureKeyType.lookup('ECC_SECP521R1') + result_pub_key = signature_utils.get_public_key(None, None, + sig_key_type) self.assertEqual(fake_cert.public_key(), result_pub_key) @mock.patch('glance.common.signature_utils.get_certificate') def test_get_public_key_invalid_key(self, mock_get_certificate): bad_pub_key = 'A' * 256 mock_get_certificate.return_value = FakeCryptoCertificate(bad_pub_key) + sig_key_type = signature_utils.SignatureKeyType.lookup('RSA-PSS') self.assertRaisesRegex(exception.SignatureVerificationError, 'Invalid public key type for ' 'signature key type: .*', signature_utils.get_public_key, None, - None, 'RSA-PSS') + None, sig_key_type) @mock.patch('cryptography.x509.load_der_x509_certificate') @mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())