Migrate backup test cases under test_delete_subcloud_backup.py (2/2)

- migrate backup test cases from old framework to new framework.
  - Add test case for test_delete_backup_group_on_central
  - Implement keywords related to backup operations.

Change-Id: I3b3c654adbc4b137e28ee8582060a7864528c4e3
Signed-off-by: Swapna Gorre <swapna.gorre@windriver.com>
This commit is contained in:
Swapna Gorre
2025-03-04 06:55:53 -05:00
parent b4bab62043
commit 70f3f930c6
4 changed files with 422 additions and 242 deletions

View File

@@ -25,12 +25,14 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
self, self,
sysadmin_password: str, sysadmin_password: str,
con_ssh: SSHConnection, con_ssh: SSHConnection,
path: str, path: Optional[str] = None,
subcloud: Optional[str] = None, subcloud: Optional[str] = None,
local_only: bool = False, local_only: bool = False,
backup_yaml: Optional[str] = None, backup_yaml: Optional[str] = None,
group: Optional[str] = None, group: Optional[str] = None,
registry: bool = False, registry: bool = False,
release: Optional[str] = None,
subcloud_list: Optional[list] = None,
) -> None: ) -> None:
""" """
Creates a backup of the specified subcloud. Creates a backup of the specified subcloud.
@@ -38,20 +40,20 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
Args: Args:
sysadmin_password (str): Subcloud sysadmin password needed for backup creation. sysadmin_password (str): Subcloud sysadmin password needed for backup creation.
con_ssh (SSHConnection): SSH connection to execute the command (central_ssh or subcloud_ssh). con_ssh (SSHConnection): SSH connection to execute the command (central_ssh or subcloud_ssh).
path (str): The directory path where the backup file will be checked. path (Optional[str]): The directory path where the backup file will be checked.
subcloud (Optional[str]): The name of the subcloud to backup. Defaults to None. subcloud (Optional[str]): The name of the subcloud to backup. Defaults to None.
local_only (bool): If True, backup will be stored only in the subcloud. Defaults to False. local_only (bool): If True, backup will be stored only in the subcloud. Defaults to False.
backup_yaml (Optional[str]): path to use the yaml file. Defaults to None. backup_yaml (Optional[str]): path to use the yaml file. Defaults to None.
group (Optional[str]): Subcloud group name to create backup. Defaults to None. group (Optional[str]): Subcloud group name to create backup. Defaults to None.
registry (bool): Option to add the registry backup in the same task. Defaults to False. registry (bool): Option to add the registry backup in the same task. Defaults to False.
release (Optional[str]): Release version required to check backup. Defaults to None.
subcloud_list (Optional[list]): List of subcloud names when backing up a group. Defaults to None.
Returns: Returns:
None: None:
""" """
# Command construction # Command construction
cmd = ( cmd = f"dcmanager subcloud-backup create --sysadmin-password {sysadmin_password}"
f"dcmanager subcloud-backup create --sysadmin-password {sysadmin_password}"
)
if subcloud: if subcloud:
cmd += f" --subcloud {subcloud}" cmd += f" --subcloud {subcloud}"
if local_only: if local_only:
@@ -65,8 +67,12 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
self.ssh_connection.send(source_openrc(cmd)) self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection) self.validate_success_return_code(self.ssh_connection)
if group:
# Use wait_for_backup_creation to ensure the file is created # Use wait_for_backup_creation to ensure the file is created
for subcloud_name in subcloud_list:
central_path = f"/opt/dc-vault/backups/{subcloud_name}/{release}"
self.wait_for_backup_creation(con_ssh, central_path, subcloud_name)
else:
self.wait_for_backup_creation(con_ssh, path, subcloud) self.wait_for_backup_creation(con_ssh, path, subcloud)
def wait_for_backup_creation( def wait_for_backup_creation(
@@ -115,24 +121,26 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
def delete_subcloud_backup( def delete_subcloud_backup(
self, self,
con_ssh: SSHConnection, con_ssh: SSHConnection,
path: str,
release: str, release: str,
path: Optional[str] = None,
subcloud: Optional[str] = None, subcloud: Optional[str] = None,
local_only: bool = False, local_only: bool = False,
group: Optional[str] = None, group: Optional[str] = None,
sysadmin_password: str = None, sysadmin_password: str = None,
subcloud_list: Optional[list] = None,
) -> None: ) -> None:
""" """
Sends the command to delete the backup of the specified subcloud and waits for confirmation of its deletion. Sends the command to delete the backup of the specified subcloud and waits for confirmation of its deletion.
Args: Args:
con_ssh (SSHConnection): SSH connection to execute the command (central_ssh or subcloud_ssh). con_ssh (SSHConnection): SSH connection to execute the command (central_ssh or subcloud_ssh).
path (str): The path where the backup file is located.
release (str): Required to delete a release backup. release (str): Required to delete a release backup.
path (Optional[str]): The path where the backup file is located. Defaults to None.
subcloud (Optional[str]): The name of the subcloud to delete the backup. Defaults to None. subcloud (Optional[str]): The name of the subcloud to delete the backup. Defaults to None.
local_only (bool): If True, only deletes the local backup in the subcloud. Defaults to False. local_only (bool): If True, only deletes the local backup in the subcloud. Defaults to False.
group (Optional[str]): Subcloud group name to delete backup. Defaults to None. group (Optional[str]): Subcloud group name to delete backup. Defaults to None.
sysadmin_password (str): Subcloud sysadmin password needed for deletion on local_path. Defaults to None. sysadmin_password (str): Subcloud sysadmin password needed for deletion on local_path. Defaults to None.
subcloud_list (Optional[list]): List of subcloud names when deleting backups for a group. Defaults to None.
Returns: Returns:
None: None:
@@ -151,12 +159,14 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
self.ssh_connection.send(source_openrc(cmd)) self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection) self.validate_success_return_code(self.ssh_connection)
# Call wait_for_backup_deletion method to wait and verify the backup deletion. if group:
for subcloud_name in subcloud_list:
central_path = f"/opt/dc-vault/backups/{subcloud_name}/{release}"
self.wait_for_backup_deletion(con_ssh, central_path, subcloud_name)
else:
self.wait_for_backup_deletion(con_ssh, path, subcloud) self.wait_for_backup_deletion(con_ssh, path, subcloud)
def wait_for_backup_deletion( def wait_for_backup_deletion(self, con_ssh: SSHConnection, path: str, subcloud: str) -> None:
self, con_ssh: SSHConnection, path: str, subcloud: str
) -> None:
""" """
Waits for the backup to be deleted by checking for the absence of the backup file. Waits for the backup to be deleted by checking for the absence of the backup file.

View File

@@ -4,78 +4,123 @@ import shutil
from optparse import OptionParser from optparse import OptionParser
import json5 import json5
from config.configuration_file_locations_manager import ConfigurationFileLocationsManager
from config.configuration_file_locations_manager import (
ConfigurationFileLocationsManager,
)
from config.configuration_manager import ConfigurationManager from config.configuration_manager import ConfigurationManager
from config.lab.objects.lab_config import LabConfig from config.lab.objects.lab_config import LabConfig
from config.lab.objects.node import Node from config.lab.objects.node import Node
from framework.database.objects.lab_capability import LabCapability from framework.database.objects.lab_capability import LabCapability
from framework.database.operations.capability_operation import CapabilityOperation from framework.database.operations.capability_operation import CapabilityOperation
from framework.database.operations.lab_capability_operation import LabCapabilityOperation from framework.database.operations.lab_capability_operation import (
LabCapabilityOperation,
)
from framework.database.operations.lab_operation import LabOperation from framework.database.operations.lab_operation import LabOperation
from framework.logging.automation_logger import get_logger from framework.logging.automation_logger import get_logger
from framework.ssh.prompt_response import PromptResponse from framework.ssh.prompt_response import PromptResponse
from framework.ssh.ssh_connection import SSHConnection from framework.ssh.ssh_connection import SSHConnection
from keywords.bmc.ipmitool.is_ipmitool_keywords import IsIPMIToolKeywords from keywords.bmc.ipmitool.is_ipmitool_keywords import IsIPMIToolKeywords
from keywords.bmc.ipmitool.sensor.ipmitool_sensor_keywords import IPMIToolSensorKeywords from keywords.bmc.ipmitool.sensor.ipmitool_sensor_keywords import IPMIToolSensorKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import (
from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_availability_enum import DcManagerSubcloudListAvailabilityEnum DcManagerSubcloudListKeywords,
from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_management_enum import DcManagerSubcloudListManagementEnum )
from keywords.cloud_platform.openstack.endpoint.openstack_endpoint_list_keywords import OpenStackEndpointListKeywords from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_availability_enum import (
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_cpus_keywords import GetHostsCpusKeywords DcManagerSubcloudListAvailabilityEnum,
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import GetHostsKeywords )
from keywords.cloud_platform.rest.bare_metal.memory.get_host_memory_keywords import GetHostMemoryKeywords from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_management_enum import (
from keywords.cloud_platform.rest.bare_metal.ports.get_host_ports_keywords import GetHostPortsKeywords DcManagerSubcloudListManagementEnum,
from keywords.cloud_platform.rest.configuration.devices.system_host_device_keywords import GetHostDevicesKeywords )
from keywords.cloud_platform.rest.configuration.interfaces.get_interfaces_keywords import GetInterfacesKeywords from keywords.cloud_platform.openstack.endpoint.openstack_endpoint_list_keywords import (
from keywords.cloud_platform.rest.configuration.storage.get_storage_keywords import GetStorageKeywords OpenStackEndpointListKeywords,
)
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_cpus_keywords import (
GetHostsCpusKeywords,
)
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import (
GetHostsKeywords,
)
from keywords.cloud_platform.rest.bare_metal.memory.get_host_memory_keywords import (
GetHostMemoryKeywords,
)
from keywords.cloud_platform.rest.bare_metal.ports.get_host_ports_keywords import (
GetHostPortsKeywords,
)
from keywords.cloud_platform.rest.configuration.devices.system_host_device_keywords import (
GetHostDevicesKeywords,
)
from keywords.cloud_platform.rest.configuration.interfaces.get_interfaces_keywords import (
GetInterfacesKeywords,
)
from keywords.cloud_platform.rest.configuration.storage.get_storage_keywords import (
GetStorageKeywords,
)
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.host.objects.system_host_if_output import SystemHostInterfaceOutput from keywords.cloud_platform.system.host.objects.system_host_if_output import (
from keywords.cloud_platform.system.host.objects.system_host_show_output import SystemHostShowOutput SystemHostInterfaceOutput,
from keywords.cloud_platform.system.host.system_host_disk_keywords import SystemHostDiskKeywords )
from keywords.cloud_platform.system.host.system_host_list_keywords import SystemHostListKeywords from keywords.cloud_platform.system.host.objects.system_host_show_output import (
from keywords.cloud_platform.system.host.system_host_show_keywords import SystemHostShowKeywords SystemHostShowOutput,
from keywords.cloud_platform.system.oam.objects.system_oam_show_output import SystemOamShowOutput )
from keywords.cloud_platform.system.oam.system_oam_show_keywords import SystemOamShowKeywords from keywords.cloud_platform.system.host.system_host_disk_keywords import (
SystemHostDiskKeywords,
)
from keywords.cloud_platform.system.host.system_host_list_keywords import (
SystemHostListKeywords,
)
from keywords.cloud_platform.system.host.system_host_show_keywords import (
SystemHostShowKeywords,
)
from keywords.cloud_platform.system.oam.objects.system_oam_show_output import (
SystemOamShowOutput,
)
from keywords.cloud_platform.system.oam.system_oam_show_keywords import (
SystemOamShowKeywords,
)
from testcases.conftest import log_configuration from testcases.conftest import log_configuration
def find_capabilities(lab_config: LabConfig): def find_capabilities(lab_config: LabConfig) -> list[str]:
""" """
Finds the capabilities from the given lab Finds the capabilities of the given lab.
Args: Args:
host (): the host lab config lab_config (LabConfig): The lab configuration object.
Returns: Returns:
list[str]: A list of capabilities found in the lab.
""" """
lab_config.lab_capabilities = [] lab_config.lab_capabilities = []
ssh_connection = LabConnectionKeywords().get_active_controller_ssh() ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list() endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list()
lab_config.set_horizon_url(endpoint_output.get_horizon_url()) lab_config.set_horizon_url(endpoint_output.get_horizon_url())
is_dc_system = endpoint_output.is_endpoint('dcmanager') is_dc_system = endpoint_output.is_endpoint("dcmanager")
if is_dc_system: if is_dc_system:
subclouds = retrieve_subclouds(lab_config, ssh_connection) subclouds = retrieve_subclouds(lab_config, ssh_connection)
lab_config.set_subclouds(subclouds[:]) lab_config.set_subclouds(subclouds[:])
find_subclouds_capabilities(lab_config) find_subclouds_capabilities(lab_config)
if len(lab_config.get_subclouds()) != 0: if len(lab_config.get_subclouds()) != 0:
lab_config.add_lab_capability('lab_has_subcloud') lab_config.add_lab_capability("lab_has_subcloud")
if len(lab_config.get_subclouds()) >= 2:
lab_config.add_lab_capability("lab_has_min_2_subclouds")
nodes = scan_hosts(lab_config, ssh_connection) nodes = scan_hosts(lab_config, ssh_connection)
lab_config.set_nodes(nodes) lab_config.set_nodes(nodes)
def find_subclouds_capabilities(lab_config: LabConfig): def find_subclouds_capabilities(lab_config: LabConfig) -> list[str]:
""" """
Finds the capabilities of the subclouds from the given lab. Finds the capabilities of the subclouds from the given lab.
Args: Args:
host (): the host lab config lab_config (LabConfig): The lab configuration object.
Returns: Returns:
list[str]: A list of capabilities found in the subclouds.
""" """
if len(lab_config.get_subclouds()) == 0: if len(lab_config.get_subclouds()) == 0:
return return
@@ -106,28 +151,32 @@ def find_subclouds_capabilities(lab_config: LabConfig):
def get_subcloud_name_from_path(subcloud_config_file_path: str) -> str: def get_subcloud_name_from_path(subcloud_config_file_path: str) -> str:
""" """
Returns the name of the cloud from a subcloud's config file path. Returns the name of the cloud from a subcloud's config file path.
Args: Args:
subcloud_config_file_path (str): the subcloud config file path. subcloud_config_file_path (str): The subcloud config file path.
Returns: the name of the subcloud.
Returns:
str: The name of the subcloud.
""" """
_, subcloud_config_filename = os.path.split(subcloud_config_file_path) _, subcloud_config_filename = os.path.split(subcloud_config_file_path)
subcloud_name, _ = os.path.splitext(subcloud_config_filename) subcloud_name, _ = os.path.splitext(subcloud_config_filename)
return subcloud_name return subcloud_name
def create_subcloud_config_file_if_needed(host: LabConfig, subcloud_name: str, subcloud_config_file_path: str): def create_subcloud_config_file_if_needed(host: LabConfig, subcloud_name: str, subcloud_config_file_path: str) -> None:
""" """
Creates a new config file for the related subcloud in the given path. Creates a new config file for the related subcloud in the given path.
Args: Args:
host (LabConfig): the host lab config host (LabConfig): The host lab configuration.
subcloud_name: the name of the subcloud subcloud_name (str): The name of the subcloud.
subcloud_config_file_path (str): the subcloud's config file path. subcloud_config_file_path (str): The subcloud's config file path.
Returns: None. Returns:
None:
Note: the initial content of this created file is the main part of the host config file, but with the ip empty Note:
The initial content of this created file is the main part of the host config file, but with the IP empty.
""" """
if os.path.isfile(subcloud_config_file_path): if os.path.isfile(subcloud_config_file_path):
return return
@@ -136,22 +185,23 @@ def create_subcloud_config_file_if_needed(host: LabConfig, subcloud_name: str, s
subcloud_config.set_floating_ip("") subcloud_config.set_floating_ip("")
subcloud_config.set_lab_name(subcloud_name) subcloud_config.set_lab_name(subcloud_name)
new_config = '{' new_config = "{"
new_config += get_main_lab_config(subcloud_config) new_config += get_main_lab_config(subcloud_config)
new_config += '}' new_config += "}"
with open(subcloud_config_file_path, 'w') as config: with open(subcloud_config_file_path, "w") as config:
config.write(json5.dumps(json5.loads(new_config), indent=4)) config.write(json5.dumps(json5.loads(new_config), indent=4))
def is_sriov(host_interface_list_output: SystemHostInterfaceOutput) -> bool: def is_sriov(host_interface_list_output: SystemHostInterfaceOutput) -> bool:
""" """
Returns true if sriov is enabled on the given node Returns True if SR-IOV is enabled on the given node.
Args: Args:
host_interface_list_output: Output of the system host interfact list command. host_interface_list_output (SystemHostInterfaceOutput): Output of the system host interface list command.
Returns: Returns:
bool: True if SR-IOV is enabled, False otherwise.
""" """
sriov_list: [] = host_interface_list_output.get_interfaces_by_class("pci-sriov") sriov_list: [] = host_interface_list_output.get_interfaces_by_class("pci-sriov")
if len(sriov_list) > 0: if len(sriov_list) > 0:
@@ -162,42 +212,42 @@ def is_sriov(host_interface_list_output: SystemHostInterfaceOutput) -> bool:
def has_min_space_30G(ssh_connection: SSHConnection, node: Node) -> bool: def has_min_space_30G(ssh_connection: SSHConnection, node: Node) -> bool:
""" """
Returns true if the node has at least 30 GB of free space in one of its disks. Returns true if the node has at least 30 GB of free space in one of its disks.
Args: Args:
ssh_connection (): the ssh connection ssh_connection (SSHConnection): The SSH connection to the node.
node (): the node node (Node): the node
Returns: true if the node has at least 30 GB of free space in one of its disks, false otherwise.
Returns:
bool: True if the node has at least 30 GB of free space in one of its disks,, False otherwise.
""" """
host_disk_output = SystemHostDiskKeywords(ssh_connection).get_system_host_disk_list(node.get_name()) host_disk_output = SystemHostDiskKeywords(ssh_connection).get_system_host_disk_list(node.get_name())
return host_disk_output.has_minimum_disk_space_in_gb(30) return host_disk_output.has_minimum_disk_space_in_gb(30)
def get_host_show_output(ssh_connection, node) -> SystemHostShowOutput: def get_host_show_output(ssh_connection: SSHConnection, node: Node) -> SystemHostShowOutput:
""" """
Returns an object of SystemHostShowOutput. This object represents the output of 'system host-show' command. Returns an object of SystemHostShowOutput. This object represents the output of the 'system host-show' command.
Args: Args:
ssh_connection (): the ssh connection ssh_connection (SSHConnection): The SSH connection to execute the command.
node (): the node node (Node): The node whose details are being retrieved.
Returns: Returns:
SystemHostShowOutput: an object of SystemHostShowOutput that represents SystemHostShowOutput: An object representing the output of the 'system host-show' command.
the output of 'system host-show' command.
""" """
return SystemHostShowKeywords(ssh_connection).get_system_host_show_output(node.get_name()) return SystemHostShowKeywords(ssh_connection).get_system_host_show_output(node.get_name())
def has_host_bmc_sensor(ssh_connection) -> bool: def has_host_bmc_sensor(ssh_connection: SSHConnection) -> bool:
""" """
Returns true if the node has BMC sensors. Returns True if the node has BMC sensors.
Args: Args:
ssh_connection (): the ssh connection ssh_connection (SSHConnection): The SSH connection.
Returns: True if the node has BMC sensors, False otherwise
Returns:
bool: True if the node has BMC sensors, False otherwise.
""" """
# First check if the lab has ipmitool available. # First check if the lab has ipmitool available.
is_ipmi_tool_available = IsIPMIToolKeywords(ssh_connection).is_ipmi_tool_available() is_ipmi_tool_available = IsIPMIToolKeywords(ssh_connection).is_ipmi_tool_available()
if not is_ipmi_tool_available: if not is_ipmi_tool_available:
@@ -211,14 +261,16 @@ def has_host_bmc_sensor(ssh_connection) -> bool:
def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[LabConfig]: def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[LabConfig]:
""" """
Gets the list of subclouds on this lab. Gets the list of subclouds on this lab.
Subclouds whose 'availability' is different from 'online' and 'management' is different from 'managed' are not Subclouds whose 'availability' is different from 'online' and 'management' is different from 'managed' are not
considered. considered.
Args: Args:
lab_config (LabConfig): the lab config object lab_config (LabConfig): The lab config object.
ssh_connection: Connection to the active controller of the central cloud. ssh_connection (SSHConnection): Connection to the active controller of the central cloud.
Returns: the list of LabConfig objects matching the online and available subclouds of this lab.
Returns:
list[LabConfig]: the list of LabConfig objects matching the online and available subclouds of this lab.
""" """
subclouds: [LabConfig] = [] subclouds: [LabConfig] = []
@@ -232,9 +284,7 @@ def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) ->
subcloud_name = dcmanager_subcloud.get_name() subcloud_name = dcmanager_subcloud.get_name()
if dcmanager_subcloud.get_availability() != DcManagerSubcloudListAvailabilityEnum.ONLINE.value or dcmanager_subcloud.get_management() != DcManagerSubcloudListManagementEnum.MANAGED.value: if dcmanager_subcloud.get_availability() != DcManagerSubcloudListAvailabilityEnum.ONLINE.value or dcmanager_subcloud.get_management() != DcManagerSubcloudListManagementEnum.MANAGED.value:
get_logger().log_info( get_logger().log_info(f"Subcloud {subcloud_name} will not be scanned because it is not {DcManagerSubcloudListAvailabilityEnum.ONLINE.value} and {DcManagerSubcloudListManagementEnum.MANAGED.value}.")
f"Subcloud {subcloud_name} will not be scanned because it is not {DcManagerSubcloudListAvailabilityEnum.ONLINE.value} and {DcManagerSubcloudListManagementEnum.MANAGED.value}."
)
continue continue
subcloud_config_file_path = f"{lab_config_directory}/{subcloud_name}.json5" subcloud_config_file_path = f"{lab_config_directory}/{subcloud_name}.json5"
@@ -257,12 +307,13 @@ def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) ->
def get_subcloud_ip(subcloud_name: str, central_cloud_ssh_connection: SSHConnection) -> str: def get_subcloud_ip(subcloud_name: str, central_cloud_ssh_connection: SSHConnection) -> str:
""" """
Gets the external IP associated with the 'subcloud_name'. Gets the external IP associated with the 'subcloud_name'.
Args: Args:
subcloud_name (str): the name of the cloud from which one wants to obtain the IP. subcloud_name (str): The name of the cloud from which one wants to obtain the IP.
central_cloud_ssh_connection (SSHConnection): the SSH connection to a central cloud. central_cloud_ssh_connection (SSHConnection): The SSH connection to a central cloud.
Returns: subcloud's IP (str).
Returns:
str: subcloud's IP.
""" """
# Executes the command 'system oam-show' on the subcloud to get the subcloud's IP. # Executes the command 'system oam-show' on the subcloud to get the subcloud's IP.
password_prompt = PromptResponse("password:", ConfigurationManager.get_lab_config().get_admin_credentials().get_password()) password_prompt = PromptResponse("password:", ConfigurationManager.get_lab_config().get_admin_credentials().get_password())
@@ -272,7 +323,7 @@ def get_subcloud_ip(subcloud_name: str, central_cloud_ssh_connection: SSHConnect
expected_prompts = [password_prompt, open_rc_prompt, system_oam_show_prompt, end_prompt] expected_prompts = [password_prompt, open_rc_prompt, system_oam_show_prompt, end_prompt]
system_oam_show_output_list = central_cloud_ssh_connection.send_expect_prompts(f'ssh {subcloud_name} -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no', expected_prompts) system_oam_show_output_list = central_cloud_ssh_connection.send_expect_prompts(f"ssh {subcloud_name} -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no", expected_prompts)
system_oam_show_output: SystemOamShowOutput = SystemOamShowOutput(system_oam_show_output_list) system_oam_show_output: SystemOamShowOutput = SystemOamShowOutput(system_oam_show_output_list)
# Get the oam_ip if available (used in vbox environments). # Get the oam_ip if available (used in vbox environments).
@@ -284,31 +335,35 @@ def get_subcloud_ip(subcloud_name: str, central_cloud_ssh_connection: SSHConnect
return subcloud_ip return subcloud_ip
def get_nodes(lab_config: LabConfig) -> [Node]: def get_nodes(lab_config: LabConfig) -> list[Node]:
""" """
Gets the nodes on this lab Gets the nodes on this lab.
Args: Args:
lab_config (LabConfig): the lab config object. lab_config (LabConfig): The lab config object.
Returns: list of nodes (list[Node])
Returns:
list[Node]: list of Nodes.
""" """
ssh_connection = LabConnectionKeywords().get_active_controller_ssh() ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
return scan_hosts(lab_config, ssh_connection) return scan_hosts(lab_config, ssh_connection)
def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> [Node]: def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Node]:
""" """
Scans the nodes on this lab and return a list of Nodes. Scans the nodes on this lab and return a list of Nodes.
Args: Args:
lab_config (LabConfig): the lab config object. lab_config (LabConfig): the lab config object.
ssh_connection (SSHConnection): the ssh connection to the host one wants to scan. ssh_connection (SSHConnection): The SSH connection to the host one wants to scan.
Returns: list of nodes (list[Node]) Returns:
list[Node]: list of Nodes.
Raises:
RuntimeError: If no controller node is found in the lab.
""" """
hosts = SystemHostListKeywords(ssh_connection).get_system_host_with_extra_column(["subfunctions", "bm_ip", "bm_username"])
hosts = SystemHostListKeywords(ssh_connection).get_system_host_with_extra_column(['subfunctions', 'bm_ip', 'bm_username'])
nodes = [] nodes = []
# Count the controllers to decide if the lab is Simplex or not. # Count the controllers to decide if the lab is Simplex or not.
@@ -318,9 +373,9 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> [Node]:
controllers_count += 1 controllers_count += 1
if controllers_count == 1: if controllers_count == 1:
lab_config.add_lab_capability('lab_is_simplex') lab_config.add_lab_capability("lab_is_simplex")
elif controllers_count > 1: elif controllers_count > 1:
lab_config.add_lab_capability('lab_has_standby_controller') lab_config.add_lab_capability("lab_has_standby_controller")
else: else:
raise RuntimeError("Failed to find at least one controller on this lab.") raise RuntimeError("Failed to find at least one controller on this lab.")
@@ -329,9 +384,9 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> [Node]:
name = host.get_host_name() name = host.get_host_name()
node_dict = { node_dict = {
'ip': get_controller_ip(ssh_connection, name), "ip": get_controller_ip(ssh_connection, name),
'node_type': host.get_personality(), "node_type": host.get_personality(),
'node_capabilities': [], "node_capabilities": [],
} }
node = Node(host.get_host_name(), node_dict) node = Node(host.get_host_name(), node_dict)
@@ -352,147 +407,149 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> [Node]:
# Parse the data to define the lab's capabilities. # Parse the data to define the lab's capabilities.
if is_sriov(host_interface_list_output): if is_sriov(host_interface_list_output):
node.append_node_capability('lab_has_sriov') node.append_node_capability("lab_has_sriov")
lab_config.add_lab_capability('lab_has_sriov') lab_config.add_lab_capability("lab_has_sriov")
if node.get_type() == 'worker': if node.get_type() == "worker":
node.append_node_capability('lab_has_compute') node.append_node_capability("lab_has_compute")
lab_config.add_lab_capability('lab_has_compute') lab_config.add_lab_capability("lab_has_compute")
if node.get_type() == 'worker' or 'worker' in node.get_subfunctions(): if node.get_type() == "worker" or "worker" in node.get_subfunctions():
node.append_node_capability('lab_has_worker') node.append_node_capability("lab_has_worker")
lab_config.add_lab_capability('lab_has_worker') lab_config.add_lab_capability("lab_has_worker")
if node.get_type() == 'storage': if node.get_type() == "storage":
node.append_node_capability('lab_has_storage') node.append_node_capability("lab_has_storage")
lab_config.add_lab_capability('lab_has_storage') lab_config.add_lab_capability("lab_has_storage")
if 'lowlatency' in node.get_subfunctions(): if "lowlatency" in node.get_subfunctions():
node.append_node_capability('lab_has_low_latency') node.append_node_capability("lab_has_low_latency")
lab_config.add_lab_capability('lab_has_low_latency') lab_config.add_lab_capability("lab_has_low_latency")
else: else:
node.append_node_capability('lab_has_non_low_latency') node.append_node_capability("lab_has_non_low_latency")
lab_config.add_lab_capability('lab_has_non_low_latency') lab_config.add_lab_capability("lab_has_non_low_latency")
if host_cpu_output.is_host_hyperthreaded(): if host_cpu_output.is_host_hyperthreaded():
node.append_node_capability('lab_has_hyperthreading') node.append_node_capability("lab_has_hyperthreading")
lab_config.add_lab_capability('lab_has_hyperthreading') lab_config.add_lab_capability("lab_has_hyperthreading")
else: else:
node.append_node_capability('lab_has_no_hyperthreading') node.append_node_capability("lab_has_no_hyperthreading")
lab_config.add_lab_capability('lab_has_no_hyperthreading') lab_config.add_lab_capability("lab_has_no_hyperthreading")
if lab_config.is_ipv6(): if lab_config.is_ipv6():
node.append_node_capability('lab_is_ipv6') node.append_node_capability("lab_is_ipv6")
lab_config.add_lab_capability('lab_is_ipv6') lab_config.add_lab_capability("lab_is_ipv6")
else: else:
node.append_node_capability('lab_is_ipv4') node.append_node_capability("lab_is_ipv4")
lab_config.add_lab_capability('lab_is_ipv4') lab_config.add_lab_capability("lab_is_ipv4")
if host_device_output.has_host_n3000(): if host_device_output.has_host_n3000():
node.append_node_capability('lab_has_n3000') node.append_node_capability("lab_has_n3000")
lab_config.add_lab_capability('lab_has_n3000') lab_config.add_lab_capability("lab_has_n3000")
if host_device_output.has_host_fpga(): if host_device_output.has_host_fpga():
node.append_node_capability('lab_has_fpga') node.append_node_capability("lab_has_fpga")
lab_config.add_lab_capability('lab_has_fpga') lab_config.add_lab_capability("lab_has_fpga")
if host_device_output.has_host_acc100(): if host_device_output.has_host_acc100():
node.append_node_capability('lab_has_acc100') node.append_node_capability("lab_has_acc100")
lab_config.add_lab_capability('lab_has_acc100') lab_config.add_lab_capability("lab_has_acc100")
if host_device_output.has_host_acc200(): if host_device_output.has_host_acc200():
node.append_node_capability('lab_has_acc200') node.append_node_capability("lab_has_acc200")
lab_config.add_lab_capability('lab_has_acc200') lab_config.add_lab_capability("lab_has_acc200")
if host_port_output.has_host_columbiaville(): if host_port_output.has_host_columbiaville():
node.append_node_capability('lab_has_columbiaville') node.append_node_capability("lab_has_columbiaville")
lab_config.add_lab_capability('lab_has_columbiaville') lab_config.add_lab_capability("lab_has_columbiaville")
if has_min_space_30G(ssh_connection, node): if has_min_space_30G(ssh_connection, node):
node.append_node_capability('lab_has_min_space_30G') node.append_node_capability("lab_has_min_space_30G")
lab_config.add_lab_capability('lab_has_min_space_30G') lab_config.add_lab_capability("lab_has_min_space_30G")
if host_cpu_output.has_minimum_number_processors(2): if host_cpu_output.has_minimum_number_processors(2):
node.append_node_capability('lab_has_processor_min_2') node.append_node_capability("lab_has_processor_min_2")
lab_config.add_lab_capability('lab_has_processor_min_2') lab_config.add_lab_capability("lab_has_processor_min_2")
if host_memory_output.has_page_size_1gb(): if host_memory_output.has_page_size_1gb():
node.append_node_capability('lab_has_page_size_1gb') node.append_node_capability("lab_has_page_size_1gb")
lab_config.add_lab_capability('lab_has_page_size_1gb') lab_config.add_lab_capability("lab_has_page_size_1gb")
if host_interface_list_output.has_ae_interface(): if host_interface_list_output.has_ae_interface():
node.append_node_capability('lab_has_ae_interface') node.append_node_capability("lab_has_ae_interface")
lab_config.add_lab_capability('lab_has_ae_interface') lab_config.add_lab_capability("lab_has_ae_interface")
if host_interface_list_output.has_minimum_number_physical_interface(2): if host_interface_list_output.has_minimum_number_physical_interface(2):
node.append_node_capability('lab_has_physical_interface_min_2') node.append_node_capability("lab_has_physical_interface_min_2")
lab_config.add_lab_capability('lab_has_physical_interface_min_2') lab_config.add_lab_capability("lab_has_physical_interface_min_2")
if host_interface_list_output.has_bond_interface(): if host_interface_list_output.has_bond_interface():
node.append_node_capability('lab_has_bond_interface') node.append_node_capability("lab_has_bond_interface")
lab_config.add_lab_capability('lab_has_bond_interface') lab_config.add_lab_capability("lab_has_bond_interface")
if host_storage_output.has_minimum_number_physical_interface(6): if host_storage_output.has_minimum_number_physical_interface(6):
node.append_node_capability('lab_has_storage_6_osd') node.append_node_capability("lab_has_storage_6_osd")
lab_config.add_lab_capability('lab_has_storage_6_osd') lab_config.add_lab_capability("lab_has_storage_6_osd")
if host_show_output.has_host_bmc_ipmi(name): if host_show_output.has_host_bmc_ipmi(name):
node.append_node_capability('lab_has_bmc_ipmi') node.append_node_capability("lab_has_bmc_ipmi")
lab_config.add_lab_capability('lab_has_bmc_ipmi') lab_config.add_lab_capability("lab_has_bmc_ipmi")
elif host_show_output.has_host_bmc_redfish(name): elif host_show_output.has_host_bmc_redfish(name):
node.append_node_capability('lab_has_bmc_redfish') node.append_node_capability("lab_has_bmc_redfish")
lab_config.add_lab_capability('lab_has_bmc_redfish') lab_config.add_lab_capability("lab_has_bmc_redfish")
elif host_show_output.has_host_bmc_dynamic(name): elif host_show_output.has_host_bmc_dynamic(name):
node.append_node_capability('lab_has_bmc_dynamic') node.append_node_capability("lab_has_bmc_dynamic")
lab_config.add_lab_capability('lab_has_bmc_dynamic') lab_config.add_lab_capability("lab_has_bmc_dynamic")
if has_host_bmc_sensor(ssh_connection): if has_host_bmc_sensor(ssh_connection):
node.append_node_capability('lab_bmc_sensor') node.append_node_capability("lab_bmc_sensor")
lab_config.add_lab_capability('lab_bmc_sensor') lab_config.add_lab_capability("lab_bmc_sensor")
nodes.append(node) nodes.append(node)
return nodes return nodes
def get_controller_ip(ssh_connection: SSHConnection, controller_name): def get_controller_ip(ssh_connection: SSHConnection, controller_name: str) -> str | None:
""" """
Getter for controller ip Getter for controller ip.
Args: Args:
ssh_connection (): the ssh connection ssh_connection (SSHConnection): The SSH connection to the lab.
controller_name (): the controller name controller_name (str): The name of the controller.
Returns: Returns:
str | None: The IP address of the controller, or None if not found.
""" """
system_oam_output = SystemOamShowKeywords(ssh_connection).oam_show() system_oam_output = SystemOamShowKeywords(ssh_connection).oam_show()
if controller_name == 'controller-0': if controller_name == "controller-0":
return system_oam_output.get_oam_c0_ip() return system_oam_output.get_oam_c0_ip()
elif controller_name == 'controller-1': elif controller_name == "controller-1":
return system_oam_output.get_oam_c1_ip() return system_oam_output.get_oam_c1_ip()
else: else:
get_logger().log_error(f'GET IP for {controller_name} has not been implemented') get_logger().log_error(f"GET IP for {controller_name} has not been implemented")
return None return None
def get_horizon_url(): def get_horizon_url() -> str:
""" """
Getter for horizon url Getter for Horizon URL.
Returns:
Returns:
str: The formatted Horizon URL.
""" """
ssh_connection = LabConnectionKeywords().get_active_controller_ssh() ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list() endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list()
# Remove port from orignal url and then add 8443 for https or 8080 for http # Remove port from orignal url and then add 8443 for https or 8080 for http
url = endpoint_output.get_endpoint('keystone', 'public').get_url().rsplit(':', 1)[0] url = endpoint_output.get_endpoint("keystone", "public").get_url().rsplit(":", 1)[0]
if 'https' in url: if "https" in url:
url += ':8443/' url += ":8443/"
else: else:
url += ':8080/' url += ":8080/"
return url return url
@@ -500,64 +557,68 @@ def get_horizon_url():
def get_lab_type(lab_config: LabConfig) -> str: def get_lab_type(lab_config: LabConfig) -> str:
""" """
Gets the lab type Gets the lab type
Args: Args:
lab_config (): the lab config lab_config (LabConfig): the lab config.
Returns: the lab type
Returns:
str: the lab type
""" """
# if lab has subclouds, then it's a DC # if lab has subclouds, then it's a DC
if len(lab_config.get_subclouds()) > 0: if len(lab_config.get_subclouds()) > 0:
return 'DC' return "DC"
nodes = lab_config.get_nodes() nodes = lab_config.get_nodes()
controller_nodes = list(filter(lambda node: node.get_type() == 'controller', nodes)) controller_nodes = list(filter(lambda node: node.get_type() == "controller", nodes))
worker_nodes = list(filter(lambda node: node.get_type() == 'worker', nodes)) worker_nodes = list(filter(lambda node: node.get_type() == "worker", nodes))
storage_nodes = list(filter(lambda node: node.get_type() == 'storage', nodes)) storage_nodes = list(filter(lambda node: node.get_type() == "storage", nodes))
if len(controller_nodes) < 2: if len(controller_nodes) < 2:
return 'Simplex' return "Simplex"
# if we have storage nodes or compute nodes and the controllers have work subfunction, then AIO+ # if we have storage nodes or compute nodes and the controllers have work subfunction, then AIO+
if (len(storage_nodes) > 0 or len(worker_nodes) > 0) and len(list(filter(lambda controller: 'worker' in controller.get_subfunctions(), controller_nodes))) > 1: if (len(storage_nodes) > 0 or len(worker_nodes) > 0) and len(list(filter(lambda controller: "worker" in controller.get_subfunctions(), controller_nodes))) > 1:
return 'AIO+' return "AIO+"
if len(storage_nodes) > 0: if len(storage_nodes) > 0:
return 'Storage' return "Storage"
if len(worker_nodes) > 0: if len(worker_nodes) > 0:
return 'Standard' return "Standard"
# more than 2 controller but no computes or storage == Duplex # more than 2 controller but no computes or storage == Duplex
return 'Duplex' return "Duplex"
def write_config(lab_config: LabConfig): def write_config(lab_config: LabConfig) -> None:
""" """
Writes the new config out to the current config Writes the new config out to the current config
Returns:
Args:
lab_config (LabConfig): The lab configuration object.
Returns:
None:
""" """
new_config = '{' new_config = "{"
new_config += get_main_lab_config(lab_config) new_config += get_main_lab_config(lab_config)
new_config += get_nodes_config(lab_config) new_config += get_nodes_config(lab_config)
new_config += get_subclouds_config(lab_config) new_config += get_subclouds_config(lab_config)
new_config += '}' new_config += "}"
lab_config_file = lab_config.get_lab_config_file() lab_config_file = lab_config.get_lab_config_file()
shutil.move(lab_config_file, f"{lab_config_file}_bak") shutil.move(lab_config_file, f"{lab_config_file}_bak")
with open(lab_config_file, 'w') as config: with open(lab_config_file, "w") as config:
config.write(json5.dumps(json5.loads(new_config), indent=4)) config.write(json5.dumps(json5.loads(new_config), indent=4))
def clean_subcloud_config_files(lab_config: LabConfig): def clean_subcloud_config_files(lab_config: LabConfig) -> None:
""" """
Clears config files created during the scan process that are not useful anymore. Clears config files created during the scan process that are not useful anymore.
Args: Args:
lab_config (LabConfig): the lab config object. lab_config (LabConfig): The lab configuration object.
Returns: Returns:
None None:
""" """
lab_config_file = lab_config.get_lab_config_file() lab_config_file = lab_config.get_lab_config_file()
lab_config_directory, _ = os.path.split(lab_config_file) lab_config_directory, _ = os.path.split(lab_config_file)
@@ -567,8 +628,8 @@ def clean_subcloud_config_files(lab_config: LabConfig):
for dcmanager_subcloud in dcmanager_subclouds.get_dcmanager_subcloud_list_objects(): for dcmanager_subcloud in dcmanager_subclouds.get_dcmanager_subcloud_list_objects():
subcloud_name = dcmanager_subcloud.get_name() subcloud_name = dcmanager_subcloud.get_name()
subcloud_file_name = f'{subcloud_name}.json5' subcloud_file_name = f"{subcloud_name}.json5"
subcloud_backup_name = f'{subcloud_name}.json5_bak' subcloud_backup_name = f"{subcloud_name}.json5_bak"
subcloud_file_path = os.path.join(lab_config_directory, subcloud_file_name) subcloud_file_path = os.path.join(lab_config_directory, subcloud_file_name)
subcloud_backup_path = os.path.join(lab_config_directory, subcloud_backup_name) subcloud_backup_path = os.path.join(lab_config_directory, subcloud_backup_name)
if os.path.exists(subcloud_file_path): if os.path.exists(subcloud_file_path):
@@ -581,63 +642,65 @@ def clean_subcloud_config_files(lab_config: LabConfig):
os.remove(subcloud_backup_path) os.remove(subcloud_backup_path)
def get_main_lab_config(lab_config: LabConfig): def get_main_lab_config(lab_config: LabConfig) -> str:
""" """
Gets the config lines for the 'main' lab Gets the configuration lines for the 'main' lab.
Args: Args:
lab_config (): the lab config lab_config (LabConfig): The lab configuration object.
Returns: Returns:
str: The formatted configuration for the main lab.
""" """
main_config = f'floating_ip: "{lab_config.get_floating_ip()}",' main_config = f'floating_ip: "{lab_config.get_floating_ip()}",'
main_config += f'lab_name: "{lab_config.get_lab_name()}",' main_config += f'lab_name: "{lab_config.get_lab_name()}",'
main_config += f'lab_type: "{lab_config.get_lab_type()}",' main_config += f'lab_type: "{lab_config.get_lab_type()}",'
main_config += 'admin_credentials: {' main_config += "admin_credentials: {"
main_config += f'user_name: "{lab_config.get_admin_credentials().get_user_name()}",' main_config += f'user_name: "{lab_config.get_admin_credentials().get_user_name()}",'
main_config += f'password: "{lab_config.get_admin_credentials().get_password()}",' main_config += f'password: "{lab_config.get_admin_credentials().get_password()}",'
main_config += '},' main_config += "},"
main_config += f'bm_password: "{lab_config.get_bm_password()}",' main_config += f'bm_password: "{lab_config.get_bm_password()}",'
use_jump_server = 'true' if lab_config.is_use_jump_server() else 'false' use_jump_server = "true" if lab_config.is_use_jump_server() else "false"
main_config += f'use_jump_server: {use_jump_server},' main_config += f"use_jump_server: {use_jump_server},"
if lab_config.is_use_jump_server(): if lab_config.is_use_jump_server():
main_config += f'jump_server_config: "{lab_config.get_jump_host_configuration().get_host_config_file()}",' main_config += f'jump_server_config: "{lab_config.get_jump_host_configuration().get_host_config_file()}",'
if lab_config.get_ssh_port(): if lab_config.get_ssh_port():
main_config += f'ssh_port: {lab_config.get_ssh_port()},' main_config += f"ssh_port: {lab_config.get_ssh_port()},"
main_config += f'horizon_url: "{lab_config.get_horizon_url()}",' main_config += f'horizon_url: "{lab_config.get_horizon_url()}",'
if lab_config.get_system_controller_ip(): if lab_config.get_system_controller_ip():
main_config += f'system_controller_ip: "{lab_config.get_system_controller_ip()}",' main_config += f'system_controller_ip: "{lab_config.get_system_controller_ip()}",'
if lab_config.get_system_controller_name(): if lab_config.get_system_controller_name():
main_config += f'system_controller_name: "{lab_config.get_system_controller_name()}",' main_config += f'system_controller_name: "{lab_config.get_system_controller_name()}",'
lab_capabilities_as_str = ', \n'.join('"{}"'.format(capability) for capability in lab_config.get_lab_capabilities()) lab_capabilities_as_str = ", \n".join('"{}"'.format(capability) for capability in lab_config.get_lab_capabilities())
main_config += f'"lab_capabilities": [\n{lab_capabilities_as_str}],\n' main_config += f'"lab_capabilities": [\n{lab_capabilities_as_str}],\n'
return main_config return main_config
def get_nodes_config(lab_config: LabConfig): def get_nodes_config(lab_config: LabConfig) -> str:
""" """
Getter for the node configs Retrieves the configuration settings for the nodes.
Args: Args:
lab_config (): lab_config (LabConfig): The lab configuration object.
Returns: Returns:
str: The formatted configuration for the nodes.
""" """
if not lab_config.get_nodes(): if not lab_config.get_nodes():
return "" return ""
node_config = '"nodes": {' node_config = '"nodes": {'
for node in lab_config.get_nodes(): for node in lab_config.get_nodes():
node_config += f'"{node.get_name()}": ' + '{' node_config += f'"{node.get_name()}": ' + "{"
node_config += f'"ip": "{node.get_ip()}",' node_config += f'"ip": "{node.get_ip()}",'
node_config += f'"node_type": "{node.get_type()}",' node_config += f'"node_type": "{node.get_type()}",'
node_config += f'"bm_ip": "{node.get_bm_ip()}",' node_config += f'"bm_ip": "{node.get_bm_ip()}",'
node_config += f'"bm_username": "{node.get_bm_username()}",' node_config += f'"bm_username": "{node.get_bm_username()}",'
node_capabilities_as_str = ', \n'.join('"{}"'.format(capability) for capability in node.get_node_capabilities()) node_capabilities_as_str = ", \n".join('"{}"'.format(capability) for capability in node.get_node_capabilities())
node_config += f'"node_capabilities": [\n{node_capabilities_as_str}],\n' node_config += f'"node_capabilities": [\n{node_capabilities_as_str}],\n'
node_config += '},' node_config += "},"
node_config += '},' node_config += "},"
return node_config return node_config
@@ -645,11 +708,12 @@ def get_nodes_config(lab_config: LabConfig):
def get_subclouds_config(lab_config: LabConfig) -> str: def get_subclouds_config(lab_config: LabConfig) -> str:
""" """
Getter for the subcloud configs (the portion in lab config file where are specified the subcloud config file paths). Getter for the subcloud configs (the portion in lab config file where are specified the subcloud config file paths).
Args: Args:
lab_config (LabConfig): the subcloud LabConfig object. lab_config (LabConfig): The subcloud LabConfig object.
Returns: Returns:
str: The formatted subcloud configuration.
""" """
if not lab_config.get_subclouds(): if not lab_config.get_subclouds():
return "" return ""
@@ -659,12 +723,12 @@ def get_subclouds_config(lab_config: LabConfig) -> str:
subclouds_sorted = sorted(subclouds, key=lambda subcloud: subcloud.get_lab_name()) subclouds_sorted = sorted(subclouds, key=lambda subcloud: subcloud.get_lab_name())
for subcloud in subclouds_sorted: for subcloud in subclouds_sorted:
subcloud_config += f'"{subcloud.get_lab_name()}": "{subcloud.get_lab_config_file()}",' subcloud_config += f'"{subcloud.get_lab_name()}": "{subcloud.get_lab_config_file()}",'
subcloud_config += '}' subcloud_config += "}"
return subcloud_config return subcloud_config
if __name__ == '__main__': if __name__ == "__main__":
""" """
This Function will update the given configuration with a list of capabilities. It scans the current lab This Function will update the given configuration with a list of capabilities. It scans the current lab
for known capabilities and adds them. for known capabilities and adds them.
@@ -681,7 +745,7 @@ if __name__ == '__main__':
# add an option for floating ip on command line # add an option for floating ip on command line
parser = OptionParser() parser = OptionParser()
parser.add_option('--floating_ip', action='store', type='str', dest='floating_ip', help='The floating ip of the lab if overriding the config') parser.add_option("--floating_ip", action="store", type="str", dest="floating_ip", help="The floating ip of the lab if overriding the config")
configuration_locations_manager.set_configs_from_options_parser(parser) configuration_locations_manager.set_configs_from_options_parser(parser)
ConfigurationManager.load_configs(configuration_locations_manager) ConfigurationManager.load_configs(configuration_locations_manager)
@@ -724,5 +788,5 @@ if __name__ == '__main__':
lab_capability_operation.insert_lab_capability(lab_capability) lab_capability_operation.insert_lab_capability(lab_capability)
write_config(lab_config) write_config(lab_config)
if 'lab_has_subcloud' in lab_config.get_lab_capabilities(): if "lab_has_subcloud" in lab_config.get_lab_capabilities():
clean_subcloud_config_files(lab_config) clean_subcloud_config_files(lab_config)

View File

@@ -1,13 +1,25 @@
from pytest import mark from typing import List
from pytest import fail, mark
from config.configuration_manager import ConfigurationManager from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_backup_keywords import ( from keywords.cloud_platform.dcmanager.dcmanager_subcloud_backup_keywords import (
DcManagerSubcloudBackupKeywords, DcManagerSubcloudBackupKeywords,
) )
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_group_keywords import (
DcmanagerSubcloudGroupKeywords,
)
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import ( from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import (
DcManagerSubcloudListKeywords, DcManagerSubcloudListKeywords,
) )
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_update_keywords import (
DcManagerSubcloudUpdateKeywords,
)
from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_list_object_filter import (
DcManagerSubcloudListObjectFilter,
)
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import ( from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import (
GetHostsKeywords, GetHostsKeywords,
) )
@@ -36,9 +48,7 @@ def test_delete_backup_central(request):
# Gets the lowest subcloud (the subcloud with the lowest id). # Gets the lowest subcloud (the subcloud with the lowest id).
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh) dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
lowest_subcloud = ( lowest_subcloud = dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_healthy_subcloud_with_lowest_id()
dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_healthy_subcloud_with_lowest_id()
)
subcloud_name = lowest_subcloud.get_name() subcloud_name = lowest_subcloud.get_name()
subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name) subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name)
@@ -66,15 +76,11 @@ def test_delete_backup_central(request):
# Create a sbcloud backup # Create a sbcloud backup
get_logger().log_info(f"Create {subcloud_name} backup on Central Cloud") get_logger().log_info(f"Create {subcloud_name} backup on Central Cloud")
dc_manager_backup.create_subcloud_backup( dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, path=central_path, subcloud=subcloud_name)
subcloud_password, central_ssh, central_path, subcloud=subcloud_name
)
# Delete the backup created # Delete the backup created
get_logger().log_info(f"Delete {subcloud_name} backup on Central Cloud") get_logger().log_info(f"Delete {subcloud_name} backup on Central Cloud")
dc_manager_backup.delete_subcloud_backup( dc_manager_backup.delete_subcloud_backup(central_ssh, release, path=central_path, subcloud=subcloud_name)
central_ssh, central_path, release, subcloud=subcloud_name
)
@mark.p2 @mark.p2
@@ -96,9 +102,7 @@ def test_delete_backup_local(request):
# Gets the lowest subcloud (the subcloud with the lowest id). # Gets the lowest subcloud (the subcloud with the lowest id).
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh) dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
lowest_subcloud = ( lowest_subcloud = dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_healthy_subcloud_with_lowest_id()
dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_healthy_subcloud_with_lowest_id()
)
subcloud_name = lowest_subcloud.get_name() subcloud_name = lowest_subcloud.get_name()
subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name) subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name)
@@ -115,9 +119,7 @@ def test_delete_backup_local(request):
dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh) dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh)
# Path to where the backup file will store. # Path to where the backup file will store.
local_path = ( local_path = f"/opt/platform-backup/backups/{release}/{subcloud_name}_platform_backup_*.tgz"
f"/opt/platform-backup/backups/{release}/{subcloud_name}_platform_backup_*.tgz"
)
def teardown(): def teardown():
get_logger().log_info("Removing test files during teardown") get_logger().log_info("Removing test files during teardown")
@@ -130,7 +132,7 @@ def test_delete_backup_local(request):
dc_manager_backup.create_subcloud_backup( dc_manager_backup.create_subcloud_backup(
subcloud_password, subcloud_password,
subcloud_ssh, subcloud_ssh,
local_path, path=local_path,
subcloud=subcloud_name, subcloud=subcloud_name,
local_only=True, local_only=True,
) )
@@ -142,9 +144,112 @@ def test_delete_backup_local(request):
get_logger().log_info(f"Delete {subcloud_name} backup on Central Cloud") get_logger().log_info(f"Delete {subcloud_name} backup on Central Cloud")
dc_manager_backup.delete_subcloud_backup( dc_manager_backup.delete_subcloud_backup(
subcloud_ssh, subcloud_ssh,
path,
release, release,
path=path,
subcloud=subcloud_name, subcloud=subcloud_name,
local_only=True, local_only=True,
sysadmin_password=subcloud_password, sysadmin_password=subcloud_password,
) )
def create_subcloud_group(subcloud_list: List[str]) -> None:
"""
Creates a subcloud group, assigns subclouds to it, and verifies the assignment.
Args:
subcloud_list (List[str]): List of subcloud names to be added to the group.
Returns:
None:
"""
group_name = "Test"
group_description = "Feature Testing"
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
# Create a subcloud group
get_logger().log_info(f"Creating subcloud group: {group_name}")
subcloud_group_keywords = DcmanagerSubcloudGroupKeywords(central_ssh)
subcloud_group_keywords.dcmanager_subcloud_group_add(group_name)
subcloud_group_keywords.dcmanager_subcloud_group_update(group_name, "description", group_description)
# Adding subclouds to group created
get_logger().log_info(f"Assigning subclouds to group: {group_name}")
subcloud_update = DcManagerSubcloudUpdateKeywords(central_ssh)
for subcloud_name in subcloud_list:
subcloud_update.dcmanager_subcloud_update(subcloud_name, "group", group_name)
# Checking Subcloud's assigned to the group correctly
get_logger().log_info("Checking Subcloud's in the new group")
group_list = subcloud_group_keywords.get_dcmanager_subcloud_group_list_subclouds(group_name).get_dcmanager_subcloud_group_list_subclouds()
subclouds = [subcloud.name for subcloud in group_list]
validate_equals(subclouds, subcloud_list, "Checking Subcloud's assigned to the group correctly")
@mark.p2
@mark.lab_has_min_2_subclouds
def test_delete_backup_group_on_central(request):
"""
Verify delete subcloud group backup on central path
Test Steps:
- Create a subcloud group and add 2 subclouds
- Create a Subcloud backup and check it on central path
- Delete the backup created and verify the backup is deleted
Teardown:
- Remove files created while the Tc was running.
- Delete the subcloud group
"""
group_name = "Test"
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
host = SystemHostListKeywords(central_ssh).get_active_controller().get_host_name()
host_show_output = GetHostsKeywords().get_hosts().get_system_host_show_object(host)
# Get the sw_version if available (used in vbox environments).
release = host_show_output.get_sw_version()
# If sw_version is not available, fall back to software_load (used in physical labs).
if not release:
release = host_show_output.get_software_load()
# Retrieves the subclouds. Considers only subclouds that are online, managed, and synced.
dcmanager_subcloud_list_input = DcManagerSubcloudListObjectFilter.get_healthy_subcloud_filter()
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
dcmanager_subcloud_list_objects_filtered = dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_dcmanager_subcloud_list_objects_filtered(dcmanager_subcloud_list_input)
subcloud_list = [subcloud.name for subcloud in dcmanager_subcloud_list_objects_filtered]
if len(subcloud_list) < 2:
get_logger().log_info("At least two subclouds managed are required to run the test")
fail("At least two subclouds managed are required to run the test")
# Gets the subcloud sysadmin password needed for backup creation.
subcloud_password = ConfigurationManager.get_lab_config().get_subcloud(subcloud_list[0]).get_admin_credentials().get_password()
# Create a subcloud group and add 2 subclouds
create_subcloud_group(subcloud_list)
dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh)
def teardown_backup():
get_logger().log_info("Removing test files during teardown")
central_ssh.send_as_sudo("rm -r -f /opt/dc-vault/backups/")
request.addfinalizer(teardown_backup)
def teardown_group():
get_logger().log_info("Removing the created subcloud group during teardown")
for subcloud_name in subcloud_list:
DcManagerSubcloudUpdateKeywords(central_ssh).dcmanager_subcloud_update(subcloud_name, "group", "Default")
DcmanagerSubcloudGroupKeywords(central_ssh).dcmanager_subcloud_group_delete(group_name)
request.addfinalizer(teardown_group)
# Create a subcloud backup
get_logger().log_info(f"Create backup on Central Cloud for subcloud group: {group_name}")
dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, group=group_name, release=release, subcloud_list=subcloud_list)
# Delete the backup created
get_logger().log_info(f"Delete backup on Central Cloud for subcloud group: {group_name}")
dc_manager_backup.delete_subcloud_backup(central_ssh, release=release, group=group_name, subcloud_list=subcloud_list)

View File

@@ -29,6 +29,7 @@ markers=
lab_has_bond_interface: mark tests where the host must have a bond interface lab_has_bond_interface: mark tests where the host must have a bond interface
lab_has_storage_6_osd: mark tests that use specific labs with 6 or more OSDs lab_has_storage_6_osd: mark tests that use specific labs with 6 or more OSDs
lab_has_subcloud: mark tests that require at least one subcloud lab_has_subcloud: mark tests that require at least one subcloud
lab_has_min_2_subclouds: mark tests that require at least 2 subcloud
lab_has_compute: mark tests that require at least one compute node lab_has_compute: mark tests that require at least one compute node
subcloud_lab_has_compute: mark tests that require at least one subcloud containing at least one compute node subcloud_lab_has_compute: mark tests that require at least one subcloud containing at least one compute node
#TODO: add 'lab_has_bmc_ipmi', 'lab_has_bmc_redfish', 'lab_has_bmc_dynamic', and 'lab_bmc_sensor' #TODO: add 'lab_has_bmc_ipmi', 'lab_has_bmc_redfish', 'lab_has_bmc_dynamic', and 'lab_bmc_sensor'