Added the nginx check using cert-manager app

for the cert-manager app have added the initial scripts.

Change-Id: Ie4add77e1e2879f365a6b219e990939ab3be88ed
This commit is contained in:
ssivaman
2025-04-12 04:52:28 -04:00
parent 147e6378cc
commit 05c54b357a
14 changed files with 656 additions and 62 deletions

View File

@@ -0,0 +1,60 @@
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.k8s.certificate.object.kubectl_get_certificate_output import KubectlGetCertsOutput
from keywords.k8s.k8s_command_wrapper import export_k8s_config
class KubectlGetCertStatusKeywords(BaseKeyword):
"""
Class for 'kubectl get certificate' keywords
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor
Args:
ssh_connection (SSHConnection): SSH connection object used to interact with the Kubernetes cluster.
"""
self.ssh_connection = ssh_connection
def get_certificates(self, namespace: str = None) -> KubectlGetCertsOutput:
"""
Gets the k8s certificate that are available using 'kubectl get certificate'.
Args:
namespace (str, optional): The namespace to retrieve certificates from. Defaults to None.
Returns:
KubectlGetCertsOutput: Parsed output of the 'kubectl get certificate' command.
"""
arg_namespace = ""
if namespace:
arg_namespace = f"-n {namespace}"
kubectl_get_issuer_output = self.ssh_connection.send(export_k8s_config(f"kubectl {arg_namespace} get certificate"))
self.validate_success_return_code(self.ssh_connection)
cert_list_output = KubectlGetCertsOutput(kubectl_get_issuer_output)
return cert_list_output
def wait_for_certs_status(self, certs_name: str, is_ready: bool, namespace: str = None, timeout: int = 600) -> None:
"""
Waits timeout amount of time for the given certs to be in the given status
Args:
certs_name (str): the name of the certificate
is_ready (bool): the is_ready status
namespace (str): the namespace
timeout (int, optional): the timeout in secs
"""
def get_cert_status():
cert_status = self.get_certificates(namespace).get_cert(certs_name).get_ready()
return bool(cert_status)
validate_equals_with_retry(get_cert_status, is_ready, "Verify the certs status issued", timeout=600)

View File

@@ -0,0 +1,60 @@
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.k8s.certificate.object.kubectl_get_issuer_output import KubectlGetIssuerOutput
from keywords.k8s.k8s_command_wrapper import export_k8s_config
class KubectlGetCertIssuerKeywords(BaseKeyword):
"""
Class for 'kubectl get issuer' keywords
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor
Args:
ssh_connection (SSHConnection): SSH connection object used to interact with the Kubernetes cluster.
"""
self.ssh_connection = ssh_connection
def get_issuers(self, namespace: str = None) -> KubectlGetIssuerOutput:
"""
Gets the k8s issuer that are available using 'kubectl get issuer'.
Args:
namespace (str, optional): the namespace
Returns:
KubectlGetIssuerOutput: Parsed output of the 'kubectl get issuer' command.
"""
arg_namespace = ""
if namespace:
arg_namespace = f"-n {namespace}"
kubectl_get_issuer_output = self.ssh_connection.send(export_k8s_config(f"kubectl {arg_namespace} get issuer"))
self.validate_success_return_code(self.ssh_connection)
issuer_list_output = KubectlGetIssuerOutput(kubectl_get_issuer_output)
return issuer_list_output
def wait_for_issuer_status(self, issuer_name: str, is_ready: bool, namespace: str = None, timeout: int = 600) -> None:
"""
Waits timeout amount of time for the given issuer to be in the given status
Args:
issuer_name (str): the certs issuer name
is_ready (bool): the is_ready status
namespace (str , optional): the namespace
timeout (int): the timeout in secs
"""
def get_issuer_status():
issuer_status = self.get_issuers(namespace).get_issuer(issuer_name).get_ready()
return bool(issuer_status)
message = f"issuer {issuer_name}'s status is {is_ready}"
validate_equals_with_retry(get_issuer_status, is_ready, message, timeout=600)

