USM upload test foundation and config cleanup

This change finalizes the foundation layer for USM upgrade and patch upload
test automation. It includes the following updates:

- Refactor USMConfig to remove unused extra_attributes and simplify
  internal variable handling.
- Enforce direct parsing and validation of required fields (e.g., release
  ID, ISO/SIG/PATCH paths, remote credentials).
- Rename test file staging directory from `"upload_test"` to
  `"usm_test"` for clarity.
- Add `__str__` and `__repr__` to `SoftwareListObject` and
  `SoftwareListOutput` for improved log and debug output.
- Clean up docstrings in `SoftwareListObject` and `SoftwareListOutput`,
  ensuring consistent formatting for output parsing.
- Add test module `test_usm_upload.py`, which validates:
  - Config parsing.
  - Uploading releases and patches.
  - Polling software state via software show.

This forms the foundational structure for future USM E2E coverage,
including deploy, rollback, and patch chaining workflows.

Future Work:
- Integrate file copy from remote source using rsync.
- Expand to USM deployment and rollback flows.

Change-Id: I0af37b3803fedd7824e2e395097c3a6236cb1cac
Signed-off-by: Andrew Vaillancourt <andrew.vaillancourt@windriver.com>
This commit is contained in:
Andrew Vaillancourt
2025-05-05 23:45:01 -04:00
parent 4d97d40147
commit 50317dfaec
8 changed files with 411 additions and 116 deletions

View File

@@ -12,8 +12,8 @@
// Required if usm_operation_type = "upgrade"
// If copy_from_remote=true, these must be full remote paths
// If copy_from_remote=false, these are assumed to already exist on the controller
"iso_path": "/home/sysadmin/starlingx-10.0.0.iso",
"sig_path": "/home/sysadmin/starlingx-10.0.0.sig",
"iso_path": "/home/sysadmin/usm_test/starlingx-10.0.0.iso",
"sig_path": "/home/sysadmin/usm_test/starlingx-10.0.0.sig",
// Patch file paths
// Required if usm_operation_type = "patch"
@@ -29,7 +29,7 @@
"patch_dir": "",
// Destination directory on the controller where files will be copied
"dest_dir": "/home/sysadmin/upload_test/",
"dest_dir": "/home/sysadmin/usm_test/",
// Expected release IDs to validate a successful software upload or patching.
//
@@ -53,10 +53,6 @@
// Example: "--force"
"upgrade_arguments": "",
// Extra attributes for future workflows (e.g., patch staging, etc.)
"extra_attributes": {
// Example: "staged": "true"
},
// Polling configuration for checking upload success
"upload_poll_interval_sec": 30,

View File

