This commit captures the state of software releases (or patches for older versions) before taking backup

- Store this state in /tmp/pre_backup_software_list.txt for later comparison after restore.
- Copied sw-patch query keywords from wrcp to starlingx under keywords/cloud_platform/sw_patch/.
- Removed software_state_output.py.

Change-Id: Ic7ee421f96af5f9648ad41c5d791f7c718ed7b41
Signed-off-by: Reema Menon <reema.menon@windriver.com>
This commit is contained in:
reemenon
2025-07-14 01:26:39 -04:00
committed by Reema Menon
parent 6da8408968
commit 1175d7f19a
6 changed files with 276 additions and 26 deletions

View File

@@ -1,25 +0,0 @@
class SoftwareStateOutput:
"""
Class to hold attributes of a software state
"""
def __init__(self, patch_states: dict) -> None:
"""
Constructor
Args:
patch_states (dict): Dictionary containing patch states.
"""
self.patch_states = patch_states
def get_state(self, patch_id: str) -> str:
"""
Get the state for a given patch_id.
Args:
patch_id (str): The patch ID to look up.
Returns:
str: The state of the patch, or None if not found.
"""
return self.patch_states.get(patch_id)

View File

@@ -0,0 +1,64 @@
class SwPatchQueryObject:
"""Represents an applied software patch returned by the sw-patch query."""
def __init__(self, patch_id: str, reboot_required: str, release: str, state: str):
"""Initializes a SwPatchQueryObject instance.
Args:
patch_id (str): Unique identifier of the patch.
reboot_required (str): Indicates if a reboot is required.
release (str): Software release version associated with the patch.
state (str): Current state of the patch.
"""
self.patch_id = patch_id
self.reboot_required = reboot_required
self.release = release
self.state = state
def set_patch_id(self, patch_id):
"""
Setter for patch_id
"""
self.patch_id = patch_id
def get_patch_id(self) -> str:
"""
Getter for patch_id
"""
return self.patch_id
def set_reboot_required(self, reboot_required):
"""
Setter for reboot_required
"""
self.reboot_required = reboot_required
def get_reboot_required(self) -> str:
"""
Getter for reboot_required
"""
return self.reboot_required
def set_release(self, release):
"""
Setter for release
"""
self.release = release
def get_release(self) -> str:
"""
Getter for release
"""
return self.release
def set_state(self, state):
"""
Setter for state
"""
self.state = state
def get_state(self) -> str:
"""
Getter for state
"""
return self.state

View File

@@ -0,0 +1,70 @@
"""Module for parsing and validating software patch query output."""
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.sw_patch.objects.query_object import SwPatchQueryObject
from keywords.cloud_platform.sw_patch.query_parser import SwPatchQueryParser
class SwPatchQueryOutput:
"""Processes and stores software patch query output."""
def __init__(self, sw_patch_output: str):
"""Initializes SwPatchQueryOutput by parsing the patch query.
Args:
sw_patch_output (str): The raw output of the sw-patch query.
Raises:
KeywordException: If the output format is invalid.
"""
self.sw_patches: list[SwPatchQueryObject] = []
sw_query_parser = SwPatchQueryParser(sw_patch_output)
output_values = sw_query_parser.to_list_of_dicts()
for value in output_values:
if self.is_valid_output(value):
patch_obj = SwPatchQueryObject(
patch_id=value["Patch ID"],
reboot_required=value["RR"],
release=value["Release"],
state=value["Patch State"],
)
self.sw_patches.append(patch_obj)
else:
raise KeywordException(f"Invalid output line: {value}")
def get_patches(self) -> list[SwPatchQueryObject]:
"""Returns the list of parsed software patches.
Returns:
list[SwPatchQueryObject]: List of software patch objects.
"""
return self.sw_patches
def get_last_patch(self) -> SwPatchQueryObject:
"""Returns the last software patch in the list.
Returns:
SwPatchQueryObject: The last software patch object.
"""
if not self.sw_patches:
raise KeywordException("No patches found in the output.")
return self.sw_patches[-1]
@staticmethod
def is_valid_output(value: dict) -> bool:
"""Validates the presence of required keys in the patch output.
Args:
value (dict): Dictionary containing patch information.
Returns:
bool: True if all required keys are present, otherwise False.
"""
required_keys = {"Patch ID", "RR", "Release", "Patch State"}
missing_keys = required_keys - value.keys()
for key in missing_keys:
get_logger().log_error(f"{key} is missing in the output value: {value}")
return not missing_keys

View File

@@ -0,0 +1,49 @@
"""Module for parsing software patch query output."""
import re
from framework.exceptions.keyword_exception import KeywordException
class SwPatchQueryParser:
"""Parses output from the `sw-patch query` command."""
def __init__(self, output_list):
"""Initializes the parser with raw output lines.
Args:
output_list (list): List of strings representing command output.
"""
self.out_lines = output_list
def parse_data(self):
"""Parses the output and returns a list of dictionaries.
Returns:
list[dict]: List of dictionaries containing parsed patch data.
"""
if len(self.out_lines) < 2: # Check if there are at least header and one data row
return [] # Return empty list if no data
header_line = self.out_lines[0]
data_lines = self.out_lines[2:] # Skip the separator line
header_names = [h.strip() for h in re.split(r"\s{2,}", header_line) if h.strip()]
parsed_data = []
for line in data_lines:
if line.strip('\n'):
values = [v.strip() for v in re.split(r"\s{2,}", line) if v.strip()]
if len(values) == len(header_names):
parsed_data.append(dict(zip(header_names, values)))
else:
raise KeywordException("Number of headers and values do not match")
return parsed_data
def to_list_of_dicts(self):
"""Converts parsed data into a list of dictionaries.
Returns:
list[dict]: List of dictionaries representing parsed patches.
"""
return self.parse_data()

