Merge "system ceph-mon keywords"

This commit is contained in:
Zuul
2025-01-10 21:15:53 +00:00
committed by Gerrit Code Review
4 changed files with 282 additions and 0 deletions

View File

@@ -0,0 +1,98 @@
class SystemCephMonObject:
"""
This class represents a Host Disk as an object.
This is typically a line in the system ceph-mon output table.
"""
def __init__(self):
self.uuid:str = None
self.ceph_mon_gib:int = -1
self.hostname:str = None
self.state:str = None
self.task:str = None
self.created_at: str = None
self.updated_at: str = None
def set_uuid(self, uuid: str):
"""
Setter for the uuid
"""
self.uuid = uuid
def get_uuid(self) -> str:
"""
Getter for this uuid
"""
return self.uuid
def set_ceph_mon_gib(self, ceph_mon_gib: int):
"""
Setter for the ceph_mon_gib
"""
self.ceph_mon_gib = ceph_mon_gib
def get_ceph_mon_gib(self) -> int:
"""
Getter for ceph_mon_gib
"""
return self.ceph_mon_gib
def set_hostname(self, hostname: str):
"""
Setter for the hostname
"""
self.hostname = hostname
def get_hostname(self) -> str:
"""
Getter for the hostname
"""
return self.hostname
def set_state(self, state: str):
"""
Setter for the state
"""
self.state = state
def get_state(self) -> str:
"""
Getter for the state
"""
return self.state
def set_task(self, task: str):
"""
Setter for the task
"""
self.task = task
def get_task(self) -> str:
"""
Getter for the task
"""
return self.task
def set_created_at(self, created_at):
"""
Setter for created_at
"""
self.created_at = created_at
def get_created_at(self) -> str:
"""
Getter for created_at
"""
return self.created_at
def set_updated_at(self, updated_at):
"""
Setter for updated_at
"""
self.updated_at = updated_at
def get_updated_at(self) -> str:
"""
Getter for updated_at
"""
return self.updated_at

View File

@@ -0,0 +1,67 @@
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.system.ceph.objects.system_ceph_mon_object import SystemCephMonObject
from keywords.cloud_platform.system.system_table_parser import SystemTableParser
class SystemCephMonOutput:
"""
This class parses the output of 'system ceph-mon-list' command into an object of type SystemCephMonObject.
"""
def __init__(self, system_output):
"""
Constructor
Args:
system_output (str): Output of the 'system ceph-mon-list' command.
Raises:
KeywordException: If the output is not valid.
"""
self.system_ceph_mon : [SystemCephMonObject] = []
system_table_parser = SystemTableParser(system_output)
output_values = system_table_parser.get_output_values_list()
for value in output_values:
if self.is_valid_output(value):
system_ceph_mon = SystemCephMonObject()
system_ceph_mon.set_uuid(value['uuid'])
system_ceph_mon.set_ceph_mon_gib(value['ceph_mon_gib'])
system_ceph_mon.set_hostname(value['hostname'])
system_ceph_mon.set_state(value['state'])
system_ceph_mon.set_task(value['task'])
self.system_ceph_mon.append(system_ceph_mon)
else:
raise KeywordException(f"The output line {value} was not valid")
def get_system_ceph_mon_list(self):
"""
Returns the parsed system ceph_mon_list object.
Returns:
SystemCephMonObject: The parsed system ceph-mon object.
"""
return self.system_ceph_mon
@staticmethod
def is_valid_output(value):
"""
Checks if the output contains all the expected fields.
Args:
value (dict): The dictionary of output values.
Returns:
bool: True if the output contains all required fields, False otherwise.
"""
required_fields = ["uuid", "ceph_mon_gib", "hostname", "state", "task"]
valid = True
for field in required_fields:
if field not in value:
get_logger().log_error(f'{field} is not in the output value')
valid = False
break
return valid

View File

@@ -0,0 +1,66 @@
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.system.ceph.objects.system_ceph_mon_object import SystemCephMonObject
from keywords.cloud_platform.system.system_vertical_table_parser import SystemVerticalTableParser
class SystemCephMonShowOutput:
"""
This class parses the output of 'system ceph-mon-show' command into an object of type SystemCephMonObject.
"""
def __init__(self, system_output):
"""
Constructor
Args:
system_output (str): Output of the 'system ceph-mon-show' command.
Raises:
KeywordException: If the output is not valid.
"""
system_vertical_table_parser = SystemVerticalTableParser(system_output)
output_values = system_vertical_table_parser.get_output_values_dict()
if self.is_valid_output(output_values):
self.system_ceph_mon = SystemCephMonObject()
self.system_ceph_mon.set_uuid(output_values['uuid'])
self.system_ceph_mon.set_ceph_mon_gib(output_values['ceph_mon_gib'])
self.system_ceph_mon.set_state(output_values['state'])
self.system_ceph_mon.set_task(output_values['task'])
self.system_ceph_mon.set_created_at(output_values['created_at'])
self.system_ceph_mon.set_updated_at(output_values['updated_at'])
else:
raise KeywordException(f"The output line {output_values} was not valid")
def get_system_ceph_mon_show(self):
"""
Returns the parsed system ceph-mon object.
Returns:
SystemCephMonObject: The parsed system ceph-mon object.
"""
return self.system_ceph_mon
@staticmethod
def is_valid_output(value):
"""
Checks if the output contains all the expected fields.
Args:
value (dict): The dictionary of output values.
Returns:
bool: True if the output contains all required fields, False otherwise.
"""
required_fields = ["uuid", "ceph_mon_gib", "state", "task", "created_at",
"updated_at"]
valid = True
for field in required_fields:
if field not in value:
get_logger().log_error(f'{field} is not in the output value')
valid = False
break
return valid

View File

@@ -0,0 +1,51 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.ceph.objects.system_ceph_mon_output import SystemCephMonOutput
from keywords.cloud_platform.system.ceph.objects.system_ceph_mon_show_output import SystemCephMonShowOutput
class SystemCephMonKeywords(BaseKeyword):
"""
This class contains all the keywords related to the 'system ceph-mon' commands.
"""
def __init__(self, ssh_connection):
"""
Constructor
Args:
ssh_connection:
"""
self.ssh_connection = ssh_connection
def get_system_ceph_mon_list(self) -> SystemCephMonOutput:
"""
Gets the system ceph-mon-list
Args:
Returns:
SystemCephMonOutput object with the list of ceph-mon.
"""
command = source_openrc(f'system ceph-mon-list')
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ceph_mon_output = SystemCephMonOutput(output)
return system_ceph_mon_output
def get_system_ceph_mon_show(self, host_id) -> SystemCephMonShowOutput:
"""
Gets the system ceph-mon-show
Args:
host_id: name or id of the host
Returns:
SystemCephMonShowOutput object.
"""
command = source_openrc(f'system ceph-mon-show {host_id}')
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ceph_mon_show_output = SystemCephMonShowOutput(output)
return system_ceph_mon_show_output