@@ -25,22 +25,21 @@ class USMConfig:
except FileNotFoundError:
raise FileNotFoundError(f"Could not find USM config file: {config_path}")
self._usm_operation_type = usm_dict.get("usm_operation_type")
self._requires_reboot = usm_dict.get("requires_reboot")
self._copy_from_remote = usm_dict.get("copy_from_remote")
self._iso_path = usm_dict.get("iso_path")
self._sig_path = usm_dict.get("sig_path")
self._patch_path = usm_dict.get("patch_path")
self._patch_dir = usm_dict.get("patch_dir")
self._dest_dir = usm_dict.get("dest_dir")
self._to_release_ids = usm_dict.get("to_release_ids")
self._remote_server = usm_dict.get("remote_server")
self._remote_server_username = usm_dict.get("remote_server_username")
self._remote_server_password = usm_dict.get("remote_server_password")
self._upgrade_arguments = usm_dict.get("upgrade_arguments")
self._extra_attributes = usm_dict.get("extra_attributes")
self._upload_poll_interval_sec = usm_dict.get("upload_poll_interval_sec")
self._upload_timeout_sec = usm_dict.get("upload_timeout_sec")
self.usm_operation_type = usm_dict.get("usm_operation_type")
self.requires_reboot = usm_dict.get("requires_reboot")
self.copy_from_remote = usm_dict.get("copy_from_remote")
self.iso_path = usm_dict.get("iso_path")
self.sig_path = usm_dict.get("sig_path")
self.patch_path = usm_dict.get("patch_path")
self.patch_dir = usm_dict.get("patch_dir")
self.dest_dir = usm_dict.get("dest_dir")
self.to_release_ids = usm_dict.get("to_release_ids")
self.remote_server = usm_dict.get("remote_server")
self.remote_server_username = usm_dict.get("remote_server_username")
self.remote_server_password = usm_dict.get("remote_server_password")
self.upgrade_arguments = usm_dict.get("upgrade_arguments")
self.upload_poll_interval_sec = usm_dict.get("upload_poll_interval_sec")
self.upload_timeout_sec = usm_dict.get("upload_timeout_sec")
self.validate_config()
@@ -57,22 +56,22 @@ class USMConfig:
Raises:
ValueError: If any config field is missing or inconsistent.
"""
if self._usm_operation_type not in ("upgrade", "patch"):
if self.usm_operation_type not in ("upgrade", "patch"):
raise ValueError("Invalid usm_operation_type: must be 'upgrade' or 'patch'")
if not isinstance(self._to_release_ids, list) or not self._to_release_ids:
if not isinstance(self.to_release_ids, list) or not self.to_release_ids:
raise ValueError("to_release_ids must be a non-empty list")
if self._copy_from_remote:
if not (self._remote_server and self._remote_server_username and self._remote_server_password):
if self.copy_from_remote:
if not (self.remote_server and self.remote_server_username and self.remote_server_password):
raise ValueError("Remote server credentials required when copy_from_remote is true")
if self._usm_operation_type == "upgrade":
if not self._iso_path or not self._sig_path:
if self.usm_operation_type == "upgrade":
if not self.iso_path or not self.sig_path:
raise ValueError("Upgrade requires source_iso_path and source_sig_path")
if self._usm_operation_type == "patch":
if not self._patch_path and not self._patch_dir:
if self.usm_operation_type == "patch":
if not self.patch_path and not self.patch_dir:
raise ValueError("Patch requires either patch_path or patch_dir")
def get_usm_operation_type(self) -> str:
@@ -81,7 +80,7 @@ class USMConfig:
Returns:
str: Either "upgrade" or "patch".
"""
return self._usm_operation_type
return self.usm_operation_type
def set_usm_operation_type(self, value: str) -> None:
"""Set the USM operation type.
@@ -89,7 +88,7 @@ class USMConfig:
Args:
value (str): Either "upgrade" or "patch".
"""
self._usm_operation_type = value
self.usm_operation_type = value
def get_requires_reboot(self) -> bool:
"""Get whether a reboot is required after operation.
@@ -97,7 +96,7 @@ class USMConfig:
Returns:
bool: True if a reboot is required.
"""
return self._requires_reboot
return self.requires_reboot
def set_requires_reboot(self, value: bool) -> None:
"""Set whether a reboot is required after operation.
@@ -105,7 +104,7 @@ class USMConfig:
Args:
value (bool): True if reboot is required.
"""
self._requires_reboot = value
self.requires_reboot = value
def get_copy_from_remote(self) -> bool:
"""Check if files should be copied from a remote server.
@@ -113,7 +112,7 @@ class USMConfig:
Returns:
bool: True if ISO/SIG or patch files should be pulled from a remote build server.
"""
return self._copy_from_remote
return self.copy_from_remote
def set_copy_from_remote(self, value: bool) -> None:
"""Specify whether to copy files from a remote build server.
@@ -121,7 +120,7 @@ class USMConfig:
Args:
value (bool): True to copy files from remote, False if they already exist on the controller.
"""
self._copy_from_remote = value
self.copy_from_remote = value
def get_iso_path(self) -> str:
"""Get the path to the ISO file.
@@ -129,7 +128,7 @@ class USMConfig:
Returns:
str: Absolute path to the ISO file for upgrade.
"""
return self._iso_path
return self.iso_path
def set_iso_path(self, value: str) -> None:
"""Set the path to the ISO file.
@@ -137,7 +136,7 @@ class USMConfig:
Args:
value (str): Absolute path to the ISO file.
"""
self._iso_path = value
self.iso_path = value
def get_sig_path(self) -> str:
"""Get the path to the signature file.
@@ -145,7 +144,7 @@ class USMConfig:
Returns:
str: Absolute path to the SIG file.
"""
return self._sig_path
return self.sig_path
def set_sig_path(self, value: str) -> None:
"""Set the path to the signature file.
@@ -153,7 +152,7 @@ class USMConfig:
Args:
value (str): Absolute path to the SIG file.
"""
self._sig_path = value
self.sig_path = value
def get_patch_path(self) -> str:
"""Get the path to a single patch file.
@@ -161,7 +160,7 @@ class USMConfig:
Returns:
str: Absolute path to a single .patch file.
"""
return self._patch_path
return self.patch_path
def set_patch_path(self, value: str) -> None:
"""Set the path to a single patch file.
@@ -169,7 +168,7 @@ class USMConfig:
Args:
value (str): Absolute path to a single .patch file.
"""
self._patch_path = value
self.patch_path = value
def get_patch_dir(self) -> str:
"""Get the path to a patch directory.
@@ -177,7 +176,7 @@ class USMConfig:
Returns:
str: Directory containing multiple .patch files.
"""
return self._patch_dir
return self.patch_dir
def set_patch_dir(self, value: str) -> None:
"""Set the path to a patch directory.
@@ -185,7 +184,7 @@ class USMConfig:
Args:
value (str): Directory containing multiple .patch files.
"""
self._patch_dir = value
self.patch_dir = value
def get_dest_dir(self) -> str:
"""Get the destination directory on the controller.
@@ -193,7 +192,7 @@ class USMConfig:
Returns:
str: Directory where ISO/SIG or patch files will be copied.
"""
return self._dest_dir
return self.dest_dir
def set_dest_dir(self, value: str) -> None:
"""Set the destination directory on the controller.
@@ -201,7 +200,7 @@ class USMConfig:
Args:
value (str): Path on controller where files will be copied.
"""
self._dest_dir = value
self.dest_dir = value
def get_to_release_ids(self) -> list[str]:
"""Get the expected release IDs.
@@ -209,7 +208,7 @@ class USMConfig:
Returns:
list[str]: List of release versions used to validate success.
"""
return self._to_release_ids
return self.to_release_ids
def set_to_release_ids(self, value: list[str]) -> None:
"""Set the expected release IDs.
@@ -217,7 +216,7 @@ class USMConfig:
Args:
value (list[str]): One or more release version strings.
"""
self._to_release_ids = value
self.to_release_ids = value
def get_remote_server(self) -> str:
"""Get the remote server address.
@@ -225,7 +224,7 @@ class USMConfig:
Returns:
str: Hostname or IP of the remote server.
"""
return self._remote_server
return self.remote_server
def set_remote_server(self, value: str) -> None:
"""Set the remote server address.
@@ -233,7 +232,7 @@ class USMConfig:
Args:
value (str): Hostname or IP of the remote server.
"""
self._remote_server = value
self.remote_server = value
def get_remote_server_username(self) -> str:
"""Get the remote server username.
@@ -241,7 +240,7 @@ class USMConfig:
Returns:
str: Username for authenticating with the remote server.
"""
return self._remote_server_username
return self.remote_server_username
def set_remote_server_username(self, value: str) -> None:
"""Set the remote server username.
@@ -249,7 +248,7 @@ class USMConfig:
Args:
value (str): Username for authenticating with the remote server.
"""
self._remote_server_username = value
self.remote_server_username = value
def get_remote_server_password(self) -> str:
"""Get the remote server password.
@@ -257,7 +256,7 @@ class USMConfig:
Returns:
str: Password for authenticating with the remote server.
"""
return self._remote_server_password
return self.remote_server_password
def set_remote_server_password(self, value: str) -> None:
"""Set the remote server password.
@@ -265,7 +264,7 @@ class USMConfig:
Args:
value (str): Password for authenticating with the remote server.
"""
self._remote_server_password = value
self.remote_server_password = value
def get_upgrade_arguments(self) -> str:
"""Get optional CLI arguments for upload or upgrade.
@@ -273,7 +272,7 @@ class USMConfig:
Returns:
str: Extra CLI flags like "--force".
"""
return self._upgrade_arguments
return self.upgrade_arguments
def set_upgrade_arguments(self, value: str) -> None:
"""Set optional CLI arguments for upload or upgrade.
@@ -281,23 +280,7 @@ class USMConfig:
Args:
value (str): Extra CLI flags like "--force".
"""
self._upgrade_arguments = value
def get_extra_attributes(self) -> dict:
"""Get extra user-defined attributes.
Returns:
dict: Arbitrary key-value pairs used in future workflows.
"""
return self._extra_attributes
def set_extra_attributes(self, value: dict) -> None:
"""Set extra user-defined attributes.
Args:
value (dict): Arbitrary key-value pairs for workflows like patch staging.
"""
self._extra_attributes = value
self.upgrade_arguments = value
def get_upload_poll_interval_sec(self) -> int:
"""Get polling interval for upload progress.
@@ -305,7 +288,7 @@ class USMConfig:
Returns:
int: Number of seconds between upload status checks.
"""
return self._upload_poll_interval_sec
return self.upload_poll_interval_sec
def set_upload_poll_interval_sec(self, value: int) -> None:
"""Set polling interval for upload progress.
@@ -313,7 +296,7 @@ class USMConfig:
Args:
value (int): Number of seconds between upload status checks.
"""
self._upload_poll_interval_sec = value
self.upload_poll_interval_sec = value
def get_upload_timeout_sec(self) -> int:
"""Get timeout duration for upload completion.
@@ -321,7 +304,7 @@ class USMConfig:
Returns:
int: Maximum seconds to wait for upload to complete.
"""
return self._upload_timeout_sec
return self.upload_timeout_sec
def set_upload_timeout_sec(self, value: int) -> None:
"""Set timeout duration for upload completion.
@@ -329,4 +312,4 @@ class USMConfig:
Args:
value (int): Maximum seconds to wait for upload to complete.
"""
self._upload_timeout_sec = value
self.upload_timeout_sec = value

