Kubernetes upgrade test case

Test case for kubernetes upgrade with two cases,
upgrade directly to the central cloud version and
upgrade versions one by one.

Change-Id: Ic2a3d92923a34e4874b7465e6b5603b1d3064ba2
Signed-off-by: Gustavo Pereira <gustavo.lyrapereira@windriver.com>
This commit is contained in:
Gustavo Pereira
2025-08-08 10:47:06 -03:00
parent f9c1198bb2
commit 2be4cf65ad
8 changed files with 469 additions and 0 deletions

View File

@@ -0,0 +1,107 @@
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.dcmanager.objects.dcmanager_kube_strategy_step_show_output import DcmanagerKubeStrategyStepShowOutput
class DcmanagerKubeStrategyKeywords(BaseKeyword):
"""
This class executes kube-upgrade-strategy commands
"""
def __init__(self, ssh_connection: SSHConnection) -> None:
"""
Constructor
Args:
ssh_connection (SSHConnection): The SSH connection object used for executing commands.
"""
self.ssh_connection = ssh_connection
def wait_kube_upgrade(
self,
expected_status: str,
check_interval: int = 30,
timeout: int = 240,
) -> None:
"""
Waits for kube upgrade method to return True.
"""
def check_kube_deployment() -> str:
"""
Checks if the kube upgrade operation has been completed, either 'initial' or 'apply'.
Returns:
str: Expected status for kube deployment.
"""
kube_deployment_status = self.get_dcmanager_kube_strategy_step_show().get_dcmanager_kube_strategy_step_show().get_state()
return kube_deployment_status
validate_equals_with_retry(
function_to_execute=check_kube_deployment,
expected_value=expected_status,
validation_description=f"Waiting for sw_deployment_status {expected_status}.",
timeout=timeout,
polling_sleep_time=check_interval,
)
def dcmanager_kube_upgrade_strategy_create(self, subcloud: str, kube_version: str) -> DcmanagerKubeStrategyStepShowOutput:
"""
Kube-strategy create.
Args:
subcloud (str): Subcloud name.
kube_version (str): Kubernetes version to be upgraded to.
Returns:
DcmanagerKubeStrategyStepShowOutput: An object containing details of the kubernetes strategy .
"""
command = source_openrc(f"dcmanager kube-upgrade-strategy create {subcloud} --to-version {kube_version}")
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
self.wait_kube_upgrade(expected_status="initial")
return DcmanagerKubeStrategyStepShowOutput(output)
def dcmanager_kube_upgrade_strategy_apply(self) -> DcmanagerKubeStrategyStepShowOutput:
"""
Kube-strategy apply.
Returns:
DcmanagerKubeStrategyStepShowOutput: An object containing details of the kubernetes strategy .
"""
command = source_openrc("dcmanager kube-upgrade-strategy apply")
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
self.wait_kube_upgrade(expected_status="complete")
return DcmanagerKubeStrategyStepShowOutput(output)
def get_dcmanager_kube_strategy_step_show(self) -> DcmanagerKubeStrategyStepShowOutput:
"""
Gets the dcmanager kube-upgrade-strategy show.
Returns:
DcmanagerKubeStrategyStepShowOutput: An object containing details of the strategy step.
"""
command = source_openrc("dcmanager kube-upgrade-strategy show")
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
return DcmanagerKubeStrategyStepShowOutput(output)
def dcmanager_kube_strategy_delete(self) -> DcmanagerKubeStrategyStepShowOutput:
"""
Delete the kubernetes upgrade strategy.
Returns:
DcmanagerKubeStrategyStepShowOutput: An object containing details of
the kubernetes strategy .
"""
command = source_openrc("dcmanager kube-upgrade-strategy delete.")
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
return DcmanagerKubeStrategyStepShowOutput(output)

View File

