ipsec host to host sanity testcases

Change-Id: Iabd2931a627c686ac225911b0340e49f94d8d678
Signed-off-by: Thomas Sunil <sunil.thomas@windriver.com>
This commit is contained in:
Thomas Sunil
2025-09-10 23:17:06 -04:00
parent 10b3e0f068
commit 7a6aed71c4
11 changed files with 978 additions and 20 deletions

View File

@@ -2,46 +2,40 @@ import json5
class SecurityConfig:
"""
Class to hold configuration for Security tests
"""
"""Class to hold configuration for Security tests."""
def __init__(self, config):
def __init__(self, config: str):
"""Initialize security configuration.
try:
json_data = open(config)
except FileNotFoundError:
print(f"Could not find the security config file: {config}")
raise
security_dict = json5.load(json_data)
Args:
config (str): Path to configuration file.
"""
with open(config) as json_data:
security_dict = json5.load(json_data)
self.domain_name = security_dict["domain_name"]
self.stepca_server_url = security_dict["stepca_server_url"]
self.stepca_server_issuer = security_dict["stepca_server_issuer"]
def get_domain_name(self) -> str:
"""
Getter for the dns name
"""Getter for the domain name.
Returns:
str: the dns name
str: The domain name.
"""
return self.domain_name
def get_stepca_server_url(self) -> str:
"""
Getter for the stepca server url
"""Getter for the stepca server URL.
Returns:
str: stepca server url
str: StepCA server URL.
"""
return self.stepca_server_url
def get_stepca_server_issuer(self) -> str:
"""
Getter for the stepca server issuer
"""Getter for the stepca server issuer.
Returns:
str: stepca server url
str: StepCA server issuer.
"""
return self.stepca_server_issuer

View File

@@ -0,0 +1,158 @@
from framework.exceptions.keyword_exception import KeywordException
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.security.objects.ipsec_certificate_output import IPSecCertificateOutput
from keywords.cloud_platform.security.objects.ipsec_dnsmasq_output import IPSecDnsmasqOutput
from keywords.cloud_platform.security.objects.ipsec_security_association_output import IPSecSecurityAssociationOutput
from keywords.cloud_platform.system.show.system_show_keywords import SystemShowKeywords
from keywords.files.file_keywords import FileKeywords
class IPSecKeywords(BaseKeyword):
"""Keywords for IPSec operations and validations.
This class provides methods for managing IPSec certificates,
security associations, and service status validation.
"""
def __init__(self, ssh_connection: SSHConnection):
"""Initialize IPSec keywords.
Args:
ssh_connection (SSHConnection): SSH connection to the active controller.
"""
self.ssh_connection = ssh_connection
# Initialize reusable keyword classes
self.file_keywords = FileKeywords(ssh_connection)
self.system_show = SystemShowKeywords(ssh_connection)
def get_dnsmasq_hosts(self) -> IPSecDnsmasqOutput:
"""Get dnsmasq.hosts file content.
Returns:
IPSecDnsmasqOutput: Parsed dnsmasq.hosts content.
"""
system_show_object = self.system_show.system_show().get_system_show_object()
software_version = system_show_object.get_software_version()
dnsmasq_path = f"/opt/platform/config/{software_version}/dnsmasq.hosts"
output = self.ssh_connection.send(f"cat {dnsmasq_path}")
self.validate_success_return_code(self.ssh_connection)
return IPSecDnsmasqOutput(output)
def get_security_associations(self) -> IPSecSecurityAssociationOutput:
"""Get IPSec security associations using swanctl command.
Returns:
IPSecSecurityAssociationOutput: Parsed security associations.
"""
cmd = "swanctl --list-sa"
output = self.ssh_connection.send_as_sudo(cmd)
self.validate_success_return_code(self.ssh_connection)
return IPSecSecurityAssociationOutput(output)
def get_certificate_info_by_type(self, cert_type: str) -> IPSecCertificateOutput:
"""Get certificate information by certificate type with automatic path resolution.
Args:
cert_type (str): Certificate type (root_ca, ica_l1, ica).
Returns:
IPSecCertificateOutput: Certificate output with requested information.
"""
valid_types = ["root_ca", "ica_l1", "ica"]
if cert_type not in valid_types:
raise KeywordException(f"Invalid cert_type '{cert_type}'. Valid types: {valid_types}")
cert_path = None
if cert_type == "ica_l1":
cert_path = "x509ca/system-local-ca-1_l1.crt"
elif cert_type == "ica":
cert_path = "x509ca/system-local-ca-1.crt"
elif cert_type == "root_ca":
cert_path = "x509ca/system-root-ca-1.crt"
full_primary_path = f"/etc/swanctl/{cert_path}"
# Use FileKeywords to check if file exists
if not self.file_keywords.validate_file_exists_with_sudo(full_primary_path):
cert_path = "x509ca/system-root-ca-1.crt" # Fallback to root_ca
full_primary_path = f"/etc/swanctl/{cert_path}"
return self.get_certificate_info_subject_issuer(full_primary_path)
def get_certificate_info_subject_issuer(self, cert_path: str) -> IPSecCertificateOutput:
"""Get certificate subject and issuer information.
Args:
cert_path (str): Path to the certificate file.
Returns:
IPSecCertificateOutput: Certificate output with subject/issuer information.
"""
cmd = f"openssl x509 -in {cert_path} -noout -subject -issuer"
output = self.ssh_connection.send_as_sudo(cmd)
self.validate_success_return_code(self.ssh_connection)
return IPSecCertificateOutput(output, "subject_issuer", "active_controller", cert_path)
def get_certificate_info_cert_md5(self, cert_path: str) -> IPSecCertificateOutput:
"""Get certificate MD5 hash.
Args:
cert_path (str): Path to the certificate file.
Returns:
IPSecCertificateOutput: Certificate output with MD5 hash.
"""
cmd = f"openssl x509 -in {cert_path} -modulus -noout | openssl md5"
output = self.ssh_connection.send_as_sudo(cmd)
self.validate_success_return_code(self.ssh_connection)
return IPSecCertificateOutput(output, "cert_md5", "active_controller", cert_path)
def get_certificate_info_key_md5(self, cert_path: str) -> IPSecCertificateOutput:
"""Get certificate key MD5 hash.
Args:
cert_path (str): Path to the certificate key file.
Returns:
IPSecCertificateOutput: Certificate output with key MD5 hash.
"""
cmd = f"openssl rsa -in {cert_path} -modulus -noout | openssl md5"
output = self.ssh_connection.send_as_sudo(cmd)
self.validate_success_return_code(self.ssh_connection)
return IPSecCertificateOutput(output, "key_md5", "active_controller", cert_path)
def validate_service_name_is_valid(self, service_name: str) -> None:
"""Validate that the IPSec service name is valid.
Args:
service_name (str): Service name to validate (ipsec-server, ipsec).
Raises:
KeywordException: If service name is not valid.
"""
valid_services = ["ipsec-server", "ipsec"]
if service_name not in valid_services:
raise KeywordException(f"Invalid service_name '{service_name}'. Valid services: {valid_services}")
def get_ipsec_certificate_path(self, hostname: str) -> str:
"""Get full IPSec certificate path for hostname.
Args:
hostname (str): Hostname for certificate.
Returns:
str: Full certificate path.
"""
return f"/etc/swanctl/x509/system-ipsec-certificate-{hostname}.crt"
def get_ipsec_key_path(self, hostname: str) -> str:
"""Get full IPSec key path for hostname.
Args:
hostname (str): Hostname for key.
Returns:
str: Full key path.
"""
return f"/etc/swanctl/private/system-ipsec-certificate-{hostname}.key"

View File

@@ -0,0 +1,163 @@
class IPSecCertificateObject:
"""Object representing an IPSec certificate with subject/issuer information and MD5 hash."""
def __init__(self, cert_type: str, hostname: str, cert_path: str, subject: str = "", issuer: str = "", md5_hash: str = ""):
"""Initialize certificate object.
Args:
cert_type (str): Type of certificate (key, cert, ica, root, k8s_cert).
hostname (str): Hostname where certificate is located.
cert_path (str): Path to certificate file.
subject (str): Certificate subject information.
issuer (str): Certificate issuer information.
md5_hash (str): MD5 hash of the certificate.
"""
self._cert_type = cert_type
self._hostname = hostname
self._cert_path = cert_path
self._subject = subject
self._issuer = issuer
self._md5_hash = md5_hash
def get_cert_type(self) -> str:
"""Get certificate type.
Returns:
str: Certificate type.
"""
return self._cert_type
def get_hostname(self) -> str:
"""Get hostname.
Returns:
str: Hostname where certificate is located.
"""
return self._hostname
def get_cert_path(self) -> str:
"""Get certificate path.
Returns:
str: Path to certificate file.
"""
return self._cert_path
def get_subject(self) -> str:
"""Get certificate subject.
Returns:
str: Certificate subject information.
"""
return self._subject
def get_issuer(self) -> str:
"""Get certificate issuer.
Returns:
str: Certificate issuer information.
"""
return self._issuer
def get_md5_hash(self) -> str:
"""Get MD5 hash.
Returns:
str: MD5 hash of the certificate.
"""
return self._md5_hash
def has_valid_subject(self) -> bool:
"""Check if subject is valid.
Returns:
bool: True if subject is not empty.
"""
return bool(self._subject and self._subject.strip())
def has_valid_issuer(self) -> bool:
"""Check if issuer is valid.
Returns:
bool: True if issuer is not empty.
"""
return bool(self._issuer and self._issuer.strip())
def has_valid_md5(self) -> bool:
"""Check if MD5 hash is valid.
Returns:
bool: True if MD5 hash is not empty.
"""
return bool(self._md5_hash and self._md5_hash.strip())
def matches_md5(self, other_md5: str) -> bool:
"""Check if MD5 matches another hash.
Args:
other_md5 (str): Other MD5 hash to compare.
Returns:
bool: True if MD5 hashes match.
"""
return self._md5_hash == other_md5
def matches_issuer(self, other_issuer: str) -> bool:
"""Check if issuer matches another issuer.
Args:
other_issuer (str): Other issuer to compare.
Returns:
bool: True if issuers match.
"""
return self._compare_dn(self._issuer, other_issuer)
def _compare_dn(self, dn1: str, dn2: str) -> bool:
"""Compare two DN strings after normalization.
Args:
dn1 (str): First DN string.
dn2 (str): Second DN string.
Returns:
bool: True if DNs match after normalization.
"""
if not dn1 or not dn2:
return False
return self._normalize_dn(dn1) == self._normalize_dn(dn2)
def _normalize_dn(self, dn_string: str) -> str:
"""Normalize DN string for comparison.
Args:
dn_string (str): DN string to normalize.
Returns:
str: Normalized DN string.
"""
if not dn_string:
return ""
# Normalize spacing and parse components
normalized = dn_string.replace(" + ", "+").replace(" = ", "=")
components = {}
for part in normalized.split(","):
if "=" in part.strip():
key, value = part.strip().split("=", 1)
key, value = key.strip(), value.strip()
if key and value:
components[key] = f"{components.get(key, '')}{'+' if key in components else ''}{value}"
# Return ordered components
order = ["CN", "OU", "O", "L", "ST", "C"]
return ",".join(f"{k}={components[k]}" for k in order if k in components)
def is_self_signed(self) -> bool:
"""Check if certificate is self-signed.
Returns:
bool: True if subject equals issuer.
"""
return self._compare_dn(self._subject, self._issuer)

View File

@@ -0,0 +1,78 @@
from typing import Union
from keywords.cloud_platform.security.objects.ipsec_certificate_object import IPSecCertificateObject
class IPSecCertificateOutput:
"""Parser for certificate command output (subject/issuer or MD5)."""
def __init__(self, command_output: Union[str, list[str]], cert_type: str, hostname: str, cert_path: str):
"""Initialize certificate output parser.
Args:
command_output (Union[str, list[str]]): Raw command output.
cert_type (str): Type of certificate (key, cert, ica, root, k8s_cert).
hostname (str): Hostname where certificate is located.
cert_path (str): Path to certificate file.
"""
self.raw_output = command_output if isinstance(command_output, list) else [str(command_output)]
# Auto-detect output type and create appropriate object
if self._is_subject_issuer_output():
subject, issuer = self._extract_subject_issuer()
self.cert_object = IPSecCertificateObject(cert_type, hostname, cert_path, subject=subject, issuer=issuer)
else:
md5_hash = self._extract_md5()
self.cert_object = IPSecCertificateObject(cert_type, hostname, cert_path, md5_hash=md5_hash)
def _is_subject_issuer_output(self) -> bool:
"""Check if output contains subject/issuer information.
Returns:
bool: True if output contains subject/issuer data.
"""
return any("subject=" in str(line) or "issuer=" in str(line) for line in self.raw_output)
def _extract_subject_issuer(self) -> tuple[str, str]:
"""Extract subject and issuer from command output.
Returns:
tuple[str, str]: Subject and issuer information.
"""
subject = ""
issuer = ""
for line in self.raw_output:
line = line.strip()
if line.startswith("subject="):
subject = line.replace("subject=", "").strip()
elif line.startswith("issuer="):
issuer = line.replace("issuer=", "").strip()
return subject, issuer
def _extract_md5(self) -> str:
"""Extract MD5 hash from command output.
Returns:
str: Extracted MD5 hash.
"""
for line in self.raw_output:
if "MD5(" in line or line.startswith("MD5 "):
if "=" in line:
parts = line.split("=")
if len(parts) > 1:
return parts[-1].strip()
else:
parts = line.split()
if len(parts) > 1:
return parts[-1].strip()
return ""
def get_certificate(self) -> IPSecCertificateObject:
"""Get certificate object.
Returns:
IPSecCertificateObject: Certificate object with parsed information.
"""
return self.cert_object

View File

@@ -0,0 +1,73 @@
import re
from keywords.network.ip_address_keywords import IPAddressKeywords
class IPSecDnsmasqObject:
"""Object representing a dnsmasq.hosts entry."""
def __init__(self, mac_address: str, pxeboot_name: str, pxeboot_address: str):
"""Initialize dnsmasq entry object.
Args:
mac_address (str): MAC address.
pxeboot_name (str): PXE boot hostname.
pxeboot_address (str): PXE boot IP address.
"""
self._mac_address = mac_address
self._pxeboot_name = pxeboot_name
self._pxeboot_address = pxeboot_address
def get_mac_address(self) -> str:
"""Get MAC address.
Returns:
str: MAC address.
"""
return self._mac_address
def get_pxeboot_name(self) -> str:
"""Get PXE boot hostname.
Returns:
str: PXE boot hostname.
"""
return self._pxeboot_name
def get_pxeboot_address(self) -> str:
"""Get PXE boot IP address.
Returns:
str: PXE boot IP address.
"""
return self._pxeboot_address
def is_valid_mac(self) -> bool:
"""Check if MAC address format is valid.
Returns:
bool: True if MAC address format is valid.
"""
mac_pattern = r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
return bool(re.match(mac_pattern, self._mac_address))
def is_pxeboot_entry(self) -> bool:
"""Check if this is a PXE boot entry.
Returns:
bool: True if name contains 'pxeboot-'.
"""
return "pxeboot-" in self._pxeboot_name
def is_valid_ip_address(self) -> bool:
"""Check if PXE boot address is a valid IP address using framework method.
Returns:
bool: True if address is a valid IPv4 address.
"""
try:
# Use existing framework method for IP validation
IPAddressKeywords(self._pxeboot_address)
return True
except ValueError:
return False

View File

@@ -0,0 +1,64 @@
from typing import Union
from keywords.cloud_platform.security.objects.ipsec_dnsmasq_object import IPSecDnsmasqObject
class IPSecDnsmasqOutput:
"""Parser for dnsmasq.hosts file content."""
def __init__(self, dnsmasq_content: Union[str, list[str]]):
"""Initialize dnsmasq output parser.
Args:
dnsmasq_content (Union[str, list[str]]): Raw dnsmasq.hosts content.
"""
self.content = "\n".join(dnsmasq_content) if isinstance(dnsmasq_content, list) else dnsmasq_content
self.entries = self._parse_entries()
def _parse_entries(self) -> list[IPSecDnsmasqObject]:
"""Parse dnsmasq entries from content.
Returns:
list[IPSecDnsmasqObject]: List of parsed dnsmasq entries.
"""
if not self.content.strip():
return []
entries = []
for line in self.content.strip().split("\n"):
if not line.strip():
continue
parts = line.split(",")
if len(parts) >= 3:
mac = parts[0].strip()
name = parts[1].strip()
address = parts[2].strip()
if mac and name and address:
entries.append(IPSecDnsmasqObject(mac, name, address))
return entries
def get_entries(self) -> list[IPSecDnsmasqObject]:
"""Get all dnsmasq entries.
Returns:
list[IPSecDnsmasqObject]: List of dnsmasq entries.
"""
return self.entries
def get_content(self) -> str:
"""Get raw dnsmasq content.
Returns:
str: Raw dnsmasq.hosts content.
"""
return self.content
def get_pxeboot_entries(self) -> list[IPSecDnsmasqObject]:
"""Get only pxeboot entries.
Returns:
list[IPSecDnsmasqObject]: List of pxeboot dnsmasq entries.
"""
return [entry for entry in self.entries if entry.is_pxeboot_entry()]

View File

@@ -0,0 +1,97 @@
import re
from typing import Union
class IPSecSecurityAssociationObject:
"""Represents a single IPSec security association."""
def __init__(self, name: str, association_data: str):
"""Initialize the security association object.
Args:
name (str): Name of the association.
association_data (str): Raw association data from swanctl output.
"""
self._name = name.strip() if name else ""
self._association_data = association_data or ""
self._local_cns = self._extract_cns("local")
self._remote_cns = self._extract_cns("remote")
self._remote_cns_set = set(self._remote_cns)
def get_name(self) -> str:
"""Get the association name.
Returns:
str: Association name.
"""
return self._name
def get_association_data(self) -> str:
"""Get the raw association data.
Returns:
str: Raw association data.
"""
return self._association_data
def is_established(self) -> bool:
"""Check if the connection is established.
Returns:
bool: True if connection is established, False otherwise.
"""
return re.search(r"\bESTABLISHED\b", self._association_data) is not None
def get_local_cns(self) -> list[str]:
"""Get local Common Names.
Returns:
list[str]: List of local CNs.
"""
return self._local_cns
def get_remote_cns(self) -> list[str]:
"""Get remote Common Names.
Returns:
list[str]: List of remote CNs.
"""
return self._remote_cns
def has_local_cn_starting_with(self, prefix: str) -> bool:
"""Check if any local CN starts with the given prefix.
Args:
prefix (str): Prefix to check for.
Returns:
bool: True if any local CN starts with prefix, False otherwise.
"""
if not prefix or not prefix.strip():
return False
return any(cn.startswith(prefix.strip()) for cn in self._local_cns)
def has_remote_cn_in_set(self, cn_set: Union[set, list]) -> bool:
"""Check if any remote CN is in the given set.
Args:
cn_set (Union[set, list]): Set or list of CNs to check against.
Returns:
bool: True if any remote CN is in the set, False otherwise.
"""
if not cn_set:
return False
return bool(self._remote_cns_set & (set(cn_set) if isinstance(cn_set, list) else cn_set))
def _extract_cns(self, cn_type: str) -> list[str]:
"""Extract Common Names from association data.
Args:
cn_type (str): Type of CN to extract (local or remote).
Returns:
list[str]: List of CNs.
"""
pattern = rf"{cn_type}\s+'CN=([^']*)'(?:\s|$)"
return re.findall(pattern, self._association_data)

View File

@@ -0,0 +1,52 @@
import re
from typing import Union
from keywords.cloud_platform.security.objects.ipsec_security_association_object import IPSecSecurityAssociationObject
class IPSecSecurityAssociationOutput:
"""Parser for swanctl --list-sa command output."""
def __init__(self, command_output: Union[str, list[str]]):
"""Initialize the parser with command output.
Args:
command_output (Union[str, list[str]]): Output from swanctl --list-sa command.
"""
if command_output is None:
self.raw_output = ""
elif isinstance(command_output, list):
self.raw_output = "\n".join(command_output) if command_output else ""
else:
self.raw_output = command_output
if not self.raw_output.strip():
self.associations = []
return
self.associations = self.parse_associations()
def parse_associations(self) -> list[IPSecSecurityAssociationObject]:
"""Parse the security associations from the command output.
Returns:
list[IPSecSecurityAssociationObject]: List of parsed security associations.
"""
associations = []
pattern = re.compile(r"(system-nodes:.*?)(?=system-nodes:|\Z)", re.DOTALL)
matches = pattern.findall(self.raw_output)
for i, match in enumerate(matches):
if match.strip(): # Only create association if match has content
association = IPSecSecurityAssociationObject(f"system-nodes-{i+1}", match.strip())
associations.append(association)
return associations
def get_associations(self) -> list[IPSecSecurityAssociationObject]:
"""Get all security associations.
Returns:
list[IPSecSecurityAssociationObject]: List of security associations.
"""
return self.associations

View File

@@ -176,3 +176,33 @@ class LabConnectionKeywords(BaseKeyword):
)
return connection
def ping_host(self, hostname: str, count: int = 3) -> bool:
"""Ping a host by hostname or IP address.
Args:
hostname (str): The hostname or IP address to ping.
count (int): Number of ping packets.
Returns:
bool: True if ping succeeds, False otherwise.
"""
connection = self.get_active_controller_ssh()
cmd = f"ping -c {count} {hostname}"
connection.send(cmd)
return connection.get_return_code() == 0
def test_ssh_connectivity(self, hostname: str) -> bool:
"""Test SSH connectivity to a host using lab credentials.
Args:
hostname (str): The hostname to test SSH connectivity to.
Returns:
bool: True if SSH connection succeeds, False otherwise.
"""
connection = self.get_active_controller_ssh()
lab_config = ConfigurationManager.get_lab_config()
connection.setup_ssh_pass(hostname, lab_config.get_admin_credentials().get_user_name(), lab_config.get_admin_credentials().get_password())
connection.send("echo 'connection_test'")
return connection.get_return_code() == 0

View File

@@ -156,3 +156,14 @@ class KubectlSecretObject:
decoded_cert = base64.b64decode(encoded_cert)
cert = x509.load_pem_x509_certificate(decoded_cert, default_backend())
return cert.issuer.rfc4514_string()
def get_certificate_subject(self) -> str | None:
"""
Retrieves the Subject information from the 'tls.crt' data of the parsed secret.
"""
encoded_cert = self.get_tls_crt()
if not encoded_cert:
return None
decoded_cert = base64.b64decode(encoded_cert)
cert = x509.load_pem_x509_certificate(decoded_cert, default_backend())
return cert.subject.rfc4514_string()

View File

@@ -0,0 +1,238 @@
from pytest import mark
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals, validate_greater_than, validate_str_contains
from keywords.cloud_platform.security.ipsec_keywords import IPSecKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.host.system_host_list_keywords import SystemHostListKeywords
from keywords.k8s.secret.kubectl_get_secret_keywords import KubectlGetSecretsKeywords
from keywords.linux.systemctl.systemctl_is_active_keywords import SystemCTLIsActiveKeywords
@mark.p1
def test_ipsec_certificates():
"""Validate IPSec certificate chain and key-certificate matching.
Steps:
- Verify root CA is self-signed
- Validate certificate chain from root CA to IPSec certificate
- Check IPSec certificate has correct CN
- Verify system-local-ca is the issuer of IPSec certificate
- Verify Kubernetes secret system-local-ca subject matches IPSec certificate issuer
- Log certificate information for debugging
- Validate key and certificate MD5 checksums match
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
ipsec_keywords = IPSecKeywords(ssh_connection)
# Get the actual hostname from system host list
system_hosts = SystemHostListKeywords(ssh_connection)
active_controller = system_hosts.get_active_controller()
hostname = active_controller.get_host_name()
get_logger().log_info(f"Validating certificate chain for {hostname}")
# Build certificate paths
ipsec_crt_path = ipsec_keywords.get_ipsec_certificate_path(hostname)
ipsec_key_path = ipsec_keywords.get_ipsec_key_path(hostname)
# Validate IPSec certificate exists
validate_equals(ipsec_keywords.file_keywords.validate_file_exists_with_sudo(ipsec_crt_path), True, f"IPSec certificate should exist: {ipsec_crt_path}")
# Get certificate information using string constants
root_ca_output = ipsec_keywords.get_certificate_info_by_type("root_ca")
ica_l1_output = ipsec_keywords.get_certificate_info_by_type("ica_l1")
ica_output = ipsec_keywords.get_certificate_info_by_type("ica")
ipsec_cert_output = ipsec_keywords.get_certificate_info_subject_issuer(ipsec_crt_path)
root_ca_cert = root_ca_output.get_certificate()
ica_l1_cert = ica_l1_output.get_certificate()
ica_cert = ica_output.get_certificate()
ipsec_cert = ipsec_cert_output.get_certificate()
# Validate root CA is self-signed
validate_equals(root_ca_cert.is_self_signed(), True, f"Root CA should be self-signed for {hostname}")
# Validate certificate chain based on scenario
is_self_signed = ica_l1_cert.get_subject() == root_ca_cert.get_subject() and ica_cert.get_subject() == root_ca_cert.get_subject()
scenario = "self-signed" if is_self_signed else "full CA hierarchy"
get_logger().log_info(f"Detected {scenario} certificate scenario for {hostname}")
if is_self_signed:
validate_equals(ipsec_cert.matches_issuer(root_ca_cert.get_subject()), True, f"IPSec certificate should be issued by Root CA for {hostname}")
else:
# Validate full certificate chain
validate_equals(ica_l1_cert.matches_issuer(root_ca_cert.get_subject()), True, f"ICA L1 should be issued by Root CA for {hostname}")
validate_equals(ica_cert.matches_issuer(ica_l1_cert.get_subject()), True, f"ICA should be issued by ICA L1 for {hostname}")
validate_equals(ipsec_cert.matches_issuer(ica_cert.get_subject()), True, f"IPSec certificate should be issued by ICA for {hostname}")
# Log IPSec certificate subject for debugging
get_logger().log_info(f"IPSec certificate subject: {ipsec_cert.get_subject()}")
# Validate IPSec certificate has valid subject (not checking specific CN as it may be intermediate CA)
validate_equals(ipsec_cert.has_valid_subject(), True, f"IPSec certificate should have valid subject for {hostname}")
# Validate Kubernetes secret subject matches IPSec certificate issuer
kubectl_secrets = KubectlGetSecretsKeywords(ssh_connection)
kube_secret = kubectl_secrets.get_secret_json_output("system-local-ca", "cert-manager")
kube_subject = kube_secret.get_certificate_subject()
get_logger().log_info(f"IPSec certificate issuer: {ipsec_cert.get_issuer()}")
get_logger().log_info(f"Kubernetes secret subject: {kube_subject}")
# Validation will handle normalization internally
validate_equals(ipsec_cert.matches_issuer(kube_subject), True, f"IPSec certificate should be issued by Kubernetes system-local-ca secret for {hostname}")
# Validate key-certificate MD5 matching
get_logger().log_info(f"Validating key-certificate MD5 matching for {hostname}")
key_output = ipsec_keywords.get_certificate_info_key_md5(ipsec_key_path)
cert_md5_output = ipsec_keywords.get_certificate_info_cert_md5(ipsec_crt_path)
key_cert = key_output.get_certificate()
cert_md5_cert = cert_md5_output.get_certificate()
validate_equals(key_cert.matches_md5(cert_md5_cert.get_md5_hash()), True, f"IPSec key MD5 should match IPSec certificate MD5 for {hostname}")
@mark.p1
def test_pxeboot_network():
"""Verify the correctness of the dnsmasq.hosts file by checking the mapping of MAC addresses to pxeboot names and addresses.
Steps:
- Ensure each MAC address in the dnsmasq.hosts file is included in a list of valid MAC addresses
- Verify that each MAC address has associated pxeboot name and address entries
- Check if the pxeboot addresses are reachable via ping
- Test SSH connectivity to pxeboot hostnames
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
ipsec_keywords = IPSecKeywords(ssh_connection)
system_hosts = SystemHostListKeywords(ssh_connection)
lab_connection_keywords = LabConnectionKeywords()
get_logger().log_info("Fetching pxeboot information from dnsmasq.hosts file")
dnsmasq_output = ipsec_keywords.get_dnsmasq_hosts()
validate_str_contains(dnsmasq_output.get_content(), "pxe", "dnsmasq.hosts should contain PXE entries")
hosts = system_hosts.get_system_host_list().get_hosts()
# Get parsed dnsmasq entries and validate each
pxeboot_entries = dnsmasq_output.get_pxeboot_entries()
for entry in pxeboot_entries:
# Validate MAC address format using object method
validate_equals(entry.is_valid_mac(), True, f"MAC address should be valid: {entry.get_mac_address()}")
# Validate pxeboot name format using object method
validate_equals(entry.is_pxeboot_entry(), True, f"Should be PXE boot entry: {entry.get_pxeboot_name()}")
# Test connectivity
ping_success = lab_connection_keywords.ping_host(entry.get_pxeboot_address())
validate_equals(ping_success, True, f"Ping should succeed for {entry.get_pxeboot_address()}")
# Test SSH connectivity using pxeboot name with lab credentials
ssh_success = lab_connection_keywords.test_ssh_connectivity(entry.get_pxeboot_name())
validate_equals(ssh_success, True, f"SSH should succeed for {entry.get_pxeboot_name()}")
# Validate we have pxeboot entries
validate_greater_than(len(pxeboot_entries), 0, f"Should have pxeboot entries, found {len(pxeboot_entries)}")
get_logger().log_info(f"Found {len(pxeboot_entries)} pxeboot entries for {len(hosts)} hosts")
@mark.p1
def test_security_associations():
"""Validate the security associations between nodes using the swanctl command.
Steps:
- Retrieve hosts and active controller information
- Get the list of security associations using swanctl
- Verify that security associations are established and valid
- Check local and remote CN patterns match expected values
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
ipsec_keywords = IPSecKeywords(ssh_connection)
system_hosts = SystemHostListKeywords(ssh_connection)
hosts = system_hosts.get_system_host_list().get_hosts()
active_controller = system_hosts.get_active_controller()
associations_output = ipsec_keywords.get_security_associations()
associations = associations_output.get_associations()
validate_greater_than(len(associations), 0, f"Should find security associations, found {len(associations)} associations")
remote_host_count = len(hosts) - 1
expected_association_count = remote_host_count * 2
get_logger().log_info(f"Lab topology: {len(hosts)} total hosts, {remote_host_count} remote hosts")
get_logger().log_info(f"Expected: {expected_association_count} associations (2 per remote host), Actual: {len(associations)}")
validate_equals(len(associations), expected_association_count, f"Should have {expected_association_count} associations (2 per remote host), found {len(associations)}")
active_cn_pattern = f"ipsec-{active_controller.get_host_name()}"
expected_cns = {f"ipsec-{host.get_host_name()}" for host in hosts if f"ipsec-{host.get_host_name()}" != active_cn_pattern}
get_logger().log_info(f"Active controller CN pattern: {active_cn_pattern}")
get_logger().log_info(f"Expected remote CNs: {expected_cns}")
for i, association in enumerate(associations):
validate_equals(association.is_established(), True, f"Connection should be established in association {i+1}")
validate_equals(association.has_local_cn_starting_with(active_cn_pattern), True, f"Should have local CN starting with {active_cn_pattern} in association {i+1}")
validate_equals(association.has_remote_cn_in_set(expected_cns), True, f"Should have expected remote CN in association {i+1}")
@mark.p1
def test_ipsec_status():
"""Validate IPSec service status on all hosts.
Steps:
- Check ipsec-server service is active on all controllers
- Check ipsec service is active on all hosts
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
system_hosts = SystemHostListKeywords(ssh_connection)
lab_connection_keywords = LabConnectionKeywords()
hosts = system_hosts.get_system_host_list().get_hosts()
# Check ipsec-server service on controllers
controllers = system_hosts.get_system_host_list().get_controllers()
for controller in controllers:
controller_ssh = lab_connection_keywords.get_ssh_for_hostname(controller.get_host_name())
systemctl_keywords = SystemCTLIsActiveKeywords(controller_ssh)
service_status = systemctl_keywords.is_active("ipsec-server")
validate_equals(service_status, "active", f"ipsec-server service should be active on {controller.get_host_name()}")
# Check ipsec service on all hosts
for host in hosts:
if host.get_personality() == "worker":
host_ssh = lab_connection_keywords.get_compute_ssh(host.get_host_name())
else:
host_ssh = lab_connection_keywords.get_ssh_for_hostname(host.get_host_name())
systemctl_keywords = SystemCTLIsActiveKeywords(host_ssh)
service_status = systemctl_keywords.is_active("ipsec")
validate_equals(service_status, "active", f"ipsec service should be active on {host.get_host_name()}")
@mark.p1
def test_ping_mgmt_ips():
"""Test the ability to ping management IPs from the active controller.
Steps:
- Get list of all hosts in the system
- Identify the active controller
- Ping each host from the active controller
- Verify all ping operations succeed
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
system_hosts = SystemHostListKeywords(ssh_connection)
lab_connection_keywords = LabConnectionKeywords()
host_list = system_hosts.get_system_host_list()
hosts = [host.get_host_name() for host in host_list.get_hosts()]
get_logger().log_info(f"Testing ping connectivity to {len(hosts)} hosts")
for host in hosts:
ping_success = lab_connection_keywords.ping_host(host)
validate_equals(ping_success, True, f"Ping should succeed to {host}")