Adding aio and rook ceph capabilities to the scanner Also, updated a

couple of areas where REST could have been used.

Change-Id: I3c47550f30cb17c086713cbf68935d84f9894b77
Signed-off-by: jpike <jason.pike@windriver.com>
This commit is contained in:
jpike
2025-07-03 08:42:53 -04:00
parent 43b6353e7c
commit 40ab2aaf9c
10 changed files with 703 additions and 408 deletions

View File

@@ -0,0 +1,27 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient
from keywords.cloud_platform.rest.get_rest_url_keywords import GetRestUrlKeywords
from keywords.cloud_platform.system.host.objects.system_host_disk_output import SystemHostDiskOutput
class GetHostDisksKeywords(BaseKeyword):
"""Keywords for retrieving host disk information via REST API."""
def __init__(self):
"""Initialize GetHostDisksKeywords with bare metal URL."""
self.bare_metal_base_url = GetRestUrlKeywords().get_bare_metal_url()
def get_disks(self, host_id: str) -> SystemHostDiskOutput:
"""Get host disks using the REST API.
Args:
host_id (str): The UUID or ID of the host to retrieve disks for.
Returns:
SystemHostDiskOutput: Object containing the host's disk information.
"""
response = CloudRestClient().get(f"{self.bare_metal_base_url}/ihosts/{host_id}/idisks")
self.validate_success_status_code(response)
system_host_disk_output = SystemHostDiskOutput(response)
return system_host_disk_output

View File

@@ -0,0 +1,26 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient
from keywords.cloud_platform.rest.configuration.addresses.objects.host_address_output import HostAddressOutput
from keywords.cloud_platform.rest.get_rest_url_keywords import GetRestUrlKeywords
class GetHostAddressesKeywords(BaseKeyword):
"""Keywords for retrieving host address information via REST API."""
def __init__(self):
"""Initialize GetHostAddressesKeywords with configuration URL."""
self.configuration_base_url = GetRestUrlKeywords().get_configuration_url()
def get_host_addresses(self, host_id: str) -> HostAddressOutput:
"""Get host addresses using the REST API.
Args:
host_id (str): The UUID or ID of the host to retrieve addresses for.
Returns:
HostAddressOutput: Object containing the host's address information.
"""
response = CloudRestClient().get(f"{self.configuration_base_url}/ihosts/{host_id}/addresses")
self.validate_success_status_code(response)
host_address_output = HostAddressOutput(response)
return host_address_output

View File

@@ -0,0 +1,141 @@
class HostAddressObject:
"""This class represents a Host Address as an object."""
def __init__(self):
"""Initialize a new HostAddressObject with default values."""
self.uuid = None
self.interface_uuid = None
self.ifname = None
self.address = None
self.prefix = -1
self.enable_dad = False
self.forihostid = -1
self.pool_uuid = None
def set_uuid(self, uuid: str):
"""Set the UUID of the host address.
Args:
uuid (str): The unique identifier for the host address.
"""
self.uuid = uuid
def get_uuid(self) -> str:
"""Get the UUID of the host address.
Returns:
str: The unique identifier for the host address.
"""
return self.uuid
def set_interface_uuid(self, interface_uuid: str):
"""Set the interface UUID associated with this address.
Args:
interface_uuid (str): The unique identifier of the network interface.
"""
self.interface_uuid = interface_uuid
def get_interface_uuid(self) -> str:
"""Get the interface UUID associated with this address.
Returns:
str: The unique identifier of the network interface.
"""
return self.interface_uuid
def set_ifname(self, ifname: str):
"""Set the interface name.
Args:
ifname (str): The name of the network interface (e.g., 'eth0', 'oam0').
"""
self.ifname = ifname
def get_ifname(self) -> str:
"""Get the interface name.
Returns:
str: The name of the network interface.
"""
return self.ifname
def set_address(self, address: str):
"""Set the IP address.
Args:
address (str): The IP address (IPv4 or IPv6).
"""
self.address = address
def get_address(self) -> str:
"""Get the IP address.
Returns:
str: The IP address (IPv4 or IPv6).
"""
return self.address
def set_prefix(self, prefix: int):
"""Set the network prefix length.
Args:
prefix (int): The subnet prefix length (e.g., 24 for /24).
"""
self.prefix = prefix
def get_prefix(self) -> int:
"""Get the network prefix length.
Returns:
int: The subnet prefix length.
"""
return self.prefix
def set_enable_dad(self, enable_dad: bool):
"""Set the Duplicate Address Detection (DAD) flag.
Args:
enable_dad (bool): Whether DAD is enabled for this address.
"""
self.enable_dad = enable_dad
def get_enable_dad(self) -> bool:
"""Get the Duplicate Address Detection (DAD) flag.
Returns:
bool: Whether DAD is enabled for this address.
"""
return self.enable_dad
def set_forihostid(self, forihostid: int):
"""Set the foreign host ID.
Args:
forihostid (int): The ID of the host this address belongs to.
"""
self.forihostid = forihostid
def get_forihostid(self) -> int:
"""Get the foreign host ID.
Returns:
int: The ID of the host this address belongs to.
"""
return self.forihostid
def set_pool_uuid(self, pool_uuid: str):
"""Set the address pool UUID.
Args:
pool_uuid (str): The unique identifier of the address pool.
"""
self.pool_uuid = pool_uuid
def get_pool_uuid(self) -> str:
"""Get the address pool UUID.
Returns:
str: The unique identifier of the address pool.
"""
return self.pool_uuid

View File