@@ -0,0 +1,19 @@
from typing import Optional
class DcmanagerKubeStrategyStepObject:
"""
This class represents a dcmanager kube-strategy-step as an object.
"""
def __init__(self) -> None:
"""Initializes a DcmanagerKubeStrategyStepObject with default values."""
self.state: Optional[str] = None
def set_state(self, state: str):
"""Sets the state of the kube-strategy-step."""
self.state = state
def get_state(self) -> Optional[str]:
"""Gets the state of the strategy-step."""
return self.state

View File

@@ -0,0 +1,59 @@
from typing import Dict
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.dcmanager.dcmanager_vertical_table_parser import DcManagerVerticalTableParser
from keywords.cloud_platform.dcmanager.objects.dcmanager_kube_strategy_step_show_object import DcmanagerKubeStrategyStepObject
class DcmanagerKubeStrategyStepShowOutput:
"""
Parses the output of the 'dcmanager kube-strategy-step show' command into a DcmanagerKubeStrategyStepObject instance.
"""
def __init__(self, kube_strategy: str) -> None:
"""
Initializes DcmanagerKubeStrategyStepShowOutput.
Args:
kube_strategy (str): Output of the 'dcmanager strategy-step show' command.
Raises:
KeywordException: If the output format is invalid.
"""
dc_vertical_table_parser = DcManagerVerticalTableParser(kube_strategy)
output_values = dc_vertical_table_parser.get_output_values_dict()
if self.is_valid_output(output_values):
self.dcmanager_kube_strategy_step = DcmanagerKubeStrategyStepObject()
self.dcmanager_kube_strategy_step.set_state(output_values["state"])
else:
raise KeywordException(f"The output line {output_values} was not valid")
def get_dcmanager_kube_strategy_step_show(self) -> DcmanagerKubeStrategyStepObject:
"""
Retrieves the parsed dcmanager strategy-step show object.
Returns:
DcmanagerKubeStrategyStepObject: The parsed dcmanager kube-strategy-step show object.
"""
return self.dcmanager_kube_strategy_step
@staticmethod
def is_valid_output(value: Dict[str, str]) -> bool:
"""
Checks if the output contains all the required fields.
Args:
value (Dict[str, str]): The dictionary of output values.
Returns:
bool: True if all required fields are present, False otherwise.
"""
required_fields = ["strategy type", "subcloud apply type", "max parallel subclouds", "stop on failure", "state", "created_at", "updated_at"]
for field in required_fields:
if field not in value:
get_logger().log_error(f"{field} is not in the output value")
return False
return True

View File

@@ -0,0 +1,55 @@
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.kubernetes.object.kubernetes_version_list_output import KubernetesVersionListOutput
class SystemKubernetesListKeywords(BaseKeyword):
"""
This class contains all the keywords related to the 'system kube-version-list' command.
"""
def __init__(self, ssh_connection: SSHConnection):
"""Constructor
Args:
ssh_connection (SSHConnection): ssh object
"""
self.ssh_connection = ssh_connection
def get_system_kube_version_list(self) -> KubernetesVersionListOutput:
"""Gets the 'system kube-version-list' output.
Returns:
KubernetesVersionListOutput: a KubernetesVersionListOutput object representing
the output of the command 'dcmanager kube-version-list list'.
"""
output = self.ssh_connection.send(source_openrc("system kube-version-list"))
self.validate_success_return_code(self.ssh_connection)
kubernetes_version_list_output = KubernetesVersionListOutput(output)
return kubernetes_version_list_output
def get_kubernetes_versions_by_state(self, state: str) -> list:
"""
Retrieves the kubernetes available versions filtered by state.
Args:
state (str): Desires kubernetes version state.
Returns:
list: List of kubernetes versions.
"""
return self.get_system_kube_version_list().get_version_by_state(state)
def get_all_kubernetes_versions(self) -> list:
"""
Retrieves all kubernetes versions.
Returns:
list: List of kubernetes versions.
"""
return self.get_system_kube_version_list().get_kubernetes_version()

View File

