
Added Sync Keywords Change-Id: Idb6488ab1fb3a98a56bf6b904da42b59675cf886 Signed-off-by: Abhishek jaiswal <abhishek.jaiswal@windriver.com>
554 lines
16 KiB
Python
554 lines
16 KiB
Python
from typing import List, Optional
|
|
|
|
import json5
|
|
|
|
from config.host.objects.host_configuration import HostConfiguration
|
|
from config.lab.objects.credentials import Credentials
|
|
from config.lab.objects.node import Node
|
|
from framework.resources.resource_finder import get_stx_resource_path
|
|
|
|
|
|
class LabConfig:
|
|
"""
|
|
Class to hold lab config
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
|
|
try:
|
|
json_data = open(config)
|
|
except FileNotFoundError:
|
|
print(f"Could not find the lab config file: {config}")
|
|
raise
|
|
|
|
lab_dict = json5.load(json_data)
|
|
self.floating_ip = lab_dict["floating_ip"]
|
|
self.lab_name = lab_dict["lab_name"]
|
|
self.lab_type = lab_dict["lab_type"]
|
|
self.admin_credentials = Credentials(lab_dict["admin_credentials"])
|
|
self.bm_password = lab_dict["bm_password"]
|
|
self.use_jump_server = lab_dict["use_jump_server"]
|
|
if "jump_server_config" in lab_dict:
|
|
jump_host_config_location = get_stx_resource_path(lab_dict["jump_server_config"])
|
|
self.jump_server_config = HostConfiguration(jump_host_config_location)
|
|
|
|
self.ssh_port: int = 22
|
|
if "ssh_port" in lab_dict:
|
|
self.ssh_port = int(lab_dict["ssh_port"])
|
|
|
|
self.lab_capabilities = []
|
|
if "lab_capabilities" in lab_dict:
|
|
self.lab_capabilities = lab_dict["lab_capabilities"]
|
|
|
|
if "horizon_url" in lab_dict:
|
|
self.horizon_url = lab_dict["horizon_url"]
|
|
else:
|
|
self.horizon_url = f"https://[{self.floating_ip}]:8443/"
|
|
|
|
if "horizon_credentials" in lab_dict:
|
|
self.horizon_credentials = Credentials(lab_dict["horizon_credentials"])
|
|
else:
|
|
self.horizon_credentials = Credentials({"user_name": "admin", "password": self.admin_credentials.get_password()})
|
|
|
|
if "rest_credentials" in lab_dict:
|
|
self.rest_credentials = Credentials(lab_dict["rest_credentials"])
|
|
else:
|
|
self.rest_credentials = Credentials({"user_name": "admin", "password": self.admin_credentials.get_password()})
|
|
|
|
self.nodes = []
|
|
self.subclouds: List["LabConfig"] = []
|
|
|
|
# if subclouds are listed in the config get the list with the subcloud's names.
|
|
if "subclouds" in lab_dict:
|
|
for subcloud in lab_dict["subclouds"]:
|
|
subcloud_config_location = get_stx_resource_path((lab_dict["subclouds"][subcloud]))
|
|
self.subclouds.append(LabConfig(subcloud_config_location))
|
|
|
|
if "nodes" in lab_dict:
|
|
for node in lab_dict["nodes"]:
|
|
self.nodes.append(Node(node, lab_dict["nodes"][node]))
|
|
|
|
self.ipv6 = True
|
|
if ":" not in self.floating_ip:
|
|
self.ipv6 = False
|
|
|
|
self.is_dc_lab = len(self.subclouds) > 0
|
|
# If there is a value in the config, override the above value
|
|
if "is_dc" in lab_dict:
|
|
self.is_dc_lab = lab_dict["is_dc"]
|
|
|
|
self.system_controller_ip = None # only gets set if DC system and only in the subcloud
|
|
if "system_controller_ip" in lab_dict:
|
|
self.system_controller_ip = lab_dict["system_controller_ip"]
|
|
|
|
self.system_controller_name = None # only gets set if DC system and only in the subcloud
|
|
if "system_controller_name" in lab_dict:
|
|
self.system_controller_name = lab_dict["system_controller_name"]
|
|
|
|
self.secondary_system_controller = None
|
|
if "secondary_system_controller" in lab_dict:
|
|
secondary_system_controller = lab_dict["secondary_system_controller"]
|
|
secondary_lab_file = get_stx_resource_path(secondary_system_controller)
|
|
self.secondary_system_controller = LabConfig(secondary_lab_file)
|
|
|
|
self.lab_config_file = config
|
|
|
|
def get_floating_ip(self) -> str:
|
|
"""
|
|
Getter for floating ip
|
|
|
|
Returns:
|
|
str: The floating IP address
|
|
|
|
"""
|
|
return self.floating_ip
|
|
|
|
def set_floating_ip(self, floating_ip: str) -> None:
|
|
"""
|
|
Setter for floating ip
|
|
|
|
Args:
|
|
floating_ip (str): the host's floating ip
|
|
"""
|
|
self.floating_ip = floating_ip
|
|
|
|
# Update ipv4 vs ipv6 value.
|
|
self.ipv6 = True
|
|
if ":" not in self.floating_ip:
|
|
self.ipv6 = False
|
|
|
|
def get_lab_name(self) -> str:
|
|
"""
|
|
Getter for lab name
|
|
|
|
Returns:
|
|
str: The lab name
|
|
|
|
"""
|
|
return self.lab_name
|
|
|
|
def set_lab_name(self, lab_name: str) -> None:
|
|
"""
|
|
Setter for lab name
|
|
|
|
Args:
|
|
lab_name (str): the lab name
|
|
"""
|
|
self.lab_name = lab_name
|
|
|
|
def get_lab_type(self) -> str:
|
|
"""
|
|
Getter for lab type
|
|
|
|
Returns:
|
|
str: The lab type
|
|
|
|
"""
|
|
return self.lab_type
|
|
|
|
def set_lab_type(self, lab_type: str) -> None:
|
|
"""
|
|
Setter for lab type
|
|
|
|
Args:
|
|
lab_type (str): The lab type
|
|
"""
|
|
self.lab_type = lab_type
|
|
|
|
def get_admin_credentials(self) -> Credentials:
|
|
"""
|
|
Getter for admin credentials
|
|
|
|
Returns:
|
|
Credentials: The admin credentials object
|
|
|
|
"""
|
|
return self.admin_credentials
|
|
|
|
def get_horizon_credentials(self) -> Credentials:
|
|
"""
|
|
Getter for Horizon credentials
|
|
|
|
Returns:
|
|
Credentials: The Horizon credentials object
|
|
|
|
"""
|
|
return self.horizon_credentials
|
|
|
|
def get_rest_credentials(self) -> Credentials:
|
|
"""
|
|
Getter for Rest credentials
|
|
|
|
Returns:
|
|
Credentials: The REST API credentials object
|
|
|
|
"""
|
|
return self.rest_credentials
|
|
|
|
def set_nodes(self, nodes: List[Node]) -> None:
|
|
"""
|
|
Sets the labs nodes (clear any nodes already created).
|
|
|
|
Args:
|
|
nodes (List[Node]): The nodes to set
|
|
"""
|
|
self.nodes = nodes
|
|
|
|
def get_nodes(self) -> List[Node]:
|
|
"""
|
|
Get all lab nodes.
|
|
|
|
Returns:
|
|
List[Node]: List of all nodes in the lab
|
|
"""
|
|
return self.nodes
|
|
|
|
def get_node(self, lab_node_name: str) -> Node:
|
|
"""
|
|
Get a lab node by name.
|
|
|
|
Args:
|
|
lab_node_name (str): The name of the lab node ex. Controller-0
|
|
|
|
Returns:
|
|
Node: The lab node with the given name
|
|
"""
|
|
for node in self.nodes:
|
|
if node.get_name() == lab_node_name:
|
|
return node
|
|
return None
|
|
|
|
def get_computes(self) -> List[Node]:
|
|
"""
|
|
Getter for compute nodes.
|
|
|
|
Returns:
|
|
List[Node]: List of compute nodes
|
|
"""
|
|
nodes = self.get_nodes()
|
|
computes = [node for node in nodes if node.node_type == "worker"]
|
|
return computes
|
|
|
|
def get_compute(self, compute_name: str) -> Optional[Node]:
|
|
"""
|
|
Retrieve an instance of Node whose type is 'Compute' and name is specified by the argument 'compute_name'.
|
|
|
|
Args:
|
|
compute_name (str): the name of the 'Compute' node.
|
|
|
|
Returns:
|
|
Optional[Node]: The compute node if found, None otherwise
|
|
"""
|
|
computes = self.get_computes()
|
|
compute = [compute_node for compute_node in computes if compute_node.get_name() == compute_name]
|
|
if len(compute) > 0:
|
|
return compute[0]
|
|
else:
|
|
return None
|
|
|
|
def is_ipv6(self) -> bool:
|
|
"""
|
|
Return True is lab is ipv6, False otherwise
|
|
|
|
Returns:
|
|
bool: True if IPv6 is used, False otherwise
|
|
"""
|
|
return self.ipv6
|
|
|
|
def get_subclouds(self) -> List["LabConfig"]:
|
|
"""
|
|
Getter for subclouds
|
|
|
|
Returns:
|
|
List[LabConfig]: List of subcloud configurations
|
|
"""
|
|
return self.subclouds
|
|
|
|
def set_subclouds(self, subclouds: List["LabConfig"]) -> None:
|
|
"""
|
|
Setter for subcloud
|
|
|
|
Args:
|
|
subclouds (List[LabConfig]): List of subcloud configurations
|
|
"""
|
|
self.subclouds = subclouds
|
|
|
|
def get_subcloud(self, subcloud_name: str) -> Optional["LabConfig"]:
|
|
"""
|
|
Get a subcloud configuration by name.
|
|
|
|
Args:
|
|
subcloud_name (str): Name of the subcloud to retrieve
|
|
|
|
Returns:
|
|
Optional[LabConfig]: The subcloud configuration if found, None otherwise
|
|
"""
|
|
for subcloud in self.subclouds:
|
|
if subcloud.get_lab_name() == subcloud_name:
|
|
return subcloud
|
|
return None
|
|
|
|
def get_subcloud_names(self) -> List[str]:
|
|
"""
|
|
Getter for the names of the subclouds in the list of subclouds in LabConfig file.
|
|
|
|
Args: None
|
|
|
|
Returns:
|
|
List[str]: List of subcloud names
|
|
"""
|
|
return [subcloud.get_lab_name() for subcloud in self.subclouds]
|
|
|
|
def get_horizon_url(self):
|
|
"""
|
|
Getter for Horizon URL
|
|
|
|
Returns: The URL to connect to Horizon
|
|
|
|
"""
|
|
return self.horizon_url
|
|
|
|
def set_horizon_url(self, url: str) -> None:
|
|
"""
|
|
Setter for horizon url
|
|
|
|
Args:
|
|
url (str): The Horizon URL to set
|
|
"""
|
|
self.horizon_url = url
|
|
|
|
def is_dc(self) -> bool:
|
|
"""
|
|
Getter for is dc
|
|
|
|
Returns:
|
|
bool: true if it's a dc, false otherwise
|
|
"""
|
|
return self.is_dc_lab
|
|
|
|
def is_use_jump_server(self) -> bool:
|
|
"""
|
|
Getter for use jump server
|
|
|
|
Returns:
|
|
bool: True if jump server should be used, False otherwise
|
|
"""
|
|
return self.use_jump_server
|
|
|
|
def get_jump_host_configuration(self) -> HostConfiguration:
|
|
"""
|
|
Getter for jump host configuration
|
|
|
|
Returns:
|
|
HostConfiguration: The jump host configuration
|
|
"""
|
|
return self.jump_server_config
|
|
|
|
def get_ssh_port(self) -> int:
|
|
"""
|
|
Getter for the SSH port.
|
|
|
|
Returns:
|
|
int: The SSH port number
|
|
"""
|
|
return self.ssh_port
|
|
|
|
def get_lab_capabilities(self) -> List[str]:
|
|
"""
|
|
Getter for lab capabilities
|
|
|
|
Returns:
|
|
List[str]: List of lab capabilities
|
|
"""
|
|
return self.lab_capabilities
|
|
|
|
def add_lab_capability(self, capability: str) -> None:
|
|
"""
|
|
Setter for lab capability
|
|
|
|
Args:
|
|
capability (str): The capability to add
|
|
"""
|
|
if capability not in self.lab_capabilities:
|
|
self.lab_capabilities.append(capability)
|
|
|
|
def get_lab_config_file(self) -> str:
|
|
"""
|
|
Getter for lab config file
|
|
|
|
Returns:
|
|
str: Path to the lab configuration file
|
|
"""
|
|
return self.lab_config_file
|
|
|
|
def set_lab_config_file(self, lab_config_file: str) -> None:
|
|
"""
|
|
Setter for lab config file
|
|
|
|
Args:
|
|
lab_config_file (str): Path to the lab configuration file
|
|
"""
|
|
self.lab_config_file = lab_config_file
|
|
|
|
def get_bm_password(self) -> str:
|
|
"""
|
|
Getter for bm password
|
|
|
|
Returns:
|
|
str: The bm password
|
|
"""
|
|
return self.bm_password
|
|
|
|
def get_system_controller_ip(self) -> str:
|
|
"""
|
|
Get the system controller IP address.
|
|
|
|
Returns:
|
|
str: The system controller IP address
|
|
"""
|
|
return self.system_controller_ip
|
|
|
|
def set_system_controller_ip(self, system_controller_ip: str) -> None:
|
|
"""
|
|
Set system controller IP address.
|
|
|
|
Args:
|
|
system_controller_ip (str): The system controller IP address
|
|
"""
|
|
self.system_controller_ip = system_controller_ip
|
|
|
|
def get_system_controller_name(self) -> str:
|
|
"""
|
|
Get the system controller name.
|
|
|
|
Returns:
|
|
str: The system controller name
|
|
"""
|
|
return self.system_controller_name
|
|
|
|
def set_system_controller_name(self, system_controller_name: str) -> None:
|
|
"""
|
|
Setter for system_controller_name
|
|
|
|
Args:
|
|
system_controller_name (str): the system_controller_name
|
|
"""
|
|
self.system_controller_name = system_controller_name
|
|
|
|
def get_secondary_system_controller_config(self) -> object:
|
|
"""
|
|
Get the secondary system controller object.
|
|
|
|
Returns:
|
|
object: Secondary dc configuration.
|
|
"""
|
|
return self.secondary_system_controller
|
|
|
|
def get_secondary_system_controller_name(self) -> str:
|
|
"""
|
|
Gets the secondary controller host name
|
|
|
|
Returns:
|
|
str: Secondary lab name.
|
|
"""
|
|
return self.secondary_system_controller.get_lab_name()
|
|
|
|
def to_log_strings(self) -> List[str]:
|
|
"""
|
|
Convert lab configuration to a list of loggable strings.
|
|
|
|
Returns:
|
|
List[str]: List of strings representing the lab configuration
|
|
"""
|
|
log_strings = []
|
|
log_strings.append(f"lab_name: {self.get_lab_name()}")
|
|
log_strings.append(f"lab_type: {self.get_lab_type()}")
|
|
log_strings.append(f"floating_ip: {self.get_floating_ip()}")
|
|
log_strings.append(f"is_ipv6: {self.is_ipv6()}")
|
|
log_strings.append(f"ssh_port: {self.get_ssh_port()}")
|
|
log_strings.append(f"admin_credentials: {self.get_admin_credentials().to_string()}")
|
|
log_strings.append(f"horizon_url: {self.get_horizon_url()}")
|
|
log_strings.append(f"horizon_credentials: {self.get_horizon_credentials().to_string()}")
|
|
log_strings.append(f"is_dc: {self.is_dc()}")
|
|
log_strings.append(f"use_jump_server: {self.is_use_jump_server()}")
|
|
if self.use_jump_server:
|
|
for log_string in self.get_jump_host_configuration().to_log_strings():
|
|
log_strings.append(f" {log_string}")
|
|
for node in self.get_nodes():
|
|
for log_string in node.to_log_strings():
|
|
log_strings.append(log_string)
|
|
for subcloud in self.get_subclouds():
|
|
log_strings.append("Subcloud")
|
|
for log_string in subcloud.to_log_strings():
|
|
log_strings.append(f" {log_string}")
|
|
|
|
return log_strings
|
|
|
|
def get_controllers(self) -> List[Node]:
|
|
"""
|
|
Getter for controller nodes.
|
|
|
|
Returns:
|
|
List[Node]: List of controller nodes
|
|
"""
|
|
nodes = self.get_nodes()
|
|
controllers = [node for node in nodes if node.node_type == "controller"]
|
|
return controllers
|
|
|
|
def get_subclouds_by_type(self, subcloud_type: str = None) -> List["LabConfig"]:
|
|
"""
|
|
Get a list of subcloud configurations by type.
|
|
|
|
Args:
|
|
subcloud_type (str): Type of the subcloud to retrieve
|
|
|
|
Returns:
|
|
List[LabConfig]: List of subcloud configurations of the specified type
|
|
"""
|
|
if subcloud_type is None:
|
|
return self.subclouds
|
|
return [subcloud for subcloud in self.subclouds if subcloud.get_lab_type().lower() == subcloud_type.lower()]
|
|
|
|
def get_subclouds_name_by_type(self, subcloud_type: str = None) -> List[str]:
|
|
"""
|
|
Get a list of subcloud names by type.
|
|
|
|
Args:
|
|
subcloud_type (str): Type of the subcloud to retrieve
|
|
|
|
Returns:
|
|
List[str]: List of subcloud names of the specified type
|
|
"""
|
|
if subcloud_type is None:
|
|
return [subcloud.get_lab_name() for subcloud in self.subclouds]
|
|
return [subcloud.get_lab_name() for subcloud in self.get_subclouds_by_type(subcloud_type)]
|
|
|
|
def get_first_controller(self) -> Optional[Node]:
|
|
"""Get the first controller node.
|
|
|
|
Returns:
|
|
Optional[Node]: The first controller node if found, None otherwise
|
|
"""
|
|
controllers = self.get_controllers()
|
|
if len(controllers) > 0:
|
|
return controllers[0]
|
|
return None
|
|
|
|
def get_counterpart_controller(self, lab_node_name: str) -> Node:
|
|
"""Function to get the paired controller
|
|
|
|
Finds and returns the counterpart controller name from a list of controllers,
|
|
given the name of the current controller. Assumes there are exactly two controllers.
|
|
|
|
Args:
|
|
lab_node_name (str): The name of the current controller.
|
|
|
|
Returns:
|
|
Node: The counterpart / paired controller Node.
|
|
"""
|
|
counterpart_controllers = [node for node in self.get_controllers() if node.get_name() != lab_node_name]
|
|
if not counterpart_controllers:
|
|
raise Exception("Others controller Not Found")
|
|
else:
|
|
return counterpart_controllers[0]
|