View File

@@ -0,0 +1,78 @@
class KubectlCertObject:
"""
Class to hold attributes of a 'kubectl get certificate' certificate entry.
"""
def __init__(self, name: str):
"""
Constructor
Args:
name (str): Name of the certs.
"""
self.name = name
self.ready = None
self.age = None
def get_name(self) -> str:
"""
Getter for NAME entry.
Returns:
str: The name of the certs.
"""
return self.name
def set_secret(self, secret: str) -> None:
"""
Setter for SECRET
Args:
secret (str): The secret associated with the certs.
Returns: None
"""
self.secret = secret
def get_secret(self) -> str:
"""
Getter for SECRET entry
"""
return self.secret
def set_ready(self, ready: str) -> None:
"""
Setter for READY
Args:
ready (str): The ready associated with the certs.
Returns: None
"""
self.ready = ready
def get_ready(self) -> str:
"""
Getter for READY entry
"""
return self.ready
def set_age(self, age: str) -> None:
"""
Setter for AGE.
Args:
age (str): The age associated with the certs.
Returns: None
"""
self.age = age
def get_age(self) -> str:
"""
Getter for AGE entry.
Returns:
str: The age of the certs.
"""
return self.age

View File

@@ -0,0 +1,60 @@
from starlingx.keywords.k8s.certificate.object.kubectl_cert_object import KubectlCertObject
from starlingx.keywords.k8s.certificate.object.kubectl_get_certificate_table_parser import KubectlGetCertsTableParser
class KubectlGetCertsOutput:
"""
This class represents the output of the 'kubectl get certificate' command.
It parses the raw command output and provides access to certificate objects.
"""
def __init__(self, kubectl_get_certs_output: str):
"""
Constructor
Args:
kubectl_get_certs_output (str): Raw string output from running a "kubectl get certificate" command.
"""
self.kubectl_certs: [KubectlCertObject] = []
kubectl_get_certs_table_parser = KubectlGetCertsTableParser(kubectl_get_certs_output)
output_values_list = kubectl_get_certs_table_parser.get_output_values_list()
for pod_dict in output_values_list:
if "NAME" not in pod_dict:
raise ValueError(f"There is no NAME associated with the pod: {pod_dict}")
certs = KubectlCertObject(pod_dict["NAME"])
if "READY" in pod_dict:
certs.set_ready(pod_dict["READY"])
if "SECRET" in pod_dict:
certs.set_secret(pod_dict["SECRET"])
if "AGE" in pod_dict:
certs.set_age(pod_dict["AGE"])
self.kubectl_certs.append(certs)
def get_cert(self, certs_name: str) -> KubectlCertObject:
"""
This function will get the pod with the name specified from this get_pods_output.
Args:
certs_name (str): The name of the certs.
Returns:
KubectlCertObject: The certificate object with the specified name.
Raises:
ValueError: If no certificate with the specified name is found.
"""
for cert in self.kubectl_certs:
if cert.get_name() == certs_name:
return cert
else:
raise ValueError(f"There is no certs with the name {certs_name}.")

View File

@@ -0,0 +1,22 @@
from keywords.k8s.k8s_table_parser_base import K8sTableParserBase
class KubectlGetCertsTableParser(K8sTableParserBase):
"""
Class for parsing the output of "kubectl get certificate" commands.
"""
def __init__(self, k8s_output: str):
"""
Constructor
Args:
k8s_output (str): The raw String output of a kubernetes command that returns a table.
"""
super().__init__(k8s_output)
self.possible_headers = [
"NAME",
"READY",
"SECRET",
"AGE",
]

View File

