Support rotating server_certs_key_passphrase key

The server_certs_key_passphrase was introduced to encrypt
the stored amphora certificates and the configuration option
for the passphrase has a default value.

This default value cannot be changed once assigned as there
is data stored encrypted using the passphrase, if the
passphrase is changed all amphora needs failover so that
the certificates is stored encrypted using the passphrase.

This wrap the existing cryptography.fernet.Fernet logic into
a utils helper function and converts it to using MultiFernet
so that the first key in the list is used for encrypting data
and all the others is available for decryption that means
that the the first key in the list can be changed and the
current key appended to the end of the list and when you
have organically (over time) performed amphora failovers you
can remove old keys.

The server_certs_key_passphrase config option is changed to
a ListOpt which is backward compatible with StrOpt.

We introduce a new FernetKeyOpt item_type for the config
option because ListOpt of strings (or if we would have
used a MultiStrOpt) does not support the `regex` kwarg
for regex based validation.

Closes-Bug: #2106810
Change-Id: Iadf100ab45499b59421425bc6874012a8bf0bde5
This commit is contained in:
Tobias Urdin
2025-02-27 10:36:20 +01:00
parent 083dd12b08
commit 4dfcd861cd
14 changed files with 113 additions and 42 deletions

View File

@@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from cryptography import fernet
from jsonschema import exceptions as js_exceptions
from jsonschema import validate
@@ -71,8 +70,7 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
topic=consts.TOPIC_AMPHORA_V2, version="2.0", fanout=False)
self.client = rpc.get_client(self.target)
self.repositories = repositories.Repositories()
key = utils.get_compatible_server_certs_key_passphrase()
self.fernet = fernet.Fernet(key)
self.fernet = utils.get_server_certs_key_passphrases_fernet()
def _validate_pool_algorithm(self, pool):
if pool.lb_algorithm not in AMPHORA_SUPPORTED_LB_ALGORITHMS:

View File

