dcmanager subcloud-group list-subclouds keyword

Change-Id: I3a1cf754b80463cca1d071ac70b41fd5aa84fc39
This commit is contained in:
Rity Menon
2025-03-03 10:44:19 -08:00
parent cce7427d14
commit f2d653a68d
3 changed files with 247 additions and 0 deletions

View File

@@ -1,6 +1,9 @@
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_group_list_subcloud_output import (
DcmanagerSubcloudGroupListSubcloudOutput,
)
from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_group_output import (
DcmanagerSubcloudGroupOutput,
)
@@ -94,3 +97,18 @@ class DcmanagerSubcloudGroupKeywords(BaseKeyword):
output = self.ssh_connection.send(source_openrc(f"dcmanager subcloud-group update {group_name} --{update_attr} '{update_value}'"))
self.validate_success_return_code(self.ssh_connection)
return DcmanagerSubcloudGroupShowOutput(output)
def get_dcmanager_subcloud_group_list_subclouds(self, group_id: str) -> DcmanagerSubcloudGroupListSubcloudOutput:
"""
Gets the dcmanager subcloud-group list-subclouds.
Args:
group_id (str): a str representing a subcloud-group's id.
Returns:
DcmanagerSubcloudGroupListSubcloudOutput: An object containing the list of subcloud groups.
"""
command = source_openrc(f"dcmanager subcloud-group list-subclouds {group_id}")
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
return DcmanagerSubcloudGroupListSubcloudOutput(output)

View File

@@ -0,0 +1,85 @@
from typing import Dict, List
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.dcmanager.dcmanager_table_parser import (
DcManagerTableParser,
)
from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_group_object import (
DcmanagerSubcloudGroupObject,
)
class DcmanagerSubcloudGroupListSubcloudOutput:
"""
Parses the output of the 'dcmanager subcloud-group list-subclouds' command into a list of DcmanagerSubcloudGroupObject instances.
"""
def __init__(self, dcmanager_output: str) -> None:
"""
Initializes DcmanagerSubcloudGroupOutput.
Args:
dcmanager_output (str): Output of the 'dcmanager subcloud-group list-subclouds' command.
Raises:
KeywordException: If the output format is invalid.
"""
self.dcmanager_subcloud_group: List[DcmanagerSubcloudGroupObject] = []
dc_table_parser = DcManagerTableParser(dcmanager_output)
output_values = dc_table_parser.get_output_values_list()
for value in output_values:
if self.is_valid_output(value):
dcmanager_subcloud_group = DcmanagerSubcloudGroupObject()
dcmanager_subcloud_group.set_id(value["id"])
dcmanager_subcloud_group.set_name(value["name"])
dcmanager_subcloud_group.set_description(value["description"])
dcmanager_subcloud_group.set_location(value["location"])
dcmanager_subcloud_group.set_software_version(value["software_version"])
dcmanager_subcloud_group.set_management(value["management"])
dcmanager_subcloud_group.set_availability(value["availability"])
dcmanager_subcloud_group.set_deploy_status(value["deploy_status"])
dcmanager_subcloud_group.set_management_subnet(value["management_subnet"])
dcmanager_subcloud_group.set_management_start_ip(value["management_start_ip"])
dcmanager_subcloud_group.set_management_end_ip(value["management_end_ip"])
dcmanager_subcloud_group.set_management_gateway_ip(value["management_gateway_ip"])
dcmanager_subcloud_group.set_systemcontroller_gateway_ip(value["systemcontroller_gateway_ip"])
dcmanager_subcloud_group.set_group_id(value["group_id"])
dcmanager_subcloud_group.set_peer_group_id(value["peer_group_id"])
dcmanager_subcloud_group.set_created_at(value["created_at"])
dcmanager_subcloud_group.set_updated_at(value["updated_at"])
dcmanager_subcloud_group.set_backup_status(value["backup_status"])
dcmanager_subcloud_group.set_backup_datetime(value["backup_datetime"])
dcmanager_subcloud_group.set_prestage_status(value["prestage_status"])
dcmanager_subcloud_group.set_prestage_versions(value["prestage_versions"])
self.dcmanager_subcloud_group.append(dcmanager_subcloud_group)
else:
raise KeywordException(f"The output line {value} was not valid")
def get_dcmanager_subcloud_group_list_subclouds(self) -> List[DcmanagerSubcloudGroupObject]:
"""
Retrieves the parsed dcmanager subcloud-group list-subclouds.
Returns:
List[DcmanagerSubcloudGroupObject]: A list of parsed DcmanagerSubcloudGroupObject instances.
"""
return self.dcmanager_subcloud_group
@staticmethod
def is_valid_output(value: Dict[str, str]) -> bool:
"""
Checks if the output dictionary contains all required fields.
Args:
value (Dict[str, str]): The dictionary of output values.
Returns:
bool: True if the output contains all required fields, False otherwise.
"""
required_fields = ["id", "name", "description", "location", "software_version", "management", "availability", "deploy_status", "management_subnet", "management_start_ip", "management_end_ip", "management_gateway_ip", "systemcontroller_gateway_ip", "group_id", "peer_group_id", "created_at", "updated_at", "backup_status", "backup_datetime", "prestage_status", "prestage_versions"]
for field in required_fields:
if field not in value:
get_logger().log_error(f"{field} is not in the output value")
return False
return True

View File

