Merge "add deprecated_api test scenarios"

This commit is contained in:
Zuul
2025-07-18 19:27:42 +00:00
committed by Gerrit Code Review
6 changed files with 300 additions and 0 deletions

View File

@@ -0,0 +1,59 @@
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.k8s.k8s_command_wrapper import export_k8s_config
from keywords.k8s.raw_metrics.object.kubectl_get_raw_metrics_output import KubectlGetRawMetricsOutput
class KubectlGetRawMetricsKeywords(BaseKeyword):
"""
Keyword class for get raw metrics
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor
Args:
ssh_connection(SSHConnection): SSH connection object
"""
self.ssh_connection = ssh_connection
def get_raw_metrics_with_grep(self, grep: str) -> list:
"""
Retrieve raw Kubernetes metrics filtered by a grep pattern.
Executes a kubectl command to fetch raw metrics from the Kubernetes API server,
applying a grep filter to reduce the output size. The filtered metrics are then
parsed and returned as a list.
Args:
grep (str): The pattern to filter the metrics output using grep.
Returns:
list: A list of filtered raw metrics lines matching the grep pattern.
"""
command = export_k8s_config(f"kubectl get --raw /metrics | grep {grep}")
output = self.ssh_connection.send(command)
raw_metrics_output = KubectlGetRawMetricsOutput(output)
return raw_metrics_output
def is_deprecated_api_found(self, expected_deprecated_output_group: str | list) -> bool:
"""
Checks if any of the specified deprecated API groups are present in the Kubernetes API server metrics.
Args:
expected_deprecated_output_group (str | list): API group(s) to search for in the metrics.
Returns:
bool: True if any of the specified API groups are found, False otherwise.
"""
output = self.get_raw_metrics_with_grep("apiserver_requested_deprecated_apis{")
raw_metrics_obj_list = output.get_raw_metrics()
for elem in raw_metrics_obj_list:
if elem.get_labels().get("group") in expected_deprecated_output_group:
found = True
break
else:
found = False
return found

View File

@@ -0,0 +1,55 @@
import re
from keywords.k8s.raw_metrics.object.kubectl_raw_metric_object import KubectlRawMetricObject
class KubectlGetRawMetricsOutput:
"""
Class to parse the output from the `kubectl get --raw /metrics` command.
This class processes the raw metrics output, extracting metric names, labels, and values,
and stores them in instances of `KubectlRawMetricObject`.
"""
def __init__(self, kubectl_get_raw_metrics_output: str):
"""
Initializes the object by parsing the output from the `kubectl get --raw /metrics` command.
Parses each line of the provided output, extracting metric names, labels, and values, and stores them as
instances of `KubectlRawMetricObject` in the `kubectl_raw_metrics` list. Lines that are empty or start with
a '#' character are ignored.
the regex pattern matches lines in the format:
metric_name{label1="value1", label2="value2"} value
example: "apiserver_requested_deprecated_apis{group="helm.toolkit.fluxcd.io"} 1
Args:
kubectl_get_raw_metrics_output (str): The raw string output from the kubectl metrics command.
"""
pattern = re.compile(r"^(?P<metric>\w+)\{(?P<labels>[^}]*)\}\s+(?P<value>.+)$")
self.kubectl_raw_metrics: [KubectlRawMetricObject] = []
for line in kubectl_get_raw_metrics_output:
if not line or line.startswith("#"):
continue
match = pattern.match(line)
if match:
raw_metric = KubectlRawMetricObject()
metric = match.group("metric")
raw_metric.set_metric(metric)
labels = dict(item.split("=") for item in match.group("labels").split(",") if "=" in item)
labels = {k: v.strip('"') for k, v in labels.items()}
raw_metric.set_labels(labels)
value = match.group("value")
raw_metric.set_value(value)
self.kubectl_raw_metrics.append(raw_metric)
def get_raw_metrics(self) -> list[KubectlRawMetricObject]:
"""
Return parsed raw metrics objects.
Returns:
list[KubectlRawMetricObject]: List of parsed raw metrics objects.
"""
return self.kubectl_raw_metrics

View File