@@ -0,0 +1,76 @@
from framework.rest.rest_response import RestResponse
from keywords.cloud_platform.rest.configuration.addresses.objects.host_address_object import HostAddressObject
from keywords.python.type_converter import TypeConverter
class HostAddressOutput:
"""Parses host addresses from REST API responses."""
def __init__(self, host_address_output: RestResponse | list | dict) -> None:
"""Initialize HostAddressOutput with parsed address data.
Args:
host_address_output (RestResponse | list | dict): REST response from /v1/ihosts/{host_id}/addresses endpoint.
"""
if isinstance(host_address_output, RestResponse):
json_object = host_address_output.get_json_content()
if "addresses" in json_object:
addresses = json_object["addresses"]
else:
addresses = [json_object] if json_object else []
else:
addresses = host_address_output if isinstance(host_address_output, list) else [host_address_output]
self.host_address_objects: list[HostAddressObject] = []
for address in addresses:
host_address_object = HostAddressObject()
if "uuid" in address:
host_address_object.set_uuid(address["uuid"])
if "interface_uuid" in address:
host_address_object.set_interface_uuid(address["interface_uuid"])
if "ifname" in address:
host_address_object.set_ifname(address["ifname"])
if "address" in address:
host_address_object.set_address(address["address"])
if "prefix" in address:
host_address_object.set_prefix(int(address["prefix"]))
if "enable_dad" in address:
value = address["enable_dad"] if isinstance(address["enable_dad"], bool) else TypeConverter.str_to_bool(address["enable_dad"])
host_address_object.set_enable_dad(value)
if "forihostid" in address:
host_address_object.set_forihostid(int(address["forihostid"]))
if "pool_uuid" in address:
host_address_object.set_pool_uuid(address["pool_uuid"])
self.host_address_objects.append(host_address_object)
def get_host_address_objects(self) -> list[HostAddressObject]:
"""Get all host address objects.
Returns:
list[HostAddressObject]: List of HostAddressObject instances.
"""
return self.host_address_objects
def get_address_by_ifname(self, ifname: str) -> str | None:
"""Get IP address by interface name.
Args:
ifname (str): Interface name to search for.
Returns:
str | None: IP address string or None if not found.
"""
for addr in self.host_address_objects:
if addr.get_ifname() == ifname:
return addr.get_address()
return None

View File

@@ -0,0 +1,24 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient
from keywords.cloud_platform.rest.get_rest_url_keywords import GetRestUrlKeywords
from keywords.cloud_platform.system.storage.objects.system_storage_backend_output import SystemStorageBackendOutput
class GetStorageBackendKeywords(BaseKeyword):
"""Keywords for retrieving storage backend information via REST API."""
def __init__(self):
"""Initialize GetStorageBackendKeywords with configuration URL."""
self.configuration_base_url = GetRestUrlKeywords().get_configuration_url()
def get_storage_backends(self) -> SystemStorageBackendOutput:
"""Get storage backends using the REST API.
Returns:
SystemStorageBackendOutput: Object containing storage backend information.
"""
response = CloudRestClient().get(f"{self.configuration_base_url}/storage_backend")
self.validate_success_status_code(response)
storage_output = SystemStorageBackendOutput(response)
return storage_output

View File