@@ -15,6 +15,22 @@ class DcmanagerSubcloudGroupObject:
self.max_parallel_subclouds: int = -1
self.created_at: Optional[str] = None
self.updated_at: Optional[str] = None
self.location: Optional[str] = None
self.software_version: Optional[str] = None
self.management: Optional[str] = None
self.availability: Optional[str] = None
self.deploy_status: Optional[str] = None
self.management_subnet: Optional[str] = None
self.management_start_ip: Optional[str] = None
self.management_end_ip: Optional[str] = None
self.management_gateway_ip: Optional[str] = None
self.systemcontroller_gateway_ip: Optional[str] = None
self.group_id: int = -1
self.peer_group_id: Optional[str] = None
self.backup_status: Optional[str] = None
self.backup_datetime: Optional[str] = None
self.prestage_status: Optional[str] = None
self.prestage_versions: Optional[str] = None
def set_id(self, group_id: int) -> None:
"""Sets the ID of the subcloud group."""
@@ -71,3 +87,131 @@ class DcmanagerSubcloudGroupObject:
def get_updated_at(self) -> Optional[str]:
"""Gets the last updated timestamp of the subcloud group."""
return self.updated_at
def set_location(self, location: str) -> None:
"""Sets the location of the subcloud."""
self.location = location
def get_location(self) -> Optional[str]:
"""Gets the location of the subcloud."""
return self.location
def set_software_version(self, software_version: str) -> None:
"""Sets the software_version of the subcloud."""
self.software_version = software_version
def get_software_version(self) -> Optional[str]:
"""Gets the software_version of the subcloud."""
return self.software_version
def set_management(self, management: str) -> None:
"""Sets the management of the subcloud."""
self.management = management
def get_management(self) -> Optional[str]:
"""Gets the management of the subcloud."""
return self.management
def set_availability(self, availability: str) -> None:
"""Sets the availability of the subcloud."""
self.availability = availability
def get_availability(self) -> Optional[str]:
"""Gets the availability of the subcloud."""
return self.availability
def set_deploy_status(self, deploy_status: str) -> None:
"""Sets the deploy_status of the subcloud."""
self.deploy_status = deploy_status
def get_deploy_status(self) -> Optional[str]:
"""Gets the deploy_status of the subcloud."""
return self.deploy_status
def set_management_subnet(self, management_subnet: str) -> None:
"""Sets the management_subnet of the subcloud."""
self.management_subnet = management_subnet
def get_management_subnet(self) -> Optional[str]:
"""Gets the management_subnet of the subcloud."""
return self.management_subnet
def set_management_start_ip(self, management_start_ip: str) -> None:
"""Sets the management_start_ip of the subcloud."""
self.management_start_ip = management_start_ip
def get_management_start_ip(self) -> Optional[str]:
"""Gets the management_start_ip of the subcloud."""
return self.management_start_ip
def set_management_end_ip(self, management_end_ip: str) -> None:
"""Sets the management_end_ip of the subcloud."""
self.management_end_ip = management_end_ip
def get_management_end_ip(self) -> Optional[str]:
"""Gets the management_end_ip of the subcloud."""
return self.management_end_ip
def set_management_gateway_ip(self, management_gateway_ip: str) -> None:
"""Sets the management_gateway_ip of the subcloud."""
self.management_gateway_ip = management_gateway_ip
def get_management_gateway_ip(self) -> Optional[str]:
"""Gets the management_gateway_ip of the subcloud."""
return self.management_gateway_ip
def set_systemcontroller_gateway_ip(self, systemcontroller_gateway_ip: str) -> None:
"""Sets the systemcontroller_gateway_ip of the subcloud."""
self.systemcontroller_gateway_ip = systemcontroller_gateway_ip
def get_systemcontroller_gateway_ip(self) -> Optional[str]:
"""Gets the systemcontroller_gateway_ip of the subcloud."""
return self.systemcontroller_gateway_ip
def set_group_id(self, group_id: int) -> None:
"""Sets the group_id of the subcloud."""
self.group_id = group_id
def get_group_id(self) -> int:
"""Gets the group_id of the subcloud."""
return self.group_id
def set_peer_group_id(self, peer_group_id: str) -> None:
"""Sets the peer_group_id of the subcloud."""
self.peer_group_id = peer_group_id
def get_peer_group_id(self) -> Optional[str]:
"""Gets the peer_group_id of the subcloud."""
return self.peer_group_id
def set_backup_status(self, backup_status: str) -> None:
"""Sets the backup_status of the subcloud."""
self.backup_status = backup_status
def get_backup_status(self) -> Optional[str]:
"""Gets the backup_status of the subcloud."""
return self.backup_status
def set_backup_datetime(self, backup_datetime: str) -> None:
"""Sets the backup_datetime of the subcloud."""
self.backup_datetime = backup_datetime
def get_backup_datetime(self) -> Optional[str]:
"""Gets the backup_datetime of the subcloud."""
return self.backup_datetime
def set_prestage_status(self, prestage_status: str) -> None:
"""Sets the prestage_status of the subcloud."""
self.prestage_status = prestage_status
def get_prestage_status(self) -> Optional[str]:
"""Gets the prestage_status of the subcloud."""
return self.prestage_status
def set_prestage_versions(self, prestage_versions: str) -> None:
"""Sets the prestage_versions of the subcloud."""
self.prestage_versions = prestage_versions
def get_prestage_versions(self) -> Optional[str]:
"""Gets the prestage_versions of the subcloud."""
return self.prestage_versions