@@ -0,0 +1,57 @@
from starlingx.keywords.k8s.certificate.object.kubectl_get_issuer_table_parser import KubectlGetIssuerTableParser
from starlingx.keywords.k8s.certificate.object.kubectl_issuer_object import KubectlIssuerObject
class KubectlGetIssuerOutput:
"""
This class represents the output of the 'kubectl get issuer' command.
It provides methods to parse and retrieve issuer information from the command output.
"""
def __init__(self, kubectl_get_issuer_output: str):
"""
Constructor
Args:
kubectl_get_issuer_output (str): Raw string output from running a "kubectl get issuer" command.
"""
self.kubectl_issuer: [KubectlIssuerObject] = []
kubectl_get_issuer_table_parser = KubectlGetIssuerTableParser(kubectl_get_issuer_output)
output_values_list = kubectl_get_issuer_table_parser.get_output_values_list()
for pod_dict in output_values_list:
if "NAME" not in pod_dict:
raise ValueError(f"There is no NAME associated with the issuer: {pod_dict}")
issuer = KubectlIssuerObject(pod_dict["NAME"])
if "READY" in pod_dict:
issuer.set_ready(pod_dict["READY"])
if "AGE" in pod_dict:
issuer.set_age(pod_dict["AGE"])
self.kubectl_issuer.append(issuer)
def get_issuer(self, issuer_name: str) -> KubectlIssuerObject:
"""
This function will get the pod with the name specified from this get_issuer_output.
Args:
issuer_name (str): The name of the issuer.
Returns:
KubectlIssuerObject: The issuer object with the specified name.
Raises:
ValueError: If no issuer with the specified name is found.
"""
for issuer in self.kubectl_issuer:
if issuer.get_name() == issuer_name:
return issuer
else:
raise ValueError(f"There is no issuer with the name {issuer_name}.")

View File

@@ -0,0 +1,21 @@
from keywords.k8s.k8s_table_parser_base import K8sTableParserBase
class KubectlGetIssuerTableParser(K8sTableParserBase):
"""
Class for parsing the output of "kubectl get issuer" commands.
"""
def __init__(self, k8s_output: str):
"""
Constructor
Args:
k8s_output (str): The raw String output of a kubernetes command that returns a table.
"""
super().__init__(k8s_output)
self.possible_headers = [
"NAME",
"READY",
"AGE",
]

View File

@@ -0,0 +1,58 @@
class KubectlIssuerObject:
"""
Class to hold attributes of a 'kubectl get issuer' pod entry.
"""
def __init__(self, name: str):
"""
Constructor
Args:
name (str): Name of the pod.
"""
self.name = name
self.ready = None
self.age = None
def get_name(self) -> str:
"""
Getter for NAME entry
"""
return self.name
def set_ready(self, ready: str) -> None:
"""
Setter for READY
Args:
ready (str): The ready associated with the issuer.
Returns: None
"""
self.ready = ready
def get_ready(self) -> str:
"""
Getter for READY entry
"""
return self.ready
def set_age(self, age: str) -> None:
"""
Setter for AGE
Args:
age (str): The age associated with the issuer.
Returns: None
"""
self.age = age
def get_age(self) -> str:
"""
Getter for AGE entry
"""
return self.age

View File

