Files
test/keywords/ptp/gnss_keywords.py
Guntaka Umashankar Reddy 9b23a60d21 Updated gnss power off and power on test cases
Change-Id: I9eec723890d1b4705a4d0b37157ccfb4fdcecd16
Signed-off-by: Guntaka Umashankar Reddy <umashankarguntaka.reddy@windriver.com>
2025-06-18 11:28:59 -04:00

241 lines
10 KiB
Python

import re
import time
from multiprocessing import get_logger
from config.configuration_manager import ConfigurationManager
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.ssh.ptp_connection_keywords import PTPConnectionKeywords
from keywords.ptp.cat.cat_ptp_cgu_keywords import CatPtpCguKeywords
class GnssKeywords(BaseKeyword):
"""
Gnss power on and off using GNSS SSH connection.
"""
def __init__(self):
"""
Initializes the GnssKeywords.
"""
def get_pci_slot_name(self, hostname: str, interface: str) -> str:
"""
Retrieves the PCI_SLOT_NAME from the uevent file for a given PTP interface.
Args:
hostname (str) : The name of the host
interface (str): The name of the ptp interface (e.g., "enp138s0f0").
Returns:
str: The PCI slot name if found, otherwise None.
Raises:
Exception: raised when PCI_SLOT_NAME not found
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
# The GNSS signal will always be on port 0 of the NIC, even if ts2phc uses ports 1, 2, 3, and so on.
interface_name = f"{interface[:-1]}0"
uevent_path = f"/sys/class/net/{interface_name}/device/uevent"
uevent_content = ssh_connection.send(f"grep PCI_SLOT_NAME {uevent_path}")
# Use regex to find the PCI_SLOT_NAME
match = re.search(r"PCI_SLOT_NAME=(.*)", " ".join(uevent_content))
if match:
return match.group(1).strip() # Return the captured value, removing leading/trailing spaces
else:
raise Exception(f"PCI_SLOT_NAME not found in {uevent_path}")
def get_gnss_serial_port_from_gnss_directory(self, hostname: str, interface: str) -> str:
"""
Get GNSS serial port from the specified gnss directory.
Args:
hostname (str) : The name of the host
interface (str): The name of the PTP interface (e.g., "enp138s0f0").
Returns:
str: The GNSS serial port value (e.g., "gnss0") if found, otherwise None.
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pci_address = self.get_pci_slot_name(hostname, interface)
gnss_dir = f"/sys/bus/pci/devices/{pci_address}/gnss"
contents = ssh_connection.send(f"ls {gnss_dir}")
if not contents:
get_logger().log_info(f"The directory {gnss_dir} is empty.")
return None
return " ".join(contents).strip() # Return the captured value in str, removing leading/trailing spaces
def extract_gnss_port(self, instance_parameters: str) -> str:
"""
Extracts the GNSS serial port value from a ts2phc.nmea_serialport configuration string using regex.
Args:
instance_parameters (str): The string containing the ts2phc.nmea_serialport setting.
Returns:
str: The GNSS serial port value (e.g., "gnss0") if found, otherwise None.
"""
match = re.search(r"ts2phc\.nmea_serialport\s*=\s*/dev/([^ ]*)", instance_parameters)
if match:
return match.group(1)
else:
return None
def gnss_power_on(self, hostname: str, nic: str) -> None:
"""
Power on gnss
Args:
hostname (str) : The name of the host
nic (str) : The name of the nic
"""
ptp_connect_keywords = PTPConnectionKeywords()
gnss_ssh_connection = ptp_connect_keywords.get_gnss_server_ssh()
host_name = hostname.replace("-", "_")
ptp_config = ConfigurationManager.get_ptp_config()
interface = ptp_config.get_host(host_name).get_nic(nic).get_base_port()
pci_address = self.get_pci_slot_name(hostname, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gpio_switch_port = ptp_config.get_host(host_name).get_nic(nic).get_gpio_switch_port()
if not gpio_switch_port:
raise Exception(f"GPIO switch port not configured for {hostname} {nic}")
# Export GPIO pin if not already exported
export_cmd = f"if [ ! -d /sys/class/gpio/gpio{gpio_switch_port} ]; then echo {gpio_switch_port} > /sys/class/gpio/export; sleep 0.5; fi"
gnss_ssh_connection.send(export_cmd)
# sleep needed even with retry loop
time.sleep(1)
# Set direction to output
direction_cmd = f"echo out > /sys/class/gpio/gpio{gpio_switch_port}/direction"
gnss_ssh_connection.send(direction_cmd)
# sleep needed even with retry loop
time.sleep(1)
# Set GPIO value to 1 (power on GNSS)
value_cmd = f"echo 1 > /sys/class/gpio/gpio{gpio_switch_port}/value"
gnss_ssh_connection.send(value_cmd)
self.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(hostname, cgu_location, timeout=1200, polling_interval=120)
def gnss_power_off(self, hostname: str, nic: str) -> None:
"""
Power off gnss
Args:
hostname (str) : The name of the host
nic (str) : The name of the nic
"""
ptp_connect_keywords = PTPConnectionKeywords()
gnss_ssh_connection = ptp_connect_keywords.get_gnss_server_ssh()
host_name = hostname.replace("-", "_")
ptp_config = ConfigurationManager.get_ptp_config()
interface = ptp_config.get_host(host_name).get_nic(nic).get_base_port()
pci_address = self.get_pci_slot_name(hostname, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gpio_switch_port = ptp_config.get_host(host_name).get_nic(nic).get_gpio_switch_port()
if not gpio_switch_port:
raise Exception(f"GPIO switch port not configured for {hostname} {nic}")
# Export GPIO pin if not already exported
export_cmd = f"if [ ! -d /sys/class/gpio/gpio{gpio_switch_port} ]; then echo {gpio_switch_port} > /sys/class/gpio/export; sleep 0.5; fi"
gnss_ssh_connection.send(export_cmd)
# sleep needed even with retry loop
time.sleep(1)
# Set direction to output
direction_cmd = f"echo out > /sys/class/gpio/gpio{gpio_switch_port}/direction"
gnss_ssh_connection.send(direction_cmd)
# sleep needed even with retry loop
time.sleep(1)
# Set GPIO value to 0 (power off GNSS)
value_cmd = f"echo 0 > /sys/class/gpio/gpio{gpio_switch_port}/value"
gnss_ssh_connection.send(value_cmd)
# Expected states for validation
expected_cgu_input_state = "invalid"
expected_dpll_status_list = ["holdover"]
self.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(hostname, cgu_location, expected_cgu_input_state=expected_cgu_input_state, expected_dpll_status_list=expected_dpll_status_list, timeout=1500, polling_interval=120)
def validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(
self,
hostname: str,
cgu_location: str,
cgu_input: str = "GNSS-1PPS",
expected_cgu_input_state: str = "valid",
expected_dpll_status_list: list = ["locked_ho_acq"],
timeout: int = 800,
polling_interval: int = 60,
) -> None:
"""
Validates the synchronization status of SMA1, GNSS 1PPS input, and both EEC and PPS DPLLs
on the specified host within a defined timeout.
Args:
hostname (str): Hostname of the target system.
cgu_location (str): Path to the CGU debug file on the target system.
cgu_input (str): CGU input identifier (e.g., "GNSS_1PPS" or "SMA1").
expected_cgu_input_state (str): Expected CGU input state (e.g., "valid", "invalid").
expected_dpll_status_list (list): List of acceptable DPLL statuses (e.g., ["locked_ho_acq"], ["holdover", "freerun"]).
timeout (int): Maximum wait time in seconds for synchronization (default: 800).
polling_interval (int): Time in seconds between polling attempts (default: 60).
Returns: None
Raises:
TimeoutError: If expected input state or DPLL statuses are not observed within the timeout period.
Notes:
Status Meaning
locked DPLL is locked to a valid timing source.
holdover Timing is maintained using previously locked values (interim fallback).
freerun No synchronization — internal clock is free-running.
invalid Signal or lock state is not usable.
locked_ho_acq locked with holdover acquisition.
"""
get_logger().log_info("Attempting Validation - CGU input state and DPLL statuses...")
end_time = time.time() + timeout
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
cgu_reader = CatPtpCguKeywords(ssh_connection)
# Attempt the validation
while True:
cgu_output = cgu_reader.cat_ptp_cgu(cgu_location)
cgu_component = cgu_output.get_cgu_component()
eec_dpll_status = cgu_component.get_eec_dpll().get_status()
pps_dpll_status = cgu_component.get_pps_dpll().get_status()
cgu_input_state = cgu_component.get_cgu_input(cgu_input).get_state()
if cgu_input_state == expected_cgu_input_state and eec_dpll_status in expected_dpll_status_list and pps_dpll_status in expected_dpll_status_list:
get_logger().log_info("Validation Successful - CGU input state and both DPLL statuses match expectations.")
return
else:
get_logger().log_info("Validation Failed")
get_logger().log_info(f"Expected CGU input {cgu_input} state: {expected_cgu_input_state}, Observed: {cgu_input_state}")
get_logger().log_info(f"Expected EEC DPLL status: {expected_dpll_status_list}, Observed: {eec_dpll_status}")
get_logger().log_info(f"Expected PPS DPLL status: {expected_dpll_status_list}, Observed: {pps_dpll_status}")
if time.time() < end_time:
get_logger().log_info(f"Retrying in {polling_interval}s")
time.sleep(polling_interval)
# Move on to the next iteration
else:
raise TimeoutError("Timeout exceeded: CGU input state or DPLL statuses did not meet expected values.")