View File

@@ -18,7 +18,7 @@ class SoftwareListObject:
Get release
Returns:
(str): release object
str: release object
"""
return self.release
@@ -28,7 +28,7 @@ class SoftwareListObject:
Get rr
Returns:
(str): rr object
str: rr object
"""
return self.rr
@@ -38,7 +38,25 @@ class SoftwareListObject:
Get state
Returns:
(str): state object
str: state object
"""
return self.state
def __str__(self) -> str:
"""
Return a readable string representation of the software list object.
Returns:
str: Formatted string of the release, RR, and state.
"""
return f"Release: {self.release}, RR: {self.rr}, State: {self.state}"
def __repr__(self) -> str:
"""
Return the developer-facing representation of the object.
Returns:
str: Object representation with class name and field values.
"""
return f"{self.__class__.__name__}(release={self.release}, rr={self.rr}, state={self.state})"

View File

@@ -1,25 +1,27 @@
"""Software List Output."""
from typing import List
from keywords.cloud_platform.system.system_table_parser import SystemTableParser
from keywords.cloud_platform.upgrade.objects.software_list_object import SoftwareListObject
class SoftwareListOutput:
"""
Parses the output of the 'software list' command into structured objects.
This class parses o/p 'software list' command into an object of
type SoftwareListObject.
This class uses SystemTableParser to convert the raw CLI output of
'software list' into a list of SoftwareListObject entries.
"""
def __init__(self, software_list_output):
def __init__(self, software_list_output: str):
"""
Constructor
Initialize and parse the software list output.
Args:
software_list_output (str): output of 'software list' command
software_list_output (str): Raw output from 'software list' command.
"""
self.software_list: SoftwareListObject = []
self.software_list: List[SoftwareListObject] = []
system_table_parser = SystemTableParser(software_list_output)
self.output_values = system_table_parser.get_output_values_list()
@@ -31,59 +33,65 @@ class SoftwareListOutput:
)
self.software_list.append(software_list_object)
def get_software_lists(self) -> list[SoftwareListObject]:
def get_software_lists(self) -> List[SoftwareListObject]:
"""
Get all software list objects.
Returns:
the list of software list objects
List[SoftwareListObject]: Parsed software entries.
"""
return self.software_list
def get_software_list_details(self):
def get_software_list_details(self) -> List[dict]:
"""
Get software list details in a list of dictionaries.
Returns:
list of software list dict
List[dict]: Parsed release table rows.
"""
return self.output_values
def get_release_name_by_state(self, state):
def get_release_name_by_state(self, state: str) -> List[str]:
"""
Get Release name of a release based in its state.
Get names of all releases with a given state.
Args:
state: State of the release.
state (str): Desired software release state (e.g., "deployed").
Returns:
list of release name.
List[str]: Matching release names.
"""
software_list_details = self.output_values
release_name = []
for j in range(len(software_list_details)):
if software_list_details[j]["State"] == state:
release_details = software_list_details[j]
release_name.append(release_details["Release"])
return release_name
return [entry["Release"] for entry in self.output_values if entry["State"] == state]
def get_release_state_by_release_name(self, release_name):
def get_release_state_by_release_name(self, release_name: str) -> str:
"""
Get the Release State based on the release name.
Get the state of a release by its name.
Args:
release_name: name of the release.
release_name (str): Software release name.
Returns:
state of the release
str: State of the release (e.g., "available", "deployed"). Empty string if not found.
"""
software_list_details = self.output_values
for j in range(len(software_list_details)):
for i in software_list_details:
if software_list_details[j]["Release"] == release_name:
release_details = software_list_details[j]
return release_details["State"]
for entry in self.output_values:
if entry["Release"] == release_name:
return entry["State"]
return ""
def __str__(self) -> str:
"""
Return a human-readable string representation of the software list.
Returns:
str: Formatted software entries as strings.
"""
return "\n".join([str(entry) for entry in self.software_list])
def __repr__(self) -> str:
"""
Return the developer-facing representation of the object.
Returns:
str: Class name and row count.
"""
return f"{self.__class__.__name__}(rows={len(self.software_list)})"

View File

@@ -39,3 +39,18 @@ class SoftwareShowOutput:
"""
software_show = self.software_show
return software_show
def get_property_value(self, property_name: str) -> str:
"""
Return the value for a given software show property (e.g., "State").
Args:
property_name (str): Property key to look up.
Returns:
str: The corresponding value, or empty string if not found.
"""
for entry in self.output_values:
if entry["Property"] == property_name:
return entry["Value"]
return ""

