Sw-deploy orchestration test case

This commit adds the new sw-deploy orchestrator test case
and its dependencies.

Test Plan:

PASS: Deploy a patch to the controllers and verify that the
subcloud is out-of-sync. Run the test case against the dc and
verify that the test finishes successfully.

Change-Id: Ifc0053653f2a9b1e1459cdbe842f20694b9c8c8a
Signed-off-by: Gustavo Pereira <gustavo.lyrapereira@windriver.com>
This commit is contained in:
Gustavo Pereira
2025-04-03 19:50:39 -03:00
parent a190fd6dbf
commit 084e2b9dcf
5 changed files with 284 additions and 1 deletions

View File

@@ -0,0 +1,82 @@
from config.configuration_manager import ConfigurationManager
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_show_keywords import DcManagerSubcloudShowKeywords
class DcmanagerSubcloudPrestage(BaseKeyword):
"""
This class executes subcloud prestage command
"""
def __init__(self, ssh_connection: SSHConnection) -> None:
"""
Constructor.
Args:
ssh_connection (SSHConnection): The SSH connection object used for executing commands.
"""
self.ssh_connection = ssh_connection
def dcmanager_subcloud_prestage(self, subcloud_name: str, syspass: str) -> None:
"""
Runs dcmanager subcloud prestage command.
Args:
subcloud_name (str): The name of the subcloud to check.
syspass (str): The sysadmin password to be passed to the command.
"""
lab_config = ConfigurationManager.get_lab_config()
pasw = lab_config.get_admin_credentials().get_password()
command = source_openrc(f"dcmanager subcloud prestage --for-sw-deploy {subcloud_name}" f" --sysadmin-password {pasw}")
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
# Wait for the subcloud to acheive 'prestaging-packages' status
self.wait_for_prestage(prestaging_packages=True, subcloud=subcloud_name, check_interval=2, timeout=10)
# Wait for the subcloud to complete prestage operation
self.wait_for_prestage(subcloud=subcloud_name)
def wait_for_prestage(
self,
subcloud: str,
prestaging_packages: bool = False,
check_interval: int = 30,
timeout: int = 120,
) -> None:
"""
Waits for the prestage verification to be completed.
Args:
subcloud (str): Subcloud name.
prestaging_packages (bool): Sets the value to check if should wait for prestaging-packages status or complete status.
check_interval (int): Interval to wait before looping again.
timeout (int): Sets the await timeout.
"""
def check_prestage() -> bool:
"""
Checks if the prestage has been completed.
Returns:
bool: If prestage status is correct.
"""
prestaged = DcManagerSubcloudShowKeywords(self.ssh_connection).get_dcmanager_subcloud_show(subcloud).get_dcmanager_subcloud_show_object().get_prestage_status()
if prestaging_packages:
return prestaged == "prestaging-packages"
else:
return prestaged == "complete"
validate_equals_with_retry(
function_to_execute=check_prestage,
expected_value=True,
validation_description=f"Waiting for {subcloud} prestage.",
timeout=timeout,
polling_sleep_time=check_interval,
)

View File

@@ -0,0 +1,97 @@
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.dcmanager.dcmanager_strategy_step_keywords import DcmanagerStrategyStepKeywords
class DcmanagerSwDeployStrategy(BaseKeyword):
"""
This class executes sw-deploy-strategy commands
"""
def __init__(self, ssh_connection: SSHConnection) -> None:
"""
Constructor
Args:
ssh_connection (SSHConnection): The SSH connection object used for executing commands.
"""
self.ssh_connection = ssh_connection
def dcmanager_sw_deploy_strategy_create(self, subcloud_name: str, sw_version: str):
"""
Runs dcmanager sw-deploy-strategy create command.
Args:
subcloud_name (str): The subcloud name.
sw_version (str): The software version to be deployed.
"""
command = source_openrc(f"dcmanager sw-deploy-strategy create {subcloud_name} {sw_version}")
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
self.wait_sw_deployment(subcloud=subcloud_name, expected_status="initial")
def dcmanager_sw_deploy_strategy_apply(self, subcloud_name: str):
"""
Runs dcmanager sw-deploy-strategy apply command.
Args:
subcloud_name (str): The subcloud name.
"""
command = source_openrc("dcmanager sw-deploy-strategy apply")
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
self.wait_sw_deployment(subcloud=subcloud_name, expected_status="complete", timeout=600, check_interval=30)
def check_sw_deploy_strategy_delete_output(self):
"""
Verifies dcmanager sw-deploy-strategy delete output.
"""
command = source_openrc("dcmanager sw-deploy-strategy delete")
output = self.ssh_connection.send(command)
if "Strategy with type sw-deploy doesn't exist." in "".join(output):
get_logger().log_info("No strategy to be deleted, moving on...")
return True
elif "Strategy in state deleting cannot be deleted" in "".join(output):
return False
def dcmanager_sw_deploy_strategy_delete(self):
"""
Starts sw-deploy-strategy deletion process if there is a strategy in progress and waits for its deletion.
"""
validate_equals_with_retry(function_to_execute=self.check_sw_deploy_strategy_delete_output, expected_value=True, validation_description="Waits for strategy deletion", timeout=60, polling_sleep_time=10)
def wait_sw_deployment(
self,
subcloud: str,
expected_status: str,
check_interval: int = 30,
timeout: int = 240,
) -> None:
"""
Waits for check_sw_deployment method to return True.
"""
def check_sw_deployment() -> str:
"""
Checks if the sw deployment operation has been completed, either 'create' or 'apply'.
Returns:
str: Expected status for sw deployment.
"""
sw_deployment_status = DcmanagerStrategyStepKeywords(self.ssh_connection).get_dcmanager_strategy_step_show(subcloud).get_dcmanager_strategy_step_show().get_state()
return sw_deployment_status
validate_equals_with_retry(
function_to_execute=check_sw_deployment,
expected_value=expected_status,
validation_description=f"Waiting for sw_deployment_status {expected_status}.",
timeout=timeout,
polling_sleep_time=check_interval,
)

