From 7a6aed71c4a504ccd97807662249000b4151366f Mon Sep 17 00:00:00 2001 From: Thomas Sunil Date: Wed, 10 Sep 2025 23:17:06 -0400 Subject: [PATCH] ipsec host to host sanity testcases Change-Id: Iabd2931a627c686ac225911b0340e49f94d8d678 Signed-off-by: Thomas Sunil --- config/security/objects/security_config.py | 34 ++- .../cloud_platform/security/ipsec_keywords.py | 158 ++++++++++++ .../objects/ipsec_certificate_object.py | 163 ++++++++++++ .../objects/ipsec_certificate_output.py | 78 ++++++ .../security/objects/ipsec_dnsmasq_object.py | 73 ++++++ .../security/objects/ipsec_dnsmasq_output.py | 64 +++++ .../ipsec_security_association_object.py | 97 +++++++ .../ipsec_security_association_output.py | 52 ++++ .../ssh/lab_connection_keywords.py | 30 +++ .../secret/object/kubectl_secret_object.py | 11 + .../security/test_ipsec_host_to_host.py | 238 ++++++++++++++++++ 11 files changed, 978 insertions(+), 20 deletions(-) create mode 100644 keywords/cloud_platform/security/ipsec_keywords.py create mode 100644 keywords/cloud_platform/security/objects/ipsec_certificate_object.py create mode 100644 keywords/cloud_platform/security/objects/ipsec_certificate_output.py create mode 100644 keywords/cloud_platform/security/objects/ipsec_dnsmasq_object.py create mode 100644 keywords/cloud_platform/security/objects/ipsec_dnsmasq_output.py create mode 100644 keywords/cloud_platform/security/objects/ipsec_security_association_object.py create mode 100644 keywords/cloud_platform/security/objects/ipsec_security_association_output.py create mode 100644 testcases/cloud_platform/regression/security/test_ipsec_host_to_host.py diff --git a/config/security/objects/security_config.py b/config/security/objects/security_config.py index b7f29923..9b108047 100644 --- a/config/security/objects/security_config.py +++ b/config/security/objects/security_config.py @@ -2,46 +2,40 @@ import json5 class SecurityConfig: - """ - Class to hold configuration for Security tests - """ + """Class to hold configuration for Security tests.""" - def __init__(self, config): + def __init__(self, config: str): + """Initialize security configuration. - try: - json_data = open(config) - except FileNotFoundError: - print(f"Could not find the security config file: {config}") - raise - - security_dict = json5.load(json_data) + Args: + config (str): Path to configuration file. + """ + with open(config) as json_data: + security_dict = json5.load(json_data) self.domain_name = security_dict["domain_name"] self.stepca_server_url = security_dict["stepca_server_url"] self.stepca_server_issuer = security_dict["stepca_server_issuer"] def get_domain_name(self) -> str: - """ - Getter for the dns name + """Getter for the domain name. Returns: - str: the dns name + str: The domain name. """ return self.domain_name def get_stepca_server_url(self) -> str: - """ - Getter for the stepca server url + """Getter for the stepca server URL. Returns: - str: stepca server url + str: StepCA server URL. """ return self.stepca_server_url def get_stepca_server_issuer(self) -> str: - """ - Getter for the stepca server issuer + """Getter for the stepca server issuer. Returns: - str: stepca server url + str: StepCA server issuer. """ return self.stepca_server_issuer diff --git a/keywords/cloud_platform/security/ipsec_keywords.py b/keywords/cloud_platform/security/ipsec_keywords.py new file mode 100644 index 00000000..c8f6dc85 --- /dev/null +++ b/keywords/cloud_platform/security/ipsec_keywords.py @@ -0,0 +1,158 @@ +from framework.exceptions.keyword_exception import KeywordException +from framework.ssh.ssh_connection import SSHConnection +from keywords.base_keyword import BaseKeyword +from keywords.cloud_platform.security.objects.ipsec_certificate_output import IPSecCertificateOutput +from keywords.cloud_platform.security.objects.ipsec_dnsmasq_output import IPSecDnsmasqOutput +from keywords.cloud_platform.security.objects.ipsec_security_association_output import IPSecSecurityAssociationOutput +from keywords.cloud_platform.system.show.system_show_keywords import SystemShowKeywords +from keywords.files.file_keywords import FileKeywords + + +class IPSecKeywords(BaseKeyword): + """Keywords for IPSec operations and validations. + + This class provides methods for managing IPSec certificates, + security associations, and service status validation. + """ + + def __init__(self, ssh_connection: SSHConnection): + """Initialize IPSec keywords. + + Args: + ssh_connection (SSHConnection): SSH connection to the active controller. + """ + self.ssh_connection = ssh_connection + # Initialize reusable keyword classes + self.file_keywords = FileKeywords(ssh_connection) + self.system_show = SystemShowKeywords(ssh_connection) + + def get_dnsmasq_hosts(self) -> IPSecDnsmasqOutput: + """Get dnsmasq.hosts file content. + + Returns: + IPSecDnsmasqOutput: Parsed dnsmasq.hosts content. + """ + system_show_object = self.system_show.system_show().get_system_show_object() + software_version = system_show_object.get_software_version() + dnsmasq_path = f"/opt/platform/config/{software_version}/dnsmasq.hosts" + output = self.ssh_connection.send(f"cat {dnsmasq_path}") + self.validate_success_return_code(self.ssh_connection) + return IPSecDnsmasqOutput(output) + + def get_security_associations(self) -> IPSecSecurityAssociationOutput: + """Get IPSec security associations using swanctl command. + + Returns: + IPSecSecurityAssociationOutput: Parsed security associations. + """ + cmd = "swanctl --list-sa" + output = self.ssh_connection.send_as_sudo(cmd) + self.validate_success_return_code(self.ssh_connection) + return IPSecSecurityAssociationOutput(output) + + def get_certificate_info_by_type(self, cert_type: str) -> IPSecCertificateOutput: + """Get certificate information by certificate type with automatic path resolution. + + Args: + cert_type (str): Certificate type (root_ca, ica_l1, ica). + + Returns: + IPSecCertificateOutput: Certificate output with requested information. + """ + valid_types = ["root_ca", "ica_l1", "ica"] + if cert_type not in valid_types: + raise KeywordException(f"Invalid cert_type '{cert_type}'. Valid types: {valid_types}") + + cert_path = None + if cert_type == "ica_l1": + cert_path = "x509ca/system-local-ca-1_l1.crt" + elif cert_type == "ica": + cert_path = "x509ca/system-local-ca-1.crt" + elif cert_type == "root_ca": + cert_path = "x509ca/system-root-ca-1.crt" + + full_primary_path = f"/etc/swanctl/{cert_path}" + + # Use FileKeywords to check if file exists + if not self.file_keywords.validate_file_exists_with_sudo(full_primary_path): + cert_path = "x509ca/system-root-ca-1.crt" # Fallback to root_ca + full_primary_path = f"/etc/swanctl/{cert_path}" + + return self.get_certificate_info_subject_issuer(full_primary_path) + + def get_certificate_info_subject_issuer(self, cert_path: str) -> IPSecCertificateOutput: + """Get certificate subject and issuer information. + + Args: + cert_path (str): Path to the certificate file. + + Returns: + IPSecCertificateOutput: Certificate output with subject/issuer information. + """ + cmd = f"openssl x509 -in {cert_path} -noout -subject -issuer" + output = self.ssh_connection.send_as_sudo(cmd) + self.validate_success_return_code(self.ssh_connection) + return IPSecCertificateOutput(output, "subject_issuer", "active_controller", cert_path) + + def get_certificate_info_cert_md5(self, cert_path: str) -> IPSecCertificateOutput: + """Get certificate MD5 hash. + + Args: + cert_path (str): Path to the certificate file. + + Returns: + IPSecCertificateOutput: Certificate output with MD5 hash. + """ + cmd = f"openssl x509 -in {cert_path} -modulus -noout | openssl md5" + output = self.ssh_connection.send_as_sudo(cmd) + self.validate_success_return_code(self.ssh_connection) + return IPSecCertificateOutput(output, "cert_md5", "active_controller", cert_path) + + def get_certificate_info_key_md5(self, cert_path: str) -> IPSecCertificateOutput: + """Get certificate key MD5 hash. + + Args: + cert_path (str): Path to the certificate key file. + + Returns: + IPSecCertificateOutput: Certificate output with key MD5 hash. + """ + cmd = f"openssl rsa -in {cert_path} -modulus -noout | openssl md5" + output = self.ssh_connection.send_as_sudo(cmd) + self.validate_success_return_code(self.ssh_connection) + return IPSecCertificateOutput(output, "key_md5", "active_controller", cert_path) + + def validate_service_name_is_valid(self, service_name: str) -> None: + """Validate that the IPSec service name is valid. + + Args: + service_name (str): Service name to validate (ipsec-server, ipsec). + + Raises: + KeywordException: If service name is not valid. + """ + valid_services = ["ipsec-server", "ipsec"] + if service_name not in valid_services: + raise KeywordException(f"Invalid service_name '{service_name}'. Valid services: {valid_services}") + + def get_ipsec_certificate_path(self, hostname: str) -> str: + """Get full IPSec certificate path for hostname. + + Args: + hostname (str): Hostname for certificate. + + Returns: + str: Full certificate path. + """ + return f"/etc/swanctl/x509/system-ipsec-certificate-{hostname}.crt" + + def get_ipsec_key_path(self, hostname: str) -> str: + """Get full IPSec key path for hostname. + + Args: + hostname (str): Hostname for key. + + Returns: + str: Full key path. + """ + return f"/etc/swanctl/private/system-ipsec-certificate-{hostname}.key" diff --git a/keywords/cloud_platform/security/objects/ipsec_certificate_object.py b/keywords/cloud_platform/security/objects/ipsec_certificate_object.py new file mode 100644 index 00000000..2c866413 --- /dev/null +++ b/keywords/cloud_platform/security/objects/ipsec_certificate_object.py @@ -0,0 +1,163 @@ +class IPSecCertificateObject: + """Object representing an IPSec certificate with subject/issuer information and MD5 hash.""" + + def __init__(self, cert_type: str, hostname: str, cert_path: str, subject: str = "", issuer: str = "", md5_hash: str = ""): + """Initialize certificate object. + + Args: + cert_type (str): Type of certificate (key, cert, ica, root, k8s_cert). + hostname (str): Hostname where certificate is located. + cert_path (str): Path to certificate file. + subject (str): Certificate subject information. + issuer (str): Certificate issuer information. + md5_hash (str): MD5 hash of the certificate. + """ + self._cert_type = cert_type + self._hostname = hostname + self._cert_path = cert_path + self._subject = subject + self._issuer = issuer + self._md5_hash = md5_hash + + def get_cert_type(self) -> str: + """Get certificate type. + + Returns: + str: Certificate type. + """ + return self._cert_type + + def get_hostname(self) -> str: + """Get hostname. + + Returns: + str: Hostname where certificate is located. + """ + return self._hostname + + def get_cert_path(self) -> str: + """Get certificate path. + + Returns: + str: Path to certificate file. + """ + return self._cert_path + + def get_subject(self) -> str: + """Get certificate subject. + + Returns: + str: Certificate subject information. + """ + return self._subject + + def get_issuer(self) -> str: + """Get certificate issuer. + + Returns: + str: Certificate issuer information. + """ + return self._issuer + + def get_md5_hash(self) -> str: + """Get MD5 hash. + + Returns: + str: MD5 hash of the certificate. + """ + return self._md5_hash + + def has_valid_subject(self) -> bool: + """Check if subject is valid. + + Returns: + bool: True if subject is not empty. + """ + return bool(self._subject and self._subject.strip()) + + def has_valid_issuer(self) -> bool: + """Check if issuer is valid. + + Returns: + bool: True if issuer is not empty. + """ + return bool(self._issuer and self._issuer.strip()) + + def has_valid_md5(self) -> bool: + """Check if MD5 hash is valid. + + Returns: + bool: True if MD5 hash is not empty. + """ + return bool(self._md5_hash and self._md5_hash.strip()) + + def matches_md5(self, other_md5: str) -> bool: + """Check if MD5 matches another hash. + + Args: + other_md5 (str): Other MD5 hash to compare. + + Returns: + bool: True if MD5 hashes match. + """ + return self._md5_hash == other_md5 + + def matches_issuer(self, other_issuer: str) -> bool: + """Check if issuer matches another issuer. + + Args: + other_issuer (str): Other issuer to compare. + + Returns: + bool: True if issuers match. + """ + return self._compare_dn(self._issuer, other_issuer) + + def _compare_dn(self, dn1: str, dn2: str) -> bool: + """Compare two DN strings after normalization. + + Args: + dn1 (str): First DN string. + dn2 (str): Second DN string. + + Returns: + bool: True if DNs match after normalization. + """ + if not dn1 or not dn2: + return False + return self._normalize_dn(dn1) == self._normalize_dn(dn2) + + def _normalize_dn(self, dn_string: str) -> str: + """Normalize DN string for comparison. + + Args: + dn_string (str): DN string to normalize. + + Returns: + str: Normalized DN string. + """ + if not dn_string: + return "" + + # Normalize spacing and parse components + normalized = dn_string.replace(" + ", "+").replace(" = ", "=") + components = {} + + for part in normalized.split(","): + if "=" in part.strip(): + key, value = part.strip().split("=", 1) + key, value = key.strip(), value.strip() + if key and value: + components[key] = f"{components.get(key, '')}{'+' if key in components else ''}{value}" + + # Return ordered components + order = ["CN", "OU", "O", "L", "ST", "C"] + return ",".join(f"{k}={components[k]}" for k in order if k in components) + + def is_self_signed(self) -> bool: + """Check if certificate is self-signed. + + Returns: + bool: True if subject equals issuer. + """ + return self._compare_dn(self._subject, self._issuer) diff --git a/keywords/cloud_platform/security/objects/ipsec_certificate_output.py b/keywords/cloud_platform/security/objects/ipsec_certificate_output.py new file mode 100644 index 00000000..bfb72152 --- /dev/null +++ b/keywords/cloud_platform/security/objects/ipsec_certificate_output.py @@ -0,0 +1,78 @@ +from typing import Union + +from keywords.cloud_platform.security.objects.ipsec_certificate_object import IPSecCertificateObject + + +class IPSecCertificateOutput: + """Parser for certificate command output (subject/issuer or MD5).""" + + def __init__(self, command_output: Union[str, list[str]], cert_type: str, hostname: str, cert_path: str): + """Initialize certificate output parser. + + Args: + command_output (Union[str, list[str]]): Raw command output. + cert_type (str): Type of certificate (key, cert, ica, root, k8s_cert). + hostname (str): Hostname where certificate is located. + cert_path (str): Path to certificate file. + """ + self.raw_output = command_output if isinstance(command_output, list) else [str(command_output)] + + # Auto-detect output type and create appropriate object + if self._is_subject_issuer_output(): + subject, issuer = self._extract_subject_issuer() + self.cert_object = IPSecCertificateObject(cert_type, hostname, cert_path, subject=subject, issuer=issuer) + else: + md5_hash = self._extract_md5() + self.cert_object = IPSecCertificateObject(cert_type, hostname, cert_path, md5_hash=md5_hash) + + def _is_subject_issuer_output(self) -> bool: + """Check if output contains subject/issuer information. + + Returns: + bool: True if output contains subject/issuer data. + """ + return any("subject=" in str(line) or "issuer=" in str(line) for line in self.raw_output) + + def _extract_subject_issuer(self) -> tuple[str, str]: + """Extract subject and issuer from command output. + + Returns: + tuple[str, str]: Subject and issuer information. + """ + subject = "" + issuer = "" + + for line in self.raw_output: + line = line.strip() + if line.startswith("subject="): + subject = line.replace("subject=", "").strip() + elif line.startswith("issuer="): + issuer = line.replace("issuer=", "").strip() + + return subject, issuer + + def _extract_md5(self) -> str: + """Extract MD5 hash from command output. + + Returns: + str: Extracted MD5 hash. + """ + for line in self.raw_output: + if "MD5(" in line or line.startswith("MD5 "): + if "=" in line: + parts = line.split("=") + if len(parts) > 1: + return parts[-1].strip() + else: + parts = line.split() + if len(parts) > 1: + return parts[-1].strip() + return "" + + def get_certificate(self) -> IPSecCertificateObject: + """Get certificate object. + + Returns: + IPSecCertificateObject: Certificate object with parsed information. + """ + return self.cert_object diff --git a/keywords/cloud_platform/security/objects/ipsec_dnsmasq_object.py b/keywords/cloud_platform/security/objects/ipsec_dnsmasq_object.py new file mode 100644 index 00000000..8c701ffc --- /dev/null +++ b/keywords/cloud_platform/security/objects/ipsec_dnsmasq_object.py @@ -0,0 +1,73 @@ +import re + +from keywords.network.ip_address_keywords import IPAddressKeywords + + +class IPSecDnsmasqObject: + """Object representing a dnsmasq.hosts entry.""" + + def __init__(self, mac_address: str, pxeboot_name: str, pxeboot_address: str): + """Initialize dnsmasq entry object. + + Args: + mac_address (str): MAC address. + pxeboot_name (str): PXE boot hostname. + pxeboot_address (str): PXE boot IP address. + """ + self._mac_address = mac_address + self._pxeboot_name = pxeboot_name + self._pxeboot_address = pxeboot_address + + def get_mac_address(self) -> str: + """Get MAC address. + + Returns: + str: MAC address. + """ + return self._mac_address + + def get_pxeboot_name(self) -> str: + """Get PXE boot hostname. + + Returns: + str: PXE boot hostname. + """ + return self._pxeboot_name + + def get_pxeboot_address(self) -> str: + """Get PXE boot IP address. + + Returns: + str: PXE boot IP address. + """ + return self._pxeboot_address + + def is_valid_mac(self) -> bool: + """Check if MAC address format is valid. + + Returns: + bool: True if MAC address format is valid. + """ + mac_pattern = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" + return bool(re.match(mac_pattern, self._mac_address)) + + def is_pxeboot_entry(self) -> bool: + """Check if this is a PXE boot entry. + + Returns: + bool: True if name contains 'pxeboot-'. + """ + return "pxeboot-" in self._pxeboot_name + + def is_valid_ip_address(self) -> bool: + """Check if PXE boot address is a valid IP address using framework method. + + Returns: + bool: True if address is a valid IPv4 address. + """ + try: + # Use existing framework method for IP validation + IPAddressKeywords(self._pxeboot_address) + return True + except ValueError: + return False diff --git a/keywords/cloud_platform/security/objects/ipsec_dnsmasq_output.py b/keywords/cloud_platform/security/objects/ipsec_dnsmasq_output.py new file mode 100644 index 00000000..e14db55c --- /dev/null +++ b/keywords/cloud_platform/security/objects/ipsec_dnsmasq_output.py @@ -0,0 +1,64 @@ +from typing import Union + +from keywords.cloud_platform.security.objects.ipsec_dnsmasq_object import IPSecDnsmasqObject + + +class IPSecDnsmasqOutput: + """Parser for dnsmasq.hosts file content.""" + + def __init__(self, dnsmasq_content: Union[str, list[str]]): + """Initialize dnsmasq output parser. + + Args: + dnsmasq_content (Union[str, list[str]]): Raw dnsmasq.hosts content. + """ + self.content = "\n".join(dnsmasq_content) if isinstance(dnsmasq_content, list) else dnsmasq_content + self.entries = self._parse_entries() + + def _parse_entries(self) -> list[IPSecDnsmasqObject]: + """Parse dnsmasq entries from content. + + Returns: + list[IPSecDnsmasqObject]: List of parsed dnsmasq entries. + """ + if not self.content.strip(): + return [] + + entries = [] + for line in self.content.strip().split("\n"): + if not line.strip(): + continue + + parts = line.split(",") + if len(parts) >= 3: + mac = parts[0].strip() + name = parts[1].strip() + address = parts[2].strip() + if mac and name and address: + entries.append(IPSecDnsmasqObject(mac, name, address)) + + return entries + + def get_entries(self) -> list[IPSecDnsmasqObject]: + """Get all dnsmasq entries. + + Returns: + list[IPSecDnsmasqObject]: List of dnsmasq entries. + """ + return self.entries + + def get_content(self) -> str: + """Get raw dnsmasq content. + + Returns: + str: Raw dnsmasq.hosts content. + """ + return self.content + + def get_pxeboot_entries(self) -> list[IPSecDnsmasqObject]: + """Get only pxeboot entries. + + Returns: + list[IPSecDnsmasqObject]: List of pxeboot dnsmasq entries. + """ + return [entry for entry in self.entries if entry.is_pxeboot_entry()] diff --git a/keywords/cloud_platform/security/objects/ipsec_security_association_object.py b/keywords/cloud_platform/security/objects/ipsec_security_association_object.py new file mode 100644 index 00000000..ed040759 --- /dev/null +++ b/keywords/cloud_platform/security/objects/ipsec_security_association_object.py @@ -0,0 +1,97 @@ +import re +from typing import Union + + +class IPSecSecurityAssociationObject: + """Represents a single IPSec security association.""" + + def __init__(self, name: str, association_data: str): + """Initialize the security association object. + + Args: + name (str): Name of the association. + association_data (str): Raw association data from swanctl output. + """ + self._name = name.strip() if name else "" + self._association_data = association_data or "" + self._local_cns = self._extract_cns("local") + self._remote_cns = self._extract_cns("remote") + self._remote_cns_set = set(self._remote_cns) + + def get_name(self) -> str: + """Get the association name. + + Returns: + str: Association name. + """ + return self._name + + def get_association_data(self) -> str: + """Get the raw association data. + + Returns: + str: Raw association data. + """ + return self._association_data + + def is_established(self) -> bool: + """Check if the connection is established. + + Returns: + bool: True if connection is established, False otherwise. + """ + return re.search(r"\bESTABLISHED\b", self._association_data) is not None + + def get_local_cns(self) -> list[str]: + """Get local Common Names. + + Returns: + list[str]: List of local CNs. + """ + return self._local_cns + + def get_remote_cns(self) -> list[str]: + """Get remote Common Names. + + Returns: + list[str]: List of remote CNs. + """ + return self._remote_cns + + def has_local_cn_starting_with(self, prefix: str) -> bool: + """Check if any local CN starts with the given prefix. + + Args: + prefix (str): Prefix to check for. + + Returns: + bool: True if any local CN starts with prefix, False otherwise. + """ + if not prefix or not prefix.strip(): + return False + return any(cn.startswith(prefix.strip()) for cn in self._local_cns) + + def has_remote_cn_in_set(self, cn_set: Union[set, list]) -> bool: + """Check if any remote CN is in the given set. + + Args: + cn_set (Union[set, list]): Set or list of CNs to check against. + + Returns: + bool: True if any remote CN is in the set, False otherwise. + """ + if not cn_set: + return False + return bool(self._remote_cns_set & (set(cn_set) if isinstance(cn_set, list) else cn_set)) + + def _extract_cns(self, cn_type: str) -> list[str]: + """Extract Common Names from association data. + + Args: + cn_type (str): Type of CN to extract (local or remote). + + Returns: + list[str]: List of CNs. + """ + pattern = rf"{cn_type}\s+'CN=([^']*)'(?:\s|$)" + return re.findall(pattern, self._association_data) diff --git a/keywords/cloud_platform/security/objects/ipsec_security_association_output.py b/keywords/cloud_platform/security/objects/ipsec_security_association_output.py new file mode 100644 index 00000000..cbe8e455 --- /dev/null +++ b/keywords/cloud_platform/security/objects/ipsec_security_association_output.py @@ -0,0 +1,52 @@ +import re +from typing import Union + +from keywords.cloud_platform.security.objects.ipsec_security_association_object import IPSecSecurityAssociationObject + + +class IPSecSecurityAssociationOutput: + """Parser for swanctl --list-sa command output.""" + + def __init__(self, command_output: Union[str, list[str]]): + """Initialize the parser with command output. + + Args: + command_output (Union[str, list[str]]): Output from swanctl --list-sa command. + """ + if command_output is None: + self.raw_output = "" + elif isinstance(command_output, list): + self.raw_output = "\n".join(command_output) if command_output else "" + else: + self.raw_output = command_output + + if not self.raw_output.strip(): + self.associations = [] + return + + self.associations = self.parse_associations() + + def parse_associations(self) -> list[IPSecSecurityAssociationObject]: + """Parse the security associations from the command output. + + Returns: + list[IPSecSecurityAssociationObject]: List of parsed security associations. + """ + associations = [] + pattern = re.compile(r"(system-nodes:.*?)(?=system-nodes:|\Z)", re.DOTALL) + matches = pattern.findall(self.raw_output) + + for i, match in enumerate(matches): + if match.strip(): # Only create association if match has content + association = IPSecSecurityAssociationObject(f"system-nodes-{i+1}", match.strip()) + associations.append(association) + + return associations + + def get_associations(self) -> list[IPSecSecurityAssociationObject]: + """Get all security associations. + + Returns: + list[IPSecSecurityAssociationObject]: List of security associations. + """ + return self.associations diff --git a/keywords/cloud_platform/ssh/lab_connection_keywords.py b/keywords/cloud_platform/ssh/lab_connection_keywords.py index e236e32d..178be078 100644 --- a/keywords/cloud_platform/ssh/lab_connection_keywords.py +++ b/keywords/cloud_platform/ssh/lab_connection_keywords.py @@ -176,3 +176,33 @@ class LabConnectionKeywords(BaseKeyword): ) return connection + + def ping_host(self, hostname: str, count: int = 3) -> bool: + """Ping a host by hostname or IP address. + + Args: + hostname (str): The hostname or IP address to ping. + count (int): Number of ping packets. + + Returns: + bool: True if ping succeeds, False otherwise. + """ + connection = self.get_active_controller_ssh() + cmd = f"ping -c {count} {hostname}" + connection.send(cmd) + return connection.get_return_code() == 0 + + def test_ssh_connectivity(self, hostname: str) -> bool: + """Test SSH connectivity to a host using lab credentials. + + Args: + hostname (str): The hostname to test SSH connectivity to. + + Returns: + bool: True if SSH connection succeeds, False otherwise. + """ + connection = self.get_active_controller_ssh() + lab_config = ConfigurationManager.get_lab_config() + connection.setup_ssh_pass(hostname, lab_config.get_admin_credentials().get_user_name(), lab_config.get_admin_credentials().get_password()) + connection.send("echo 'connection_test'") + return connection.get_return_code() == 0 diff --git a/keywords/k8s/secret/object/kubectl_secret_object.py b/keywords/k8s/secret/object/kubectl_secret_object.py index d27b33ae..a92751f3 100644 --- a/keywords/k8s/secret/object/kubectl_secret_object.py +++ b/keywords/k8s/secret/object/kubectl_secret_object.py @@ -156,3 +156,14 @@ class KubectlSecretObject: decoded_cert = base64.b64decode(encoded_cert) cert = x509.load_pem_x509_certificate(decoded_cert, default_backend()) return cert.issuer.rfc4514_string() + + def get_certificate_subject(self) -> str | None: + """ + Retrieves the Subject information from the 'tls.crt' data of the parsed secret. + """ + encoded_cert = self.get_tls_crt() + if not encoded_cert: + return None + decoded_cert = base64.b64decode(encoded_cert) + cert = x509.load_pem_x509_certificate(decoded_cert, default_backend()) + return cert.subject.rfc4514_string() diff --git a/testcases/cloud_platform/regression/security/test_ipsec_host_to_host.py b/testcases/cloud_platform/regression/security/test_ipsec_host_to_host.py new file mode 100644 index 00000000..ee04edec --- /dev/null +++ b/testcases/cloud_platform/regression/security/test_ipsec_host_to_host.py @@ -0,0 +1,238 @@ +from pytest import mark + +from framework.logging.automation_logger import get_logger +from framework.validation.validation import validate_equals, validate_greater_than, validate_str_contains +from keywords.cloud_platform.security.ipsec_keywords import IPSecKeywords +from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords +from keywords.cloud_platform.system.host.system_host_list_keywords import SystemHostListKeywords +from keywords.k8s.secret.kubectl_get_secret_keywords import KubectlGetSecretsKeywords +from keywords.linux.systemctl.systemctl_is_active_keywords import SystemCTLIsActiveKeywords + + +@mark.p1 +def test_ipsec_certificates(): + """Validate IPSec certificate chain and key-certificate matching. + + Steps: + - Verify root CA is self-signed + - Validate certificate chain from root CA to IPSec certificate + - Check IPSec certificate has correct CN + - Verify system-local-ca is the issuer of IPSec certificate + - Verify Kubernetes secret system-local-ca subject matches IPSec certificate issuer + - Log certificate information for debugging + - Validate key and certificate MD5 checksums match + """ + ssh_connection = LabConnectionKeywords().get_active_controller_ssh() + ipsec_keywords = IPSecKeywords(ssh_connection) + + # Get the actual hostname from system host list + system_hosts = SystemHostListKeywords(ssh_connection) + active_controller = system_hosts.get_active_controller() + hostname = active_controller.get_host_name() + + get_logger().log_info(f"Validating certificate chain for {hostname}") + + # Build certificate paths + ipsec_crt_path = ipsec_keywords.get_ipsec_certificate_path(hostname) + ipsec_key_path = ipsec_keywords.get_ipsec_key_path(hostname) + + # Validate IPSec certificate exists + validate_equals(ipsec_keywords.file_keywords.validate_file_exists_with_sudo(ipsec_crt_path), True, f"IPSec certificate should exist: {ipsec_crt_path}") + + # Get certificate information using string constants + root_ca_output = ipsec_keywords.get_certificate_info_by_type("root_ca") + ica_l1_output = ipsec_keywords.get_certificate_info_by_type("ica_l1") + ica_output = ipsec_keywords.get_certificate_info_by_type("ica") + ipsec_cert_output = ipsec_keywords.get_certificate_info_subject_issuer(ipsec_crt_path) + + root_ca_cert = root_ca_output.get_certificate() + ica_l1_cert = ica_l1_output.get_certificate() + ica_cert = ica_output.get_certificate() + ipsec_cert = ipsec_cert_output.get_certificate() + + # Validate root CA is self-signed + validate_equals(root_ca_cert.is_self_signed(), True, f"Root CA should be self-signed for {hostname}") + + # Validate certificate chain based on scenario + is_self_signed = ica_l1_cert.get_subject() == root_ca_cert.get_subject() and ica_cert.get_subject() == root_ca_cert.get_subject() + + scenario = "self-signed" if is_self_signed else "full CA hierarchy" + get_logger().log_info(f"Detected {scenario} certificate scenario for {hostname}") + + if is_self_signed: + validate_equals(ipsec_cert.matches_issuer(root_ca_cert.get_subject()), True, f"IPSec certificate should be issued by Root CA for {hostname}") + else: + # Validate full certificate chain + validate_equals(ica_l1_cert.matches_issuer(root_ca_cert.get_subject()), True, f"ICA L1 should be issued by Root CA for {hostname}") + validate_equals(ica_cert.matches_issuer(ica_l1_cert.get_subject()), True, f"ICA should be issued by ICA L1 for {hostname}") + validate_equals(ipsec_cert.matches_issuer(ica_cert.get_subject()), True, f"IPSec certificate should be issued by ICA for {hostname}") + + # Log IPSec certificate subject for debugging + get_logger().log_info(f"IPSec certificate subject: {ipsec_cert.get_subject()}") + + # Validate IPSec certificate has valid subject (not checking specific CN as it may be intermediate CA) + validate_equals(ipsec_cert.has_valid_subject(), True, f"IPSec certificate should have valid subject for {hostname}") + + # Validate Kubernetes secret subject matches IPSec certificate issuer + kubectl_secrets = KubectlGetSecretsKeywords(ssh_connection) + kube_secret = kubectl_secrets.get_secret_json_output("system-local-ca", "cert-manager") + kube_subject = kube_secret.get_certificate_subject() + + get_logger().log_info(f"IPSec certificate issuer: {ipsec_cert.get_issuer()}") + get_logger().log_info(f"Kubernetes secret subject: {kube_subject}") + + # Validation will handle normalization internally + validate_equals(ipsec_cert.matches_issuer(kube_subject), True, f"IPSec certificate should be issued by Kubernetes system-local-ca secret for {hostname}") + + # Validate key-certificate MD5 matching + get_logger().log_info(f"Validating key-certificate MD5 matching for {hostname}") + key_output = ipsec_keywords.get_certificate_info_key_md5(ipsec_key_path) + cert_md5_output = ipsec_keywords.get_certificate_info_cert_md5(ipsec_crt_path) + + key_cert = key_output.get_certificate() + cert_md5_cert = cert_md5_output.get_certificate() + + validate_equals(key_cert.matches_md5(cert_md5_cert.get_md5_hash()), True, f"IPSec key MD5 should match IPSec certificate MD5 for {hostname}") + + +@mark.p1 +def test_pxeboot_network(): + """Verify the correctness of the dnsmasq.hosts file by checking the mapping of MAC addresses to pxeboot names and addresses. + + Steps: + - Ensure each MAC address in the dnsmasq.hosts file is included in a list of valid MAC addresses + - Verify that each MAC address has associated pxeboot name and address entries + - Check if the pxeboot addresses are reachable via ping + - Test SSH connectivity to pxeboot hostnames + """ + ssh_connection = LabConnectionKeywords().get_active_controller_ssh() + ipsec_keywords = IPSecKeywords(ssh_connection) + system_hosts = SystemHostListKeywords(ssh_connection) + lab_connection_keywords = LabConnectionKeywords() + + get_logger().log_info("Fetching pxeboot information from dnsmasq.hosts file") + dnsmasq_output = ipsec_keywords.get_dnsmasq_hosts() + + validate_str_contains(dnsmasq_output.get_content(), "pxe", "dnsmasq.hosts should contain PXE entries") + + hosts = system_hosts.get_system_host_list().get_hosts() + + # Get parsed dnsmasq entries and validate each + pxeboot_entries = dnsmasq_output.get_pxeboot_entries() + + for entry in pxeboot_entries: + # Validate MAC address format using object method + validate_equals(entry.is_valid_mac(), True, f"MAC address should be valid: {entry.get_mac_address()}") + + # Validate pxeboot name format using object method + validate_equals(entry.is_pxeboot_entry(), True, f"Should be PXE boot entry: {entry.get_pxeboot_name()}") + + # Test connectivity + ping_success = lab_connection_keywords.ping_host(entry.get_pxeboot_address()) + validate_equals(ping_success, True, f"Ping should succeed for {entry.get_pxeboot_address()}") + + # Test SSH connectivity using pxeboot name with lab credentials + ssh_success = lab_connection_keywords.test_ssh_connectivity(entry.get_pxeboot_name()) + validate_equals(ssh_success, True, f"SSH should succeed for {entry.get_pxeboot_name()}") + + # Validate we have pxeboot entries + validate_greater_than(len(pxeboot_entries), 0, f"Should have pxeboot entries, found {len(pxeboot_entries)}") + get_logger().log_info(f"Found {len(pxeboot_entries)} pxeboot entries for {len(hosts)} hosts") + + +@mark.p1 +def test_security_associations(): + """Validate the security associations between nodes using the swanctl command. + + Steps: + - Retrieve hosts and active controller information + - Get the list of security associations using swanctl + - Verify that security associations are established and valid + - Check local and remote CN patterns match expected values + """ + ssh_connection = LabConnectionKeywords().get_active_controller_ssh() + ipsec_keywords = IPSecKeywords(ssh_connection) + system_hosts = SystemHostListKeywords(ssh_connection) + + hosts = system_hosts.get_system_host_list().get_hosts() + active_controller = system_hosts.get_active_controller() + associations_output = ipsec_keywords.get_security_associations() + associations = associations_output.get_associations() + + validate_greater_than(len(associations), 0, f"Should find security associations, found {len(associations)} associations") + + remote_host_count = len(hosts) - 1 + expected_association_count = remote_host_count * 2 + + get_logger().log_info(f"Lab topology: {len(hosts)} total hosts, {remote_host_count} remote hosts") + get_logger().log_info(f"Expected: {expected_association_count} associations (2 per remote host), Actual: {len(associations)}") + + validate_equals(len(associations), expected_association_count, f"Should have {expected_association_count} associations (2 per remote host), found {len(associations)}") + + active_cn_pattern = f"ipsec-{active_controller.get_host_name()}" + expected_cns = {f"ipsec-{host.get_host_name()}" for host in hosts if f"ipsec-{host.get_host_name()}" != active_cn_pattern} + + get_logger().log_info(f"Active controller CN pattern: {active_cn_pattern}") + get_logger().log_info(f"Expected remote CNs: {expected_cns}") + + for i, association in enumerate(associations): + validate_equals(association.is_established(), True, f"Connection should be established in association {i+1}") + validate_equals(association.has_local_cn_starting_with(active_cn_pattern), True, f"Should have local CN starting with {active_cn_pattern} in association {i+1}") + + validate_equals(association.has_remote_cn_in_set(expected_cns), True, f"Should have expected remote CN in association {i+1}") + + +@mark.p1 +def test_ipsec_status(): + """Validate IPSec service status on all hosts. + + Steps: + - Check ipsec-server service is active on all controllers + - Check ipsec service is active on all hosts + """ + ssh_connection = LabConnectionKeywords().get_active_controller_ssh() + system_hosts = SystemHostListKeywords(ssh_connection) + lab_connection_keywords = LabConnectionKeywords() + + hosts = system_hosts.get_system_host_list().get_hosts() + + # Check ipsec-server service on controllers + controllers = system_hosts.get_system_host_list().get_controllers() + for controller in controllers: + controller_ssh = lab_connection_keywords.get_ssh_for_hostname(controller.get_host_name()) + systemctl_keywords = SystemCTLIsActiveKeywords(controller_ssh) + service_status = systemctl_keywords.is_active("ipsec-server") + validate_equals(service_status, "active", f"ipsec-server service should be active on {controller.get_host_name()}") + + # Check ipsec service on all hosts + for host in hosts: + if host.get_personality() == "worker": + host_ssh = lab_connection_keywords.get_compute_ssh(host.get_host_name()) + else: + host_ssh = lab_connection_keywords.get_ssh_for_hostname(host.get_host_name()) + systemctl_keywords = SystemCTLIsActiveKeywords(host_ssh) + service_status = systemctl_keywords.is_active("ipsec") + validate_equals(service_status, "active", f"ipsec service should be active on {host.get_host_name()}") + + +@mark.p1 +def test_ping_mgmt_ips(): + """Test the ability to ping management IPs from the active controller. + + Steps: + - Get list of all hosts in the system + - Identify the active controller + - Ping each host from the active controller + - Verify all ping operations succeed + """ + ssh_connection = LabConnectionKeywords().get_active_controller_ssh() + system_hosts = SystemHostListKeywords(ssh_connection) + lab_connection_keywords = LabConnectionKeywords() + + host_list = system_hosts.get_system_host_list() + hosts = [host.get_host_name() for host in host_list.get_hosts()] + + get_logger().log_info(f"Testing ping connectivity to {len(hosts)} hosts") + for host in hosts: + ping_success = lab_connection_keywords.ping_host(host) + validate_equals(ping_success, True, f"Ping should succeed to {host}")