Add reinstall of standby controller with Rook Ceph; fix typos and docstrings

Patch 2 : Refactor and test improvements

- Moved the keywords reinstall to keywords/cloud_platform/system/host/system_host_reinstall_keywords.py
- Updated code to use  get_return_code() directly
- Marked the test case priority
- Added short description for the test case

Patch 3 : Improve the unlock host function
Patch 4 : Change the paramenter name timeout to unlock_accepted_timeout, and incrise the default timeout to 300s
Change-Id: I8586530d69a67ea030cd0ec5710f68d5c84e5cbc
Signed-off-by: Dario Oliveira <Dario.DeOliveiraFilho@windriver.com>
This commit is contained in:
Dario Oliveira
2025-09-15 12:53:12 -03:00
parent 95f7771ac8
commit 89307ac3cf
4 changed files with 198 additions and 27 deletions

View File

@@ -1,20 +1,28 @@
from typing import Any
from framework.logging.automation_logger import get_logger
from framework.rest.rest_response import RestResponse
from framework.ssh.ssh_connection import SSHConnection
class BaseKeyword:
"""Base class for keyword implementations.
def pretty_print(self, value) -> str:
This class provides shared functionality for custom keywords,
including validation helpers and utility methods that can be
reused by subclasses.
"""
def pretty_print(self, value: Any) -> str:
"""
This function will return a log-readable version of value to add to a logging statement.
Args:
value: An kind of parameter that we want to log.
Returns: str
value (Any): An kind of parameter that we want to log.
Returns:
str: A formatted string representation of the value.
"""
if value is None:
return "None"
@@ -49,14 +57,17 @@ class BaseKeyword:
return "?UNKNOWN?"
def on_every_keyword(self, name: str, *args, **kwargs):
def on_every_keyword(self, name: str, *args: Any, **kwargs: Any):
"""
Hook executed whenever a keyword function is invoked.
This function is a hook that gets called any time a Keyword function is invoked.
It will log information about the keyword and the parameters passed in.
Args:
name: The name of the function being called.
*args: arguments that have been passed to the keyword
**kwargs: kwargs that have been passed in to the keyword.
name (str): The name of the function being called.
*args (Any): arguments that have been passed to the keyword
**kwargs (Any): kwargs that have been passed in to the keyword.
Returns: None
@@ -75,19 +86,21 @@ class BaseKeyword:
get_logger().log_keyword(f"{name}({args_string})")
def __getattribute__(self, name):
"""
def __getattribute__(self, name: str) -> Any:
"""Intercept attribute access on a Keyword object.
This is a default Python hook that gets called whenever Python tries to access a field or
a function in a Keyword object. We are intercepting it here to place the on_every_keyword
hook every time that we are calling a function.
Args:
name: The attribute or function getting accessed.
Returns: If we are accessing a function, we return the function, wrapped in the
Args:
name (str): The attribute or function getting accessed.
Returns:
Any: If we are accessing a function, we return the function, wrapped in the
on_every_keyword hook. Otherwise, we return the field directly.
"""
# Avoid an infinite recursive loop with the wrapper on_every_keyword or pretty_print.
if name == "on_every_keyword" or name == "pretty_print":
return object.__getattribute__(self, name)
@@ -111,20 +124,20 @@ class BaseKeyword:
def validate_success_return_code(self, ssh_connection: SSHConnection):
"""
Validates a successful return code was received
Args:
ssh_connection (): the ssh connection
Returns:
Args:
ssh_connection (SSHConnection): the ssh connection
"""
rc = ssh_connection.get_return_code()
assert 0 == rc, f"Return code was {rc}"
def validate_cmd_rejection_return_code(self, ssh_connection: SSHConnection):
def validate_cmd_rejection_return_code(self, ssh_connection: SSHConnection) -> bool:
"""
Validates a command rejection return code was received
Args:
ssh_connection (): the ssh connection
ssh_connection (SSHConnection): the ssh connection
Returns:
bool: True if command was correctly rejected, False if it wasn't.
@@ -137,12 +150,10 @@ class BaseKeyword:
def validate_success_status_code(self, rest_response: RestResponse, expected_status_code: int = 200):
"""
Validates a successful status code was received
Args:
rest_response (): the rest reponse object
expected_status_code(): the expected status code - default is 200
Returns:
rest_response (RestResponse): the rest response object
expected_status_code(int): the expected status code - default is 200
"""
rc = rest_response.get_status_code()
assert expected_status_code == rc, f"Status code was {rc}"

View File

@@ -106,12 +106,13 @@ class SystemHostLockKeywords(BaseKeyword):
return True
return False
def unlock_host(self, host_name: str) -> bool:
def unlock_host(self, host_name: str, unlock_accepted_timeout: int = 300) -> bool:
"""
Unlocks the given host
Args:
host_name (str): the host name
unlock_accepted_timeout (int): unlock_accepted_timeout to wait to try unlock the host
Returns:
bool: True if the unlock is successful
@@ -122,6 +123,20 @@ class SystemHostLockKeywords(BaseKeyword):
"""
self.unlock_host_pre_check()
self.ssh_connection.send(source_openrc(f"system host-unlock {host_name}"))
# Checking whether the host can be unlocked; if not, the process will retry until the timeout is reached.
start = time.time()
while time.time() - start < unlock_accepted_timeout:
if self.ssh_connection.get_return_code() == 1:
get_logger().log_info("Fail to unlock, trying again in 5 seconds")
time.sleep(5)
self.ssh_connection.send(source_openrc(f"system host-unlock {host_name}"))
else:
get_logger().log_info(f"The unlock of host {host_name} was started")
break
else:
raise KeywordException(f"Timeout: failed to unlock host {host_name}")
self.validate_success_return_code(self.ssh_connection)
is_host_unlocked = self.wait_for_host_unlocked(host_name)
if not is_host_unlocked:

View File

@@ -0,0 +1,99 @@
import time
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.host.system_host_list_keywords import SystemHostListKeywords
class SystemHostReinstallKeywords(BaseKeyword):
"""
Keywords for System Reinstall Host commands
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor
Args:
ssh_connection (SSHConnection): the ssh connection
"""
self.ssh_connection = ssh_connection
def wait_for_host_reinstall(self, host_name: str, reinstall_wait_timeout: int = 1800) -> bool:
"""
Wait for the host to be reinstalled
Args:
host_name (str): the host name
reinstall_wait_timeout (int): the amount of time in secs to wait for the host to reinstall
Returns:
bool: True if host is reinstalled
"""
timeout = time.time() + reinstall_wait_timeout
refresh_time = 5
while time.time() < timeout:
try:
if self.is_host_reinstalled(host_name):
return True
except Exception:
get_logger().log_info(f"Found an exception when checking the health of the system. Trying again after {refresh_time} seconds")
time.sleep(refresh_time)
return False
def is_host_reinstalled(self, host_name: str) -> bool:
"""
Returns true if the host is reinstalled
Args:
host_name (str): the name of the host
Returns:
bool: True is host is reinstalled
"""
is_host_list_ok = False
# Check System Host-List
host_value = SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_host(host_name)
if host_value.get_availability() == "online" and host_value.get_administrative() == "locked" and host_value.get_operational() == "disabled":
get_logger().log_info("The host is in a good state from system host list.")
is_host_list_ok = True
# Exit the loop once all conditions are met.
if is_host_list_ok:
return True
return False
def reinstall_host(self, host_name: str) -> bool:
"""
Reinstall the given host
Args:
host_name (str): the host name
Returns:
bool: True if the reinstalled is successful
Raises:
KeywordException: If reinstall does not occur in the given time
"""
self.ssh_connection.send(source_openrc(f"system host-reinstall {host_name}"))
self.validate_success_return_code(self.ssh_connection)
validate_equals_with_retry(lambda: SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_host(host_name).get_availability(), expected_value="offline", validation_description="Waiting for host to go offline")
is_host_reinstalled = self.wait_for_host_reinstall(host_name)
if not is_host_reinstalled:
host_value = SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_host(host_name)
raise KeywordException("Host reinstall did not complete within the required time. Host values were: " f"Operational: {host_value.get_operational()} " f"Administrative: {host_value.get_administrative()} " f"Availability: {host_value.get_availability()}")
return True

View File

@@ -5,6 +5,7 @@ from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals, validate_equals_with_retry, validate_str_contains
from keywords.ceph.ceph_osd_pool_ls_detail_keywords import CephOsdPoolLsDetailKeywords
from keywords.ceph.ceph_status_keywords import CephStatusKeywords
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.application.system_application_apply_keywords import SystemApplicationApplyKeywords
from keywords.cloud_platform.system.application.system_application_list_keywords import SystemApplicationListKeywords
@@ -12,6 +13,7 @@ from keywords.cloud_platform.system.host.system_host_fs_keywords import SystemHo
from keywords.cloud_platform.system.host.system_host_list_keywords import SystemHostListKeywords
from keywords.cloud_platform.system.host.system_host_lock_keywords import SystemHostLockKeywords
from keywords.cloud_platform.system.host.system_host_reboot_keywords import SystemHostRebootKeywords
from keywords.cloud_platform.system.host.system_host_reinstall_keywords import SystemHostReinstallKeywords
from keywords.cloud_platform.system.host.system_host_swact_keywords import SystemHostSwactKeywords
from keywords.cloud_platform.system.storage.system_storage_backend_keywords import SystemStorageBackendKeywords
@@ -709,10 +711,10 @@ def test_monitor_operations_rook_ceph():
Args: None
"""
active_controller_ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
ceph_status_keywords = CephStatusKeywords(active_controller_ssh_connection)
system_host_fs_keywords = SystemHostFSKeywords(active_controller_ssh_connection)
system_application_apply_keywords = SystemApplicationApplyKeywords(active_controller_ssh_connection)
system_application_list_keywords = SystemApplicationListKeywords(active_controller_ssh_connection)
ceph_status_keywords = CephStatusKeywords(active_controller_ssh_connection)
app_status_list = ["applied"]
app_name = "rook-ceph"
no_monitor_hosts = []
@@ -762,3 +764,47 @@ def test_monitor_operations_rook_ceph():
get_logger().log_test_case_step("Verify final rook-ceph health.")
ceph_status_keywords.wait_for_ceph_health_status(expect_health_status=True)
@mark.lab_has_standby_controller
def test_reinstall_standby_host():
"""
Test to validate standby controller reinstallation and ceph health.
Test Steps:
- Lock standby controller
- Reinstall standby controller
- Unlock standby controller
- Checking if there are any active alarms
- Checking rook-ceph health after reinstall.
Args: None
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
system_host_list_keywords = SystemHostListKeywords(ssh_connection)
standby_controller = system_host_list_keywords.get_standby_controller().get_host_name()
system_host_lock_keywords = SystemHostLockKeywords(ssh_connection)
system_host_reinstall_keywords = SystemHostReinstallKeywords(ssh_connection)
ceph_status_keywords = CephStatusKeywords(ssh_connection)
alarm_list_keyword = AlarmListKeywords(ssh_connection)
get_logger().log_test_case_step("Checking if there are any active alarms")
alarms = alarm_list_keyword.alarm_list()
validate_equals(alarms, [], "No active alarms")
get_logger().log_test_case_step("Lock standby controller")
system_host_lock_keywords.lock_host(standby_controller)
get_logger().log_test_case_step("Reinstall standby controller")
system_host_reinstall_keywords.reinstall_host(standby_controller)
get_logger().log_test_case_step("Unlock standby controller")
system_host_lock_keywords.unlock_host(standby_controller)
get_logger().log_test_case_step("Checking if there are any active alarms")
alarms = alarm_list_keyword.alarm_list()
validate_equals(alarms, [], "No active alarms")
get_logger().log_test_case_step("Checking rook-ceph health after reinstall.")
ceph_status_keywords.wait_for_ceph_health_status(expect_health_status=True)