View File

@@ -42,6 +42,24 @@ class DcManagerSubcloudListObjectFilter:
healthy_filter.set_sync(DcManagerSubcloudListSyncEnum.IN_SYNC)
return healthy_filter
@staticmethod
def get_out_of_sync_subcloud_filter():
"""
Static method to create an instance of DcManagerSubcloudListFilter.
Args: None
Returns:
DcManagerSubcloudListObjectFilter: a configuration of this filter configured as managed, online,
deploy complete, and out-of-sync.
"""
out_of_sync_filter = DcManagerSubcloudListObjectFilter()
out_of_sync_filter.set_management(DcManagerSubcloudListManagementEnum.MANAGED)
out_of_sync_filter.set_availability(DcManagerSubcloudListAvailabilityEnum.ONLINE)
out_of_sync_filter.set_deploy_status(DcManagerSubcloudListDeployEnum.COMPLETE)
out_of_sync_filter.set_sync(DcManagerSubcloudListSyncEnum.OUT_OF_SYNC)
return out_of_sync_filter
def get_id(self) -> str:
"""
Getter for the filter by Subcloud Id.

View File

@@ -186,4 +186,28 @@ class DcManagerSubcloudListOutput:
dcmanager_subcloud_list_object_filter.set_name(subcloud_name)
subclouds = self.get_dcmanager_subcloud_list_objects_filtered(dcmanager_subcloud_list_object_filter)
return bool(subclouds)
return bool(subclouds)
def get_lower_id_async_subcloud(self) -> DcManagerSubcloudListObject:
""""
Gets an instance of DcManagerSubcloudListObject with the lowest ID and that satisfies the criteria:
_ Managed;
_ Online;
_ Deploy completed;
_ out-of-sync
Returns:
DcManagerSubcloudListObject: the instance of DcManagerSubcloudListObject with the lowest ID that satisfies
the above criteria.
"""
dcmanager_subcloud_list_obj_filter = DcManagerSubcloudListObjectFilter.get_out_of_sync_subcloud_filter()
subclouds = self.get_dcmanager_subcloud_list_objects_filtered(
dcmanager_subcloud_list_obj_filter)
if not subclouds:
error_message = "In this DC system, there is no subcloud managed, online, deploy completed, and out-of-sync."
get_logger().log_exception(error_message)
raise ValueError(error_message)
return min(subclouds, key=lambda subcloud: int(subcloud.get_id()))

View File

@@ -0,0 +1,62 @@
from pytest import fail, mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals
from keywords.cloud_platform.dcmanager.dcmanager_strategy_step_keywords import DcmanagerStrategyStepKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_prestage import DcmanagerSubcloudPrestage
from keywords.cloud_platform.dcmanager.dcmanager_sw_deploy_strategy_keywords import DcmanagerSwDeployStrategy
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.upgrade.software_list_keywords import SoftwareListKeywords
@mark.p2
@mark.lab_has_subcloud
def test_patch_apply(request):
"""
Verify patch application on subcloud
Test Steps:
- Prestage the subcloud with 25.09.1
- Create the sw deploy strategy for 25.09.1
- Apply strategy steps to subcloud
"""
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
# Gets the lowest subcloud (the subcloud with the lowest id).
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
lowest_subcloud = dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_lower_id_async_subcloud()
subcloud_name = lowest_subcloud.get_name()
# Gets the lowest subcloud sysadmin password needed for backup creation.
lab_config = ConfigurationManager.get_lab_config().get_subcloud(subcloud_name)
subcloud_password = lab_config.get_admin_credentials().get_password()
sw_release = SoftwareListKeywords(central_ssh).get_software_list().get_release_name_by_state("deployed")
latest_deployed_release = max(sw_release)
if len(sw_release) <= 1:
fail("Only one release in system controller, lab must have at least two releases deployed.")
# Attempt sw-deploy-strategy delete to prevent sw-deploy-strategy create failure.
DcmanagerSwDeployStrategy(central_ssh).dcmanager_sw_deploy_strategy_delete()
# Prestage the subcloud with the latest software deployed in the controller
get_logger().log_info(f"Prestage {subcloud_name} with {sw_release}.")
DcmanagerSubcloudPrestage(central_ssh).dcmanager_subcloud_prestage(subcloud_name=subcloud_name, syspass=subcloud_password)
# Create software deploy strategy
get_logger().log_info(f"Create sw-deploy strategy for {subcloud_name}.")
DcmanagerSwDeployStrategy(central_ssh).dcmanager_sw_deploy_strategy_create(subcloud_name=subcloud_name, sw_version=latest_deployed_release)
# Apply the previously created strategy
get_logger().log_info(f"Apply strategy for {subcloud_name}.")
DcmanagerSwDeployStrategy(central_ssh).dcmanager_sw_deploy_strategy_apply(subcloud_name=subcloud_name)
strategy_status = DcmanagerStrategyStepKeywords(central_ssh).get_dcmanager_strategy_step_show(subcloud_name).get_dcmanager_strategy_step_show().get_state()
# Verify that the strategy was applied correctly
validate_equals(strategy_status, "complete", f"Software deploy completed successfully for subcloud {subcloud_name}.")