Anchor support to Octavia
Use Anchor for certificate signing to make the octavia communication more secure. Anchor Ref url: https://github.com/openstack/anchor Co-Authored-By: bharath <bharath.stacker@gmail.com> Co-Authored-By: German Eichberger <german.eichberger@hp.com> Change-Id: Id77b2b1540377db661f15d4eeafc4922f446d987
This commit is contained in:
parent
0a1d45f696
commit
d2072ae0ae
22
doc/source/main/Anchor.rst
Normal file
22
doc/source/main/Anchor.rst
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
======
|
||||||
|
Anchor
|
||||||
|
======
|
||||||
|
Anchor (see https://wiki.openstack.org/wiki/Security/Projects/Anchor) is
|
||||||
|
an ephemeral PKI system built to enable cryptographic trust in OpenStack
|
||||||
|
services. In the context of Octavia it can be used to sign the certificates
|
||||||
|
which secure the amphora - controller communication.
|
||||||
|
|
||||||
|
Basic Setup
|
||||||
|
-----------
|
||||||
|
# Download/Install/Start Anchor from https://github.com/openstack/anchor
|
||||||
|
# Change the listening port in config.py to 9999
|
||||||
|
# I found it useful to run anchor in an additional devstack screen
|
||||||
|
# Set in octavia.conf
|
||||||
|
## [controller_worker] cert_generator to anchor
|
||||||
|
## [haproxy_amphora] server_ca = /opt/stack/anchor/CA/root-ca.crt (Anchor CA)
|
||||||
|
# Restart o-cw o-hm o-hk
|
||||||
|
|
||||||
|
Benefit
|
||||||
|
-------
|
||||||
|
In bigger cloud installations Anchor can be a gateway to a more secure
|
||||||
|
certificate management system than our default local signing.
|
@ -28,6 +28,12 @@ description of these terms.
|
|||||||
back-end amphora corresponding with the driver. This communication
|
back-end amphora corresponding with the driver. This communication
|
||||||
happens over the LB network.
|
happens over the LB network.
|
||||||
|
|
||||||
|
Anchor
|
||||||
|
Is an OpenStack project for an ephemeral PKI system (see
|
||||||
|
https://wiki.openstack.org/wiki/Security/Projects/Anchor). In Octavia
|
||||||
|
we can use Anchor to sign the certificates we use to authenticate/secure
|
||||||
|
controller <-> amphora communication.
|
||||||
|
|
||||||
Apolocation
|
Apolocation
|
||||||
Term used to describe when two or more amphorae are not colocated on
|
Term used to describe when two or more amphorae are not colocated on
|
||||||
the same physical hardware (which is often essential in HA topologies).
|
the same physical hardware (which is often essential in HA topologies).
|
||||||
|
@ -73,6 +73,12 @@
|
|||||||
# barbican_cert_manager
|
# barbican_cert_manager
|
||||||
# cert_manager=local_cert_manager
|
# cert_manager=local_cert_manager
|
||||||
|
|
||||||
|
[anchor]
|
||||||
|
# Use OpenStack anchor to sign the amphora REST API certificates
|
||||||
|
# url = http://localhost:9999/v1/sign/default
|
||||||
|
# username = myusername
|
||||||
|
# password = simplepassword
|
||||||
|
|
||||||
[networking]
|
[networking]
|
||||||
# Network to communicate with amphora
|
# Network to communicate with amphora
|
||||||
# lb_network_name =
|
# lb_network_name =
|
||||||
@ -137,6 +143,7 @@
|
|||||||
#
|
#
|
||||||
# Certificate Generator options are local_cert_generator
|
# Certificate Generator options are local_cert_generator
|
||||||
# barbican_cert_generator
|
# barbican_cert_generator
|
||||||
|
# anchor_cert_generator
|
||||||
# cert_generator = local_cert_generator
|
# cert_generator = local_cert_generator
|
||||||
|
|
||||||
[task_flow]
|
[task_flow]
|
||||||
|
68
octavia/certificates/generator/anchor.py
Normal file
68
octavia/certificates/generator/anchor.py
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
# Copyright (c) 2015 Hewlett Packard Enterprise Development Company LP
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from octavia.certificates.generator import local
|
||||||
|
from octavia.common import exceptions
|
||||||
|
from octavia.i18n import _LE
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.import_group('anchor', 'octavia.common.config')
|
||||||
|
|
||||||
|
|
||||||
|
class AnchorException(exceptions.CertificateGenerationException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AnchorCertGenerator(local.LocalCertGenerator):
|
||||||
|
"""Cert Generator Interface that signs certs with Anchor."""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def sign_cert(cls, csr, validity=None, **kwargs):
|
||||||
|
"""Signs a certificate using Anchor based on the specified CSR
|
||||||
|
|
||||||
|
:param csr: A Certificate Signing Request
|
||||||
|
:param validity: Will be ignored for now
|
||||||
|
:param kwargs: Will be ignored for now
|
||||||
|
|
||||||
|
:return: Signed certificate
|
||||||
|
:raises Exception: if certificate signing fails
|
||||||
|
"""
|
||||||
|
LOG.debug("Signing a certificate request using Anchor")
|
||||||
|
|
||||||
|
try:
|
||||||
|
LOG.debug('Certificate: %s', csr)
|
||||||
|
r = requests.post(CONF.anchor.url, data={
|
||||||
|
'user': CONF.anchor.username,
|
||||||
|
'secret': CONF.anchor.password,
|
||||||
|
'encoding': 'pem',
|
||||||
|
'csr': csr})
|
||||||
|
|
||||||
|
if r.status_code != 200:
|
||||||
|
LOG.debug('Anchor returned: %s', r.content)
|
||||||
|
raise AnchorException("Anchor returned Status Code : "
|
||||||
|
+ str(r.status_code))
|
||||||
|
|
||||||
|
return r.content
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error(_LE("Unable to sign certificate."))
|
||||||
|
raise exceptions.CertificateGenerationException(msg=e)
|
@ -177,7 +177,10 @@ class LocalCertGenerator(cert_gen.CertGenerator):
|
|||||||
csr = x509.CertificateSigningRequestBuilder().subject_name(
|
csr = x509.CertificateSigningRequestBuilder().subject_name(
|
||||||
x509.Name([
|
x509.Name([
|
||||||
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, cn),
|
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, cn),
|
||||||
])).sign(pk, hashes.SHA256(), backends.default_backend())
|
])).add_extension(
|
||||||
|
x509.BasicConstraints(
|
||||||
|
ca=False, path_length=None), critical=True,
|
||||||
|
).sign(pk, hashes.SHA256(), backends.default_backend())
|
||||||
return csr.public_bytes(serialization.Encoding.PEM)
|
return csr.public_bytes(serialization.Encoding.PEM)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -286,7 +286,19 @@ house_keeping_opts = [
|
|||||||
default=10,
|
default=10,
|
||||||
help=_('Number of threads performing amphora certificate'
|
help=_('Number of threads performing amphora certificate'
|
||||||
' rotation'))
|
' rotation'))
|
||||||
|
]
|
||||||
|
|
||||||
|
anchor_opts = [
|
||||||
|
cfg.StrOpt('url',
|
||||||
|
default='http://localhost:9999/v1/sign/default',
|
||||||
|
help=_('Anchor URL')),
|
||||||
|
cfg.StrOpt('username',
|
||||||
|
default='myusername',
|
||||||
|
help=_('Anchor username')),
|
||||||
|
cfg.StrOpt('password',
|
||||||
|
default='simplepassword',
|
||||||
|
help=_('Anchor password'),
|
||||||
|
secret=True)
|
||||||
]
|
]
|
||||||
|
|
||||||
# Register the configuration options
|
# Register the configuration options
|
||||||
@ -299,6 +311,7 @@ cfg.CONF.register_opts(controller_worker_opts, group='controller_worker')
|
|||||||
cfg.CONF.register_opts(task_flow_opts, group='task_flow')
|
cfg.CONF.register_opts(task_flow_opts, group='task_flow')
|
||||||
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
|
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
|
||||||
cfg.CONF.register_opts(house_keeping_opts, group='house_keeping')
|
cfg.CONF.register_opts(house_keeping_opts, group='house_keeping')
|
||||||
|
cfg.CONF.register_opts(anchor_opts, group='anchor')
|
||||||
cfg.CONF.register_cli_opts(core_cli_opts)
|
cfg.CONF.register_cli_opts(core_cli_opts)
|
||||||
cfg.CONF.register_opts(certificate_opts, group='certificates')
|
cfg.CONF.register_opts(certificate_opts, group='certificates')
|
||||||
cfg.CONF.register_cli_opts(healthmanager_opts, group='health_manager')
|
cfg.CONF.register_cli_opts(healthmanager_opts, group='health_manager')
|
||||||
@ -306,6 +319,7 @@ cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
|
|||||||
cfg.CONF.register_opts(keystone_authtoken_v3_opts,
|
cfg.CONF.register_opts(keystone_authtoken_v3_opts,
|
||||||
group='keystone_authtoken_v3')
|
group='keystone_authtoken_v3')
|
||||||
|
|
||||||
|
|
||||||
# Ensure that the control exchange is set correctly
|
# Ensure that the control exchange is set correctly
|
||||||
messaging.set_transport_defaults(control_exchange='octavia')
|
messaging.set_transport_defaults(control_exchange='octavia')
|
||||||
_SQL_CONNECTION_DEFAULT = 'sqlite://'
|
_SQL_CONNECTION_DEFAULT = 'sqlite://'
|
||||||
|
115
octavia/tests/unit/certificates/generator/local_csr.py
Normal file
115
octavia/tests/unit/certificates/generator/local_csr.py
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from cryptography.hazmat import backends
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography import x509
|
||||||
|
import mock
|
||||||
|
|
||||||
|
import octavia.tests.unit.base as base
|
||||||
|
|
||||||
|
|
||||||
|
class BaseLocalCSRTestCase(base.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.signing_digest = "sha256"
|
||||||
|
|
||||||
|
# Set up CSR data
|
||||||
|
csr_key = rsa.generate_private_key(
|
||||||
|
public_exponent=65537,
|
||||||
|
key_size=2048,
|
||||||
|
backend=backends.default_backend()
|
||||||
|
)
|
||||||
|
csr = x509.CertificateSigningRequestBuilder().subject_name(
|
||||||
|
x509.Name([
|
||||||
|
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"test"),
|
||||||
|
])).sign(csr_key, hashes.SHA256(), backends.default_backend())
|
||||||
|
self.certificate_signing_request = csr.public_bytes(
|
||||||
|
serialization.Encoding.PEM)
|
||||||
|
|
||||||
|
# Set up keys
|
||||||
|
self.ca_key = rsa.generate_private_key(
|
||||||
|
public_exponent=65537,
|
||||||
|
key_size=2048,
|
||||||
|
backend=backends.default_backend()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.ca_private_key_passphrase = b"Testing"
|
||||||
|
self.ca_private_key = self.ca_key.private_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||||
|
encryption_algorithm=serialization.BestAvailableEncryption(
|
||||||
|
self.ca_private_key_passphrase),
|
||||||
|
)
|
||||||
|
|
||||||
|
super(BaseLocalCSRTestCase, self).setUp()
|
||||||
|
|
||||||
|
def test_generate_csr(self):
|
||||||
|
cn = 'test_cn'
|
||||||
|
# Attempt to generate a CSR
|
||||||
|
csr = self.cert_generator._generate_csr(
|
||||||
|
cn=cn,
|
||||||
|
private_key=self.ca_private_key,
|
||||||
|
passphrase=self.ca_private_key_passphrase
|
||||||
|
)
|
||||||
|
|
||||||
|
# Attempt to load the generated CSR
|
||||||
|
csro = x509.load_pem_x509_csr(data=csr,
|
||||||
|
backend=backends.default_backend())
|
||||||
|
|
||||||
|
# Make sure the CN is correct
|
||||||
|
self.assertEqual(cn, csro.subject.get_attributes_for_oid(
|
||||||
|
x509.oid.NameOID.COMMON_NAME)[0].value)
|
||||||
|
|
||||||
|
def test_generate_private_key(self):
|
||||||
|
bit_length = 1024
|
||||||
|
# Attempt to generate a private key
|
||||||
|
pk = self.cert_generator._generate_private_key(
|
||||||
|
bit_length=bit_length
|
||||||
|
)
|
||||||
|
|
||||||
|
# Attempt to load the generated private key
|
||||||
|
pko = serialization.load_pem_private_key(
|
||||||
|
data=pk, password=None, backend=backends.default_backend())
|
||||||
|
|
||||||
|
# Make sure the bit_length is what we set
|
||||||
|
self.assertEqual(pko.key_size, bit_length)
|
||||||
|
|
||||||
|
def test_generate_private_key_with_passphrase(self):
|
||||||
|
bit_length = 2048
|
||||||
|
# Attempt to generate a private key
|
||||||
|
pk = self.cert_generator._generate_private_key(
|
||||||
|
bit_length=bit_length,
|
||||||
|
passphrase=self.ca_private_key_passphrase
|
||||||
|
)
|
||||||
|
|
||||||
|
# Attempt to load the generated private key
|
||||||
|
pko = serialization.load_pem_private_key(
|
||||||
|
data=pk, password=self.ca_private_key_passphrase,
|
||||||
|
backend=backends.default_backend())
|
||||||
|
|
||||||
|
# Make sure the bit_length is what we set
|
||||||
|
self.assertEqual(pko.key_size, bit_length)
|
||||||
|
|
||||||
|
def test_generate_cert_key_pair_mock(self):
|
||||||
|
cn = 'test_cn'
|
||||||
|
|
||||||
|
with mock.patch.object(self.cert_generator, 'sign_cert') as m:
|
||||||
|
# Attempt to generate a cert/key pair
|
||||||
|
self.cert_generator.generate_cert_key_pair(
|
||||||
|
cn=cn,
|
||||||
|
validity=2 * 365 * 24 * 60 * 60,
|
||||||
|
)
|
||||||
|
self.assertTrue(m.called)
|
48
octavia/tests/unit/certificates/generator/test_anchor.py
Normal file
48
octavia/tests/unit/certificates/generator/test_anchor.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
import requests_mock
|
||||||
|
import six
|
||||||
|
|
||||||
|
from octavia.certificates.generator import anchor
|
||||||
|
from octavia.common import exceptions
|
||||||
|
from octavia.tests.unit.certificates.generator import local_csr
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.import_group('anchor', 'octavia.common.config')
|
||||||
|
|
||||||
|
|
||||||
|
class TestAnchorGenerator(local_csr.BaseLocalCSRTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestAnchorGenerator, self).setUp()
|
||||||
|
self.cert_generator = anchor.AnchorCertGenerator
|
||||||
|
|
||||||
|
@requests_mock.mock()
|
||||||
|
def test_sign_cert(self, m):
|
||||||
|
|
||||||
|
m.post(CONF.anchor.url, content=six.b('test'))
|
||||||
|
|
||||||
|
# Attempt to sign a cert
|
||||||
|
signed_cert = self.cert_generator.sign_cert(
|
||||||
|
csr=self.certificate_signing_request
|
||||||
|
)
|
||||||
|
self.assertEqual("test", signed_cert.decode('ascii'))
|
||||||
|
self.assertTrue(m.called)
|
||||||
|
|
||||||
|
m.post(CONF.anchor.url, status_code=400)
|
||||||
|
self.assertRaises(exceptions.CertificateGenerationException,
|
||||||
|
self.cert_generator.sign_cert,
|
||||||
|
self.certificate_signing_request)
|
@ -15,46 +15,20 @@ import datetime
|
|||||||
|
|
||||||
from cryptography import exceptions as crypto_exceptions
|
from cryptography import exceptions as crypto_exceptions
|
||||||
from cryptography.hazmat import backends
|
from cryptography.hazmat import backends
|
||||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
||||||
from cryptography.hazmat.primitives import hashes
|
from cryptography.hazmat.primitives import hashes
|
||||||
from cryptography.hazmat.primitives import serialization
|
from cryptography.hazmat.primitives import serialization
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
|
|
||||||
import octavia.certificates.generator.local as local_cert_gen
|
import octavia.certificates.generator.local as local_cert_gen
|
||||||
import octavia.tests.unit.base as base
|
from octavia.tests.unit.certificates.generator import local_csr
|
||||||
|
|
||||||
|
|
||||||
class TestLocalGenerator(base.TestCase):
|
class TestLocalGenerator(local_csr.BaseLocalCSRTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
super(TestLocalGenerator, self).setUp()
|
||||||
self.signing_digest = "sha256"
|
self.signing_digest = "sha256"
|
||||||
|
|
||||||
# Set up CSR data
|
# Setup CA data
|
||||||
csr_key = rsa.generate_private_key(
|
|
||||||
public_exponent=65537,
|
|
||||||
key_size=2048,
|
|
||||||
backend=backends.default_backend()
|
|
||||||
)
|
|
||||||
csr = x509.CertificateSigningRequestBuilder().subject_name(
|
|
||||||
x509.Name([
|
|
||||||
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"test"),
|
|
||||||
])).sign(csr_key, hashes.SHA256(), backends.default_backend())
|
|
||||||
self.certificate_signing_request = csr.public_bytes(
|
|
||||||
serialization.Encoding.PEM)
|
|
||||||
|
|
||||||
# Set up CA data
|
|
||||||
ca_key = rsa.generate_private_key(
|
|
||||||
public_exponent=65537,
|
|
||||||
key_size=2048,
|
|
||||||
backend=backends.default_backend()
|
|
||||||
)
|
|
||||||
|
|
||||||
self.ca_private_key_passphrase = b"Testing"
|
|
||||||
self.ca_private_key = ca_key.private_bytes(
|
|
||||||
encoding=serialization.Encoding.PEM,
|
|
||||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
||||||
encryption_algorithm=serialization.BestAvailableEncryption(
|
|
||||||
self.ca_private_key_passphrase),
|
|
||||||
)
|
|
||||||
|
|
||||||
ca_cert = x509.CertificateBuilder()
|
ca_cert = x509.CertificateBuilder()
|
||||||
valid_from_datetime = datetime.datetime.utcnow()
|
valid_from_datetime = datetime.datetime.utcnow()
|
||||||
@ -75,19 +49,19 @@ class TestLocalGenerator(base.TestCase):
|
|||||||
])
|
])
|
||||||
ca_cert = ca_cert.subject_name(subject_name)
|
ca_cert = ca_cert.subject_name(subject_name)
|
||||||
ca_cert = ca_cert.issuer_name(subject_name)
|
ca_cert = ca_cert.issuer_name(subject_name)
|
||||||
ca_cert = ca_cert.public_key(ca_key.public_key())
|
ca_cert = ca_cert.public_key(self.ca_key.public_key())
|
||||||
signed_cert = ca_cert.sign(private_key=ca_key,
|
signed_cert = ca_cert.sign(private_key=self.ca_key,
|
||||||
algorithm=hashes.SHA256(),
|
algorithm=hashes.SHA256(),
|
||||||
backend=backends.default_backend())
|
backend=backends.default_backend())
|
||||||
|
|
||||||
self.ca_certificate = signed_cert.public_bytes(
|
self.ca_certificate = signed_cert.public_bytes(
|
||||||
encoding=serialization.Encoding.PEM)
|
encoding=serialization.Encoding.PEM)
|
||||||
|
|
||||||
super(TestLocalGenerator, self).setUp()
|
self.cert_generator = local_cert_gen.LocalCertGenerator
|
||||||
|
|
||||||
def test_sign_cert(self):
|
def test_sign_cert(self):
|
||||||
# Attempt sign a cert
|
# Attempt sign a cert
|
||||||
signed_cert = local_cert_gen.LocalCertGenerator.sign_cert(
|
signed_cert = self.cert_generator.sign_cert(
|
||||||
csr=self.certificate_signing_request,
|
csr=self.certificate_signing_request,
|
||||||
validity=2 * 365 * 24 * 60 * 60,
|
validity=2 * 365 * 24 * 60 * 60,
|
||||||
ca_cert=self.ca_certificate,
|
ca_cert=self.ca_certificate,
|
||||||
@ -128,7 +102,7 @@ class TestLocalGenerator(base.TestCase):
|
|||||||
def test_sign_cert_invalid_algorithm(self):
|
def test_sign_cert_invalid_algorithm(self):
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
crypto_exceptions.UnsupportedAlgorithm,
|
crypto_exceptions.UnsupportedAlgorithm,
|
||||||
local_cert_gen.LocalCertGenerator.sign_cert,
|
self.cert_generator.sign_cert,
|
||||||
csr=self.certificate_signing_request,
|
csr=self.certificate_signing_request,
|
||||||
validity=2 * 365 * 24 * 60 * 60,
|
validity=2 * 365 * 24 * 60 * 60,
|
||||||
ca_cert=self.ca_certificate,
|
ca_cert=self.ca_certificate,
|
||||||
@ -137,58 +111,12 @@ class TestLocalGenerator(base.TestCase):
|
|||||||
ca_digest='not_an_algorithm'
|
ca_digest='not_an_algorithm'
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_generate_private_key(self):
|
|
||||||
bit_length = 1024
|
|
||||||
# Attempt to generate a private key
|
|
||||||
pk = local_cert_gen.LocalCertGenerator._generate_private_key(
|
|
||||||
bit_length=bit_length
|
|
||||||
)
|
|
||||||
|
|
||||||
# Attempt to load the generated private key
|
|
||||||
pko = serialization.load_pem_private_key(
|
|
||||||
data=pk, password=None, backend=backends.default_backend())
|
|
||||||
|
|
||||||
# Make sure the bit_length is what we set
|
|
||||||
self.assertEqual(pko.key_size, bit_length)
|
|
||||||
|
|
||||||
def test_generate_private_key_with_passphrase(self):
|
|
||||||
bit_length = 2048
|
|
||||||
# Attempt to generate a private key
|
|
||||||
pk = local_cert_gen.LocalCertGenerator._generate_private_key(
|
|
||||||
bit_length=bit_length,
|
|
||||||
passphrase=self.ca_private_key_passphrase
|
|
||||||
)
|
|
||||||
|
|
||||||
# Attempt to load the generated private key
|
|
||||||
pko = serialization.load_pem_private_key(
|
|
||||||
data=pk, password=self.ca_private_key_passphrase,
|
|
||||||
backend=backends.default_backend())
|
|
||||||
|
|
||||||
# Make sure the bit_length is what we set
|
|
||||||
self.assertEqual(pko.key_size, bit_length)
|
|
||||||
|
|
||||||
def test_generate_csr(self):
|
|
||||||
cn = 'test_cn'
|
|
||||||
# Attempt to generate a CSR
|
|
||||||
csr = local_cert_gen.LocalCertGenerator._generate_csr(
|
|
||||||
cn=cn,
|
|
||||||
private_key=self.ca_private_key,
|
|
||||||
passphrase=self.ca_private_key_passphrase
|
|
||||||
)
|
|
||||||
|
|
||||||
# Attempt to load the generated CSR
|
|
||||||
csro = x509.load_pem_x509_csr(data=csr,
|
|
||||||
backend=backends.default_backend())
|
|
||||||
|
|
||||||
# Make sure the CN is correct
|
|
||||||
self.assertEqual(cn, csro.subject.get_attributes_for_oid(
|
|
||||||
x509.oid.NameOID.COMMON_NAME)[0].value)
|
|
||||||
|
|
||||||
def test_generate_cert_key_pair(self):
|
def test_generate_cert_key_pair(self):
|
||||||
cn = 'test_cn'
|
cn = 'test_cn'
|
||||||
bit_length = 512
|
bit_length = 512
|
||||||
|
|
||||||
# Attempt to generate a cert/key pair
|
# Attempt to generate a cert/key pair
|
||||||
cert_object = local_cert_gen.LocalCertGenerator.generate_cert_key_pair(
|
cert_object = self.cert_generator.generate_cert_key_pair(
|
||||||
cn=cn,
|
cn=cn,
|
||||||
validity=2 * 365 * 24 * 60 * 60,
|
validity=2 * 365 * 24 * 60 * 60,
|
||||||
bit_length=bit_length,
|
bit_length=bit_length,
|
||||||
@ -207,4 +135,4 @@ class TestLocalGenerator(base.TestCase):
|
|||||||
data=cert_object.private_key,
|
data=cert_object.private_key,
|
||||||
password=cert_object.private_key_passphrase,
|
password=cert_object.private_key_passphrase,
|
||||||
backend=backends.default_backend())
|
backend=backends.default_backend())
|
||||||
self.assertIsNotNone(key)
|
self.assertIsNotNone(key)
|
@ -59,6 +59,7 @@ octavia.network.drivers =
|
|||||||
octavia.cert_generator =
|
octavia.cert_generator =
|
||||||
local_cert_generator = octavia.certificates.generator.local:LocalCertGenerator
|
local_cert_generator = octavia.certificates.generator.local:LocalCertGenerator
|
||||||
barbican_cert_generator = octavia.certificates.generator.barbican:BarbicanCertGenerator
|
barbican_cert_generator = octavia.certificates.generator.barbican:BarbicanCertGenerator
|
||||||
|
anchor_cert_generator = octavia.certificates.generator.anchor:AnchorCertGenerator
|
||||||
octavia.cert_manager =
|
octavia.cert_manager =
|
||||||
local_cert_manager = octavia.certificates.manager.local:LocalCertManager
|
local_cert_manager = octavia.certificates.manager.local:LocalCertManager
|
||||||
barbican_cert_manager = octavia.certificates.manager.barbican:BarbicanCertManager
|
barbican_cert_manager = octavia.certificates.manager.barbican:BarbicanCertManager
|
||||||
|
Loading…
Reference in New Issue
Block a user