@@ -0,0 +1,43 @@
class KubernetesVersionListObject:
"""KubernetesVersionListObject.
This class represents a kube-version-list summary as an object.
This is typically a line in the 'dcmanager kube-version-list' command output table, as shown below.
+---------+--------+-------------+
| version | target | state |
+---------+--------+-------------+
| v1.29.2 | False | unavailable |
| v1.30.6 | False | unavailable |
| v1.31.5 | False | unavailable |
| v1.32.2 | True | active |
| v1.33.0 | False | available |
+---------+--------+-------------+
"""
def __init__(self, version: str, target: bool, state: str):
"""
Constructor
Args:
version (str): kubernetes version.
target (bool): kubernetes target version.
state (str): kubernetes version state.
"""
self.version = version
self.target = target
self.state = state
def get_version(self) -> str:
"""
Getter for kubernetes version
"""
return self.version
def get_state(self):
"""
Getter for kubernetes state
"""
return self.state

View File

@@ -0,0 +1,76 @@
from typing import Dict, List
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.system.kubernetes.object.kubernetes_version_list_object import KubernetesVersionListObject
from keywords.cloud_platform.system.system_table_parser import SystemTableParser
class KubernetesVersionListOutput:
"""
Represents the output of 'system kube-version-list' command as a list of KubernetesVersionListObject objects.
"""
def __init__(self, system_kubernetes_version_list_output: list[str]) -> None:
"""
Constructor
Args:
system_kubernetes_version_list_output (list[str]): output of 'system kube-version-list' command
"""
self.k8s_version_list: [KubernetesVersionListObject] = []
system_table_parser = SystemTableParser(system_kubernetes_version_list_output)
self.output_values = system_table_parser.get_output_values_list()
for value in self.output_values:
if self.is_valid_output(value):
self.k8s_version_list.append(
KubernetesVersionListObject(
value["version"],
value["target"],
value["state"],
)
)
else:
raise KeywordException(f"The output line {value} was not valid")
def get_kubernetes_version(self) -> list:
"""
Returns the list of kubernetes version objects
"""
k8s_versions = [k8s["version"] for k8s in self.output_values]
return k8s_versions
def get_version_by_state(self, state: str) -> List[str]:
"""
Gets the kubernetes version by state.
Args:
state (str): the version desired state.
Returns:
List[str]: The kubernetes version list.
"""
k8s_versions = [entry["version"] for entry in self.output_values if entry["state"] == state]
if not k8s_versions:
raise KeywordException(f"No version with state {state} was found.")
return k8s_versions
@staticmethod
def is_valid_output(value: Dict[str, str]) -> bool:
"""
Checks if the output contains all the required fields.
Args:
value (Dict[str, str]): The dictionary of output values.
Returns:
bool: True if all required fields are present, False otherwise.
"""
required_keys = ["version", "target", "state"]
for key in required_keys:
if key not in value:
get_logger().log_error(f"{key} is not in the output value: {value}")
return False
return True

View File

