Support user supplied certs

Change-Id: I0bc72bb0e4c907fa042e9e4d4154b9541247ff15
This commit is contained in:
Liam Young 2021-08-12 16:37:08 +00:00
parent 0c279b8ad5
commit 56b48dcd37
3 changed files with 123 additions and 18 deletions

View File

@ -78,3 +78,21 @@ options:
default: ""
description: |
Message of the day settings. Should be in the format "severity|expires|message". Set to "" to disable.
ssl_cert:
type: string
default:
description: |
SSL certificate to install and use for API ports. Setting this value
and ssl_key will enable reverse proxying, point Neutron's entry in the
Keystone catalog to use https, and override any certificate and key
issued by Keystone (if it is configured to do so).
ssl_key:
type: string
default:
description: SSL key to use with certificate specified as ssl_cert.
ssl_ca:
type: string
default:
description: |
SSL CA to use with the certificate and key provided - this is only
required if you are providing a privately signed ssl_cert and ssl_key.

View File

@ -13,8 +13,9 @@ from ops.framework import StoredState
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus, StatusBase
from ops.charm import ActionEvent
from typing import List, Union
from typing import List, Union, Tuple
import base64
import interface_tls_certificates.ca_client as ca_client
import re
import secrets
@ -32,6 +33,8 @@ from pathlib import Path
logger = logging.getLogger(__name__)
TLS_Config = Tuple[Union[bytes, None], Union[bytes, None], Union[bytes, None]]
class CephDashboardCharm(ops_openstack.core.OSBaseCharm):
"""Ceph Dashboard charm."""
@ -160,7 +163,7 @@ class CephDashboardCharm(ops_openstack.core.OSBaseCharm):
self._on_ca_available)
self.framework.observe(
self.ca_client.on.tls_server_config_ready,
self._on_tls_server_config_ready)
self._configure_dashboard)
self.framework.observe(self.on.add_user_action, self._add_user_action)
self.ingress = interface_api_endpoints.APIEndpointsRequires(
self,
@ -243,6 +246,7 @@ class CephDashboardCharm(ops_openstack.core.OSBaseCharm):
if self.unit.is_leader() and not ceph_utils.is_dashboard_enabled():
ceph_utils.mgr_enable_dashboard()
self._apply_ceph_config_from_charm_config()
self._configure_tls()
ceph_utils.mgr_config_set(
'mgr/dashboard/{hostname}/server_addr'.format(
hostname=socket.gethostname()),
@ -254,24 +258,57 @@ class CephDashboardCharm(ops_openstack.core.OSBaseCharm):
binding = self.model.get_binding('public')
return str(binding.network.ingress_address)
def _on_tls_server_config_ready(self, _) -> None:
"""Configure TLS."""
self.TLS_KEY_PATH.write_bytes(
self.ca_client.server_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()))
self.TLS_CERT_PATH.write_bytes(
self.ca_client.server_certificate.public_bytes(
encoding=serialization.Encoding.PEM))
self.TLS_CA_CERT_PATH.write_bytes(
def _get_tls_from_config(self) -> TLS_Config:
"""Extract TLS config from charm config."""
raw_key = self.config.get("ssl_key")
raw_cert = self.config.get("ssl_cert")
raw_ca_cert = self.config.get("ssl_ca")
if not (raw_key and raw_key):
return None, None, None
key = base64.b64decode(raw_key)
cert = base64.b64decode(raw_cert)
if raw_ca_cert:
ca_cert = base64.b64decode(raw_ca_cert)
else:
ca_cert = None
return key, cert, ca_cert
def _get_tls_from_relation(self) -> TLS_Config:
"""Extract TLS config from certificatees relation."""
if not self.ca_client.is_server_cert_ready:
return None, None, None
key = self.ca_client.server_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
cert = self.ca_client.server_certificate.public_bytes(
encoding=serialization.Encoding.PEM)
ca_cert = (
self.ca_client.ca_certificate.public_bytes(
encoding=serialization.Encoding.PEM) +
self.ca_client.root_ca_chain.public_bytes(
encoding=serialization.Encoding.PEM))
return key, cert, ca_cert
def _configure_tls(self) -> None:
"""Configure TLS."""
logging.debug("Attempting to collect TLS config from relation")
key, cert, ca_cert = self._get_tls_from_relation()
if not (key and cert):
logging.debug("Attempting to collect TLS config from charm "
"config")
key, cert, ca_cert = self._get_tls_from_config()
if not (key and cert):
logging.warn(
"Not configuring TLS, not all data present")
return
self.TLS_KEY_PATH.write_bytes(key)
self.TLS_CERT_PATH.write_bytes(cert)
if ca_cert:
self.TLS_CA_CERT_PATH.write_bytes(ca_cert)
subprocess.check_call(['update-ca-certificates'])
hostname = socket.gethostname()
subprocess.check_call(['update-ca-certificates'])
ceph_utils.dashboard_set_ssl_certificate(
self.TLS_CERT_PATH,
hostname=hostname)

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import unittest
import sys
@ -345,22 +346,32 @@ class TestCephDashboardCharmBase(CharmTestCase):
'10.0.0.10')
@patch('socket.gethostname')
def test__on_tls_server_config_ready(self, _gethostname):
def test_certificates_relation(self, _gethostname):
self.ceph_utils.is_dashboard_enabled.return_value = True
mock_TLS_KEY_PATH = MagicMock()
mock_TLS_CERT_PATH = MagicMock()
mock_TLS_CA_CERT_PATH = MagicMock()
_gethostname.return_value = 'server1'
rel_id = self.harness.add_relation('certificates', 'vault')
cert_rel_id = self.harness.add_relation('certificates', 'vault')
dash_rel_id = self.harness.add_relation('dashboard', 'ceph-mon')
self.harness.begin()
self.harness.set_leader()
self.harness.charm.TLS_CERT_PATH = mock_TLS_CERT_PATH
self.harness.charm.TLS_CA_CERT_PATH = mock_TLS_CA_CERT_PATH
self.harness.charm.TLS_KEY_PATH = mock_TLS_KEY_PATH
self.harness.add_relation_unit(
rel_id,
dash_rel_id,
'ceph-mon/0')
self.harness.update_relation_data(
dash_rel_id,
'ceph-mon/0',
{
'mon-ready': 'True'})
self.harness.add_relation_unit(
cert_rel_id,
'vault/0')
self.harness.update_relation_data(
rel_id,
cert_rel_id,
'vault/0',
{
'ceph-dashboard_0.server.cert': TEST_CERT,
@ -384,6 +395,45 @@ class TestCephDashboardCharmBase(CharmTestCase):
self.ceph_utils.mgr_disable_dashboard.assert_called_once_with()
self.ceph_utils.mgr_enable_dashboard.assert_called_once_with()
def test_certificates_from_config(self):
self.ceph_utils.is_dashboard_enabled.return_value = True
mock_TLS_KEY_PATH = MagicMock()
mock_TLS_CERT_PATH = MagicMock()
mock_TLS_CA_CERT_PATH = MagicMock()
dash_rel_id = self.harness.add_relation('dashboard', 'ceph-mon')
self.harness.begin()
self.harness.set_leader()
self.harness.add_relation_unit(
dash_rel_id,
'ceph-mon/0')
self.harness.update_relation_data(
dash_rel_id,
'ceph-mon/0',
{
'mon-ready': 'True'})
self.harness.charm.TLS_CERT_PATH = mock_TLS_CERT_PATH
self.harness.charm.TLS_CA_CERT_PATH = mock_TLS_CA_CERT_PATH
self.harness.charm.TLS_KEY_PATH = mock_TLS_KEY_PATH
self.subprocess.check_call.reset_mock()
self.harness.update_config(
key_values={
'ssl_key': base64.b64encode(TEST_KEY.encode("utf-8")),
'ssl_cert': base64.b64encode(TEST_CERT.encode("utf-8")),
'ssl_ca': base64.b64encode(TEST_CA.encode("utf-8"))})
self.subprocess.check_call.assert_called_once_with(
['update-ca-certificates'])
self.ceph_utils.dashboard_set_ssl_certificate.assert_has_calls([
call(mock_TLS_CERT_PATH, hostname='server1'),
call(mock_TLS_CERT_PATH)])
self.ceph_utils.dashboard_set_ssl_certificate_key.assert_has_calls([
call(mock_TLS_KEY_PATH, hostname='server1'),
call(mock_TLS_KEY_PATH)])
self.ceph_utils.mgr_config_set.assert_has_calls([
call('mgr/dashboard/standby_behaviour', 'redirect'),
call('mgr/dashboard/ssl', 'true')])
self.ceph_utils.mgr_disable_dashboard.assert_called_once_with()
self.ceph_utils.mgr_enable_dashboard.assert_called_once_with()
@patch.object(charm.secrets, 'choice')
def test__gen_user_password(self, _choice):
self.harness.begin()