View File

@@ -0,0 +1,72 @@
"""Module containing keyword functions for querying software patches."""
from framework.logging.automation_logger import get_logger
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_sudo_openrc
from keywords.cloud_platform.sw_patch.objects.query_output import SwPatchQueryOutput
class SwPatchQueryKeywords(BaseKeyword):
"""Provides keyword functions for software patch queries."""
def __init__(self, ssh_connection):
"""Initializes SwPatchQueryKeywords with an SSH connection.
Args:
ssh_connection: SSH connection to the target system.
"""
self.ssh_connection = ssh_connection
def get_sw_patch_query(self) -> SwPatchQueryOutput:
"""Executes the `sw-patch query` command and returns the parsed output.
Returns:
SwPatchQueryOutput: Parsed output containing software patches.
"""
cmd_out = self.ssh_connection.send('/usr/sbin/sw-patch query')
self.validate_success_return_code(self.ssh_connection)
get_logger().log_info("get_sw_patch_query")
sw_query_output = SwPatchQueryOutput(cmd_out)
return sw_query_output
def sw_patch_upload(self, region_name, patch_file_path) -> str:
"""Uploads a software patch to the system.
This function executes the `sw-patch upload` command to upload a patch
file. If a region name is provided, the command includes the
`--os-region-name` flag.
Args:
region_name (str, optional): The target system's region name.
patch_file_path (str): The full path to the patch file to upload.
Returns:
str: The command output after attempting to upload the patch.
"""
command = f'/usr/sbin/sw-patch upload {patch_file_path}'
if region_name:
command = f'/usr/sbin/sw-patch --os-region-name {region_name} upload {patch_file_path}'
# command should run by sudo user
get_logger().log_info(f"Executing patch upload: {command}")
cmd_out = self.ssh_connection.send_as_sudo(source_sudo_openrc(command))
get_logger().log_info(f"Patch upload command output: {cmd_out}")
self.validate_success_return_code(self.ssh_connection)
return cmd_out[0].strip('\n')
def sw_patch_apply(self, patch_name) -> str:
"""Apply a software patch to the system.
Args:
patch_name: The patch file to apply.
Returns:
str: The command output after attempting to apply the patch.
"""
command = f'/usr/sbin/sw-patch apply {patch_name}'
# command should run by sudo user
get_logger().log_info(f"Executing patch apply: {command}")
cmd_out = self.ssh_connection.send_as_sudo(source_sudo_openrc(command))
get_logger().log_info(f"Patch apply command output: {cmd_out}")
self.validate_success_return_code(self.ssh_connection)
return cmd_out[0].strip('\n')

View File

@@ -6,7 +6,10 @@ from keywords.cloud_platform.ansible_playbook.ansible_playbook_keywords import A
from keywords.cloud_platform.ansible_playbook.backup_files_upload_keywords import BackUpFilesUploadKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.files.file_keywords import FileKeywords
from keywords.cloud_platform.version_info.cloud_platform_software_version import CloudPlatformSoftwareVersion
from keywords.cloud_platform.version_info.cloud_platform_version_manager import CloudPlatformVersionManager
from keywords.cloud_platform.sw_patch.software_patch_keywords import SwPatchQueryKeywords
from keywords.cloud_platform.upgrade.software_list_keywords import SoftwareListKeywords
@mark.p0
def test_backup():
@@ -23,6 +26,20 @@ def test_backup():
backup_dir = "/opt/backups"
local_backup_folder_path = "/tmp/bnr"
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
# Capture the state of software releases (or patches for older versions) before backup
current_version = CloudPlatformVersionManager.get_sw_version()
if current_version.is_after_or_equal_to(CloudPlatformSoftwareVersion.STARLINGX_10_0):
get_logger().log_info("Getting software list (24.09+)")
sw_list = SoftwareListKeywords(ssh_connection).get_software_list().get_software_lists()
info_list = [f"{sw.get_release()}:{sw.get_state()}" for sw in sw_list]
else:
get_logger().log_info("Getting sw-patch query (pre-24.09)")
sw_patch_output = SwPatchQueryKeywords(ssh_connection).get_sw_patch_query()
info_list = [f"{patch.get_patch_id()}:{patch.get_state()}" for patch in sw_patch_output.get_patches()]
FileKeywords(ssh_connection).create_file_with_echo("/tmp/pre_backup_software_list.txt", "\n".join(info_list))
get_logger().log_info("Delete old backup files if present in back up directory")
backup_files = FileKeywords(ssh_connection).get_files_in_dir(backup_dir)
for backup_file in backup_files:
@@ -35,6 +52,9 @@ def test_backup():
ansible_playbook_backup_output = AnsiblePlaybookKeywords(ssh_connection).ansible_playbook_backup(backup_dir)
validate_equals(ansible_playbook_backup_output, True, "Ansible backup command execution")
# Copy software list to backup directory
ssh_connection.send_as_sudo(f"cp /tmp/pre_backup_software_list.txt {backup_dir}/")
backup_file_upload_status = BackUpFilesUploadKeywords(ssh_connection).backup_file(backup_dir, local_backup_folder_path)
validate_equals(backup_file_upload_status, True, "Backup file upload to local directory")