@@ -1,5 +1,6 @@
import time
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.k8s.k8s_command_wrapper import export_k8s_config
from keywords.k8s.pods.object.kubectl_get_pods_output import KubectlGetPodsOutput
@@ -10,24 +11,26 @@ class KubectlGetPodsKeywords(BaseKeyword):
Class for 'kubectl get pods' keywords
"""
def __init__(self, ssh_connection):
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor
Initialize the KubectlGetPodsKeywords class.
Args:
ssh_connection:
ssh_connection (SSHConnection): An SSH connection object to the target system.
"""
self.ssh_connection = ssh_connection
def get_pods(self, namespace: str = None) -> KubectlGetPodsOutput:
"""
Gets the k8s pods that are available using '-o wide'.
Args:
namespace ():
namespace(str, optional): The namespace to search for pods. If None, it will search in all namespaces.
Returns:
KubectlGetPodsOutput: An object containing the parsed output of the command.
"""
arg_namespace = ""
if namespace:
arg_namespace = f"-n {namespace}"
@@ -40,14 +43,11 @@ class KubectlGetPodsKeywords(BaseKeyword):
def get_pods_all_namespaces(self) -> KubectlGetPodsOutput:
"""
Gets the k8s pods that are available using '-o wide'.
Args:
namespace ():
Gets the k8s pods that are available using '-o wide' for all namespaces.
Returns:
KubectlGetPodsOutput: An object containing the parsed output of the command.
"""
kubectl_get_pods_output = self.ssh_connection.send(export_k8s_config("kubectl -o wide get pods --all-namespaces"))
self.validate_success_return_code(self.ssh_connection)
pods_list_output = KubectlGetPodsOutput(kubectl_get_pods_output)
@@ -57,16 +57,17 @@ class KubectlGetPodsKeywords(BaseKeyword):
def wait_for_pod_status(self, pod_name: str, expected_status: str, namespace: str = None, timeout: int = 600) -> bool:
"""
Waits timeout amount of time for the given pod to be in the given status
Args:
pod_name (): the pod name
expected_status (): the expected status
namespace (): the namespace
timeout (): the timeout in secs
pod_name (str): the pod name
expected_status (str): the expected status
namespace (str): the namespace
timeout (int): the timeout in secs
Returns:
bool: True if the pod is in the expected status
"""
pod_status_timeout = time.time() + timeout
while time.time() < pod_status_timeout:
@@ -80,11 +81,13 @@ class KubectlGetPodsKeywords(BaseKeyword):
def wait_for_all_pods_status(self, expected_statuses: [str], timeout: int = 600) -> bool:
"""
Wait for all pods to be in the given status(s)
Args:
expected_statuses (): list of expected statuses ex. ['Completed' , 'Running']
timeout (): the amount of time in seconds to wait
Returns: True if all expected statuses are met
Args:
expected_statuses ([str]): list of expected statuses ex. ['Completed' , 'Running']
timeout (int): the amount of time in seconds to wait
Returns:
bool: True if all expected statuses are met
"""
pod_status_timeout = time.time() + timeout

View File