View File

@@ -37,3 +37,16 @@ class SoftwareShowKeywords(BaseKeyword):
software_show_output = SoftwareShowOutput(output)
return software_show_output
def get_release_state(self, release_id: str) -> str:
"""
Return the release state of a specific version using 'software show'.
Args:
release_id (str): The software release ID to show.
Returns:
str: The state string (e.g., "available", "deployed", etc.)
"""
output = self.get_software_show(sudo=True, release_id=release_id)
return output.get_property_value("State")

View File

@@ -0,0 +1,138 @@
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.upgrade.software_show_keywords import SoftwareShowKeywords
class USMKeywords(BaseKeyword):
"""
Keywords for USM software operations.
Supports: commands for upgrade and patch management.
"""
def __init__(self, ssh_connection: SSHConnection):
self.ssh_connection = ssh_connection
def upload_patch_file(self, patch_file_path: str) -> None:
"""
Upload a single patch file using 'software upload'.
Args:
patch_file_path (str): Absolute path to a .patch file.
Raises:
KeywordException: On failure to upload.
"""
get_logger().log_info(f"Uploading patch file: {patch_file_path}")
output = self.ssh_connection.send_as_sudo(f"software upload {patch_file_path}")
self.validate_success_return_code(self.ssh_connection)
get_logger().log_info("Upload completed:\n" + "\n".join(output))
def upload_patch_dir(self, patch_dir_path: str) -> None:
"""
Upload all patches in a directory using 'software upload-dir'.
Args:
patch_dir_path (str): Absolute path to a directory of .patch files.
Raises:
KeywordException: On failure to upload.
"""
get_logger().log_info(f"Uploading patch directory: {patch_dir_path}")
output = self.ssh_connection.send_as_sudo(f"software upload-dir {patch_dir_path}")
self.validate_success_return_code(self.ssh_connection)
get_logger().log_info("Upload directory completed:\n" + "\n".join(output))
def show_software_release(self, release_id: str) -> list[str]:
"""
Show info for a specific release using 'software show'.
Args:
release_id (str): ID of the release (e.g., "starlingx-10.0.0")
Returns:
list[str]: Raw output lines.
Raises:
KeywordException: If release_id is missing or the command fails.
"""
if not release_id:
raise KeywordException("Missing release ID for software show")
get_logger().log_info(f"Showing software release: {release_id}")
output = self.ssh_connection.send_as_sudo(f"software show {release_id}")
self.validate_success_return_code(self.ssh_connection)
return output
def upload_release(self, iso_path: str, sig_path: str) -> None:
"""
Upload a full software release using 'software upload'.
Args:
iso_path (str): Absolute path to the .iso file.
sig_path (str): Absolute path to the corresponding .sig file.
Raises:
KeywordException: On failure to upload.
"""
get_logger().log_info(f"Uploading software release: ISO={iso_path}, SIG={sig_path}")
cmd = f"software upload {iso_path} {sig_path}"
output = self.ssh_connection.send_as_sudo(cmd)
self.validate_success_return_code(self.ssh_connection)
get_logger().log_info("Release upload completed:\n" + "\n".join(output))
def upload_and_verify_patch_file(self, patch_file_path: str, expected_release_id: str, timeout: int, poll_interval: int) -> None:
"""Upload a patch and verify that it becomes available.
This method is used for USM patching operations. It uploads a `.patch` file
using `software upload` and polls for the corresponding release ID to appear
with state "available" using `software show`.
Args:
patch_file_path (str): Absolute path to the `.patch` file.
expected_release_id (str): Expected release ID after patch upload.
timeout (int): Maximum number of seconds to wait for the release to appear.
poll_interval (int): Interval (in seconds) between poll attempts.
Raises:
KeywordException: If upload fails or release does not become available in time.
"""
self.upload_patch_file(patch_file_path)
validate_equals_with_retry(
function_to_execute=lambda: SoftwareShowKeywords(self.ssh_connection).get_release_state(expected_release_id),
expected_value="available",
validation_description=f"Wait for patch release {expected_release_id} to become available",
timeout=timeout,
polling_sleep_time=poll_interval,
)
def upload_and_verify_release(self, iso_path: str, sig_path: str, expected_release_id: str, timeout: int, poll_interval: int) -> None:
"""Upload a software release and verify that it becomes available.
This method is used for USM upgrade operations. It uploads a `.iso` and `.sig`
pair using `software upload` and waits for the release ID to appear with
state "available" using `software show`.
Args:
iso_path (str): Absolute path to the `.iso` file.
sig_path (str): Absolute path to the `.sig` signature file.
expected_release_id (str): Expected release ID after upload.
timeout (int): Maximum number of seconds to wait for the release to appear.
poll_interval (int): Interval (in seconds) between poll attempts.
Raises:
KeywordException: If upload fails or release does not become available in time.
"""
self.upload_release(iso_path, sig_path)
validate_equals_with_retry(
function_to_execute=lambda: SoftwareShowKeywords(self.ssh_connection).get_release_state(expected_release_id),
expected_value="available",
validation_description=f"Wait for release {expected_release_id} to become available",
timeout=timeout,
polling_sleep_time=poll_interval,
)