@@ -0,0 +1,60 @@
from pytest import fail, mark
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals
from keywords.cloud_platform.dcmanager.dcmanager_kube_deploy_strategy_keywords import DcmanagerKubeStrategyKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.kubernetes.kubernetes_version_list_keywords import SystemKubernetesListKeywords
@mark.p2
@mark.lab_has_subcloud
def test_kube_upgrade_one_by_one(request):
"""
Verify subcloud kubernetes upgrade to central
cloud kubernetes version.
Test Steps:
- Create the kubernetes strategy for the
central cloud kubernetes active version.
- Apply strategy.
"""
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
# Gets the lowest subcloud (the subcloud with the lowest id).
subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
lowest_subcloud = subcloud_list_keywords.get_dcmanager_subcloud_list().get_lower_id_async_subcloud()
subcloud_name = lowest_subcloud.get_name()
# get subcloud ssh
subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name)
central_k8s_active_version = SystemKubernetesListKeywords(central_ssh).get_kubernetes_versions_by_state("active")[0]
central_versions = SystemKubernetesListKeywords(central_ssh).get_all_kubernetes_versions()
remote_k8s_active_version = SystemKubernetesListKeywords(subcloud_ssh).get_kubernetes_versions_by_state("active")[0]
if remote_k8s_active_version >= central_k8s_active_version:
fail("Subcloud k8s version is equal or higher than central cloud version.")
step_list = []
for version in central_versions:
if version == remote_k8s_active_version:
continue
if version == central_k8s_active_version:
step_list.append(version)
break
step_list.append(version)
get_logger().log_info(f"Central cloud active kubernetes version: {central_k8s_active_version}, subcloud kubernetes active version: {remote_k8s_active_version}")
for k8s_version in step_list:
get_logger().log_info(f"Upgrading subcloud {subcloud_name} kubernetes to: {k8s_version}")
DcmanagerKubeStrategyKeywords(central_ssh).dcmanager_kube_upgrade_strategy_create(subcloud=subcloud_name, kube_version=k8s_version)
DcmanagerKubeStrategyKeywords(central_ssh).dcmanager_kube_upgrade_strategy_apply()
DcmanagerKubeStrategyKeywords(central_ssh).dcmanager_kube_strategy_delete()
current_subcloud_k8s_version = SystemKubernetesListKeywords(subcloud_ssh).get_kubernetes_versions_by_state("active")[0]
validate_equals(central_k8s_active_version, current_subcloud_k8s_version, "Validate that subcloud and centra cloud has the same kubernetes version.")

View File

@@ -0,0 +1,50 @@
from pytest import fail, mark
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals
from keywords.cloud_platform.dcmanager.dcmanager_kube_deploy_strategy_keywords import DcmanagerKubeStrategyKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.kubernetes.kubernetes_version_list_keywords import SystemKubernetesListKeywords
@mark.p2
@mark.lab_has_subcloud
def test_kube_upgrade_to_active_version(request):
"""
Verify subcloud kubernetes upgrade to central
cloud kubernetes version one by one.
Test Steps:
- Create the kubernetes strategy upgrade
for N+1 subcloud version available in
the central cloud.
- Apply strategy.
- Loop over the versions until reaching
central cloud active kubernetes version.
"""
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
# Gets the lowest subcloud (the subcloud with the lowest id).
subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
lowest_subcloud = subcloud_list_keywords.get_dcmanager_subcloud_list().get_lower_id_async_subcloud()
subcloud_name = lowest_subcloud.get_name()
# get subcloud ssh
subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name)
central_k8s_active_version = SystemKubernetesListKeywords(central_ssh).get_kubernetes_versions_by_state("active")[0]
remote_k8s_active_version = SystemKubernetesListKeywords(subcloud_ssh).get_kubernetes_versions_by_state("active")[0]
get_logger().log_info(f"Central cloud active kubernetes version: {central_k8s_active_version}, subcloud kubernetes active version: {remote_k8s_active_version}")
if remote_k8s_active_version >= central_k8s_active_version:
fail("Subcloud k8s version is equal or higher than central cloud version.")
get_logger().log_info(f"Upgrading subcloud kubernetes to {central_k8s_active_version}...")
DcmanagerKubeStrategyKeywords(central_ssh).dcmanager_kube_upgrade_strategy_create(subcloud=subcloud_name, kube_version=central_k8s_active_version)
DcmanagerKubeStrategyKeywords(central_ssh).dcmanager_kube_upgrade_strategy_apply()
DcmanagerKubeStrategyKeywords(central_ssh).dcmanager_kube_strategy_delete()
current_subcloud_k8s_version = SystemKubernetesListKeywords(subcloud_ssh).get_kubernetes_versions_by_state("active")[0]
validate_equals(central_k8s_active_version, current_subcloud_k8s_version, "Validate that subcloud and centra cloud has the same kubernetes version.")