@@ -3,64 +3,72 @@ from keywords.k8s.pods.object.kubectl_pod_object import KubectlPodObject
class KubectlGetPodsOutput:
"""
A class to interact with and retrieve information about Kubernetes pods.
This class provides methods to filter and retrieve pod information
using the `kubectl` command output.
"""
def __init__(self, kubectl_get_pods_output: str):
"""
Constructor
Args:
kubectl_get_pods_output: Raw string output from running a "kubectl get pods" command.
kubectl_get_pods_output (str): Raw string output from running a "kubectl get pods" command.
"""
self.kubectl_pod: [KubectlPodObject] = []
kubectl_get_pods_table_parser = KubectlGetPodsTableParser(kubectl_get_pods_output)
output_values_list = kubectl_get_pods_table_parser.get_output_values_list()
for pod_dict in output_values_list:
if 'NAME' not in pod_dict:
if "NAME" not in pod_dict:
raise ValueError(f"There is no NAME associated with the pod: {pod_dict}")
pod = KubectlPodObject(pod_dict['NAME'])
pod = KubectlPodObject(pod_dict["NAME"])
if 'NAMESPACE' in pod_dict:
pod.set_namespace(pod_dict['NAMESPACE'])
if "NAMESPACE" in pod_dict:
pod.set_namespace(pod_dict["NAMESPACE"])
if 'READY' in pod_dict:
pod.set_ready(pod_dict['READY'])
if "READY" in pod_dict:
pod.set_ready(pod_dict["READY"])
if 'STATUS' in pod_dict:
pod.set_status(pod_dict['STATUS'])
if "STATUS" in pod_dict:
pod.set_status(pod_dict["STATUS"])
if 'RESTARTS' in pod_dict:
pod.set_restarts(pod_dict['RESTARTS'])
if "RESTARTS" in pod_dict:
pod.set_restarts(pod_dict["RESTARTS"])
if 'AGE' in pod_dict:
pod.set_age(pod_dict['AGE'])
if "AGE" in pod_dict:
pod.set_age(pod_dict["AGE"])
if 'IP' in pod_dict:
pod.set_ip(pod_dict['IP'])
if "IP" in pod_dict:
pod.set_ip(pod_dict["IP"])
if 'NODE' in pod_dict:
pod.set_node(pod_dict['NODE'])
if "NODE" in pod_dict:
pod.set_node(pod_dict["NODE"])
if 'NOMINATED NODE' in pod_dict:
pod.set_nominated_node(pod_dict['NOMINATED NODE'])
if "NOMINATED NODE" in pod_dict:
pod.set_nominated_node(pod_dict["NOMINATED NODE"])
if 'READINESS GATES' in pod_dict:
pod.set_readiness_gates(pod_dict['READINESS GATES'])
if "READINESS GATES" in pod_dict:
pod.set_readiness_gates(pod_dict["READINESS GATES"])
self.kubectl_pod.append(pod)
def get_pod(self, pod_name) -> KubectlPodObject:
def get_pod(self, pod_name: str) -> KubectlPodObject:
"""
This function will get the pod with the name specified from this get_pods_output.
Args:
pod_name: The name of the pod of interest.
pod_name (str): The name of the pod of interest.
Returns: KubectlPodObject
Returns:
KubectlPodObject: The pod object with the name specified.
Raises:
ValueError: If the pod with the specified name does not exist in the output.
"""
for pod in self.kubectl_pod:
if pod.get_name() == pod_name:
@@ -68,22 +76,44 @@ class KubectlGetPodsOutput:
else:
raise ValueError(f"There is no pod with the name {pod_name}.")
def get_pods_start_with(self, starts_with) -> [KubectlPodObject]:
def get_pods_start_with(self, starts_with: str) -> [KubectlPodObject]:
"""
Returns list of pods that starts with 'starts_with'
Args:
starts_with - the str the pod name starts with
starts_with (str): the str the pod name starts with
Returns:
[KubectlPodObject]: list of pods that starts with 'starts_with'
"""
pods = list(filter(lambda pod: starts_with in pod.get_name(), self.kubectl_pod))
return pods
def get_pods(self) -> [KubectlPodObject]:
"""
Gets all pods
Gets all pods.
Returns:
[KubectlPodObject]: A list of all pods.
"""
return self.kubectl_pod
def get_unique_pod_matching_prefix(self, starts_with: str) -> str:
"""
Get the full name(s) of pod(s) that start with the given prefix.
Args:
starts_with(str): The prefix of the pod name.
Returns:
str: A string if one pod matches
Raises:
ValueError: If no pods match the prefix.
"""
pods = self.get_pods_start_with(starts_with)
if len(pods) == 0:
raise ValueError(f"No pods found starting with '{starts_with}'.")
return pods[0].get_name()

View File