@@ -19,6 +19,7 @@ Common classes for local filesystem certificate handling
import os
from oslo_config import cfg
from oslo_config import types as cfg_types
from octavia.certificates.common import cert
@@ -37,6 +38,22 @@ TLS_STORAGE_DEFAULT = os.environ.get(
'OS_OCTAVIA_TLS_STORAGE', '/var/lib/octavia/certificates/'
)
class FernetKeyOpt:
regex_pattern = r'^[A-Za-z0-9\-_=]{32}$'
def __init__(self, value: str):
string_type = cfg_types.String(
choices=None, regex=self.regex_pattern)
self.value = string_type(value)
def __repr__(self):
return self.value.__repr__()
def __str__(self):
return self.value.__str__()
certgen_opts = [
cfg.StrOpt('ca_certificate',
default=TLS_CERT_DEFAULT,
@@ -51,15 +68,17 @@ certgen_opts = [
help='Passphrase for the Private Key. Defaults'
' to env[OS_OCTAVIA_CA_KEY_PASS] or None.',
secret=True),
cfg.StrOpt('server_certs_key_passphrase',
default=TLS_PASS_AMPS_DEFAULT,
help='Passphrase for encrypting Amphora Certificates and '
'Private Keys. Must be 32, base64(url) compatible, '
'characters long. Defaults to env[TLS_PASS_AMPS_DEFAULT] '
'or insecure-key-do-not-use-this-key',
regex=r'^[A-Za-z0-9\-_=]{32}$',
required=True,
secret=True),
cfg.ListOpt('server_certs_key_passphrase',
default=[TLS_PASS_AMPS_DEFAULT],
item_type=FernetKeyOpt,
help='List of passphrase for encrypting Amphora Certificates '
'and Private Keys, first in list is used for encryption while '
'all other keys is used to decrypt previously encrypted data. '
'Each key must be 32, base64(url) compatible, characters long.'
' Defaults to env[TLS_PASS_AMPS_DEFAULT] or '
'a list with default key insecure-key-do-not-use-this-key',
required=True,
secret=True),
cfg.StrOpt('signing_digest',
default=TLS_DIGEST_DEFAULT,
help='Certificate signing digest. Defaults'
@@ -80,6 +99,7 @@ certmgr_opts = [
class LocalCert(cert.Cert):
"""Representation of a Cert for local storage."""
def __init__(self, certificate, private_key, intermediates=None,
private_key_passphrase=None):
self.certificate = certificate

View File

@@ -25,6 +25,7 @@ import re
import socket
import typing
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
@@ -131,11 +132,24 @@ def get_compatible_value(value):
return value
def get_compatible_server_certs_key_passphrase():
key = CONF.certificates.server_certs_key_passphrase
if isinstance(key, str):
key = key.encode('utf-8')
return base64.urlsafe_b64encode(key)
def _get_compatible_server_certs_key_passphrases():
key_opts = CONF.certificates.server_certs_key_passphrase
keys = []
for key_opt in key_opts:
key = str(key_opt)
if isinstance(key, str):
key = key.encode('utf-8')
keys.append(
base64.urlsafe_b64encode(key))
return keys
def get_server_certs_key_passphrases_fernet() -> fernet.MultiFernet:
"""Get a cryptography.MultiFernet with loaded keys."""
keys = [
fernet.Fernet(x) for x in
_get_compatible_server_certs_key_passphrases()]
return fernet.MultiFernet(keys)
def subnet_ip_availability(nw_ip_avail, subnet_id, req_num_ips):
@@ -178,6 +192,7 @@ class exception_logger:
any occurred
"""
def __init__(self, logger=None):
self.logger = logger

View File

@@ -17,7 +17,6 @@ import copy
from typing import List
from typing import Optional
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver as stevedore_driver
@@ -458,8 +457,7 @@ class AmphoraCertUpload(BaseAmphoraTask):
def execute(self, amphora, server_pem):
"""Execute cert_update_amphora routine."""
LOG.debug("Upload cert in amphora REST driver")
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
session = db_apis.get_session()
with session.begin():
db_amp = self.amphora_repo.get(session,

View File

@@ -13,7 +13,6 @@
# under the License.
#
from cryptography import fernet
from oslo_config import cfg
from stevedore import driver as stevedore_driver
from taskflow import task
@@ -45,8 +44,7 @@ class GenerateServerPEMTask(BaseCertTask):
cert = self.cert_generator.generate_cert_key_pair(
cn=amphora_id,
validity=CONF.certificates.cert_validity_time)
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
# storing in db requires conversion bytes to string
# (required for python3)

View File

@@ -15,7 +15,6 @@
import time
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver as stevedore_driver
@@ -190,8 +189,7 @@ class CertComputeCreate(ComputeCreate):
encoding='utf-8') as client_ca:
ca = client_ca.read()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
config_drive_files = {
'/etc/octavia/certs/server.pem': fer.decrypt(
server_pem.encode("utf-8")).decode("utf-8"),

View File

@@ -13,7 +13,6 @@
# under the License.
#
from cryptography import fernet
from octavia_lib.common import constants as lib_consts
from oslo_config import cfg
from oslo_db import exception as odb_exceptions
@@ -1047,8 +1046,7 @@ class UpdateAmphoraDBCertExpiration(BaseDatabaseTask):
LOG.debug("Update DB cert expiry date of amphora id: %s", amphora_id)
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
cert_expiration = cert_parser.get_cert_expiration(
fer.decrypt(server_pem.encode("utf-8")))
LOG.debug("Certificate expiration date is %s ", cert_expiration)

View File

@@ -878,7 +878,7 @@ class TestAmphoraDriver(base.TestRpc):
self.amp_driver.validate_availability_zone,
'bogus')
@mock.patch('cryptography.fernet.Fernet')
@mock.patch('cryptography.fernet.MultiFernet')
def test_encrypt_listener_dict(self, mock_fernet):
mock_fern = mock.MagicMock()
mock_fernet.return_value = mock_fern

View File

@@ -13,7 +13,9 @@
# under the License.
from unittest import mock
from cryptography import fernet
from octavia_lib.common import constants as lib_consts
from oslo_config import cfg
from oslo_utils import uuidutils
from octavia.common import constants
@@ -178,3 +180,42 @@ class TestConfig(base.TestCase):
result = utils.map_protocol_to_nftable_protocol(
{constants.PROTOCOL: lib_consts.PROTOCOL_PROMETHEUS})
self.assertEqual({constants.PROTOCOL: lib_consts.PROTOCOL_TCP}, result)
def test_rotate_server_certs_key_passphrase(self):
"""Test rotate server_certs_key_passphrase."""
# Use one key (default) and encrypt/decrypt it
cfg.CONF.set_override(
'server_certs_key_passphrase',
['insecure-key-do-not-use-this-key'],
group='certificates')
fer = utils.get_server_certs_key_passphrases_fernet()
data1 = 'some data one'
enc1 = fer.encrypt(data1.encode('utf-8'))
self.assertEqual(
data1, fer.decrypt(enc1).decode('utf-8'))
# Use two keys, first key is new and used for encrypting
# and default key can still be used for decryption
cfg.CONF.set_override(
'server_certs_key_passphrase',
['insecure-key-do-not-use-this-ke2',
'insecure-key-do-not-use-this-key'],
group='certificates')
fer = utils.get_server_certs_key_passphrases_fernet()
data2 = 'some other data'
enc2 = fer.encrypt(data2.encode('utf-8'))
self.assertEqual(
data2, fer.decrypt(enc2).decode('utf-8'))
self.assertEqual(
data1, fer.decrypt(enc1).decode('utf-8'))
# Remove first key and we should only be able to
# decrypt the newest data
cfg.CONF.set_override(
'server_certs_key_passphrase',
['insecure-key-do-not-use-this-ke2'],
group='certificates')
fer = utils.get_server_certs_key_passphrases_fernet()
self.assertEqual(
data2, fer.decrypt(enc2).decode('utf-8'))
self.assertRaises(fernet.InvalidToken, fer.decrypt, enc1)

View File

@@ -14,7 +14,6 @@
#
from unittest import mock
from cryptography import fernet
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
@@ -840,9 +839,8 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_get,
mock_amphora_repo_update):
key = utils.get_compatible_server_certs_key_passphrase()
mock_amphora_repo_get.return_value = _db_amphora_mock
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
pem_file_mock = fer.encrypt(
utils.get_compatible_value('test-pem-file')).decode('utf-8')
amphora_cert_upload_mock = amphora_driver_tasks.AmphoraCertUpload()

View File

@@ -14,7 +14,6 @@
#
from unittest import mock
from cryptography import fernet
from oslo_config import cfg
from octavia.certificates.common import local
@@ -29,8 +28,7 @@ class TestCertTasks(base.TestCase):
@mock.patch('stevedore.driver.DriverManager.driver')
def test_execute(self, mock_driver):
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
dummy_cert = local.LocalCert(
utils.get_compatible_value('test_cert'),
utils.get_compatible_value('test_key'))

View File

@@ -14,7 +14,6 @@
#
from unittest import mock
from cryptography import fernet
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
@@ -375,8 +374,7 @@ class TestComputeTasks(base.TestCase):
def test_compute_create_cert(self, mock_driver, mock_ud_conf,
mock_conf, mock_jinja, mock_log_cfg):
createcompute = compute_tasks.CertComputeCreate()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
mock_log_cfg.return_value = 'FAKE CFG'
mock_driver.build.return_value = COMPUTE_ID

View File

@@ -16,7 +16,6 @@ import copy
import random
from unittest import mock
from cryptography import fernet
from oslo_db import exception as odb_exceptions
from oslo_utils import uuidutils
from sqlalchemy.orm import exc
@@ -1201,8 +1200,7 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_delete):
update_amp_cert = database_tasks.UpdateAmphoraDBCertExpiration()
key = utils.get_compatible_server_certs_key_passphrase()
fer = fernet.Fernet(key)
fer = utils.get_server_certs_key_passphrases_fernet()
_pem_mock = fer.encrypt(
utils.get_compatible_value('test_cert')
).decode('utf-8')

View File

@@ -0,0 +1,13 @@
---
features:
- |
Added support for multiple Fernet keys in the ``[certificates]/server_certs_key_passphrase``
configuration option by changing it to a ListOpt. The first key is used for
encryption and other keys is used for decryption adding support for rotating
the passphrase.
upgrade:
- |
The ``[certificates]/server_certs_key_passphrase`` configuration option is
now a ListOpt so multiple keys can be specified, the first key is used for
encryption and other keys is used for decryption adding support for rotating
the passphrase.