Implement relation for user ops

Change-Id: I81f9cfc548e002de71dbafa9254c6059fb9226d9
This commit is contained in:
Guillaume Boutry
2023-10-23 14:54:38 +02:00
parent f4afc7a17b
commit 72ee41376c

View File

@@ -14,14 +14,19 @@
"""Base classes for defining a charm using the Operator framework."""
import hashlib
import json
import logging
import secrets
import string
from typing import (
Callable,
Dict,
FrozenSet,
List,
Optional,
Tuple,
Union,
)
from urllib.parse import (
urlparse,
@@ -1141,10 +1146,10 @@ class IdentityResourceRequiresHandler(RelationHandler):
def setup_event_handler(self):
"""Configure event handlers for an Identity resource relation."""
import charms.keystone_k8s.v0.identity_resource as id_ops
import charms.keystone_k8s.v0.identity_resource as ops_svc
logger.debug("Setting up Identity Resource event handler")
ops_svc = id_ops.IdentityResourceRequires(
ops_svc = ops_svc.IdentityResourceRequires(
self.charm,
self.relation_name,
)
@@ -1318,3 +1323,440 @@ class CephAccessRequiresHandler(RelationHandler):
ctxt["key"] = data.get("key")
ctxt["uuid"] = data.get("uuid")
return ctxt
ExtraOpsProcess = Callable[[ops.EventBase, dict], None]
class UserIdentityResourceRequiresHandler(RelationHandler):
"""Handle user management on IdentityResource relation."""
CREDENTIALS_SECRET_PREFIX = "user-identity-resource-"
CONFIGURE_SECRET_PREFIX = "configure-credential-"
resource_identifiers: FrozenSet[str] = frozenset(
{
"name",
"email",
"description",
"domain",
"project",
"project_domain",
"enable",
"may_exist",
}
)
def __init__(
self,
charm: ops.CharmBase,
relation_name: str,
callback_f: Callable,
mandatory: bool,
name: str,
domain: str,
email: Optional[str] = None,
description: Optional[str] = None,
project: Optional[str] = None,
project_domain: Optional[str] = None,
enable: bool = True,
may_exist: bool = True,
role: Optional[str] = None,
add_suffix: bool = False,
rotate: ops.SecretRotate = ops.SecretRotate.NEVER,
extra_ops: Optional[List[Union[dict, Callable]]] = None,
extra_ops_process: Optional[ExtraOpsProcess] = None,
):
self.username = name
super().__init__(charm, relation_name, callback_f, mandatory)
self.charm = charm
self.add_suffix = add_suffix
# add_suffix is used to add suffix to username to create unique user
self.role = role
self.rotate = rotate
self.extra_ops = extra_ops
self.extra_ops_process = extra_ops_process
self._params = {}
_locals = locals()
for keys in self.resource_identifiers:
value = _locals.get(keys)
if value is not None:
self._params[keys] = value
def setup_event_handler(self) -> ops.Object:
"""Configure event handlers for the relation."""
import charms.keystone_k8s.v0.identity_resource as id_ops
logger.debug("Setting up Identity Resource event handler")
ops_svc = id_ops.IdentityResourceRequires(
self.charm,
self.relation_name,
)
self.framework.observe(
ops_svc.on.provider_ready,
self._on_provider_ready,
)
self.framework.observe(
ops_svc.on.provider_goneaway,
self._on_provider_goneaway,
)
self.framework.observe(
ops_svc.on.response_available,
self._on_response_available,
)
self.framework.observe(
self.charm.on.secret_changed, self._on_secret_changed
)
self.framework.observe(
self.charm.on.secret_rotate, self._on_secret_rotate
)
self.framework.observe(
self.charm.on.secret_remove, self._on_secret_remove
)
return ops_svc
def _hash_ops(self, ops: list) -> str:
"""Hash ops request."""
return hashlib.sha256(json.dumps(ops).encode()).hexdigest()
@property
def label(self) -> str:
"""Secret label to share over keystone resource relation."""
return self.CREDENTIALS_SECRET_PREFIX + self.username
@property
def config_label(self) -> str:
"""Secret label to template configuration from."""
return self.CONFIGURE_SECRET_PREFIX + self.username
@property
def _create_user_tag(self) -> str:
return "create_user_" + self.username
@property
def _delete_user_tag(self) -> str:
return "delete_user_" + self.username
def random_string(self, length: int) -> str:
"""Utility function to generate secure random string."""
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for i in range(length))
def _ensure_credentials(self, refresh_user: bool = False) -> str:
credentials_id = self.charm.leader_get(self.label)
suffix_length = 6
password_length = 18
if credentials_id:
if refresh_user:
username = self.username
if self.add_suffix:
suffix = self.random_string(suffix_length)
username += "-" + suffix
secret = self.model.get_secret(id=credentials_id)
secret.set_content(
{
"username": username,
"password": self.random_string(password_length),
}
)
return credentials_id
username = self.username
password = self.random_string(password_length)
if self.add_suffix:
suffix = self.random_string(suffix_length)
username += "-" + suffix
secret = self.model.app.add_secret(
{"username": username, "password": password},
label=self.label,
rotate=self.rotate,
)
self.charm.leader_set({self.label: secret.id})
return secret.id # type: ignore[union-attr]
def _grant_ops_secret(self, relation: ops.Relation):
secret = self.model.get_secret(id=self._ensure_credentials())
secret.grant(relation)
def _get_credentials(self) -> Tuple[str, str]:
credentials_id = self._ensure_credentials()
secret = self.model.get_secret(id=credentials_id)
content = secret.get_content()
return content["username"], content["password"]
def get_config_credentials(self) -> Optional[Tuple[str, str]]:
"""Get credential from config secret."""
credentials_id = self.charm.leader_get(self.config_label)
if not credentials_id:
return None
secret = self.model.get_secret(id=credentials_id)
content = secret.get_content()
return content["username"], content["password"]
def _update_config_credentials(self) -> bool:
"""Update config credentials.
Returns True if credentials are updated, False otherwise.
"""
credentials_id = self.charm.leader_get(self.config_label)
username, password = self._get_credentials()
content = {"username": username, "password": password}
if credentials_id is None:
secret = self.model.app.add_secret(
content, label=self.config_label
)
self.charm.leader_set({self.config_label: secret.id})
return True
secret = self.model.get_secret(id=credentials_id)
old_content = secret.get_content()
if old_content != content:
secret.set_content(content)
return True
return False
def _create_user_request(self) -> dict:
credentials_id = self._ensure_credentials()
username, _ = self._get_credentials()
requests = []
domain = self._params["domain"]
create_domain = {
"name": "create_domain",
"params": {"name": domain, "enable": True},
}
requests.append(create_domain)
if self.role:
create_role = {
"name": "create_role",
"params": {"name": self.role},
}
requests.append(create_role)
params = self._params.copy()
params.pop("name", None)
create_user = {
"name": "create_user",
"params": {
"name": username,
"password": credentials_id,
**params,
},
}
requests.append(create_user)
requests.extend(self._create_role_requests(username, domain))
if self.extra_ops:
for extra_op in self.extra_ops:
if isinstance(extra_op, dict):
requests.append(extra_op)
elif callable(extra_op):
requests.append(extra_op())
else:
logger.debug(f"Invalid type of extra_op: {extra_op!r}")
request = {
"id": self._hash_ops(requests),
"tag": self._create_user_tag,
"ops": requests,
}
return request
def _create_role_requests(
self, username, domain: Optional[str]
) -> List[dict]:
requests = []
if self.role:
params = {
"role": self.role,
}
if domain:
params["domain"] = domain
params["user_domain"] = domain
project_domain = self._params.get("project_domain")
if project_domain:
params["project_domain"] = project_domain
params["user"] = username
grant_role_domain = {"name": "grant_role", "params": params}
requests.append(grant_role_domain)
project = self._params.get("project")
if project:
requests.append(
{
"name": "show_project",
"params": {
"name": project,
"domain": project_domain or domain,
},
}
)
params = {
"project": "{{ show_project[0].id }}",
"role": "{{ create_role[0].id }}",
"user": "{{ create_user[0].id }}",
"user_domain": "{{ create_domain[0].id }}",
}
if project_domain:
params[
"project_domain"
] = "{{ show_project[0].domain_id }}"
requests.append(
{
"name": "grant_role",
"params": params,
}
)
return requests
def _delete_user_request(self, users: List[str]) -> dict:
requests = []
for user in users:
params = {"name": user}
domain = self._params.get("domain")
if domain:
params["domain"] = domain
requests.append(
{
"name": "delete_user",
"params": params,
}
)
return {
"id": self._hash_ops(requests),
"tag": self._delete_user_tag,
"ops": requests,
}
def _process_create_user_response(self, response: dict) -> None:
if {op.get("return-code") for op in response.get("ops", [])} == {0}:
logger.debug("Create user completed.")
config_credentials = self.get_config_credentials()
credentials_updated = self._update_config_credentials()
if config_credentials and credentials_updated:
username = config_credentials[0]
self.add_user_to_delete_user_list(username)
else:
logger.debug("Error in creation of user ops " f"{response}")
def add_user_to_delete_user_list(self, user: str) -> None:
"""Update users list to delete."""
logger.debug(f"Adding user to delete list {user}")
old_users = self.charm.leader_get("old_users")
delete_users = json.loads(old_users) if old_users else []
if user not in delete_users:
delete_users.append(user)
self.charm.leader_set({"old_users": json.dumps(delete_users)})
def _process_delete_user_response(self, response: dict) -> None:
deleted_users = []
for op in response.get("ops", []):
if op.get("return-code") == 0:
deleted_users.append(op.get("value").get("name"))
else:
logger.debug(f"Error in running delete user for op {op}")
if deleted_users:
logger.debug(f"Deleted users: {deleted_users}")
old_users = self.charm.leader_get("old_users")
users_to_delete = json.loads(old_users) if old_users else []
new_users_to_delete = [
x for x in users_to_delete if x not in deleted_users
]
self.charm.leader_set({"old_users": json.dumps(new_users_to_delete)})
def _on_secret_changed(self, event: ops.SecretChangedEvent):
logger.debug(
f"secret-changed triggered for label {event.secret.label}"
)
# Secret change on configured user secret
if event.secret.label == self.config_label:
logger.debug(
"Calling configure charm to populate user info in "
"configuration files"
)
self.callback_f(event)
else:
logger.debug(
"Ignoring the secret-changed event for label "
f"{event.secret.label}"
)
def _on_secret_rotate(self, event: ops.SecretRotateEvent):
# All the juju secrets are created on leader unit, so return
# if unit is not leader at this stage instead of checking at
# each secret.
logger.debug(f"secret-rotate triggered for label {event.secret.label}")
if not self.model.unit.is_leader():
logger.debug("Not leader unit, no action required")
return
# Secret rotate on stack user secret sent to ops
if event.secret.label == self.label:
self._ensure_credentials(refresh_user=True)
request = self._create_user_request()
logger.debug(f"Sending ops request: {request}")
self.interface.request_ops(request)
else:
logger.debug(
"Ignoring the secret-rotate event for label "
f"{event.secret.label}"
)
def _on_secret_remove(self, event: ops.SecretRemoveEvent):
logger.debug(f"secret-remove triggered for label {event.secret.label}")
if not self.model.unit.is_leader():
logger.debug("Not leader unit, no action required")
return
# Secret remove on configured stack admin secret
if event.secret.label == self.config_label:
old_users = self.charm.leader_get("old_users")
users_to_delete = json.loads(old_users) if old_users else []
if not users_to_delete:
return
request = self._delete_user_request(users_to_delete)
logger.debug(f"Sending ops request: {request}")
self.interface.request_ops(request)
else:
logger.debug(
"Ignoring the secret-remove event for label "
f"{event.secret.label}"
)
def _on_provider_ready(self, event) -> None:
"""Handles response available events."""
logger.info("Handle response from identity ops")
if not self.model.unit.is_leader():
return
self.interface.request_ops(self._create_user_request())
self._grant_ops_secret(event.relation)
self.callback_f(event)
def _on_response_available(self, event) -> None:
"""Handles response available events."""
if not self.model.unit.is_leader():
return
logger.info("Handle response from identity ops")
response = self.interface.response
tag = response.get("tag")
if tag == self._create_user_tag:
self._process_create_user_response(response)
if self.extra_ops_process is not None:
self.extra_ops_process(event, response)
elif tag == self._delete_user_tag:
self._process_delete_user_response(response)
self.callback_f(event)
def _on_provider_goneaway(self, event) -> None:
"""Handle gone_away event."""
self.callback_f(event)
@property
def ready(self) -> bool:
"""Whether the relation is ready."""
return self.get_config_credentials() is not None