@@ -1,22 +1,26 @@
import ipaddress
import socket
from typing import Optional
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from keywords.base_keyword import BaseKeyword
class IPAddressKeywords(BaseKeyword):
"""
This class contains all the keywords related to the Internet Protocol (IP) addresses in a general way, that is it,
independently of specific implemented technologies.
independently of specific implemented technologies.
"""
def __init__(self, ip: str):
"""
Constructor
Args:
ip (str): a valid IPv4 or IPv6 address representation.
"""
self.ip = None
if not self.is_valid_ip_address(ip):
@@ -27,11 +31,13 @@ class IPAddressKeywords(BaseKeyword):
def ipv6_same_network(self, ipv6: str, prefix_length: int) -> bool:
"""
Verifies if 'ipv6' and 'self.ip' addresses belong to the same network.
Args:
ipv6 (str): An IPv6 address as a string.
prefix_length (int): The prefix length (network mask).
Returns: True if 'self.ip' and 'ipv6' are in the same network, False otherwise.
Returns:
bool: True if 'self.ip' and 'ipv6' are in the same network, False otherwise.
"""
if not self.is_valid_ip_address(ipv6):
@@ -46,11 +52,13 @@ class IPAddressKeywords(BaseKeyword):
def ipv4_same_network(self, ipv4: str, netmask: str) -> bool:
"""
Verifies if 'ipv4' and 'self.ip' addresses belong to the same network.
Args:
ipv4 (str): An IPv6 address as a string.
netmask: The subnet mask as a string.
netmask (str): The subnet mask as a string.
Returns: True if 'self.ip' and 'ipv4' are in the same network, False otherwise.
Returns:
bool: True if 'self.ip' and 'ipv4' are in the same network, False otherwise.
"""
if not self.is_valid_ip_address(ipv4):
@@ -67,11 +75,13 @@ class IPAddressKeywords(BaseKeyword):
def ipv4_with_prefix_length_same_network(self, ipv4: str, prefix_length: int) -> bool:
"""
Verifies if 'ipv4' and 'self.ip' addresses belong to the same network.
Args:
ipv4 (str): An IPv6 address as a string.
prefix_length (int): The number of leading bits that corresponds to the network prefix in the IP address.
Returns: True if 'self.ip' and 'ipv4' are in the same network; False otherwise.
Returns:
bool: True if 'self.ip' and 'ipv4' are in the same network; False otherwise.
"""
if not self.is_valid_ip_address(ipv4):
@@ -85,11 +95,13 @@ class IPAddressKeywords(BaseKeyword):
def ip_same_network(self, ip: str, prefix_length: int) -> bool:
"""
Verifies if 'ipv6' and 'self.ip' addresses belong to the same network.
Args:
ip (str): An IPv4 or IPv6 address as a string.
prefix_length (int): The prefix length.
Returns: True if 'self.ip' and 'ip' are in the same network; False otherwise.
Returns:
bool: True if 'self.ip' and 'ip' are in the same network; False otherwise.
"""
if not self.is_valid_ip_address(ip):
@@ -106,14 +118,15 @@ class IPAddressKeywords(BaseKeyword):
else:
return False
def is_valid_ip_address(self, ip_address) -> bool:
def is_valid_ip_address(self, ip_address: str) -> bool:
"""
Check if 'ip_address' is a valid IPv4 or IPv6 address.
Args:
ip_address: a supposed valid either IPv4 or IPv6 address.
ip_address (str): a supposed valid either IPv4 or IPv6 address.
Returns:
boolean: True if 'ip_address' is valid IP; False otherwise.
bool: True if 'ip_address' is valid IP; False otherwise.
"""
try:
ipaddress.ip_address(ip_address)
@@ -124,10 +137,12 @@ class IPAddressKeywords(BaseKeyword):
def check_ip_version(self, ip: str) -> Optional[str]:
"""
Check if 'ip' is either an IPv4 or IPv6 address.
Args:
ip (str): a supposed valid either IPv4 or IPv6 address.
Returns:
str: "IPv4" if 'ip' is an IPv4;
"IPv6" if "ip" is an IPv6;
None, otherwise.
Optional[str]: 'IPv4' if the address is IPv4, 'IPv6' if the address is IPv6, or None if invalid.
"""
try:
ip_obj = ipaddress.ip_address(ip)
@@ -138,3 +153,25 @@ class IPAddressKeywords(BaseKeyword):
return "IPv4"
elif ip_obj.version == 6:
return "IPv6"
def check_dnsname_resolution(self, dns_name: str) -> bool:
"""
Method to verify the dnsname resolution of the lab
Args:
dns_name (str): a supposed valid dns name.
Returns:
bool: True if the DNS name resolves to an IP address, False otherwise.
"""
family = socket.AF_INET
lab_config = ConfigurationManager.get_lab_config()
oam_fip = lab_config.get_floating_ip()
if self.check_ip_version(oam_fip) == "IPv6":
family = socket.AF_INET6
try:
socket.getaddrinfo(dns_name, None, family)
return True
except socket.error as msg:
get_logger().log_error(f"nslookup failed with error '{msg}'")
return False

View File

@@ -0,0 +1,16 @@
apiVersion: crd.projectcalico.org/v1
kind: GlobalNetworkPolicy
metadata:
name: gnp-oam-overrides
spec:
ingress:
- action: Allow
destination:
ports:
- 80
- 443
protocol: TCP
order: 500
selector: has(iftype) && iftype == 'oam'
types:
- Ingress

