Merge "Update certificate generator implementations"

This commit is contained in:
Jenkins 2015-01-14 19:03:49 +00:00 committed by Gerrit Code Review
commit 88cdb78acc
4 changed files with 204 additions and 12 deletions

View File

@ -16,18 +16,24 @@
""" """
Cert generator implementation for Barbican Cert generator implementation for Barbican
""" """
import random
import time
from octavia.certificates.common import barbican as barbican_common
from octavia.certificates.generator import cert_gen from octavia.certificates.generator import cert_gen
from octavia.openstack.common import log as logging from octavia.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
MAX_ATTEMPTS = 10
class BarbicanCertGenerator(cert_gen.CertGenerator): class BarbicanCertGenerator(cert_gen.CertGenerator):
"""Certificate Generator that wraps the Barbican client API.""" """Certificate Generator that wraps the Barbican client API."""
@staticmethod @classmethod
def sign_cert(csr, validity): def sign_cert(cls, csr, validity):
"""Signs a certificate using our private CA based on the specified CSR. """Signs a certificate using our private CA based on the specified CSR.
:param csr: A Certificate Signing Request :param csr: A Certificate Signing Request
@ -37,3 +43,68 @@ class BarbicanCertGenerator(cert_gen.CertGenerator):
:raises Exception: if certificate signing fails :raises Exception: if certificate signing fails
""" """
raise NotImplementedError("Barbican does not yet support signing.") raise NotImplementedError("Barbican does not yet support signing.")
@classmethod
def _generate_private_key(cls, bit_length=2048, passphrase=None,
create_only=False):
"""Generates a private key
:param bit_length: Private key bit length (default 2048)
:param passphrase: Passphrase to use for encrypting the private key
:return: PEM encoded private key
:raises Exception: If private key generation fails
"""
connection = barbican_common.BarbicanKeystoneAuth.get_barbican_client()
order = connection.orders.create_asymmetric(
bit_length=bit_length,
algorithm='rsa',
pass_phrase=passphrase,
payload_content_type='application/octet-stream'
)
order.submit()
attempts = 0
while (order.container_ref is None and order.status != 'ERROR'
and attempts < MAX_ATTEMPTS):
backoff = float(1 << attempts) + random.random() * attempts
time.sleep(backoff)
order = connection.orders.get(order.order_ref)
attempts += 1
if order.status != 'ACTIVE':
raise Exception("Barbican failed to generate a private key.")
container = connection.containers.get(order.container_ref)
secret = container.private_key
if not create_only:
try:
pk = secret.payload
except ValueError:
secret = connection.secrets.get(
secret_ref=secret.secret_ref,
payload_content_type='application/octet-stream'
)
pk = secret.payload
return pk
return secret.secret_ref
@classmethod
def _sign_cert_from_stored_key(cls, cn, pk_ref, validity):
raise NotImplementedError("Barbican does not yet support signing.")
@classmethod
def generate_cert_key_pair(cls, cn, validity, bit_length=2048,
passphrase=None):
# This code will essentially work once Barbican enables CertOrders
# pk = cls._generate_private_key(
# bit_length=bit_length,
# passphrase=passphrase,
# create_only=True
# )
# cert_container = cls._sign_cert_from_stored_key(cn=cn, pk_ref=pk,
# validity=validity)
# return barbican_common.BarbicanCert(cert_container)
raise NotImplementedError("Barbican does not yet support signing.")

View File