@@ -1,14 +1,15 @@
import json5
from keywords.cloud_platform.rest.configuration.system.objects.system_capabilities_object import SystemCapabilitiesObject
from keywords.cloud_platform.system.host.objects.host_capabilities_object import HostCapabilities
class SystemObject:
"""
Class for System Object
"""
"""Represents a StarlingX system with its configuration and metadata."""
def __init__(self, uuid: str):
"""Initialize a SystemObject with the given UUID.
Args:
uuid (str): The unique identifier for the system.
"""
self.uuid = uuid
self.name: str = None
@@ -22,7 +23,7 @@ class SystemObject:
self.software_version: str = None
self.timezone: str = None
self.links = None
self.capabilities: HostCapabilities = None
self.capabilities: SystemCapabilitiesObject = None
self.region_name: str = None
self.distributed_cloud_role: str = None
self.service_project_name: str = None
@@ -31,307 +32,303 @@ class SystemObject:
self.updated_at: str = None
def set_name(self, name: str):
"""
Setter for name
Args
name () - the name
"""Set the system name.
Args:
name (str): The name of the system.
"""
self.name = name
def get_name(self) -> str:
"""
Getter for name
Args
"""Get the system name.
Returns:
str: The name of the system.
"""
return self.name
def set_system_type(self, system_type: str):
"""
Setter for system_type
Args
system_type () - the system_type
"""Set the system type.
Args:
system_type (str): The type of the system.
"""
self.system_type = system_type
def get_system_type(self) -> str:
"""
Getter for system_type
Args
"""Get the system type.
Returns:
str: The type of the system.
"""
return self.system_type
def set_system_mode(self, system_mode: str):
"""
Setter for system_mode
Args
system_mode () - the system_mode
"""Set the system mode.
Args:
system_mode (str): The mode of the system.
"""
self.system_mode = system_mode
def get_system_mode(self) -> str:
"""
Getter for system_mode
Args
"""Get the system mode.
Returns:
str: The mode of the system.
"""
return self.system_mode
def set_description(self, description: str):
"""
Setter for description
Args
description () - the description
"""Set the system description.
Args:
description (str): The description of the system.
"""
self.description = description
def get_description(self) -> str:
"""
Getter for description
Args
"""Get the system description.
Returns:
str: The description of the system.
"""
return self.description
def set_contact(self, contact: str):
"""
Setter for contact
Args
contact () - the contact
"""Set the system contact information.
Args:
contact (str): The contact information for the system.
"""
self.contact = contact
def get_contact(self) -> str:
"""
Getter for contact
Args
"""Get the system contact information.
Returns:
str: The contact information for the system.
"""
return self.contact
def set_location(self, location: str):
"""
Setter for location
Args
location () - the location
"""Set the system location.
Args:
location (str): The location of the system.
"""
self.location = location
def get_location(self) -> str:
"""
Getter for location
Args
"""Get the system location.
Returns:
str: The location of the system.
"""
return self.location
def set_latitude(self, latitude: str):
"""
Setter for latitude
Args
latitude () - the latitude
"""Set the system latitude.
Args:
latitude (str): The latitude coordinate of the system.
"""
self.latitude = latitude
def get_latitude(self) -> str:
"""
Getter for latitude
Args
"""Get the system latitude.
Returns:
str: The latitude coordinate of the system.
"""
return self.latitude
def set_longitude(self, longitude: str):
"""
Setter for longitude
Args
longitude () - the longitude
"""Set the system longitude.
Args:
longitude (str): The longitude coordinate of the system.
"""
self.longitude = longitude
def get_longitude(self) -> str:
"""
Getter for longitude
Args
"""Get the system longitude.
Returns:
str: The longitude coordinate of the system.
"""
return self.longitude
def set_software_version(self, software_version: str):
"""
Setter for software_version
Args
software_version () - the software_version
"""Set the system software version.
Args:
software_version (str): The software version of the system.
"""
self.software_version = software_version
def get_software_version(self) -> str:
"""
Getter for software_version
Args
"""Get the system software version.
Returns:
str: The software version of the system.
"""
return self.software_version
def set_timezone(self, timezone: str):
"""
Setter for timezone
Args
name () - the timezone
"""Set the system timezone.
Args:
timezone (str): The timezone of the system.
"""
self.timezone = timezone
def get_timezone(self) -> str:
"""
Getter for timezone
Args
"""Get the system timezone.
Returns:
str: The timezone of the system.
"""
return self.timezone
def set_links(self, links: str):
"""
Setter for links
Args
links () - the links
"""Set the system links.
Args:
links (str): The links associated with the system.
"""
self.links = links
def get_links(self) -> str:
"""
Getter for links
Args
"""Get the system links.
Returns:
str: The links associated with the system.
"""
return self.links
def set_capabilities(self, capabilities):
"""
Setter for host capabilities
def set_capabilities(self, capabilities: dict):
"""Set the system capabilities from a dictionary.
Args:
capabilities (): the string of capabilities from the system host-list command (json format)
Returns:
capabilities (dict): Dictionary containing system capabilities data.
"""
system_capability = SystemCapabilitiesObject()
if 'region_config' in capabilities:
system_capability.set_region_config(capabilities['region_config'])
if 'vswitch_type' in capabilities:
system_capability.set_vswitch_type(capabilities['vswitch_type'])
if 'shared_services' in capabilities:
system_capability.set_shared_services(capabilities['shared_services'])
if 'sdn_enabled' in capabilities:
system_capability.set_sdn_enabled(capabilities['sdn_enabled'])
if 'https_enabled' in capabilities:
system_capability.set_https_enabled(capabilities['https_enabled'])
if 'bm_region' in capabilities:
system_capability.set_bm_region(capabilities['bm_region'])
if "region_config" in capabilities:
system_capability.set_region_config(capabilities["region_config"])
if "vswitch_type" in capabilities:
system_capability.set_vswitch_type(capabilities["vswitch_type"])
if "shared_services" in capabilities:
system_capability.set_shared_services(capabilities["shared_services"])
if "sdn_enabled" in capabilities:
system_capability.set_sdn_enabled(capabilities["sdn_enabled"])
if "https_enabled" in capabilities:
system_capability.set_https_enabled(capabilities["https_enabled"])
if "bm_region" in capabilities:
system_capability.set_bm_region(capabilities["bm_region"])
self.capabilities = capabilities
self.capabilities = system_capability
def get_capabilities(self) -> SystemCapabilitiesObject:
"""
Getter for capabilities
Returns:
"""Get the system capabilities object.
Returns:
SystemCapabilitiesObject: The system's capabilities object.
"""
return self.capabilities
def set_region_name(self, region_name: str):
"""
Setter for region_name
Args
region_name () - the region_name
"""Set the system region name.
Args:
region_name (str): The name of the region for this system.
"""
self.region_name = region_name
def get_region_name(self) -> str:
"""
Getter for region_name
Args
"""Get the system region name.
Returns:
str: The name of the region for this system.
"""
return self.region_name
def set_distributed_cloud_role(self, distributed_cloud_role: str):
"""
Setter for distributed_cloud_role
Args
distributed_cloud_role () - the distributed_cloud_role
"""Set the distributed cloud role.
Args:
distributed_cloud_role (str): The distributed cloud role of the system.
"""
self.distributed_cloud_role = distributed_cloud_role
def get_distributed_cloud_role(self) -> str:
"""
Getter for distributed_cloud_role
Args
"""Get the distributed cloud role.
Returns:
str: The distributed cloud role of the system.
"""
return self.distributed_cloud_role
def set_service_project_name(self, service_project_name: str):
"""
Setter for service_project_name
Args
service_project_name () - the service_project_name
"""Set the service project name.
Args:
service_project_name (str): The name of the service project.
"""
self.service_project_name = service_project_name
def get_service_project_name(self) -> str:
"""
Getter for service_project_name
Args
"""Get the service project name.
Returns:
str: The name of the service project.
"""
return self.service_project_name
def set_security_feature(self, security_feature: str):
"""
Setter for security_feature
Args
security_feature () - the security_feature
"""Set the security feature.
Args:
security_feature (str): The security feature of the system.
"""
self.security_feature = security_feature
def get_security_feature(self) -> str:
"""
Getter for security_feature
Args
"""Get the security feature.
Returns:
str: The security feature of the system.
"""
return self.security_feature
def set_created_at(self, created_at: str):
"""
Setter for created_at
Args
created_at () - the created_at
"""Set the creation timestamp.
Args:
created_at (str): The timestamp when the system was created.
"""
self.created_at = created_at
def get_created_at(self) -> str:
"""
Getter for created_at
Args
"""Get the creation timestamp.
Returns:
str: The timestamp when the system was created.
"""
return self.created_at
def set_updated_at(self, updated_at: str):
"""
Setter for updated_at
Args
updated_at () - the updated_at
"""Set the update timestamp.
Args:
updated_at (str): The timestamp when the system was last updated.
"""
self.updated_at = updated_at
def get_updated_at(self) -> str:
"""Get the update timestamp.
Returns:
str: The timestamp when the system was last updated.
"""
Getter for updated_at
Args
"""
return self.updated_at
return self.updated_at