@@ -0,0 +1,67 @@
class KubectlRawMetricObject:
"""
Class representing a raw metric object from Kubernetes metrics.
"""
def __init__(self):
"""
Constructor
"""
self.metric = None
self.labels = {}
self.value = None
def set_metric(self, metric: str):
"""
Set the deprecated API metric.
Args:
metric (str): The deprecated API metric.
"""
self.metric = metric
def get_metric(self) -> str:
"""
Get the deprecated API metric.
Returns:
str: The deprecated API metric.
"""
return self.metric
def set_labels(self, labels: dict):
"""
Set the labels for the deprecated API metric.
Args:
labels (dict): The labels for the deprecated API metric.
"""
self.labels = labels
def get_labels(self) -> dict:
"""
Get the labels for the deprecated API metric.
Returns:
dict: The labels for the deprecated API metric.
"""
return self.labels
def set_value(self, value: str):
"""
Set the value for the deprecated API metric.
Args:
value (str): The value for the deprecated API metric.
"""
self.value = value
def get_value(self) -> str:
"""
Get the value for the deprecated API metric.
Returns:
str: The value for the deprecated API metric.
"""
return self.value

View File

@@ -0,0 +1,101 @@
from pytest import mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.application.object.system_application_delete_input import SystemApplicationDeleteInput
from keywords.cloud_platform.system.application.object.system_application_remove_input import SystemApplicationRemoveInput
from keywords.cloud_platform.system.application.object.system_application_status_enum import SystemApplicationStatusEnum
from keywords.cloud_platform.system.application.object.system_application_upload_input import SystemApplicationUploadInput
from keywords.cloud_platform.system.application.system_application_apply_keywords import SystemApplicationApplyKeywords
from keywords.cloud_platform.system.application.system_application_delete_keywords import SystemApplicationDeleteKeywords
from keywords.cloud_platform.system.application.system_application_list_keywords import SystemApplicationListKeywords
from keywords.cloud_platform.system.application.system_application_remove_keywords import SystemApplicationRemoveKeywords
from keywords.cloud_platform.system.application.system_application_show_keywords import SystemApplicationShowKeywords
from keywords.cloud_platform.system.application.system_application_upload_keywords import SystemApplicationUploadKeywords
from keywords.k8s.raw_metrics.kubectl_get_raw_metrics_keywords import KubectlGetRawMetricsKeywords
@mark.p3
def test_deprecated_api_system_default():
"""
Test to ensure which deprecated APIs are present in cloud platform.
This test will fail if any unexpected deprecated API is found.
kubectl get --raw /metrics | grep apiserver_requested_deprecated_apis{}
Test Steps:
Step 1: Execute the command to get the raw metrics from the Kubernetes API server.
Step 2: Use grep to filter out the deprecated APIs.
Step 3: Check if any deprecated APIs are found in the output.
Step 4: If any deprecated APIs are found, the test will fail.
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
get_logger().log_test_case_step("Get deprecated APIs from the Kubernetes API server")
# these groups : ["helm.toolkit.fluxcd.io", "source.toolkit.fluxcd.io"]
# are expected to be removed in the future, so we check if they are not present
deprecated_api_output = KubectlGetRawMetricsKeywords(ssh_connection).is_deprecated_api_found(["helm.toolkit.fluxcd.io", "source.toolkit.fluxcd.io"])
validate_equals(deprecated_api_output, False, "Deprecated APIs validation in the Kubernetes API server output")
@mark.p3
def test_deprecated_api_system_kubevirt():
"""
Test to ensure which deprecated APIs are present in cloud platform when kubevirt is applied.
This test will fail if any unexpected deprecated API is not found.
kubectl get --raw /metrics | grep apiserver_requested_deprecated_apis
Test Steps:
Step 1: Upload and apply the kubevirt-app to the active controller.
Step 2: Execute the command to get the raw metrics from the Kubernetes API server in a system with kubevirt-app applied.
Step 3: Use grep to filter out the deprecated APIs.
Step 4: Check if any deprecated APIs are found in the output.
Step 5: If any deprecated APIs are found, the test will fail.
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
# Setups the upload input object
get_logger().log_test_case_step("Upload kubevirt-app in the active controller")
app_config = ConfigurationManager.get_app_config()
kubevirt_app_name = "kubevirt-app"
base_path = app_config.get_base_application_path()
system_application_upload_input = SystemApplicationUploadInput()
system_application_upload_input.set_app_name(kubevirt_app_name)
system_application_upload_input.set_tar_file_path(f"{base_path}{kubevirt_app_name}*.tgz")
# Uploads the app file and verifies it
SystemApplicationUploadKeywords(ssh_connection).system_application_upload(system_application_upload_input)
system_applications = SystemApplicationListKeywords(ssh_connection).get_system_application_list()
kubevirt_app_status = system_applications.get_application(kubevirt_app_name).get_status()
validate_equals(kubevirt_app_status, "uploaded", f"{kubevirt_app_name} upload status validation")
get_logger().log_test_case_step("Apply kubevirt-app in the active controller")
# Applies the app to the active controller
SystemApplicationApplyKeywords(ssh_connection).system_application_apply(kubevirt_app_name)
system_application_show_output = SystemApplicationShowKeywords(ssh_connection).validate_app_progress_contains("kubevirt-app", "completed")
validate_equals(system_application_show_output.get_status(), "applied", "kubevirt-app's status is applied")
validate_equals(system_application_show_output.get_active(), "True", "kubevirt-app's active is True")
get_logger().log_test_case_step("Get deprecated APIs from the Kubernetes API server")
expected_deprecated_output_groups = ["cdi.kubevirt.io", "kubevirt.io"]
deprecated_api_output = KubectlGetRawMetricsKeywords(ssh_connection=ssh_connection).is_deprecated_api_found(expected_deprecated_output_groups)
# Removes the application
get_logger().log_test_case_step("Remove kubevirt-app in the active controller")
system_application_remove_input = SystemApplicationRemoveInput()
system_application_remove_input.set_app_name(kubevirt_app_name)
system_application_remove_input.set_force_removal(False)
system_application_output = SystemApplicationRemoveKeywords(ssh_connection).system_application_remove(system_application_remove_input)
validate_equals(system_application_output.get_system_application_object().get_status(), SystemApplicationStatusEnum.UPLOADED.value, "Application removal status validation")
# Deletes the application
get_logger().log_test_case_step("Delete kubevirt-app in the active controller")
system_application_delete_input = SystemApplicationDeleteInput()
system_application_delete_input.set_app_name(kubevirt_app_name)
system_application_delete_input.set_force_deletion(False)
delete_msg = SystemApplicationDeleteKeywords(ssh_connection).get_system_application_delete(system_application_delete_input)
validate_equals(delete_msg, f"Application {kubevirt_app_name} deleted.\n", "Application deletion message validation")
validate_equals(deprecated_api_output, True, "Deprecated APIs validation in the Kubernetes API server output")

