Files
sunbeam-charms/tests/local/zaza/sunbeam/charm_tests/api_audit/tests.py
Lucian Petrut 58a4c980bb Enable audit middleware
We're enabling the audit middleware for the following API services:

* cinder
* nova
* neutron
* glance
* heat
* barbican
* octavia
* gnocchi
  * config.verbose was deprecated a long time ago and Gnocchi doesn't
    support a separate "logging.conf" file
     * as such, Gnocchi can't be configured to use "info" level logging
     * the audit logs will only be emitted in debug mode
  * cf001e8428/gnocchi/service.py (L72-L79)
* newly defined audit map (the other services had already existing
  definitions, which were updated):
  * aodh
  * designate
  * magnum
  * masakari
    * updated wsgi log configuration

The following services do not support the audit middleware,
the support for api paste was dropped:
* placement
* watcher
* keystone (has its own pycadf audit implementation)
  * emits notifications using oslo.notifications
  * configurable driver, possible options include "messagingv2" and "log"
  * we can't just use the log driver since the user may request amqp
    notifications using:
      "juju config keystone enable-telemetry-notifications=True"
  * we'll listen for amqp notifications using a separate service

Reference:
* audit middleware configuration: https://docs.openstack.org/keystonemiddleware/latest/audit.html
* api map samples: https://opendev.org/openstack/pycadf/src/tag/3.1.1/etc/pycadf

Change-Id: I28a261b85067704221d6ab3d949c5d2a27a4a9d7
2025-06-10 13:52:25 +00:00

205 lines
6.5 KiB
Python

# Copyright (c) 2024 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import subprocess
import barbicanclient.client as barbican_client
import zaza.openstack.charm_tests.test_utils as test_utils
from zaza.openstack.utilities import openstack as openstack_utils
class OpenStackAPIAuditTest(test_utils.BaseCharmTest):
"""Charm tests for API audit logging."""
application_name = None
@classmethod
def setUpClass(cls):
super().setUpClass(application_name=cls.application_name)
cls.keystone_session = openstack_utils.get_overcloud_keystone_session()
auth = openstack_utils.get_overcloud_auth()
cls.keystone_client = openstack_utils.get_keystone_client(auth)
@classmethod
def get_pod_logs(cls, pod_name, since="5m",
container=None, all_containers=True):
# We expect the k8s namespace name to match the model name.
namespace = cls.model_name
cmd = ["sudo", "k8s", "kubectl", "logs", "-n", namespace,
f"pod/{pod_name}"]
if all_containers:
cmd += ["--all-containers"]
if container:
cmd += ["--container", container]
result = subprocess.run(cmd, check=True, capture_output=True)
return result.stdout.decode()
def _trigger_audit_event(self):
# Perform any API action that is expected to trigger an audit event.
pass
def check_audit_logs(self, exp_msg):
self._trigger_audit_event()
# For simplicity we expect there to be just one pod.
# If needed, we can check multiple pods or aggregate the
# logs using COS.
pod_name = f"{self.application_name}-0"
pod_logs = self.get_pod_logs(pod_name)
assert exp_msg in pod_logs, (
f"{pod_name} logs do not contain the expected message: {exp_msg}")
class KeystoneAPIAuditTest(OpenStackAPIAuditTest):
application_name = "keystone"
def _trigger_audit_event(self):
# We'll update the user email to trigger an audit event.
auth = openstack_utils.get_overcloud_auth()
username = auth["OS_USERNAME"]
user = self.keystone_client.users.find(name=username)
rand_num = random.randint(0, 32768)
email = f"test{rand_num}.example.com"
self.keystone_client.users.update(user.id, email=email)
def test_audit(self):
# Expects the following relation: rabbitmq:amqp keystone:amqp
default_config = {'enable-telemetry-notifications': False}
alternate_config = {'enable-telemetry-notifications': True}
with self.config_change(
default_config=default_config,
alternate_config=alternate_config,
application_name="keystone"):
exp_msg = (
"keystone_audit_monitor Received info notification, "
"publisher: identity.keystone")
self.check_audit_logs(exp_msg)
class AuditMiddlewareTest(OpenStackAPIAuditTest):
"""Base class for services that use the audit paste middleware"""
def test_audit(self):
exp_msg = "oslo.messaging.notification.audit.http.request"
self.check_audit_logs(exp_msg)
class AodhAPIAuditTest(AuditMiddlewareTest):
application_name = "aodh"
def _trigger_audit_event(self):
client = openstack_utils.get_aodh_session_client(
self.keystone_session)
client.alarm.list()
class BarbicanAPIAuditTest(AuditMiddlewareTest):
application_name = "barbican"
def _trigger_audit_event(self):
barbican_endpoint = self.keystone_client.service_catalog.url_for(
service_type='key-manager', interface='publicURL')
client = barbican_client.Client(session=self.keystone_session,
endpoint=barbican_endpoint)
client.secrets.list()
class CinderAPIAuditTest(AuditMiddlewareTest):
application_name = "cinder"
def _trigger_audit_event(self):
client = openstack_utils.get_cinder_session_client(
self.keystone_session)
client.volumes.list()
class DesignateAPIAuditTest(AuditMiddlewareTest):
application_name = "designate"
def _trigger_audit_event(self):
client = openstack_utils.get_designate_session_client(
session=self.keystone_session)
client.zones.list()
class GlanceAPIAuditTest(AuditMiddlewareTest):
application_name = "glance"
def _trigger_audit_event(self):
client = openstack_utils.get_glance_session_client(
self.keystone_session)
client.images.list()
class HeatAPIAuditTest(AuditMiddlewareTest):
application_name = "heat"
def _trigger_audit_event(self):
client = openstack_utils.get_heat_session_client(
self.keystone_session)
client.stacks.list()
class MagnumAPIAuditTest(AuditMiddlewareTest):
application_name = "magnum"
def _trigger_audit_event(self):
client = openstack_utils.get_magnum_session_client(
self.keystone_session)
client.clusters.list()
class MasakariAPIAuditTest(AuditMiddlewareTest):
application_name = "masakari"
def _trigger_audit_event(self):
client = openstack_utils.get_masakari_session_client(
self.keystone_session)
for segment in client.segments():
pass
class NovaAPIAuditTest(AuditMiddlewareTest):
application_name = "nova"
def _trigger_audit_event(self):
client = openstack_utils.get_nova_session_client(
self.keystone_session)
client.servers.list()
class NeutronAPIAuditTest(AuditMiddlewareTest):
application_name = "neutron"
def _trigger_audit_event(self):
client = openstack_utils.get_neutron_session_client(
self.keystone_session)
client.list_networks()
class OctaviaAPIAuditTest(AuditMiddlewareTest):
application_name = "octavia"
def _trigger_audit_event(self):
client = openstack_utils.get_octavia_session_client(
self.keystone_session)
client.amphora_list()