View File

@@ -5,8 +5,8 @@ from keywords.cloud_platform.system.system_table_parser import SystemTableParser
class SystemApplicationListOutput:
"""
This class parses the output of the command 'system application-list'
"""This class parses the output of the command 'system application-list'.
The parsing result is a 'SystemApplicationListObject' instance.
Example:
@@ -27,11 +27,11 @@ class SystemApplicationListOutput:
"""
def __init__(self, system_application_list_output):
"""
Constructor
def __init__(self, system_application_list_output: str) -> None:
"""Initialize SystemApplicationListOutput with parsed application data.
Args:
system_application_list_output: the output of the command 'system application-list'.
system_application_list_output (str): The output of the 'system application-list' command.
"""
self.system_applications: [SystemApplicationListObject] = []
system_table_parser = SystemTableParser(system_application_list_output)
@@ -41,33 +41,36 @@ class SystemApplicationListOutput:
if self.is_valid_output(value):
self.system_applications.append(
SystemApplicationListObject(
value['application'],
value['version'],
value['manifest name'],
value['manifest file'],
value['status'],
value['progress'],
value["application"],
value["version"],
value["manifest name"],
value["manifest file"],
value["status"],
value["progress"],
)
)
else:
raise KeywordException(f"The output line {value} was not valid")
def get_applications(self) -> [SystemApplicationListObject]:
"""
Returns the list of applications objects
Returns:
def get_applications(self) -> list[SystemApplicationListObject]:
"""Get the list of all application objects.
Returns:
list[SystemApplicationListObject]: List of SystemApplicationListObject instances.
"""
return self.system_applications
def get_application(self, application: str) -> SystemApplicationListObject:
"""
Gets the given application
"""Get a specific application by name.
Args:
application (): the name of the application
application (str): The name of the application to retrieve.
Returns: the application object
Returns:
SystemApplicationListObject: The application object.
Raises:
KeywordException: If no application with the given name is found.
"""
applications = list(filter(lambda app: app.get_application() == application, self.system_applications))
if len(applications) == 0:
@@ -75,34 +78,46 @@ class SystemApplicationListOutput:
return applications[0]
@staticmethod
def is_valid_output(value):
"""
Checks to ensure the output has the correct keys
def application_exists(self, application: str) -> bool:
"""Check if an application exists in the list.
Args:
value (): the value to check
application (str): The name of the application to check.
Returns:
bool: True if the application exists, False otherwise.
"""
applications = list(filter(lambda app: app.get_application() == application, self.system_applications))
return len(applications) > 0
@staticmethod
def is_valid_output(value: dict) -> bool:
"""Validate that output contains all required keys.
Args:
value (dict): The parsed output value to validate.
Returns:
bool: True if the output is valid, False otherwise.
"""
valid = True
if 'application' not in value:
get_logger().log_error(f'application is not in the output value: {value}')
if "application" not in value:
get_logger().log_error(f"application is not in the output value: {value}")
valid = False
if 'version' not in value:
get_logger().log_error(f'version is not in the output value: {value}')
if "version" not in value:
get_logger().log_error(f"version is not in the output value: {value}")
valid = False
if 'manifest name' not in value:
get_logger().log_error(f'manifest name is not in the output value: {value}')
if "manifest name" not in value:
get_logger().log_error(f"manifest name is not in the output value: {value}")
valid = False
if 'manifest file' not in value:
get_logger().log_error(f'manifest file is not in the output value: {value}')
if "manifest file" not in value:
get_logger().log_error(f"manifest file is not in the output value: {value}")
valid = False
if 'status' not in value:
get_logger().log_error(f'status is not in the output value: {value}')
if "status" not in value:
get_logger().log_error(f"status is not in the output value: {value}")
valid = False
if 'progress' not in value:
get_logger().log_error(f'progress is not in the output value: {value}')
if "progress" not in value:
get_logger().log_error(f"progress is not in the output value: {value}")
valid = False
return valid

View File

@@ -1,63 +1,74 @@
from framework.rest.rest_response import RestResponse
from keywords.cloud_platform.system.host.objects.system_host_disk_object import SystemHostDiskObject
from keywords.cloud_platform.system.system_table_parser import SystemTableParser
class SystemHostDiskOutput:
"""
This class parses the output of 'system host-disk-list' commands into a list of SystemHostDiskObject
"""
"""Parses the output of 'system host-disk-list' commands into SystemHostDiskObject instances."""
def __init__(self, system_host_disk_output):
"""
Constructor
def __init__(self, system_host_disk_output: str | RestResponse) -> None:
"""Initialize SystemHostDiskOutput with parsed disk data.
Args:
system_host_disk_output: String output of 'system host-disk-list' command
system_host_disk_output (str | RestResponse): String output from 'system host-disk-list' command or RestResponse object.
"""
self.system_host_disks: list[SystemHostDiskObject] = []
self.system_host_disks: [SystemHostDiskObject] = []
system_table_parser = SystemTableParser(system_host_disk_output)
output_values = system_table_parser.get_output_values_list()
if isinstance(system_host_disk_output, RestResponse): # came from REST and is already in dict form
json_object = system_host_disk_output.get_json_content()
if "idisks" in json_object:
disks = json_object["idisks"]
else:
disks = [json_object]
else:
system_table_parser = SystemTableParser(system_host_disk_output)
disks = system_table_parser.get_output_values_list()
for value in output_values:
for value in disks:
system_host_disk_object = SystemHostDiskObject()
if 'uuid' in value:
system_host_disk_object.set_uuid(value['uuid'])
if "uuid" in value:
system_host_disk_object.set_uuid(value["uuid"])
if 'device_node' in value:
system_host_disk_object.set_device_node(value['device_node'])
if "device_node" in value:
system_host_disk_object.set_device_node(value["device_node"])
if 'device_num' in value:
system_host_disk_object.set_device_num(value['device_num'])
if "device_num" in value:
system_host_disk_object.set_device_num(value["device_num"])
if 'device_type' in value:
system_host_disk_object.set_device_type(value['device_type'])
if "device_type" in value:
system_host_disk_object.set_device_type(value["device_type"])
if 'size_gib' in value:
system_host_disk_object.set_size_gib(float(value['size_gib']))
if "size_gib" in value:
system_host_disk_object.set_size_gib(float(value["size_gib"]))
if "size_mib" in value:
system_host_disk_object.set_size_gib(float(value["size_mib"]) / 1024.0)
if 'available_gib' in value:
system_host_disk_object.set_available_gib(float(value['available_gib']))
if "available_gib" in value:
system_host_disk_object.set_available_gib(float(value["available_gib"]))
if 'rpm' in value:
system_host_disk_object.set_rpm(value['rpm'])
if "available_mib" in value:
system_host_disk_object.set_available_gib(float(value["available_mib"]) / 1024.0)
if 'serial_id' in value:
system_host_disk_object.set_serial_id(value['serial_id'])
if "rpm" in value:
system_host_disk_object.set_rpm(value["rpm"])
if 'device_path' in value:
system_host_disk_object.set_device_path(value['device_path'])
if "serial_id" in value:
system_host_disk_object.set_serial_id(value["serial_id"])
if "device_path" in value:
system_host_disk_object.set_device_path(value["device_path"])
self.system_host_disks.append(system_host_disk_object)
def has_minimum_disk_space_in_gb(self, minimum_disk_space_in_gb: float) -> bool:
"""
This function will look for a disk in the list of disks with at least <minimum_disk_space_in_gb> GB
of free space.
"""Check if any disk has at least the specified amount of free space.
Returns: True if this host has at least one disk with at least <minimum_disk_space_in_gb> GB of free space.
Args:
minimum_disk_space_in_gb (float): The minimum required free disk space in GB.
Returns:
bool: True if at least one disk has the minimum required free space, False otherwise.
"""
return any(item.get_available_gib() >= minimum_disk_space_in_gb for item in self.system_host_disks)

View File

@@ -1,4 +1,6 @@
from framework.exceptions.keyword_exception import KeywordException
from framework.rest.rest_response import RestResponse
from keywords.cloud_platform.system.host.objects.storage_capabilities_object import StorageCapabilities
from keywords.cloud_platform.system.storage.objects.system_storage_backend_object import SystemStorageBackendObject
from keywords.cloud_platform.system.system_table_parser import SystemTableParser
@@ -24,11 +26,19 @@ class SystemStorageBackendOutput:
"""
self.system_storage_backends: [SystemStorageBackendObject] = []
system_table_parser = SystemTableParser(system_storage_backend_list_output)
output_values = system_table_parser.get_output_values_list()
if isinstance(system_storage_backend_list_output, RestResponse): # came from REST and is already in dict format
json_object = system_storage_backend_list_output.get_json_content()
if "storage_backends" in json_object:
storage_backends = json_object["storage_backends"]
else:
storage_backends = [json_object]
else:
system_table_parser = SystemTableParser(system_storage_backend_list_output)
storage_backends = system_table_parser.get_output_values_list()
system_storage_backend_object = None
for value in output_values:
for value in storage_backends:
if "name" not in value:
raise KeywordException(f"The output line {value} was not valid because it is missing an 'name'.")
@@ -58,7 +68,18 @@ class SystemStorageBackendOutput:
system_storage_backend_object.set_services(value["services"])
if "capabilities" in value:
system_storage_backend_object.add_capabilities(value["capabilities"])
storage_capabilities = value["capabilities"]
# if it's from REST, then we are already in dict format
if isinstance(storage_capabilities, dict):
storage_capabilities_object = StorageCapabilities()
if "replication" in storage_capabilities:
storage_capabilities_object.set_replication(storage_capabilities["replication"])
if "min_replication" in storage_capabilities:
storage_capabilities_object.set_min_replication(storage_capabilities["min_replication"])
if "deployment_model" in storage_capabilities:
storage_capabilities_object.set_deployment_model(storage_capabilities["deployment_model"])
else:
system_storage_backend_object.add_capabilities(storage_capabilities)
self.system_storage_backends.append(system_storage_backend_object)