View File

@@ -0,0 +1,124 @@
"""
Basic USM Upload Tests (Foundation Layer)
=========================================
This module provides **foundational test coverage** for the USM (Upgrade and Software Management) system,
specifically focusing on uploading software releases and patches. These tests verify that an `.iso` + `.sig`
pair (for major upgrades) or `.patch` file (for patches) can be uploaded successfully to a StarlingX controller
and reach the "available" state.
Scope:
------
These tests are **intentionally minimal** and serve as a starting point. They do not implement full
end-to-end upgrade or patch deployment flows. Their goal is to validate:
- Configuration parsing via `UsmConfig`.
- Optional remote-to-local file copy using `rsync` via `FileKeywords`.
- Uploading files via the `software upload` or `software upload-dir` CLI.
- Polling for availability using `software show`.
Key Concepts:
-------------
- The config file is parsed using `ConfigurationManager.get_usm_config()` and provides:
- ISO/SIG/PATCH paths (`get_iso_path()`, `get_sig_path()`, `get_patch_path()`).
- Destination and expected release ID (`get_to_release_ids()`).
- Upload timeouts and polling intervals.
- If `copy_from_remote` is `True`, contributors should use
`FileKeywords(ssh_connection).copy_from_remote(remote_path, local_path, ...)`
to retrieve the necessary `.iso`, `.sig`, or `.patch` files from a remote server
before calling the upload keyword. This supports workflows where files are staged
on build hosts or CI/CD artifacts servers.
How to Extend:
--------------
This module is meant to be built upon. Contributors are encouraged to:
- Validate `software list` and `software show` parsing logic.
- Chain upload -> deploy precheck -> deploy start -> deploy complete steps for full upgrade flows.
- Cover patch rollback, deploy delete, and state recovery.
Location of Supporting Logic:
-----------------------------
- Upload Keywords: `keywords/cloud_platform/upgrade/usm_keywords.py`.
- Contains `upload_patch_file()`, `upload_release()`, and verification methods that call `software show`.
- Release State Polling: `keywords/cloud_platform/upgrade/software_show_keywords.py`.
- Wraps `software show` and extracts release state (e.g., `"available"`).
- Config Management: `config/usm/objects/usm_config.py`.
- Parses structured JSON5 configuration and validates upgrade parameters.
- Remote Copy Support: `keywords/files/file_keywords.py`.
- Implements `copy_from_remote()` for fetching `.iso`, `.sig`, or `.patch` files using `rsync`.
These tests form a solid base for contributors to validate the upload mechanism
before tackling the broader USM lifecycle.
"""
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.upgrade.usm_keywords import USMKeywords
def test_usm_upload_release_from_local():
"""
Upload a USM ISO already present on the controller and verify the upload was successful.
Assumes that:
- ISO and SIG files are already present at the expected paths.
- No remote copy is required (copy_from_remote is False).
- The release ID is known and matches the ISO being uploaded.
"""
get_logger().log_info("Starting local upload test for USM release ISO.")
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
usm_keywords = USMKeywords(ssh_connection)
usm_config = ConfigurationManager.get_usm_config()
iso_path = usm_config.get_iso_path()
sig_path = usm_config.get_sig_path()
release_id = usm_config.get_to_release_ids()[0]
timeout = usm_config.get_upload_timeout_sec()
poll_interval = usm_config.get_upload_poll_interval_sec()
get_logger().log_test_case_step(f"Uploading software release: ISO={iso_path}, SIG={sig_path}")
usm_keywords.upload_and_verify_release(
iso_path=iso_path,
sig_path=sig_path,
expected_release_id=release_id,
timeout=timeout,
poll_interval=poll_interval,
)
get_logger().log_info(f"Upload verification complete for release: {release_id}")
def test_usm_upload_patch_from_local():
"""
Upload a USM patch file already present on the controller and verify it becomes available.
Assumes that:
- The patch file is already located at the configured path.
- No remote copy is required (copy_from_remote is False).
- The expected release ID after applying the patch is known.
"""
get_logger().log_info("Starting local upload test for USM patch file.")
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
usm_keywords = USMKeywords(ssh_connection)
usm_config = ConfigurationManager.get_usm_config()
patch_file_path = usm_config.get_patch_path()
expected_release_id = usm_config.get_to_release_ids()[0]
timeout = usm_config.get_upload_timeout_sec()
poll_interval = usm_config.get_upload_poll_interval_sec()
get_logger().log_test_case_step(f"Uploading patch file: {patch_file_path}")
usm_keywords.upload_and_verify_patch_file(
patch_file_path=patch_file_path,
expected_release_id=expected_release_id,
timeout=timeout,
poll_interval=poll_interval,
)
get_logger().log_info(f"Upload verification complete for patch release: {expected_release_id}")