Use cryptography to load PKCS12 certificates
... because implementation in PyOpenSSL has been derprecated, according to the following warning. ``` DeprecationWarning: PKCS#12 support in pyOpenSSL is deprecated. You should use the APIs in cryptography. ``` Closes-Bug: #2042787 Change-Id: Ic81e98c54c4bce100e3f44ff1a2fe6ce7b7f4256
This commit is contained in:
parent
51ae11032e
commit
d1d7fe7197
@ -18,7 +18,7 @@ Common classes for pkcs12 based certificate handling
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
from cryptography.hazmat.primitives import serialization
|
from cryptography.hazmat.primitives import serialization
|
||||||
from OpenSSL import crypto
|
from cryptography.hazmat.primitives.serialization import pkcs12
|
||||||
|
|
||||||
from octavia.certificates.common import cert
|
from octavia.certificates.common import cert
|
||||||
from octavia.common import exceptions
|
from octavia.common import exceptions
|
||||||
@ -28,21 +28,21 @@ class PKCS12Cert(cert.Cert):
|
|||||||
"""Representation of a Cert for local storage."""
|
"""Representation of a Cert for local storage."""
|
||||||
def __init__(self, certbag):
|
def __init__(self, certbag):
|
||||||
try:
|
try:
|
||||||
p12 = crypto.load_pkcs12(certbag)
|
p12 = pkcs12.load_pkcs12(certbag, None)
|
||||||
except crypto.Error as e:
|
except (TypeError, ValueError) as e:
|
||||||
raise exceptions.UnreadablePKCS12(error=str(e))
|
raise exceptions.UnreadablePKCS12(error=str(e))
|
||||||
self.certificate = p12.get_certificate()
|
self.certificate = p12.cert
|
||||||
self.intermediates = p12.get_ca_certificates()
|
self.intermediates = p12.additional_certs
|
||||||
self.private_key = p12.get_privatekey()
|
self.private_key = p12.key
|
||||||
|
|
||||||
def get_certificate(self):
|
def get_certificate(self):
|
||||||
return self.certificate.to_cryptography().public_bytes(
|
return self.certificate.certificate.public_bytes(
|
||||||
encoding=serialization.Encoding.PEM).strip()
|
encoding=serialization.Encoding.PEM).strip()
|
||||||
|
|
||||||
def get_intermediates(self):
|
def get_intermediates(self):
|
||||||
if self.intermediates:
|
if self.intermediates:
|
||||||
int_data = [
|
int_data = [
|
||||||
ic.to_cryptography().public_bytes(
|
ic.certificate.public_bytes(
|
||||||
encoding=serialization.Encoding.PEM).strip()
|
encoding=serialization.Encoding.PEM).strip()
|
||||||
for ic in self.intermediates
|
for ic in self.intermediates
|
||||||
]
|
]
|
||||||
@ -50,7 +50,7 @@ class PKCS12Cert(cert.Cert):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def get_private_key(self):
|
def get_private_key(self):
|
||||||
return self.private_key.to_cryptography_key().private_bytes(
|
return self.private_key.private_bytes(
|
||||||
encoding=serialization.Encoding.PEM,
|
encoding=serialization.Encoding.PEM,
|
||||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||||
encryption_algorithm=serialization.NoEncryption()).strip()
|
encryption_algorithm=serialization.NoEncryption()).strip()
|
||||||
|
@ -17,8 +17,9 @@
|
|||||||
"""
|
"""
|
||||||
Cert manager implementation for Barbican using a single PKCS12 secret
|
Cert manager implementation for Barbican using a single PKCS12 secret
|
||||||
"""
|
"""
|
||||||
from OpenSSL import crypto
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.serialization import pkcs12 as c_pkcs12
|
||||||
|
from cryptography import x509
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import encodeutils
|
from oslo_utils import encodeutils
|
||||||
@ -64,25 +65,29 @@ class BarbicanCertManager(cert_mgr.CertManager):
|
|||||||
connection = self.auth.get_barbican_client(context.project_id)
|
connection = self.auth.get_barbican_client(context.project_id)
|
||||||
|
|
||||||
LOG.info("Storing certificate secret '%s' in Barbican.", name)
|
LOG.info("Storing certificate secret '%s' in Barbican.", name)
|
||||||
p12 = crypto.PKCS12()
|
|
||||||
p12.set_friendlyname(encodeutils.to_utf8(name))
|
|
||||||
x509_cert = crypto.load_certificate(crypto.FILETYPE_PEM, certificate)
|
|
||||||
p12.set_certificate(x509_cert)
|
|
||||||
x509_pk = crypto.load_privatekey(crypto.FILETYPE_PEM, private_key)
|
|
||||||
p12.set_privatekey(x509_pk)
|
|
||||||
if intermediates:
|
|
||||||
cert_ints = list(cert_parser.get_intermediates_pems(intermediates))
|
|
||||||
x509_ints = [
|
|
||||||
crypto.load_certificate(crypto.FILETYPE_PEM, ci)
|
|
||||||
for ci in cert_ints]
|
|
||||||
p12.set_ca_certificates(x509_ints)
|
|
||||||
if private_key_passphrase:
|
if private_key_passphrase:
|
||||||
raise exceptions.CertificateStorageException(
|
raise exceptions.CertificateStorageException(
|
||||||
"Passphrase protected PKCS12 certificates are not supported.")
|
"Passphrase protected PKCS12 certificates are not supported.")
|
||||||
|
|
||||||
|
x509_cert = x509.load_pem_x509_certificate(certificate)
|
||||||
|
x509_pk = serialization.load_pem_private_key(private_key, None)
|
||||||
|
cas = None
|
||||||
|
if intermediates:
|
||||||
|
cert_ints = list(cert_parser.get_intermediates_pems(intermediates))
|
||||||
|
cas = [
|
||||||
|
x509.load_pem_x509_certificate(ci)
|
||||||
|
for ci in cert_ints]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
certificate_secret = connection.secrets.create(
|
certificate_secret = connection.secrets.create(
|
||||||
payload=p12.export(),
|
payload=c_pkcs12.serialize_key_and_certificates(
|
||||||
|
name=encodeutils.safe_encode(name),
|
||||||
|
key=x509_pk,
|
||||||
|
cert=x509_cert,
|
||||||
|
cas=cas,
|
||||||
|
encryption_algorithm=serialization.NoEncryption()
|
||||||
|
),
|
||||||
expiration=expiration,
|
expiration=expiration,
|
||||||
name=name
|
name=name
|
||||||
)
|
)
|
||||||
|
@ -18,7 +18,8 @@ Cert manager implementation for Castellan
|
|||||||
"""
|
"""
|
||||||
from castellan.common.objects import opaque_data
|
from castellan.common.objects import opaque_data
|
||||||
from castellan import key_manager
|
from castellan import key_manager
|
||||||
from OpenSSL import crypto
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.serialization import pkcs12 as c_pkcs12
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
@ -41,16 +42,20 @@ class CastellanCertManager(cert_mgr.CertManager):
|
|||||||
def store_cert(self, context, certificate, private_key, intermediates=None,
|
def store_cert(self, context, certificate, private_key, intermediates=None,
|
||||||
private_key_passphrase=None, expiration=None,
|
private_key_passphrase=None, expiration=None,
|
||||||
name="PKCS12 Certificate Bundle"):
|
name="PKCS12 Certificate Bundle"):
|
||||||
p12 = crypto.PKCS12()
|
|
||||||
p12.set_certificate(certificate)
|
|
||||||
p12.set_privatekey(private_key)
|
|
||||||
if intermediates:
|
|
||||||
p12.set_ca_certificates(intermediates)
|
|
||||||
if private_key_passphrase:
|
if private_key_passphrase:
|
||||||
raise exceptions.CertificateStorageException(
|
raise exceptions.CertificateStorageException(
|
||||||
"Passphrases protected PKCS12 certificates are not supported.")
|
"Passphrases protected PKCS12 certificates are not supported.")
|
||||||
|
|
||||||
p12_data = opaque_data.OpaqueData(p12.export(), name=name)
|
p12_data = opaque_data.OpaqueData(
|
||||||
|
c_pkcs12.serialize_key_and_certificates(
|
||||||
|
name=None,
|
||||||
|
key=private_key,
|
||||||
|
cert=certificate,
|
||||||
|
cas=intermediates,
|
||||||
|
encryption_algorithm=serialization.NoEncryption()
|
||||||
|
),
|
||||||
|
name=name
|
||||||
|
)
|
||||||
self.manager.store(context, p12_data)
|
self.manager.store(context, p12_data)
|
||||||
|
|
||||||
def get_cert(self, context, cert_ref, resource_ref=None, check_only=False,
|
def get_cert(self, context, cert_ref, resource_ref=None, check_only=False,
|
||||||
|
@ -15,7 +15,6 @@ from unittest import mock
|
|||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from barbicanclient.v1 import secrets
|
from barbicanclient.v1 import secrets
|
||||||
from OpenSSL import crypto
|
|
||||||
|
|
||||||
import octavia.certificates.common.barbican as barbican_common
|
import octavia.certificates.common.barbican as barbican_common
|
||||||
import octavia.certificates.common.cert as cert
|
import octavia.certificates.common.cert as cert
|
||||||
@ -138,10 +137,11 @@ class TestBarbicanManager(base.TestCase):
|
|||||||
sorted(data.get_intermediates()))
|
sorted(data.get_intermediates()))
|
||||||
self.assertIsNone(data.get_private_key_passphrase())
|
self.assertIsNone(data.get_private_key_passphrase())
|
||||||
|
|
||||||
@mock.patch('OpenSSL.crypto.load_pkcs12')
|
@mock.patch('cryptography.hazmat.primitives.serialization.pkcs12.'
|
||||||
|
'load_pkcs12')
|
||||||
def test_get_cert_bad_pkcs12(self, mock_load_pkcs12):
|
def test_get_cert_bad_pkcs12(self, mock_load_pkcs12):
|
||||||
|
|
||||||
mock_load_pkcs12.side_effect = [crypto.Error]
|
mock_load_pkcs12.side_effect = [ValueError]
|
||||||
|
|
||||||
# Mock out the client
|
# Mock out the client
|
||||||
self.bc.secrets.get.return_value = self.secret_pkcs12
|
self.bc.secrets.get.return_value = self.secret_pkcs12
|
||||||
|
@ -38,7 +38,6 @@ python-barbicanclient>=4.5.2 # Apache-2.0
|
|||||||
python-glanceclient>=2.8.0 # Apache-2.0
|
python-glanceclient>=2.8.0 # Apache-2.0
|
||||||
python-novaclient>=9.1.0 # Apache-2.0
|
python-novaclient>=9.1.0 # Apache-2.0
|
||||||
python-cinderclient>=3.3.0 # Apache-2.0
|
python-cinderclient>=3.3.0 # Apache-2.0
|
||||||
pyOpenSSL>=19.1.0 # Apache-2.0
|
|
||||||
WSME>=0.8.0 # MIT
|
WSME>=0.8.0 # MIT
|
||||||
Jinja2>=2.10 # BSD License (3 clause)
|
Jinja2>=2.10 # BSD License (3 clause)
|
||||||
taskflow>=4.4.0 # Apache-2.0
|
taskflow>=4.4.0 # Apache-2.0
|
||||||
|
Loading…
Reference in New Issue
Block a user