View File

@@ -0,0 +1,18 @@
from keywords.k8s.raw_metrics.object.kubectl_get_raw_metrics_output import KubectlGetRawMetricsOutput
default_deprecated_apis = [
'apiserver_requested_deprecated_apis{group="cdi.kubevirt.io",removed_release="",resource="objecttransfers",subresource="",version="v1beta1"} 1\n',
'apiserver_requested_deprecated_apis{group="helm.toolkit.fluxcd.io",removed_release="",resource="helmreleases",subresource="",version="v2beta1"} 1\n',
]
def test_raw_metrics_output():
"""
Test all methods in KubectlGetRawMetricsOutput class.
"""
deprecated_api_default = KubectlGetRawMetricsOutput(default_deprecated_apis)
deprecated_api_default_obj_list = deprecated_api_default.get_raw_metrics()
for deprecated_api_default_obj in deprecated_api_default_obj_list:
assert deprecated_api_default_obj.get_metric() == "apiserver_requested_deprecated_apis"
assert deprecated_api_default_obj.get_labels() == {"group": "cdi.kubevirt.io", "removed_release": "", "resource": "objecttransfers", "subresource": "", "version": "v1beta1"} or {"group": "helm.toolkit.fluxcd.io", "removed_release": "", "resource": "helmreleases", "subresource": "", "version": "v2beta1"}
assert deprecated_api_default_obj.get_value() == "1"