View File

@@ -0,0 +1,24 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
cert-manager.io/issuer: stepca-issuer
kubernetes.io/ingress.class: nginx
name: kuard
namespace: pvtest
spec:
rules:
- host: '{{ dns_name }}'
http:
paths:
- backend:
service:
name: kuard
port:
number: 80
path: /
pathType: Prefix
tls:
- hosts:
- '{{ dns_name }}'
secretName: kuard-ingress-tls

View File

@@ -0,0 +1,68 @@
from pytest import mark
from config.configuration_manager import ConfigurationManager
from framework.resources.resource_finder import get_stx_resource_path
from framework.validation.validation import validate_equals
from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.files.file_keywords import FileKeywords
from keywords.files.yaml_keywords import YamlKeywords
from keywords.k8s.certificate.kubectl_get_certificate_keywords import KubectlGetCertStatusKeywords
from keywords.k8s.certificate.kubectl_get_issuer_keywords import KubectlGetCertIssuerKeywords
from keywords.k8s.pods.kubectl_apply_pods_keywords import KubectlApplyPodsKeywords
from keywords.k8s.pods.kubectl_get_pods_keywords import KubectlGetPodsKeywords
from keywords.network.ip_address_keywords import IPAddressKeywords
@mark.p0
def test_app_using_nginx_controller():
"""
This test is to deploy an application which uses Nginx Ingress controller using a
certificate signed by External CA(acme stepCA)
Steps:
- Deploy and apply the app file
- Deploy and apply the globalnetworkpolicy for the acme challenge
- Verify app status
- Verify cert is issued from StepCa
- Check the app url
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
lab_config = ConfigurationManager.get_lab_config()
oam_ip = lab_config.get_floating_ip()
dns_name = ConfigurationManager.get_security_config().get_dns_name()
dns_resolution_status = IPAddressKeywords(oam_ip).check_dnsname_resolution(dns_name=dns_name)
validate_equals(dns_resolution_status, True, "Verify the dns name resolution")
stepca_issuer = "stepca-issuer"
pod_name = "kuard"
cert = "kuard-ingress-tls"
base_url = f"https://{dns_name}/"
deploy_app_file_name = "deploy_app.yaml"
global_policy_file_name = "global_policy.yaml"
kuard_file_name = "kuard.yaml"
namespace = "pvtest"
file_keywords = FileKeywords(ssh_connection)
file_keywords.upload_file(get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{deploy_app_file_name}"), f"/home/sysadmin/{deploy_app_file_name}", overwrite=False)
file_keywords.upload_file(get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{global_policy_file_name}"), f"/home/sysadmin/{global_policy_file_name}", overwrite=False)
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(f"/home/sysadmin/{global_policy_file_name}")
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(f"/home/sysadmin/{deploy_app_file_name}")
# Check the issuer status
KubectlGetCertIssuerKeywords(ssh_connection).wait_for_issuer_status(stepca_issuer, True, namespace)
# Check the ingress pod status
get_pod_obj = KubectlGetPodsKeywords(ssh_connection)
pod_name = get_pod_obj.get_pods(namespace=namespace).get_unique_pod_matching_prefix(starts_with=pod_name)
pod_status = KubectlGetPodsKeywords(ssh_connection).wait_for_pod_status(pod_name, "Running", namespace)
validate_equals(pod_status, True, "Verify ingress pods are running")
template_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{kuard_file_name}")
replacement_dictionary = {"dns_name": dns_name}
nginx_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(template_file, replacement_dictionary, f"{kuard_file_name}", "/home/sysadmin")
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(nginx_yaml)
# Check the cert status
KubectlGetCertStatusKeywords(ssh_connection).wait_for_certs_status(cert, True, namespace)
# Check the app url
response = CloudRestClient().get(f"{base_url}")
validate_equals(response.get_status_code(), 200, "Verify the app url is reachable")