@ -19,6 +19,7 @@ from OpenSSL import crypto
from oslo.config import cfg from oslo.config import cfg
import six import six
from octavia.certificates.common import local as local_common
from octavia.certificates.generator import cert_gen from octavia.certificates.generator import cert_gen
from octavia.common import exceptions from octavia.common import exceptions
from octavia.i18n import _LE, _LI from octavia.i18n import _LE, _LI
@ -33,13 +34,13 @@ CONF = cfg.CONF
class LocalCertGenerator(cert_gen.CertGenerator): class LocalCertGenerator(cert_gen.CertGenerator):
"""Cert Generator Interface that signs certs locally.""" """Cert Generator Interface that signs certs locally."""
@staticmethod @classmethod
def _new_serial(): def _new_serial(cls):
return int(binascii.hexlify(os.urandom(20)), 16) return int(binascii.hexlify(os.urandom(20)), 16)
@staticmethod @classmethod
def sign_cert(csr, validity, ca_cert=None, ca_key=None, ca_key_pass=None, def sign_cert(cls, csr, validity, ca_cert=None, ca_key=None,
ca_digest=None): ca_key_pass=None, ca_digest=None):
"""Signs a certificate using our private CA based on the specified CSR """Signs a certificate using our private CA based on the specified CSR
The signed certificate will be valid from now until <validity> seconds The signed certificate will be valid from now until <validity> seconds
@ -64,7 +65,7 @@ class LocalCertGenerator(cert_gen.CertGenerator):
ca_cert = open(CONF.certificates.ca_certificate).read() ca_cert = open(CONF.certificates.ca_certificate).read()
except IOError: except IOError:
raise exceptions.CertificateGenerationException( raise exceptions.CertificateGenerationException(
"Failed to load {0}." msg="Failed to load {0}."
.format(CONF.certificates.ca_certificate) .format(CONF.certificates.ca_certificate)
) )
if not ca_key: if not ca_key:
@ -73,7 +74,7 @@ class LocalCertGenerator(cert_gen.CertGenerator):
ca_key = open(CONF.certificates.ca_private_key).read() ca_key = open(CONF.certificates.ca_private_key).read()
except IOError: except IOError:
raise exceptions.CertificateGenerationException( raise exceptions.CertificateGenerationException(
"Failed to load {0}." msg="Failed to load {0}."
.format(CONF.certificates.ca_certificate) .format(CONF.certificates.ca_certificate)
) )
if not ca_key_pass: if not ca_key_pass:
@ -97,7 +98,7 @@ class LocalCertGenerator(cert_gen.CertGenerator):
new_cert = crypto.X509() new_cert = crypto.X509()
new_cert.set_version(2) new_cert.set_version(2)
new_cert.set_serial_number(LocalCertGenerator._new_serial()) new_cert.set_serial_number(cls._new_serial())
new_cert.gmtime_adj_notBefore(0) new_cert.gmtime_adj_notBefore(0)
new_cert.gmtime_adj_notAfter(validity) new_cert.gmtime_adj_notAfter(validity)
new_cert.set_issuer(lo_cert.get_subject()) new_cert.set_issuer(lo_cert.get_subject())
@ -122,4 +123,50 @@ class LocalCertGenerator(cert_gen.CertGenerator):
return crypto.dump_certificate(crypto.FILETYPE_PEM, new_cert) return crypto.dump_certificate(crypto.FILETYPE_PEM, new_cert)
except Exception as e: except Exception as e:
LOG.error(_LE("Unable to sign certificate.")) LOG.error(_LE("Unable to sign certificate."))
raise exceptions.CertificateGenerationException(e) raise exceptions.CertificateGenerationException(msg=e)
@classmethod
def _generate_private_key(cls, bit_length=2048, passphrase=None):
pk = crypto.PKey()
pk.generate_key(crypto.TYPE_RSA, bit_length)
if passphrase:
return crypto.dump_privatekey(
crypto.FILETYPE_PEM,
pk,
'aes-256-cbc',
passphrase
)
else:
return crypto.dump_privatekey(
crypto.FILETYPE_PEM,
pk
)
@classmethod
def _generate_csr(cls, cn, private_key, passphrase=None):
pk = crypto.load_privatekey(
crypto.FILETYPE_PEM,
private_key,
passphrase
)
csr = crypto.X509Req()
csr.set_pubkey(pk)
subject = csr.get_subject()
subject.CN = cn
return crypto.dump_certificate_request(
crypto.FILETYPE_PEM,
csr
)
@classmethod
def generate_cert_key_pair(cls, cn, validity, bit_length=2048,
passphrase=None, **kwargs):
pk = cls._generate_private_key(bit_length, passphrase)
csr = cls._generate_csr(cn, pk, passphrase)
cert = cls.sign_cert(csr, validity, **kwargs)
cert_object = local_common.LocalCert(
certificate=cert,
private_key=pk,
private_key_passphrase=passphrase
)
return cert_object

View File

@ -1,4 +1,4 @@
# Copyright 2014 Rackspace # Copyright 2014 Rackspace US, Inc
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -64,3 +64,6 @@ class TestBarbicanGenerator(base.TestCase):
# create order should be called once # create order should be called once
# should get back a valid order # should get back a valid order
bc.orders.create.assert_called_once_with() bc.orders.create.assert_called_once_with()
def test_generate_cert_key_pair(self):
self.skipTest("Barbican does not yet support signing.")

View File

@ -108,3 +108,74 @@ class TestLocalGenerator(base.TestCase):
# Make sure this cert can't sign other certs # Make sure this cert can't sign other certs
self.assertIn(six.b("CA:FALSE"), cert_text) self.assertIn(six.b("CA:FALSE"), cert_text)
def test_generate_private_key(self):
bit_length = 1024
# Attempt to generate a private key
pk = local_cert_gen.LocalCertGenerator._generate_private_key(
bit_length=bit_length
)
# Attempt to load the generated private key
pko = crypto.load_privatekey(crypto.FILETYPE_PEM, pk)
# Make sure the bit_length is what we set
self.assertEqual(pko.bits(), bit_length)
def test_generate_private_key_with_passphrase(self):
bit_length = 2048
# Attempt to generate a private key
pk = local_cert_gen.LocalCertGenerator._generate_private_key(
bit_length=bit_length,
passphrase=self.ca_private_key_passphrase
)
# Attempt to load the generated private key
pko = crypto.load_privatekey(crypto.FILETYPE_PEM, pk,
self.ca_private_key_passphrase)
# Make sure the bit_length is what we set
self.assertEqual(pko.bits(), bit_length)
def test_generate_csr(self):
cn = 'test_cn'
# Attempt to generate a CSR
csr = local_cert_gen.LocalCertGenerator._generate_csr(
cn=cn,
private_key=self.ca_private_key,
passphrase=self.ca_private_key_passphrase
)
# Attempt to load the generated CSR
csro = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
# Make sure the CN is correct
self.assertEqual(csro.get_subject().CN, cn)
def test_generate_cert_key_pair(self):
cn = 'test_cn'
bit_length = 512
# Attempt to generate a cert/key pair
cert_object = local_cert_gen.LocalCertGenerator.generate_cert_key_pair(
cn=cn,
validity=2 * 365 * 24 * 60 * 60,
bit_length=bit_length,
passphrase=self.ca_private_key_passphrase,
ca_cert=self.ca_certificate,
ca_key=self.ca_private_key,
ca_key_pass=self.ca_private_key_passphrase
)
# Validate that the cert and key are loadable
cert = crypto.load_certificate(
crypto.FILETYPE_PEM,
cert_object.certificate
)
self.assertIsNotNone(cert)
key = crypto.load_privatekey(
crypto.FILETYPE_PEM,
cert_object.private_key,
cert_object.private_key_passphrase
)
self.assertIsNotNone(key)