View File

@@ -5,84 +5,42 @@ from optparse import OptionParser
import json5
from config.configuration_file_locations_manager import (
ConfigurationFileLocationsManager,
)
from config.configuration_file_locations_manager import ConfigurationFileLocationsManager
from config.configuration_manager import ConfigurationManager
from config.lab.objects.lab_config import LabConfig
from config.lab.objects.node import Node
from framework.database.objects.lab_capability import LabCapability
from framework.database.operations.capability_operation import CapabilityOperation
from framework.database.operations.lab_capability_operation import (
LabCapabilityOperation,
)
from framework.database.operations.lab_capability_operation import LabCapabilityOperation
from framework.database.operations.lab_operation import LabOperation
from framework.logging.automation_logger import get_logger
from framework.ssh.prompt_response import PromptResponse
from framework.ssh.ssh_connection import SSHConnection
from keywords.bmc.ipmitool.is_ipmitool_keywords import IsIPMIToolKeywords
from keywords.bmc.ipmitool.sensor.ipmitool_sensor_keywords import IPMIToolSensorKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import (
DcManagerSubcloudListKeywords,
)
from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_availability_enum import (
DcManagerSubcloudListAvailabilityEnum,
)
from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_management_enum import (
DcManagerSubcloudListManagementEnum,
)
from keywords.cloud_platform.openstack.endpoint.openstack_endpoint_list_keywords import (
OpenStackEndpointListKeywords,
)
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_cpus_keywords import (
GetHostsCpusKeywords,
)
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import (
GetHostsKeywords,
)
from keywords.cloud_platform.rest.bare_metal.memory.get_host_memory_keywords import (
GetHostMemoryKeywords,
)
from keywords.cloud_platform.rest.bare_metal.ports.get_host_ports_keywords import (
GetHostPortsKeywords,
)
from keywords.cloud_platform.rest.configuration.devices.system_host_device_keywords import (
GetHostDevicesKeywords,
)
from keywords.cloud_platform.rest.configuration.interfaces.get_interfaces_keywords import (
GetInterfacesKeywords,
)
from keywords.cloud_platform.rest.configuration.storage.get_storage_keywords import (
GetStorageKeywords,
)
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_availability_enum import DcManagerSubcloudListAvailabilityEnum
from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_management_enum import DcManagerSubcloudListManagementEnum
from keywords.cloud_platform.rest.bare_metal.disks.get_host_disks_keywords import GetHostDisksKeywords
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_cpus_keywords import GetHostsCpusKeywords
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import GetHostsKeywords
from keywords.cloud_platform.rest.bare_metal.memory.get_host_memory_keywords import GetHostMemoryKeywords
from keywords.cloud_platform.rest.bare_metal.ports.get_host_ports_keywords import GetHostPortsKeywords
from keywords.cloud_platform.rest.configuration.addresses.get_host_addresses_keywords import GetHostAddressesKeywords
from keywords.cloud_platform.rest.configuration.devices.system_host_device_keywords import GetHostDevicesKeywords
from keywords.cloud_platform.rest.configuration.interfaces.get_interfaces_keywords import GetInterfacesKeywords
from keywords.cloud_platform.rest.configuration.storage.get_storage_backends_keyword import GetStorageBackendKeywords
from keywords.cloud_platform.rest.configuration.storage.get_storage_keywords import GetStorageKeywords
from keywords.cloud_platform.rest.configuration.system.get_system_keywords import GetSystemKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.host.objects.system_host_if_output import (
SystemHostInterfaceOutput,
)
from keywords.cloud_platform.system.host.objects.system_host_show_output import (
SystemHostShowOutput,
)
from keywords.cloud_platform.system.host.system_host_disk_keywords import (
SystemHostDiskKeywords,
)
from keywords.cloud_platform.system.host.system_host_list_keywords import (
SystemHostListKeywords,
)
from keywords.cloud_platform.system.host.system_host_show_keywords import (
SystemHostShowKeywords,
)
from keywords.cloud_platform.system.oam.objects.system_oam_show_output import (
SystemOamShowOutput,
)
from keywords.cloud_platform.system.oam.system_oam_show_keywords import (
SystemOamShowKeywords,
)
from keywords.cloud_platform.system.host.objects.system_host_if_output import SystemHostInterfaceOutput
from keywords.cloud_platform.system.oam.objects.system_oam_show_output import SystemOamShowOutput
from keywords.cloud_platform.system.oam.system_oam_show_keywords import SystemOamShowKeywords
from testcases.conftest import log_configuration
def find_capabilities(lab_config: LabConfig) -> list[str]:
"""
Finds the capabilities of the given lab.
"""Find the capabilities of the given lab.
Args:
lab_config (LabConfig): The lab configuration object.
@@ -93,10 +51,12 @@ def find_capabilities(lab_config: LabConfig) -> list[str]:
lab_config.lab_capabilities = []
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list()
lab_config.set_horizon_url(endpoint_output.get_horizon_url())
lab_config.set_horizon_url(get_horizon_url())
is_dc_system = endpoint_output.is_endpoint("dcmanager")
# Check if this is a DC system by checking distributed_cloud_role
system_output = GetSystemKeywords().get_system()
system_object = system_output.get_system_object()
is_dc_system = system_object.get_distributed_cloud_role() == "systemcontroller"
if is_dc_system:
subclouds = retrieve_subclouds(lab_config, ssh_connection)
lab_config.set_subclouds(subclouds[:])
@@ -113,8 +73,7 @@ def find_capabilities(lab_config: LabConfig) -> list[str]:
def find_subclouds_capabilities(lab_config: LabConfig) -> list[str]:
"""
Finds the capabilities of the subclouds from the given lab.
"""Find the capabilities of the subclouds from the given lab.
Args:
lab_config (LabConfig): The lab configuration object.
@@ -149,8 +108,7 @@ def find_subclouds_capabilities(lab_config: LabConfig) -> list[str]:
def get_subcloud_name_from_path(subcloud_config_file_path: str) -> str:
"""
Returns the name of the cloud from a subcloud's config file path.
"""Extract the subcloud name from a config file path.
Args:
subcloud_config_file_path (str): The subcloud config file path.
@@ -164,19 +122,15 @@ def get_subcloud_name_from_path(subcloud_config_file_path: str) -> str:
def create_subcloud_config_file_if_needed(host: LabConfig, subcloud_name: str, subcloud_config_file_path: str) -> None:
"""
Creates a new config file for the related subcloud in the given path.
"""Create a new config file for the subcloud if it doesn't exist.
Args:
host (LabConfig): The host lab configuration.
subcloud_name (str): The name of the subcloud.
subcloud_config_file_path (str): The subcloud's config file path.
Returns:
None:
Note:
The initial content of this created file is the main part of the host config file, but with the IP empty.
The initial content is based on the host config but with empty IP.
"""
if os.path.isfile(subcloud_config_file_path):
return
@@ -194,8 +148,7 @@ def create_subcloud_config_file_if_needed(host: LabConfig, subcloud_name: str, s
def is_sriov(host_interface_list_output: SystemHostInterfaceOutput) -> bool:
"""
Returns True if SR-IOV is enabled on the given node.
"""Check if SR-IOV is enabled on the given node.
Args:
host_interface_list_output (SystemHostInterfaceOutput): Output of the system host interface list command.
@@ -209,41 +162,11 @@ def is_sriov(host_interface_list_output: SystemHostInterfaceOutput) -> bool:
return False
def has_min_space_30G(ssh_connection: SSHConnection, node: Node) -> bool:
"""
Returns true if the node has at least 30 GB of free space in one of its disks.
Args:
ssh_connection (SSHConnection): The SSH connection to the node.
node (Node): the node
Returns:
bool: True if the node has at least 30 GB of free space in one of its disks,, False otherwise.
"""
host_disk_output = SystemHostDiskKeywords(ssh_connection).get_system_host_disk_list(node.get_name())
return host_disk_output.has_minimum_disk_space_in_gb(30)
def get_host_show_output(ssh_connection: SSHConnection, node: Node) -> SystemHostShowOutput:
"""
Returns an object of SystemHostShowOutput. This object represents the output of the 'system host-show' command.
Args:
ssh_connection (SSHConnection): The SSH connection to execute the command.
node (Node): The node whose details are being retrieved.
Returns:
SystemHostShowOutput: An object representing the output of the 'system host-show' command.
"""
return SystemHostShowKeywords(ssh_connection).get_system_host_show_output(node.get_name())
def has_host_bmc_sensor(ssh_connection: SSHConnection) -> bool:
"""
Returns True if the node has BMC sensors.
"""Check if the node has BMC sensors.
Args:
ssh_connection (SSHConnection): The SSH connection.
ssh_connection (SSHConnection): The SSH connection to the host.
Returns:
bool: True if the node has BMC sensors, False otherwise.
@@ -259,18 +182,16 @@ def has_host_bmc_sensor(ssh_connection: SSHConnection) -> bool:
def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[LabConfig]:
"""
Gets the list of subclouds on this lab.
"""Get the list of online and managed subclouds.
Subclouds whose 'availability' is different from 'online' and 'management' is different from 'managed' are not
considered.
Only subclouds with 'availability' = 'online' and 'management' = 'managed' are considered.
Args:
lab_config (LabConfig): The lab config object.
ssh_connection (SSHConnection): Connection to the active controller of the central cloud.
Returns:
list[LabConfig]: the list of LabConfig objects matching the online and available subclouds of this lab.
list[LabConfig]: List of LabConfig objects for online and managed subclouds.
"""
subclouds: [LabConfig] = []
@@ -305,15 +226,14 @@ def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) ->
def get_subcloud_ip(subcloud_name: str, central_cloud_ssh_connection: SSHConnection) -> str:
"""
Gets the external IP associated with the 'subcloud_name'.
"""Get the external IP address of a subcloud.
Args:
subcloud_name (str): The name of the cloud from which one wants to obtain the IP.
central_cloud_ssh_connection (SSHConnection): The SSH connection to a central cloud.
subcloud_name (str): The name of the subcloud.
central_cloud_ssh_connection (SSHConnection): SSH connection to the central cloud.
Returns:
str: subcloud's IP.
str: The subcloud's IP address.
"""
# Executes the command 'system oam-show' on the subcloud to get the subcloud's IP.
password_prompt = PromptResponse("password:", ConfigurationManager.get_lab_config().get_admin_credentials().get_password())
@@ -363,12 +283,13 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod
Raises:
RuntimeError: If no controller node is found in the lab.
"""
hosts = SystemHostListKeywords(ssh_connection).get_system_host_with_extra_column(["subfunctions", "bm_ip", "bm_username"])
host_show_output = GetHostsKeywords().get_hosts()
hosts = host_show_output.get_all_system_host_show_objects()
nodes = []
# Count the controllers to decide if the lab is Simplex or not.
controllers_count = 0
for host in hosts.get_hosts():
for host in hosts:
if host.get_personality() == "controller":
controllers_count += 1
@@ -380,22 +301,22 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod
raise RuntimeError("Failed to find at least one controller on this lab.")
# Look at the Capabilities of each host individually.
for host in hosts.get_hosts():
for host in hosts:
name = host.get_host_name()
name = host.get_hostname()
node_dict = {
"ip": get_controller_ip(ssh_connection, name),
"ip": get_controller_ip(host_show_output, name),
"node_type": host.get_personality(),
"node_capabilities": [],
}
node = Node(host.get_host_name(), node_dict)
node = Node(host.get_hostname(), node_dict)
node.set_subfunctions(host.get_sub_functions())
node.set_subfunctions(host.get_subfunctions())
node.set_bm_username(host.get_bm_username())
node.set_bm_ip(host.get_bm_ip())
# Gather data from the system into objects.
host_show_output = GetHostsKeywords().get_hosts()
host_uuid = host_show_output.get_system_host_show_object(node.get_name()).get_uuid()
host_cpu_output = GetHostsCpusKeywords().get_hosts_cpus(host_uuid)
@@ -404,6 +325,7 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod
host_port_output = GetHostPortsKeywords().get_ports(host_uuid)
host_memory_output = GetHostMemoryKeywords().get_memory(host_uuid)
host_storage_output = GetStorageKeywords().get_storage(host_uuid)
host_disk_output = GetHostDisksKeywords().get_disks(host_uuid)
# Parse the data to define the lab's capabilities.
if is_sriov(host_interface_list_output):
@@ -463,7 +385,7 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod
node.append_node_capability("lab_has_columbiaville")
lab_config.add_lab_capability("lab_has_columbiaville")
if has_min_space_30G(ssh_connection, node):
if host_disk_output.has_minimum_disk_space_in_gb(30):
node.append_node_capability("lab_has_min_space_30G")
lab_config.add_lab_capability("lab_has_min_space_30G")
@@ -511,47 +433,48 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod
return nodes
def get_controller_ip(ssh_connection: SSHConnection, controller_name: str) -> str | None:
"""
Getter for controller ip.
def get_controller_ip(host_show_output: object, controller_name: str) -> str | None:
"""Get the IP address of a controller.
Args:
ssh_connection (SSHConnection): The SSH connection to the lab.
host_show_output (object): The host show output object.
controller_name (str): The name of the controller.
Returns:
str | None: The IP address of the controller, or None if not found.
"""
system_oam_output = SystemOamShowKeywords(ssh_connection).oam_show()
host_uuid = host_show_output.get_system_host_show_object(controller_name).get_uuid()
if controller_name == "controller-0":
return system_oam_output.get_oam_c0_ip()
elif controller_name == "controller-1":
return system_oam_output.get_oam_c1_ip()
else:
get_logger().log_error(f"GET IP for {controller_name} has not been implemented")
if host_uuid is None:
get_logger().log_error(f"Host {controller_name} not found")
return None
return None
host_addresses_output = GetHostAddressesKeywords().get_host_addresses(host_uuid)
return host_addresses_output.get_address_by_ifname("oam0")
def get_horizon_url() -> str:
"""
Getter for Horizon URL.
"""Get the Horizon URL using system CLI commands.
Returns:
str: The formatted Horizon URL.
"""
# Get system capabilities to check if HTTPS is enabled
system_output = GetSystemKeywords().get_system()
system_capabilities = system_output.get_system_object().get_capabilities()
https_enabled = system_capabilities.get_https_enabled() if system_capabilities else False
# Get OAM IP information using system CLI
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list()
oam_show_output = SystemOamShowKeywords(ssh_connection).oam_show()
# Remove port from orignal url and then add 8443 for https or 8080 for http
url = endpoint_output.get_endpoint("keystone", "public").get_url().rsplit(":", 1)[0]
if "https" in url:
url += ":8443/"
else:
url += ":8080/"
# Get the appropriate IP (oam_ip for vbox, oam_floating_ip for physical)
oam_ip = oam_show_output.get_oam_ip() or oam_show_output.get_oam_floating_ip()
return url
# Construct Horizon URL
protocol = "https" if https_enabled else "http"
port = "8443" if https_enabled else "8080"
return f"{protocol}://{oam_ip}:{port}/"
def get_lab_type(lab_config: LabConfig) -> str:
@@ -587,6 +510,32 @@ def get_lab_type(lab_config: LabConfig) -> str:
return "Duplex"
def is_aio(lab_config: LabConfig) -> bool:
"""Checks if the lab is AIO.
Args:
lab_config (LabConfig): The lab configuration object.
Returns:
bool: True if the lab is AIO, False otherwise.
"""
nodes = lab_config.get_nodes()
worker_nodes = list(filter(lambda node: node.get_type() == "worker", nodes))
return len(worker_nodes) == 0
def is_rook_ceph() -> bool:
"""
Checks if the lab is using Rook Ceph.
Returns:
bool: True if the lab is using Rook Ceph, False otherwise.
"""
backends = GetStorageBackendKeywords().get_storage_backends()
return backends.is_backend_configured("ceph-rook")
def write_config(lab_config: LabConfig) -> None:
"""
Writes the new config out to the current config
@@ -764,6 +713,14 @@ if __name__ == "__main__":
lab_type = get_lab_type(lab_config)
lab_config.set_lab_type(lab_type)
# check if the lab is an aio
if is_aio(lab_config):
lab_config.add_lab_capability("lab_is_aio")
# check if the lab is using rook ceph
if is_rook_ceph():
lab_config.add_lab_capability("lab_has_rook_ceph")
if ConfigurationManager.get_database_config().use_database():
# insert lab into db if it doesn't already exist
lab_name = lab_config.get_lab_name()