Verify PTP operation and status changes during GNSS loss, phc_ctl adjustments, and service stop, start, and restart events.

Change-Id: I5734135d1871531dee265ff81d75b12656e3907a
Signed-off-by: Guntaka Umashankar Reddy <umashankarguntaka.reddy@windriver.com>
This commit is contained in:
Guntaka Umashankar Reddy
2025-05-28 13:06:46 -04:00
parent 2b0f4cbc68
commit 49367cb58c
12 changed files with 1059 additions and 260 deletions

View File

@@ -1,3 +1,4 @@
import re
import time
from framework.logging.automation_logger import get_logger
@@ -128,24 +129,74 @@ class AlarmListKeywords(BaseKeyword):
Raises:
TimeoutError: if alarms are not found within the timeout period.
"""
timeout = self.get_timeout_in_seconds()
check_interval = self.get_check_interval_in_seconds()
end_time = time.time() + self.get_timeout_in_seconds()
alarm_descriptions = ", ".join(f"[ID: {alarm.get_alarm_id()}, Reason: {alarm.get_reason_text()}, Entity: {alarm.get_entity_id()}]" for alarm in alarms)
alarm_descriptions = ", ".join(self.alarm_to_str(alarm) for alarm in alarms)
while time.time() < end_time:
current_alarms = self.alarm_list()
all_matched = all(any(current.get_alarm_id() == expected.get_alarm_id() and current.get_reason_text() == expected.get_reason_text() and current.get_entity_id() == expected.get_entity_id() for current in current_alarms) for expected in alarms)
observed_alarms = self.alarm_list()
all_matched = True
for expected_alarm_obj in alarms:
match_found = any(self.alarms_match(observed_alarm_obj, expected_alarm_obj) for observed_alarm_obj in observed_alarms)
if not match_found:
get_logger().log_info(f"Expected alarm not found yet: {self.alarm_to_str(expected_alarm_obj)}")
all_matched = False
break
if all_matched:
get_logger().log_info(f"All expected alarms are now present in SSH connection ({self.get_ssh_connection()}): {alarm_descriptions}")
get_logger().log_info(f"All expected alarms are now present: {alarm_descriptions}")
return
get_logger().log_info(f"Waiting for expected alarms to appear in SSH connection ({self.get_ssh_connection()}). " f"Retrying in {check_interval:.3f} seconds. Remaining time: {end_time - time.time():.3f} seconds.")
get_logger().log_info(f"Waiting for expected alarms. Retrying in {check_interval:.3f} seconds. Remaining time: {end_time - time.time():.3f} seconds.")
time.sleep(check_interval)
raise TimeoutError(f"The following alarms did not appear within {timeout} seconds: {alarm_descriptions}")
# Final check before raising
observed_alarms = self.alarm_list()
observed_alarm_str = [self.alarm_to_str(observed_alarm_obj) for observed_alarm_obj in observed_alarms]
raise TimeoutError(f"Timeout. Alarms not found:\nExpected: {alarm_descriptions}\nObserved alarms:\n" + "\n".join(observed_alarm_str))
def alarms_match(self, observed_alarm_object: AlarmListObject, expected_alarm_object: AlarmListObject) -> bool:
"""
Compares two AlarmListObject instances for equality based on
alarm ID, reason text, and entity ID.
Args:
observed_alarm_object (AlarmListObject): The current alarm object to compare against.
expected_alarm_object (AlarmListObject): The expected alarm object.
Returns:
bool: True if all three fields (alarm ID, reason text, and entity ID) match exactly
(after stripping whitespace for text fields), False otherwise.
"""
observed_id = observed_alarm_object.get_alarm_id()
expected_id = expected_alarm_object.get_alarm_id()
observed_reason_text = observed_alarm_object.get_reason_text()
expected_reason_text_pattern = expected_alarm_object.get_reason_text()
observed_entity_id = observed_alarm_object.get_entity_id()
expected_entity_id = expected_alarm_object.get_entity_id()
# Perform the comparisons, making each condition clear.
id_matches = observed_id == expected_id
reason_text_matches = re.fullmatch(expected_reason_text_pattern, observed_reason_text)
entity_id_matches = observed_entity_id == expected_entity_id
# Return True only if all three conditions are met.
return id_matches and reason_text_matches and entity_id_matches
def alarm_to_str(self, alarm: AlarmListObject) -> str:
"""
Formats an AlarmListObject into a human-readable string representation.
Args:
alarm (AlarmListObject): The alarm object to format.
Returns:
str: A string in the format "ID: <alarm_id>, Reason: <reason_text>, Entity: <entity_id>".
"""
return f"[ID: {alarm.get_alarm_id()}, Reason: {alarm.get_reason_text()}, Entity: {alarm.get_entity_id()}]"
def get_timeout_in_seconds(self) -> int:
"""

View File

@@ -12,6 +12,16 @@ class AlarmListObject:
self.severity = None
self.time_stamp = None
def __str__(self):
"""
String representation of this object.
Returns:
str: String representation of this object.
"""
return self.get_alarm_id()
def set_alarm_id(self, alarm_id: str):
"""
Setter for alarm id
@@ -110,7 +120,4 @@ class AlarmListObject:
def __eq__(self, alarm_list_object):
if not isinstance(alarm_list_object, AlarmListObject):
return False
return (self.get_alarm_id() == alarm_list_object.get_alarm_id() and
self.get_severity() == alarm_list_object.get_severity() and
self.get_entity_id() == alarm_list_object.get_entity_id())
return self.get_alarm_id() == alarm_list_object.get_alarm_id() and self.get_severity() == alarm_list_object.get_severity() and self.get_entity_id() == alarm_list_object.get_entity_id()

View File

@@ -0,0 +1,138 @@
from framework.validation.validation import validate_equals_with_retry
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.ptp.pmc.pmc_keywords import PMCKeywords
class PTPReadinessWatcher:
"""
PMC (PTP Management Client) operations to check various PTP parameters with retry logic.
Attributes:
ssh_connection: An instance of an SSH connection.
"""
def __init__(self):
"""
Initializes the PTPReadinessWatcher.
"""
def _get_ptp_instance_paths(self, name: str) -> tuple[str, str]:
"""
Helper method to get the config and socket file paths for a PTP instance.
Args:
name (str): Name of the PTP instance.
Returns:
tuple[str, str]: A tuple containing (config_file_path, socket_file_path).
"""
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
return config_file, socket_file
def wait_for_port_state_appear_in_port_data_set(self, name: str, hostname: str, expected_port_states: list[str]) -> None:
"""
Waits until the port states observed in the port data set match the expected states, or times out.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
expected_port_states (list[str]): List of expected port states to wait for.
Raises:
Exception: If expected port states do not appear within the timeout.
"""
def check_port_state_in_port_data_set(name: str, hostname: str) -> list[str]:
"""
Checks whether the observed port states from the port data set match the expected port states.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
list[str]: List of expected port states.
"""
config_file, socket_file = self._get_ptp_instance_paths(name)
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
observed_states = [obj.get_port_state() for obj in pmc_keywords.pmc_get_port_data_set(config_file, socket_file).get_pmc_get_port_data_set_objects()]
return observed_states
validate_equals_with_retry(lambda: check_port_state_in_port_data_set(name, hostname), expected_port_states, "port state in port data set", 120, 30)
def wait_for_clock_class_appear_in_grandmaster_settings_np(self, name: str, hostname: str, expected_clock_class: int) -> None:
"""
Waits until the clock class observed in the grandmaster settings np match the expected clock class, or times out.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
expected_clock_class (int): expected clock class to wait for.
Raises:
Exception: If expected clock class do not appear within the timeout.
"""
def get_clock_class_in_grandmaster_settings_np(name: str, hostname: str) -> int:
"""
Get the observed clock class from the grandmaster settings np.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
int: observed clock class.
"""
config_file, socket_file = self._get_ptp_instance_paths(name)
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
get_grandmaster_settings_np_object = pmc_keywords.pmc_get_grandmaster_settings_np(config_file, socket_file).get_pmc_get_grandmaster_settings_np_object()
observed_clock_class = get_grandmaster_settings_np_object.get_clock_class()
return observed_clock_class
validate_equals_with_retry(lambda: get_clock_class_in_grandmaster_settings_np(name, hostname), expected_clock_class, "clock class in grandmaster settings np", 120, 30)
def wait_for_gm_clock_class_appear_in_parent_data_set(self, name: str, hostname: str, expected_gm_clock_class: int) -> None:
"""
Waits until the gm clock class observed in the parent data set match the expected clock class, or times out.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
expected_gm_clock_class (int): expected gm clock class to wait for.
Raises:
Exception: If expected gm clock class do not appear within the timeout.
"""
def get_gm_clock_class_in_parent_data_set(name: str, hostname: str) -> int:
"""
Get the observed gm clock class from the parent data set.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
int: observed gm clock class.
"""
config_file, socket_file = self._get_ptp_instance_paths(name)
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
parent_data_set_obj = pmc_keywords.pmc_get_parent_data_set(config_file, socket_file).get_pmc_get_parent_data_set_object()
observed_gm_clock_class = parent_data_set_obj.get_gm_clock_class()
return observed_gm_clock_class
validate_equals_with_retry(lambda: get_gm_clock_class_in_parent_data_set(name, hostname), expected_gm_clock_class, "gm clock class in parent data set", 120, 30)

View File

@@ -237,7 +237,7 @@ class PTPSetupExecutorKeywords(BaseKeyword):
for host, interface in ifaces_to_check:
pci_address = gnss_keywords.get_pci_slot_name(host, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gnss_keywords.validate_gnss_1pps_state_and_pps_dpll_status(host, cgu_location, "SMA1", "valid", ["locked_ho_acq"], 120, 30)
gnss_keywords.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(host, cgu_location, "SMA1", timeout=120, polling_interval=30)
check_sma_status = True
break
@@ -263,7 +263,7 @@ class PTPSetupExecutorKeywords(BaseKeyword):
for host, interface in ifaces_to_check:
pci_address = gnss_keywords.get_pci_slot_name(host, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gnss_keywords.validate_gnss_1pps_state_and_pps_dpll_status(host, cgu_location, "GNSS-1PPS", "valid", ["locked_ho_acq"], 120, 30)
gnss_keywords.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(host, cgu_location, timeout=120, polling_interval=30)
check_gnss_status = True
break

View File

@@ -13,6 +13,7 @@ from keywords.ptp.cat.cat_ptp_cgu_keywords import CatPtpCguKeywords
from keywords.ptp.cat.cat_ptp_config_keywords import CatPtpConfigKeywords
from keywords.ptp.gnss_keywords import GnssKeywords
from keywords.ptp.pmc.pmc_keywords import PMCKeywords
from keywords.ptp.ptp4l.ptp_service_status_validator import PTPServiceStatusValidator
from keywords.ptp.setup.ptp_setup_reader import PTPSetupKeywords
@@ -112,7 +113,7 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Returns: None
"""
systemctl_status_Keywords = SystemCTLStatusKeywords(self.ssh_connection)
ptp_service_status_validator = PTPServiceStatusValidator(self.ssh_connection)
for service_type, setup_list in [
("ptp4l", self.ptp4l_setup_list),
@@ -127,9 +128,9 @@ class PTPVerifyConfigKeywords(BaseKeyword):
for hostname in hostnames:
if service_type == "phc2sys" and "cmdline_opts" in instance_parameters: # Here the PHC service is using the clock from the NIC, not from the PTP instance.
systemctl_status_Keywords.verify_ptp_status_and_instance_parameters_on_hostname(hostname, name, service_name, instance_parameters)
ptp_service_status_validator.verify_status_and_instance_parameters_on_hostname(hostname, name, service_name, instance_parameters)
else:
systemctl_status_Keywords.verify_status_on_hostname(hostname, name, service_name)
ptp_service_status_validator.verify_status_on_hostname(hostname, name, service_name)
def verify_ptp_config_file_content(self) -> None:
"""
@@ -152,9 +153,12 @@ class PTPVerifyConfigKeywords(BaseKeyword):
else:
self.validate_ptp_config_file_content(instance_obj, hostname, config_file)
def verify_ptp_pmc_values(self) -> None:
def verify_ptp_pmc_values(self, check_domain: bool = True) -> None:
"""
verify ptp pmc values
Verify PTP PMC values across all ptp4l instances and host mappings.
Args:
check_domain (bool): Whether to validate the PTP domain number (default: True).
Returns: None
"""
@@ -171,7 +175,8 @@ class PTPVerifyConfigKeywords(BaseKeyword):
self.validate_port_data_set(hostname, name, config_file, socket_file)
self.validate_get_domain(hostname, instance_parameters, config_file, socket_file)
if check_domain:
self.validate_get_domain(hostname, instance_parameters, config_file, socket_file)
self.validate_parent_data_set(hostname, name, port_data_set, config_file, socket_file)

View File

@@ -0,0 +1,47 @@
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
class SystemCTLKeywords(BaseKeyword):
"""
Keywords for systemctl stop/start/restart <service_name> cmds
"""
def __init__(self, ssh_connection: SSHConnection):
self.ssh_connection = ssh_connection
def systemctl_start(self, service_name: str, instance_name: str) -> None:
"""
Starts a systemd service instance remotely using systemctl.
Args:
service_name (str): The base name of the service (e.g., 'ptp4l').
instance_name (str): The specific instance name (e.g., 'ptp1').
Returns: None
"""
self.ssh_connection.send_as_sudo(f"systemctl start {service_name}@{instance_name}.service")
def systemctl_stop(self, service_name: str, instance_name: str) -> None:
"""
Stops a systemd service instance remotely using systemctl.
Args:
service_name (str): The base name of the service (e.g., 'ptp4l').
instance_name (str): The specific instance name (e.g., 'ptp1').
Returns: None
"""
self.ssh_connection.send_as_sudo(f"systemctl stop {service_name}@{instance_name}.service")
def systemctl_restart(self, service_name: str, instance_name: str) -> None:
"""
Restarts a systemd service instance remotely using systemctl.
Args:
service_name (str): The base name of the service (e.g., 'ptp4l').
instance_name (str): The specific instance name (e.g., 'ptp1').
Returns: None
"""
self.ssh_connection.send_as_sudo(f"systemctl restart {service_name}@{instance_name}.service")

View File

@@ -1,10 +1,5 @@
from multiprocessing import get_logger
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.ptp.ptp4l.objects.ptp4l_status_output import PTP4LStatusOutput
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from starlingx.framework.validation.validation import validate_equals
class SystemCTLStatusKeywords(BaseKeyword):
@@ -21,82 +16,8 @@ class SystemCTLStatusKeywords(BaseKeyword):
Args:
service_name (str): the service name
Returns: the output as a list of strings - this should be consumed by a parser for the given output type
Returns:
list[str]: the output as a list of strings - this should be consumed by a parser for the given output type
"""
output = self.ssh_connection.send(f'systemctl status {service_name}')
self.validate_success_return_code(self.ssh_connection)
output = self.ssh_connection.send(f"systemctl status {service_name}")
return output
def verify_status_on_hostname(self, hostname :str, name : str, service_name : str) -> None:
"""
verify systemctl ptp service status on hostname
Args:
hostname (str): The name of the host
name (str): name of instance (e.g., "phc1")
service_name (str): service name (e.g., "phc2sys@phc1.service")
Returns: None
Raises:
Exception: raised when validate fails
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
output = SystemCTLStatusKeywords(ssh_connection).get_status(service_name)
ptp_service_status_output = PTP4LStatusOutput(output)
expected_service_status = "active (running)"
observed_service_status = ptp_service_status_output.get_ptp4l_object(name).get_active()
if expected_service_status in observed_service_status :
get_logger().log_info(f"Validation Successful - systemctl status {service_name}")
else:
get_logger().log_info(f"Validation Failed - systemctl status {service_name}")
get_logger().log_info(f"Expected service status: {expected_service_status}")
get_logger().log_info(f"Observed service status: {observed_service_status}")
raise Exception("Validation Failed")
def verify_ptp_status_and_instance_parameters_on_hostname(self, hostname :str, name : str, service_name : str, instance_parameters : str) -> None:
"""
verify systemctl ptp service status and instance parameters on hostname
Args:
hostname (str): The name of the host
name (str) : name of instance (e.g., "phc1")
service_name (str): service name (e.g., "phc2sys@phc1.service")
instance_parameters (str) : instance parameters
Returns: None
Raises:
Exception: raised when validate fails
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
output = SystemCTLStatusKeywords(ssh_connection).get_status(service_name)
ptp_service_status_output = PTP4LStatusOutput(output)
expected_service_status = "active (running)"
observed_service_status = ptp_service_status_output.get_ptp4l_object(name).get_active()
get_command = ptp_service_status_output.get_ptp4l_object(name).get_command()
# From the input string "cmdline_opts='-s enpXXs0f2 -O -37 -m'"
# The extracted output string is '-s enpXXs0f2 -O -37 -m'
instance_parameter = eval(instance_parameters.split("=")[1])
if expected_service_status in observed_service_status and instance_parameter in get_command :
get_logger().log_info(f"Validation Successful - systemctl status {service_name}")
else:
get_logger().log_info(f"Validation Failed - systemctl status {service_name}")
get_logger().log_info(f"Expected service status: {expected_service_status}")
get_logger().log_info(f"Observed service status: {observed_service_status}")
get_logger().log_info(f"Expected instance parameter: {instance_parameter}")
get_logger().log_info(f"Observed instance parameter: {get_command}")
raise Exception("Validation Failed")

View File

@@ -1,7 +1,6 @@
import re
import time
from multiprocessing import get_logger
from time import sleep
from config.configuration_manager import ConfigurationManager
from keywords.base_keyword import BaseKeyword
@@ -109,13 +108,19 @@ class GnssKeywords(BaseKeyword):
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gpio_switch_port = ptp_config.get_host(host_name).get_nic(nic).get_gpio_switch_port()
command = f"echo 1 > /sys/class/gpio/gpio{gpio_switch_port}/value"
# power on gnss
gnss_ssh_connection.send_as_sudo(command)
expected_gnss_1pps_state = "valid"
expected_pps_dpll_status = ["locked_ho_acq"]
self.validate_gnss_1pps_state_and_pps_dpll_status(hostname, cgu_location, "GNSS-1PPS", expected_gnss_1pps_state, expected_pps_dpll_status)
export_cmd = f"[ ! -d /sys/class/gpio/gpio{gpio_switch_port} ] && " f"echo {gpio_switch_port} | sudo tee /sys/class/gpio/export > /dev/null"
gnss_ssh_connection.send_as_sudo(export_cmd)
# Set direction to output
direction_cmd = f"echo out | tee /sys/class/gpio/gpio{gpio_switch_port}/direction > /dev/null"
gnss_ssh_connection.send_as_sudo(direction_cmd)
# Set GPIO value to 1 (power on GNSS)
value_cmd = f"echo 1 | tee /sys/class/gpio/gpio{gpio_switch_port}/value > /dev/null"
gnss_ssh_connection.send_as_sudo(value_cmd)
self.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(hostname, cgu_location, timeout=1200, polling_interval=120)
def gnss_power_off(self, hostname: str, nic: str) -> None:
"""
@@ -135,74 +140,86 @@ class GnssKeywords(BaseKeyword):
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gpio_switch_port = ptp_config.get_host(host_name).get_nic(nic).get_gpio_switch_port()
command = f"echo 0 > /sys/class/gpio/gpio{gpio_switch_port}/value"
# power off gnss
gnss_ssh_connection.send_as_sudo(command)
export_cmd = f"[ ! -d /sys/class/gpio/gpio{gpio_switch_port} ] && " f"echo {gpio_switch_port} | sudo tee /sys/class/gpio/export > /dev/null"
gnss_ssh_connection.send_as_sudo(export_cmd)
expected_gnss_1pps_state = "invalid"
expected_pps_dpll_status = ["holdover", "freerun"]
self.validate_gnss_1pps_state_and_pps_dpll_status(hostname, cgu_location, "GNSS-1PPS", expected_gnss_1pps_state, expected_pps_dpll_status)
# Set direction to output
direction_cmd = f"echo out | tee /sys/class/gpio/gpio{gpio_switch_port}/direction > /dev/null"
gnss_ssh_connection.send_as_sudo(direction_cmd)
def validate_gnss_1pps_state_and_pps_dpll_status(
# Set GPIO value to 0 (power off GNSS)
value_cmd = f"echo 0 | tee /sys/class/gpio/gpio{gpio_switch_port}/value > /dev/null"
gnss_ssh_connection.send_as_sudo(value_cmd)
# Expected states for validation
expected_cgu_input_state = "invalid"
expected_dpll_status_list = ["holdover"]
self.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(hostname, cgu_location, expected_cgu_input_state=expected_cgu_input_state, expected_dpll_status_list=expected_dpll_status_list, timeout=1500, polling_interval=120)
def validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(
self,
hostname: str,
cgu_location: str,
cgu_input: str,
expected_gnss_1pps_state: str,
expected_pps_dpll_status: list,
cgu_input: str = "GNSS-1PPS",
expected_cgu_input_state: str = "valid",
expected_dpll_status_list: list = ["locked_ho_acq"],
timeout: int = 800,
polling_sleep_time: int = 60,
polling_interval: int = 60,
) -> None:
"""
Validates the GNSS-1PPS state and PPS DPLL status within the specified time.
Validates the synchronization status of SMA1, GNSS 1PPS input, and both EEC and PPS DPLLs
on the specified host within a defined timeout.
Args:
hostname (str): The name of the host.
cgu_location (str): the cgu location.
cgu_input (str): the cgu input name.
expected_gnss_1pps_state (str): The expected gnss 1pss state value.
expected_pps_dpll_status (list): expected list of PPS DPLL status values.
timeout (int): The maximum time (in seconds) to wait for the match.
polling_sleep_time (int): The time period to wait to receive the expected output.
hostname (str): Hostname of the target system.
cgu_location (str): Path to the CGU debug file on the target system.
cgu_input (str): CGU input identifier (e.g., "GNSS_1PPS" or "SMA1").
expected_cgu_input_state (str): Expected CGU input state (e.g., "valid", "invalid").
expected_dpll_status_list (list): List of acceptable DPLL statuses (e.g., ["locked_ho_acq"], ["holdover", "freerun"]).
timeout (int): Maximum wait time in seconds for synchronization (default: 800).
polling_interval (int): Time in seconds between polling attempts (default: 60).
Returns: None
Raises:
TimeoutError: raised when validate does not equal in the required time
TimeoutError: If expected input state or DPLL statuses are not observed within the timeout period.
Notes:
Status Meaning
locked DPLL is locked to a valid timing source.
holdover Timing is maintained using previously locked values (interim fallback).
freerun No synchronization — internal clock is free-running.
invalid Signal or lock state is not usable.
locked_ho_acq locked with holdover acquisition.
"""
get_logger().log_info("Attempting Validation - GNSS-1PPS state and PPS DPLL status")
get_logger().log_info("Attempting Validation - CGU input state and DPLL statuses...")
end_time = time.time() + timeout
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
cat_ptp_cgu_keywords = CatPtpCguKeywords(ssh_connection)
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
cgu_reader = CatPtpCguKeywords(ssh_connection)
# Attempt the validation
while True:
cgu_output = cgu_reader.cat_ptp_cgu(cgu_location)
cgu_component = cgu_output.get_cgu_component()
# Compute the actual status and state that we are trying to validate.
ptp_cgu_output = cat_ptp_cgu_keywords.cat_ptp_cgu(cgu_location)
ptp_cgu_component = ptp_cgu_output.get_cgu_component()
eec_dpll_status = cgu_component.get_eec_dpll().get_status()
pps_dpll_status = cgu_component.get_pps_dpll().get_status()
cgu_input_state = cgu_component.get_cgu_input(cgu_input).get_state()
pps_dpll_object = ptp_cgu_component.get_pps_dpll()
status = pps_dpll_object.get_status()
input_object = ptp_cgu_component.get_cgu_input(cgu_input)
state = input_object.get_state()
if status in expected_pps_dpll_status and state == expected_gnss_1pps_state:
get_logger().log_info("Validation Successful - GNSS-1PPS state and PPS DPLL status")
if cgu_input_state == expected_cgu_input_state and eec_dpll_status in expected_dpll_status_list and pps_dpll_status in expected_dpll_status_list:
get_logger().log_info("Validation Successful - CGU input state and both DPLL statuses match expectations.")
return
else:
get_logger().log_info("Validation Failed")
get_logger().log_info(f"Expected GNSS-1PPS state: {expected_gnss_1pps_state}")
get_logger().log_info(f"Observed GNSS-1PPS state: {state}")
get_logger().log_info(f"Expected PPS DPLL status: {expected_pps_dpll_status}")
get_logger().log_info(f"Observed PPS DPLL status: {status}")
get_logger().log_info(f"Expected CGU input {cgu_input} state: {expected_cgu_input_state}, Observed: {cgu_input_state}")
get_logger().log_info(f"Expected EEC DPLL status: {expected_dpll_status_list}, Observed: {eec_dpll_status}")
get_logger().log_info(f"Expected PPS DPLL status: {expected_dpll_status_list}, Observed: {pps_dpll_status}")
if time.time() < end_time:
get_logger().log_info(f"Retrying in {polling_sleep_time}s")
sleep(polling_sleep_time)
get_logger().log_info(f"Retrying in {polling_interval}s")
time.sleep(polling_interval)
# Move on to the next iteration
else:
raise TimeoutError("Timeout performing validation - GNSS-1PPS state and PPS DPLL status")
raise TimeoutError("Timeout exceeded: CGU input state or DPLL statuses did not meet expected values.")

View File

@@ -1,4 +1,13 @@
import time
from multiprocessing import get_logger
from typing import List
from config.configuration_manager import ConfigurationManager
from framework.ssh.prompt_response import PromptResponse
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.fault_management.alarms.objects.alarm_list_object import AlarmListObject
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
class PhcCtlKeywords(BaseKeyword):
@@ -32,7 +41,7 @@ class PhcCtlKeywords(BaseKeyword):
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} get")
self.validate_success_return_code(self.ssh_connection)
output_str = ''.join(output).replace('\n', '')
output_str = "".join(output).replace("\n", "")
if output_str and len(output_str.split()) > 4:
return output_str.split()[4]
else:
@@ -52,7 +61,7 @@ class PhcCtlKeywords(BaseKeyword):
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} cmp")
self.validate_success_return_code(self.ssh_connection)
output_str = ''.join(output)
output_str = "".join(output)
if output_str and len(output_str.split()) > 5:
return output_str.split()[5]
else:
@@ -73,7 +82,7 @@ class PhcCtlKeywords(BaseKeyword):
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} adj {seconds}")
self.validate_success_return_code(self.ssh_connection)
output_str = ''.join(output).replace('\n', '')
output_str = "".join(output).replace("\n", "")
if output_str and len(output_str.split()) > 5:
return output_str.split()[4]
else:
@@ -99,8 +108,77 @@ class PhcCtlKeywords(BaseKeyword):
output = self.ssh_connection.send_as_sudo(cmd)
self.validate_success_return_code(self.ssh_connection)
output_str = ''.join(output).replace('\n', '')
output_str = "".join(output).replace("\n", "")
if output_str and len(output_str.split()) > 5:
return output_str.split()[5]
else:
raise "output_str.split() is expected to be a List with five elements."
def wait_for_phc_ctl_adjustment_alarm(self, interface: str, alarms: List[AlarmListObject], timeout: int = 120, polling_interval: int = 10) -> None:
"""
Run a remote phc_ctl adjustment loop on the controller as root,
and stop it once the specified PTP alarm(s) are detected or
a timeout occurs.
Args:
interface (str): The interface to apply phc_ctl adjustments to.
alarms (List[AlarmListObject]): A list of expected alarm objects to wait for.
timeout (int): Maximum wait time in seconds (default: 120).
polling_interval (int): Interval in seconds between polling attempts (default: 10).
Returns: None
Raises:
TimeoutError: If the expected alarms are not observed within the timeout period.
"""
# Prepare prompt responses for entering sudo
password = ConfigurationManager.get_lab_config().get_admin_credentials().get_password()
password_prompt = PromptResponse("Password:", password)
def run_as_root(command: str) -> None:
"""
Executes a given shell command on the remote host as the root user using 'sudo su'.
Args:
command (str): The shell command to be executed with root privileges.
Returns:
None
"""
root_prompt = PromptResponse("#", command)
self.ssh_connection.send_expect_prompts("sudo su", [password_prompt, root_prompt])
# Create and store the phc_ctl loop script
remote_script_path = "/tmp/phc_loop.sh"
loop_script = f"while true; do phc_ctl {interface} -q adj 0.0001; sleep 1; done"
run_as_root(f"echo '{loop_script}' > {remote_script_path}")
run_as_root(f"chmod +x {remote_script_path}")
run_as_root(f"nohup bash {remote_script_path} & echo $! > /tmp/phc_loop.pid")
alarm_keywords = AlarmListKeywords(LabConnectionKeywords().get_active_controller_ssh())
alarm_descriptions = ", ".join(alarm_keywords.alarm_to_str(alarm_obj) for alarm_obj in alarms)
get_logger().log_info(f"Waiting for alarms: {alarm_descriptions}")
end_time = time.time() + timeout
all_matched = False
while time.time() < end_time:
observed_alarms = alarm_keywords.alarm_list()
all_matched = all(any(alarm_keywords.alarms_match(observed_alarm_obj, expected_alarm_obj) for observed_alarm_obj in observed_alarms) for expected_alarm_obj in alarms)
if all_matched:
get_logger().log_info("All expected alarms have been observed.")
break
remaining = end_time - time.time()
get_logger().log_info(f"Expected alarms not fully observed yet. Retrying in {polling_interval}s. " f"Time remaining: {remaining:.2f}s")
time.sleep(polling_interval)
# Clean up: stop script and remove temp files
run_as_root("test -f /tmp/phc_loop.pid && kill $(cat /tmp/phc_loop.pid) 2>/dev/null")
run_as_root("rm -f /tmp/phc_loop.sh /tmp/phc_loop.pid")
if not all_matched:
observed_alarm_strs = [alarm_keywords.alarm_to_str(observed_alarm_obj) for observed_alarm_obj in observed_alarms]
raise TimeoutError(f"Timeout: Expected alarms not found within {timeout}s.\n" f"Expected: {alarm_descriptions}\n" f"Observed:\n" + "\n".join(observed_alarm_strs))

View File

@@ -0,0 +1,161 @@
import re
from datetime import datetime, timedelta, timezone
from multiprocessing import get_logger
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals, validate_str_contains
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.linux.systemctl.systemctl_status_keywords import SystemCTLStatusKeywords
from keywords.ptp.ptp4l.objects.ptp4l_status_output import PTP4LStatusOutput
class PTPServiceStatusValidator(BaseKeyword):
"""
A class to validate the status and parameters of PTP (Precision Time Protocol)
services on a target host using systemctl.
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Initializes the PTPServiceStatusValidator with an SSH connection.
Args:
ssh_connection: An instance of an SSH connection.
"""
self.ssh_connection = ssh_connection
def verify_status_on_hostname(self, hostname: str, name: str, service_name: str) -> None:
"""
verify systemctl ptp service status on hostname
Args:
hostname (str): The name of the host
name (str): name of instance (e.g., "phc1")
service_name (str): service name (e.g., "phc2sys@phc1.service")
Returns: None
Raises:
Exception: raised when validate fails
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
output = SystemCTLStatusKeywords(ssh_connection).get_status(service_name)
ptp_service_status_output = PTP4LStatusOutput(output)
expected_service_status = "active (running)"
observed_service_status = ptp_service_status_output.get_ptp4l_object(name).get_active()
validate_str_contains(observed_service_status, expected_service_status, f"systemctl status {service_name}")
def verify_status_and_instance_parameters_on_hostname(self, hostname: str, name: str, service_name: str, instance_parameters: str) -> None:
"""
verify systemctl service status and instance parameters on hostname
Args:
hostname (str): The name of the host
name (str) : name of instance (e.g., "phc1")
service_name (str): service name (e.g., "phc2sys@phc1.service")
instance_parameters (str) : instance parameters
Returns: None
Raises:
Exception: raised when validate fails
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
output = SystemCTLStatusKeywords(ssh_connection).get_status(service_name)
service_status_output = PTP4LStatusOutput(output)
expected_service_status = "active (running)"
observed_service_status = service_status_output.get_ptp4l_object(name).get_active()
get_command = service_status_output.get_ptp4l_object(name).get_command()
# From the input string "cmdline_opts='-s enpXXs0f2 -O -37 -m'"
# The extracted output string is '-s enpXXs0f2 -O -37 -m'
instance_parameter = eval(instance_parameters.split("=")[1])
if expected_service_status in observed_service_status and instance_parameter in get_command:
get_logger().log_info(f"Validation Successful - systemctl status {service_name}")
else:
get_logger().log_info(f"Validation Failed - systemctl status {service_name}")
get_logger().log_info(f"Expected service status: {expected_service_status}")
get_logger().log_info(f"Observed service status: {observed_service_status}")
get_logger().log_info(f"Expected instance parameter: {instance_parameter}")
get_logger().log_info(f"Observed instance parameter: {get_command}")
raise Exception("Validation Failed")
def _is_service_event_recent(self, status_line: str, threshold_seconds: int) -> bool:
"""
Determines if a service event (start, stop, restart) occurred within a given threshold.
Args:
status_line (str): A line like:
'active (running) since Wed 2025-05-28 13:00:00 UTC; 10s ago'
'inactive (dead) since Wed 2025-05-28 12:22:49 UTC; 52min ago'
threshold_seconds (int): Time threshold in seconds.
Returns:
bool: True if the event occurred within the threshold.
"""
match = re.search(r"since (.+? UTC);\s+(\d+)(s|min|h) ago", status_line)
if not match:
raise ValueError(f"Could not parse systemctl status line: {status_line}")
datetime_str, value_str, unit = match.groups()
try:
datetime.strptime(datetime_str.strip(), "%a %Y-%m-%d %H:%M:%S UTC")
except ValueError:
raise ValueError(f"Could not parse timestamp: {datetime_str.strip()}")
# Convert "52min" or "10s" into timedelta
value = int(value_str)
if unit == "s":
delta = timedelta(seconds=value)
elif unit == "min":
delta = timedelta(minutes=value)
elif unit == "h":
delta = timedelta(hours=value)
else:
raise ValueError(f"Unsupported time unit: {unit}")
# Estimate the actual event time from 'ago'
now = datetime.now(timezone.utc)
estimated_event_time = now - delta
# Compare time difference
return (now - estimated_event_time).total_seconds() <= threshold_seconds
def verify_service_status_and_recent_event(self, service_name: str, instance_name: str, threshold_seconds: int, expected_service_status: str = "active (running)") -> None:
"""
Verifies that the given PTP service is in the expected systemctl status and
that its most recent state change occurred within the given threshold.
Args:
service_name (str): service name (e.g., "phc2sys")
instance_name (str): name of instance (e.g., "phc1")
threshold_seconds (int): Time threshold in seconds to check service recency.
expected_service_status (str, optional): Expected status string to match from `systemctl` (default: "active (running)").
Returns: None
Raises:
Exception: If service status is not as expected, or event is too old.
"""
template_instance = f"{service_name}@{instance_name}.service"
output = SystemCTLStatusKeywords(self.ssh_connection).get_status(template_instance)
service_status_output = PTP4LStatusOutput(output)
service_status = service_status_output.get_ptp4l_object(instance_name)
status_line = service_status.get_active()
# Check if the service event (start/stop/restart) was recent
recent_event = self._is_service_event_recent(status_line, threshold_seconds)
validate_equals(recent_event, True, "Service event recency check")
# Validate actual status
validate_str_contains(status_line, expected_service_status, f"systemctl status {template_instance}")

View File

@@ -50,15 +50,15 @@ class SmaKeywords(BaseKeyword):
self.ssh_connection.send_expect_prompts("sudo su", expected_prompts)
# Expected states for validation
expected_gnss_1pps_state = "invalid"
expected_pps_dpll_status = ["holdover"]
expected_cgu_input_state = "invalid"
expected_dpll_status_list = ["holdover"]
# Construct CGU location path
pci_address = gnss_keywords.get_pci_slot_name(hostname, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
# Validate GNSS 1PPS state and DPLL status
gnss_keywords.validate_gnss_1pps_state_and_pps_dpll_status(hostname, cgu_location, "SMA1", expected_gnss_1pps_state, expected_pps_dpll_status)
gnss_keywords.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(hostname, cgu_location, "SMA1", expected_cgu_input_state, expected_dpll_status_list)
def enable_sma(self, hostname: str, nic: str) -> None:
"""
@@ -88,13 +88,9 @@ class SmaKeywords(BaseKeyword):
# Run echo command to crash standby controller
self.ssh_connection.send_expect_prompts("sudo su", expected_prompts)
# Expected states for validation
expected_gnss_1pps_state = "valid"
expected_pps_dpll_status = ["locked_ho_acq"]
# Construct CGU location path
pci_address = gnss_keywords.get_pci_slot_name(hostname, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
# Validate GNSS 1PPS state and DPLL status
gnss_keywords.validate_gnss_1pps_state_and_pps_dpll_status(hostname, cgu_location, "SMA1", expected_gnss_1pps_state, expected_pps_dpll_status)
gnss_keywords.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(hostname, cgu_location, "SMA1")

View File

@@ -1,19 +1,25 @@
import os
import time
from pytest import mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.resources.resource_finder import get_stx_resource_path
from framework.validation.validation import validate_equals_with_retry
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.fault_management.alarms.objects.alarm_list_object import AlarmListObject
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.ptp_readiness_watcher import PTPReadinessWatcher
from keywords.cloud_platform.system.ptp.ptp_setup_executor_keywords import PTPSetupExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_teardown_executor_keywords import PTPTeardownExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_verify_config_keywords import PTPVerifyConfigKeywords
from keywords.files.file_keywords import FileKeywords
from keywords.linux.ip.ip_keywords import IPKeywords
from keywords.ptp.pmc.pmc_keywords import PMCKeywords
from keywords.linux.systemctl.systemctl_keywords import SystemCTLKeywords
from keywords.ptp.gnss_keywords import GnssKeywords
from keywords.ptp.phc_ctl_keywords import PhcCtlKeywords
from keywords.ptp.ptp4l.ptp_service_status_validator import PTPServiceStatusValidator
from keywords.ptp.setup.ptp_setup_reader import PTPSetupKeywords
from keywords.ptp.sma_keywords import SmaKeywords
@@ -70,7 +76,6 @@ def test_ptp_operation_interface_down_and_up():
"""
Verify PTP operation and status change when an interface goes down and comes back up.
Test Steps:
- Bring down controller-0 NIC1.
- Verify that alarm "100.119" appears on controller-1.
@@ -90,6 +95,9 @@ def test_ptp_operation_interface_down_and_up():
- Verify the PMC data on controller-1.
- Download the "/var/log/user.log" file from the active controller.
Preconditions:
- System is set up with valid PTP configuration as defined in ptp_configuration_expectation_compute.json5.
Notes:
- In this scenario, controller-0 NIC1 (configured with ptp1) is powered off.
Initially, ctrl0 NIC1 is in MASTER state, and ctrl1 NIC1 is in SLAVE state.
@@ -123,21 +131,13 @@ def test_ptp_operation_interface_down_and_up():
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
ip_keywords = IPKeywords(ssh_connection)
ptp_readiness_watcher = PTPReadinessWatcher()
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
get_logger().log_info("Verify PTP operation and the corresponding status change when an interface goes down")
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp1 with controller-0), only the required instances that need to be verified are included.
# Unnecessary entries in instance_hostnames and ptp_interface_names—those not relevant to the verification—are
# removed when compared to the original ptp_configuration_expectation_compute.json5 file.
# The ptp1if1 interface is used to retrieve the interface name for the down operation.
ctrl0_nic1_iface_down_ptp_selection = [("ptp1", "controller-1", ["ptp1if1"])]
ctrl0_nic1_iface_down_exp_dict = """{
@@ -203,19 +203,12 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic1_interface} goes down.")
wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["MASTER", "MASTER"])
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["MASTER", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic1_interface} goes down.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_iface_down_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp1 with controller-0 and controller-1), only the required instances that need to be verified
# are included. Unnecessary entries in instance_hostnames and ptp_interface_names—those not relevant to the verification—are
# removed when compared to the original ptp_configuration_expectation_compute.json5 file.
ctrl0_nic1_iface_up_ptp_selection = [("ptp1", "controller-0", []), ("ptp1", "controller-1", [])]
ctrl0_nic1_iface_up_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic1_iface_up_ptp_selection)
@@ -226,19 +219,12 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic1_interface} comes up.")
wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["SLAVE", "MASTER"])
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["SLAVE", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic1_interface} comes up.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_iface_up_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp4 with controller-1), only the required instances that need to be verified are included.
# Unnecessary entries in instance_hostnames and ptp_interface_names—those not relevant to the verification—are
# removed when compared to the original ptp_configuration_expectation_compute.json5 file.
ctrl0_nic2_iface_down_ptp_selection = [("ptp4", "controller-1", [])]
ctrl0_nic2_iface_down_exp_dict = """{
"ptp4l": [
@@ -288,15 +274,6 @@ def test_ptp_operation_interface_down_and_up():
"""
ctrl0_nic2_iface_down_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic2_iface_down_ptp_selection, ctrl0_nic2_iface_down_exp_dict)
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp3 with controller-0 and ptp4 with controller-1), only the required instances that need
# to be verified are included. Unnecessary entries in instance_hostnames and ptp_interface_names—those not
# relevant to the verification—are removed when compared to the original ptp_configuration_expectation_compute.json5 file.
# The ptp3if1 interface is used to retrieve the interface name for the down operation.
ctrl0_nic2_iface_up_ptp_selection = [("ptp3", "controller-0", ["ptp3if1"]), ("ptp4", "controller-1", [])]
ctrl0_nic2_iface_up_exp_dict_overrides = {"ptp4l": [{"name": "ptp4", "controller-1": {"grandmaster_settings": {"clock_class": 165}}}]}
ctrl0_nic2_iface_up_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic2_iface_up_ptp_selection, expected_dict_overrides=ctrl0_nic2_iface_up_exp_dict_overrides)
@@ -316,7 +293,7 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic2_interface} goes down.")
wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["MASTER", "MASTER"])
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["MASTER", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic2_interface} goes down.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_iface_down_ptp_setup)
@@ -329,7 +306,7 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic2_interface} comes up.")
wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["SLAVE", "MASTER"])
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["SLAVE", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic2_interface} comes up.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_iface_up_ptp_setup)
@@ -356,22 +333,19 @@ def test_ptp_operation_sma_disabled_and_enable():
- Wait for 100.119 to alarm to clear.
- Wait for clock class to appear in grandmaster settings.
- Verify PTP PMC values.
Preconditions:
- System is set up with valid PTP configuration as defined in ptp_configuration_expectation_compute.json5.
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
ptp_readiness_watcher = PTPReadinessWatcher()
get_logger().log_info("Verifying PTP operation and corresponding status changes when SMA is disabled.")
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp3 with controller-0 and ptp4 with controller-1), only the required instances that need
# to be verified are included. Unnecessary entries in instance_hostnames and ptp_interface_names—those not
# relevant to the verification—are removed when compared to the original ptp_configuration_expectation_compute.json5 file.
ctrl0_nic2_sma1_disable_ptp_selection = [("ptp3", "controller-0", []), ("ptp4", "controller-1", [])]
ctrl0_nic2_sma1_disable_exp_dict = """{
"ptp4l": [
@@ -460,6 +434,8 @@ def test_ptp_operation_sma_disabled_and_enable():
sma_keywords.disable_sma("controller-0", "nic2")
get_logger().log_info("Waiting for alarm 100.119 to appear after SMA is disabled.")
ptp_config = ConfigurationManager.get_ptp_config()
interface = ptp_config.get_host("controller_0").get_nic("nic2").get_base_port()
not_locked_alarm_obj = AlarmListObject()
not_locked_alarm_obj.set_alarm_id("100.119")
@@ -469,12 +445,12 @@ def test_ptp_operation_sma_disabled_and_enable():
signal_loss_alarm_obj = AlarmListObject()
signal_loss_alarm_obj.set_alarm_id("100.119")
signal_loss_alarm_obj.set_reason_text("controller-0 1PPS signal loss state: holdover")
signal_loss_alarm_obj.set_entity_id("host=controller-0.interface=enp138s0f0.ptp=1PPS-signal-loss")
signal_loss_alarm_obj.set_entity_id(f"host=controller-0.interface={interface}.ptp=1PPS-signal-loss")
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj, signal_loss_alarm_obj])
get_logger().log_info("Waiting for clock class after SMA1 is disabled")
wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 7)
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 7)
get_logger().log_info("Verifying PMC data after SMA1 is disabled")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_sma1_disable_exp_ptp_setup)
@@ -493,83 +469,485 @@ def test_ptp_operation_sma_disabled_and_enable():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([alarm_list_object])
get_logger().log_info("Waiting for clock class after SMA1 is enabled")
wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
get_logger().log_info("Verifying PMC data after SMA1 is enabled")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_sma1_enable_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
def wait_for_port_state_appear_in_port_data_set(name: str, hostname: str, expected_port_states: list[str]) -> None:
@mark.p1
@mark.lab_has_compute
@mark.lab_has_ptp_configuration_compute
def test_ptp_operation_gnss_off_and_on():
"""
Waits until the port states observed in the port data set match the expected states, or times out.
Verify PTP behavior when GNSS is powered off and then back on.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
expected_port_states (list[str]): List of expected port states to wait for.
Test Steps:
- Powers off the GNSS input for Controller-0 NIC1.
- Verifies the expected PTP alarms and clock class degradation.
- Verifies expected PTP PMC configuration when GNSS is off.
- Powers the GNSS back on.
- Confirms the alarms are cleared and clock class is restored.
- Verifies expected PTP PMC configuration when GNSS is back on.
Raises:
Exception: If expected port states do not appear within the timeout.
Preconditions:
- System is set up with valid PTP configuration as defined in ptp_configuration_expectation_compute.json5.
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
def check_port_state_in_port_data_set(name: str, hostname: str) -> list[str]:
"""
Checks whether the observed port states from the port data set match the expected port states.
ptp_readiness_watcher = PTPReadinessWatcher()
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
get_logger().log_info("Verifying PTP operation and status when GNSS is turned off...")
Returns:
list[str]: List of expected port states.
"""
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
selected_instances = [("ptp1", "controller-0", []), ("ptp1", "controller-1", []), ("ptp3", "controller-0", []), ("ptp4", "controller-1", [])]
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
# controller-0 with GNSS disabled demotes itself (clockClass: 248) and becomes a SLAVE.
# controller-1 (still in degraded state but better than 248) becomes MASTER.
# Synchronization continues from controller-1 to controller-0.
# External clients (e.g., Proxmox) continue receiving PTP sync via remaining MASTER ports.
# Clock quality degradation is tracked via clock_class, accuracy, and variance.
ctrl0_nic1_gnss_disable_exp_dict = """{
"ptp4l": [
{
"name": "ptp1",
"controller-0": {
"parent_data_set": {
"gm_clock_class": 165, // GM is in holdover or degraded mode due to GNSS loss
"gm_clock_accuracy": "0xfe", // Accuracy unknown due to GNSS signal loss
"gm_offset_scaled_log_variance": "0xffff" // Clock stability unknown
},
"time_properties_data_set": {
"current_utc_offset": 37, // Standard UTC offset (can be static)
"current_utc_offset_valid": 0, // UTC offset is not currently valid
"time_traceable": 0, // Time is not traceable to a valid source
"frequency_traceable": 0 // Frequency is not traceable to a valid reference
},
"grandmaster_settings": {
"clock_class": 248, // Indicates GNSS signal is lost (free-running or degraded)
"clock_accuracy": "0xfe", // Accuracy is unknown
"offset_scaled_log_variance": "0xffff", // Clock variance is unknown
"time_traceable": 0, // Time not traceable
"frequency_traceable": 0, // Frequency not traceable
"time_source": "0xa0", // Time source originally GNSS (0xA0)
"current_utc_offset_valid": 0 // UTC offset invalid due to signal loss
},
"port_data_set": [
{
"interface": "{{ controller_0.nic1.nic_connection.interface }}",
"port_state": "SLAVE", // Now syncing from controller-1, no longer acting as GM
"parent_port_identity" : {
"name": "ptp1", // Source PTP instance (controller-1)
"hostname":"controller-1", // Controller now acting as GM
"interface": "{{ controller_1.nic1.nic_connection.interface }}" // Source interface on controller-1
},
},
{
"interface": "{{ controller_0.nic1.conn_to_proxmox }}",
"port_state": "MASTER" // Continues to send time sync to Proxmox as master
}
]
},
"controller-1": {
"parent_data_set": {
"gm_clock_class": 165, // Controller-1 is GM but in holdover/degraded state
"gm_clock_accuracy": "0xfe", // Unknown accuracy
"gm_offset_scaled_log_variance": "0xffff" // Unknown clock variance
},
"time_properties_data_set": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0,
"time_traceable": 0,
"frequency_traceable": 0
},
"grandmaster_settings": {
"clock_class": 165, // Controller-1 is acting as the best available GM
"clock_accuracy": "0xfe",
"offset_scaled_log_variance": "0xffff",
"time_traceable": 0,
"frequency_traceable": 0,
"time_source": "0xa0",
"current_utc_offset_valid": 0
},
"port_data_set": [
{
"interface": "{{ controller_1.nic1.nic_connection.interface }}",
"port_state": "MASTER" // Now acting as the Grandmaster
},
{
"interface": "{{ controller_1.nic1.conn_to_proxmox }}",
"port_state": "MASTER" // Continues acting as master toward external clients
}
]
}
},
{
"name": "ptp3",
"controller-0": {
"parent_data_set": {
"gm_clock_class": 165, // Degraded GM status
"gm_clock_accuracy": "0xfe",
"gm_offset_scaled_log_variance": "0xffff"
},
"time_properties_data_set": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0,
"time_traceable": 0,
"frequency_traceable": 0
},
"grandmaster_settings": {
"clock_class": 248, // Lost GNSS, degraded state
"clock_accuracy": "0xfe",
"offset_scaled_log_variance": "0xffff",
"time_traceable": 0,
"frequency_traceable": 0,
"time_source": "0xa0",
"current_utc_offset_valid": 0
},
"port_data_set": [
{
"interface": "{{ controller_0.nic2.nic_connection.interface }}",
"port_state": "SLAVE", // Now following ptp4 on controller-1 NIC2
"parent_port_identity" : {
"name": "ptp4",
"hostname":"controller-1",
"interface": "{{ controller_1.nic2.nic_connection.interface }}"
},
},
{
"interface": "{{ controller_0.nic2.conn_to_proxmox }}",
"port_state": "MASTER" // Still acting as master to Proxmox
}
]
}
},
{
"name": "ptp4",
"controller-1": {
"parent_data_set": {
"gm_clock_class": 165,
"gm_clock_accuracy": "0xfe",
"gm_offset_scaled_log_variance": "0xffff"
},
"time_properties_data_set": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0,
"time_traceable": 0,
"frequency_traceable": 0
},
"grandmaster_settings": {
"clock_class": 165,
"clock_accuracy": "0xfe",
"offset_scaled_log_variance": "0xffff",
"time_traceable": 0,
"frequency_traceable": 0,
"time_source": "0xa0",
"current_utc_offset_valid": 0
},
"port_data_set": [
{
"interface": "{{ controller_1.nic2.nic_connection.interface }}",
"port_state": "MASTER" // Acting as GM for NIC2
},
{
"interface": "{{ controller_1.nic2.conn_to_proxmox }}",
"port_state": "MASTER" // Acts as GM for external Proxmox
}
]
}
}
]
}"""
observed_states = [obj.get_port_state() for obj in pmc_keywords.pmc_get_port_data_set(config_file, socket_file).get_pmc_get_port_data_set_objects()]
ctrl0_nic1_gnss_disable_exp_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, selected_instances, ctrl0_nic1_gnss_disable_exp_dict)
return observed_states
get_logger().log_info("Turning off GNSS for Controller-0 NIC1.")
gnss_keywords = GnssKeywords()
gnss_keywords.gnss_power_off("controller-0", "nic1")
validate_equals_with_retry(lambda: check_port_state_in_port_data_set(name, hostname), expected_port_states, "port state in port data set", 120, 30)
get_logger().log_info("Waiting for alarm 100.119 to appear due to GNSS off.")
ptp_config = ConfigurationManager.get_ptp_config()
interface = ptp_config.get_host("controller_0").get_nic("nic1").get_base_port()
ptp1_not_locked_alarm_obj = AlarmListObject()
ptp1_not_locked_alarm_obj.set_alarm_id("100.119")
ptp1_not_locked_alarm_obj.set_reason_text("controller-1 is not locked to remote PTP Grand Master")
ptp1_not_locked_alarm_obj.set_entity_id("host=controller-1.instance=ptp1.ptp=no-lock")
ptp4_not_locked_alarm_obj = AlarmListObject()
ptp4_not_locked_alarm_obj.set_alarm_id("100.119")
ptp4_not_locked_alarm_obj.set_reason_text("controller-1 is not locked to remote PTP Grand Master")
ptp4_not_locked_alarm_obj.set_entity_id("host=controller-1.instance=ptp4.ptp=no-lock")
pps_signal_loss_alarm_obj = AlarmListObject()
pps_signal_loss_alarm_obj.set_alarm_id("100.119")
pps_signal_loss_alarm_obj.set_reason_text("controller-0 1PPS signal loss state: holdover")
pps_signal_loss_alarm_obj.set_entity_id(f"host=controller-0.interface={interface}.ptp=1PPS-signal-loss")
gnss_signal_loss_alarm_obj = AlarmListObject()
gnss_signal_loss_alarm_obj.set_alarm_id("100.119")
gnss_signal_loss_alarm_obj.set_reason_text("controller-0 GNSS signal loss state: holdover")
gnss_signal_loss_alarm_obj.set_entity_id(f"host=controller-0.interface={interface}.ptp=GNSS-signal-loss")
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([ptp1_not_locked_alarm_obj, ptp4_not_locked_alarm_obj, pps_signal_loss_alarm_obj, gnss_signal_loss_alarm_obj])
get_logger().log_info("Verifying clock class degradation after GNSS is off.")
# The clock is in "Holdover" or "Degraded" mode
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 248)
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 248)
# GNSS loss
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 165)
get_logger().log_info("Verifying PMC configuration after GNSS is off.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_gnss_disable_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
get_logger().log_info("Turning GNSS back on for Controller-0 NIC1...")
gnss_keywords.gnss_power_on("controller-0", "nic1")
get_logger().log_info("Waiting for alarm 100.119 to clear after GNSS is back on.")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ptp1_not_locked_alarm_obj, ptp4_not_locked_alarm_obj, pps_signal_loss_alarm_obj, gnss_signal_loss_alarm_obj])
get_logger().log_info("Verifying clock class restoration after GNSS is on.")
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 6)
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 6)
get_logger().log_info("Verifying PMC configuration after GNSS is restored.")
ctrl0_nic1_gnss_enable_exp_dict_overrides = {"ptp4l": [{"name": "ptp4", "controller-1": {"grandmaster_settings": {"clock_class": 165}}}]}
ctrl0_nic1_gnss_enable_exp_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, selected_instances, expected_dict_overrides=ctrl0_nic1_gnss_enable_exp_dict_overrides)
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_gnss_enable_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
def wait_for_clock_class_appear_in_grandmaster_settings_np(name: str, hostname: str, expected_clock_class: int) -> None:
@mark.p1
@mark.lab_has_compute
@mark.lab_has_ptp_configuration_compute
def test_ptp_operation_phc_ctl_time_change():
"""
Waits until the clock class observed in the grandmaster settings np match the expected clock class, or times out.
Verify PTP behavior when the PHC (Precision Hardware Clock) is adjusted manually using `phc_ctl`
and then returned to normal.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
expected_clock_class (int): expected clock class to wait for.
Test Steps:
- Identify controller-0 NIC1 interface.
- Start phc_ctl loop on controller-0 NIC1 and verify that out-of-tolerance alarms (ID 100.119) are triggered.
- Stop the adjustment and wait for alarms to clear.
- Identify controller-1 NIC2 interface.
- Start phc_ctl loop on controller-1 NIC2 and verify that out-of-tolerance alarms (ID 100.119) are triggered.
- Stop the adjustment and wait for alarms to clear.
Raises:
Exception: If expected clock class do not appear within the timeout.
Preconditions:
- The system must have a valid PTP configuration as defined in `ptp_configuration_expectation_compute.json5`.
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
def get_clock_class_in_grandmaster_settings_np(name: str, hostname: str) -> int:
"""
Get the observed clock class from the grandmaster settings np.
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup = ptp_setup_keywords.generate_ptp_setup_from_template(ptp_setup_template_path)
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
get_logger().log_info("Verifying PTP operation with phc_ctl time change on controller-0 NIC1...")
interfaces = ptp_setup.get_ptp4l_setup("ptp1").get_ptp_interface("ptp1if1").get_interfaces_for_hostname("controller-0")
if not interfaces:
raise Exception("No interfaces found for controller-0 NIC1")
ctrl0_nic1_interface = interfaces[0]
Returns:
int: observed clock class.
"""
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
ctrl0_ptp3_oot_alarm_obj = AlarmListObject()
ctrl0_ptp3_oot_alarm_obj.set_alarm_id("100.119")
ctrl0_ptp3_oot_alarm_obj.set_reason_text(r"controller-0 Precision Time Protocol \(PTP\) clocking is out of tolerance by (\d+\.\d+) (milli|micro)secs")
ctrl0_ptp3_oot_alarm_obj.set_entity_id("host=controller-0.instance=ptp3.ptp=out-of-tolerance")
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
ctrl0_ptp1_oot_alarm_obj = AlarmListObject()
ctrl0_ptp1_oot_alarm_obj.set_alarm_id("100.119")
ctrl0_ptp1_oot_alarm_obj.set_reason_text(r"controller-0 Precision Time Protocol \(PTP\) clocking is out of tolerance by (\d+\.\d+) (milli|micro)secs")
ctrl0_ptp1_oot_alarm_obj.set_entity_id("host=controller-0.instance=ptp1.ptp=out-of-tolerance")
get_grandmaster_settings_np_object = pmc_keywords.pmc_get_grandmaster_settings_np(config_file, socket_file).get_pmc_get_grandmaster_settings_np_object()
observed_clock_class = get_grandmaster_settings_np_object.get_clock_class()
ctrl1_ptp1_oot_alarm_obj = AlarmListObject()
ctrl1_ptp1_oot_alarm_obj.set_alarm_id("100.119")
ctrl1_ptp1_oot_alarm_obj.set_reason_text(r"controller-1 Precision Time Protocol \(PTP\) clocking is out of tolerance by (\d+\.\d+) (milli|micro)secs")
ctrl1_ptp1_oot_alarm_obj.set_entity_id("host=controller-1.instance=ptp1.ptp=out-of-tolerance")
return observed_clock_class
phc_ctl_keywords = PhcCtlKeywords(lab_connect_keywords.get_ssh_for_hostname("controller-0"))
phc_ctl_keywords.wait_for_phc_ctl_adjustment_alarm(ctrl0_nic1_interface, [ctrl0_ptp3_oot_alarm_obj, ctrl0_ptp1_oot_alarm_obj, ctrl1_ptp1_oot_alarm_obj])
validate_equals_with_retry(lambda: get_clock_class_in_grandmaster_settings_np(name, hostname), expected_clock_class, "clock class in grandmaster settings np", 120, 30)
get_logger().log_info("Waiting for alarm 100.119 to clear after stopping phc_ctl on controller-0...")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ctrl0_ptp3_oot_alarm_obj, ctrl0_ptp1_oot_alarm_obj, ctrl1_ptp1_oot_alarm_obj])
get_logger().log_info("Verifying PTP operation with phc_ctl time change on controller-1 NIC2...")
interfaces = ptp_setup.get_ptp4l_setup("ptp4").get_ptp_interface("ptp4if1").get_interfaces_for_hostname("controller-1")
if not interfaces:
raise Exception("No interfaces found for controller-1 NIC2")
ctrl1_nic2_interface = interfaces[0]
ctrl1_ptp4_oot_alarm_obj = AlarmListObject()
ctrl1_ptp4_oot_alarm_obj.set_alarm_id("100.119")
ctrl1_ptp4_oot_alarm_obj.set_reason_text(r"controller-1 Precision Time Protocol \(PTP\) clocking is out of tolerance by (\d+\.\d+) (milli|micro)secs")
ctrl1_ptp4_oot_alarm_obj.set_entity_id("host=controller-1.instance=ptp4.ptp=out-of-tolerance")
phc_ctl_keywords = PhcCtlKeywords(lab_connect_keywords.get_ssh_for_hostname("controller-1"))
phc_ctl_keywords.wait_for_phc_ctl_adjustment_alarm(ctrl1_nic2_interface, [ctrl1_ptp1_oot_alarm_obj, ctrl1_ptp4_oot_alarm_obj])
get_logger().log_info("Waiting for alarm 100.119 to clear after stopping phc_ctl on controller-1...")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ctrl1_ptp1_oot_alarm_obj, ctrl1_ptp4_oot_alarm_obj])
@mark.p1
@mark.lab_has_compute
@mark.lab_has_ptp_configuration_compute
def test_ptp_operation_service_stop_start_restart():
"""
Verify Precision Time Protocol (PTP) behavior when the PTP service is stopped, started, and restarted.
Test Steps:
- Stop the PTP service (ptp4l@ptp1.service) on controller-0.
- Verify service status becomes inactive and appropriate alarms are raised.
- Verify degradation in PTP configuration (clock class, grandmaster settings).
- Start the PTP service on controller-0.
- Verify service status becomes active and alarms clear.
- Verify full restoration of PTP configuration.
- Restart the PTP service.
- Verify service reactivation, alarm clearance, and final configuration validation.
Preconditions:
- System is configured with a valid PTP setup (as per ptp_configuration_expectation_compute.json5).
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
systemctl_keywords = SystemCTLKeywords(ssh_connection)
ptp_service_status_validator = PTPServiceStatusValidator(ssh_connection)
ptp_readiness_watcher = PTPReadinessWatcher()
get_logger().log_info("Stopping ptp4l@ptp1.service on controller-0...")
selected_instances = [("ptp1", "controller-0", []), ("ptp1", "controller-1", [])]
# Expected degraded configuration after service stop on controller-0
ctrl0_ptp1_service_stop_exp_dict = """{
"ptp4l": [
{
"name": "ptp1",
"controller-0": {
"parent_data_set": {
"gm_clock_class": -1,
"gm_clock_accuracy": "",
"gm_offset_scaled_log_variance": ""
},
"time_properties_data_set": {
"current_utc_offset": -1,
"current_utc_offset_valid": -1,
"time_traceable": -1,
"frequency_traceable": -1
},
"grandmaster_settings": {
"clock_class": -1,
"clock_accuracy": "",
"offset_scaled_log_variance": "",
"time_traceable": -1,
"frequency_traceable": -1,
"time_source": "",
"current_utc_offset_valid": -1
},
"port_data_set": [
{
"interface": "{{ controller_1.nic1.nic_connection.interface }}",
"port_state": ""
},
{
"interface": "{{ controller_1.nic1.conn_to_proxmox }}",
"port_state": ""
}
]
},
"controller-1": {
"parent_data_set": {
"gm_clock_class": 165, // Controller-1 is now acting as the Grandmaster (GM) in degraded mode
"gm_clock_accuracy": "0xfe", // Clock accuracy unknown
"gm_offset_scaled_log_variance": "0xffff" // Clock stability/variance is unknown
},
"time_properties_data_set": {
"current_utc_offset": 37, // Standard UTC offset
"current_utc_offset_valid": 0, // UTC offset is not currently valid (as time source is degraded)
"time_traceable": 0, // The time is not traceable to a known accurate source
"frequency_traceable": 0 // The frequency is not traceable either
},
"grandmaster_settings": {
"clock_class": 165, // Degraded or holdover mode (not traceable to GNSS or accurate time)
"clock_accuracy": "0xfe", // Accuracy unknown
"offset_scaled_log_variance": "0xffff", // Stability unknown
"time_traceable": 0, // Time not traceable
"frequency_traceable": 0, // Frequency not traceable
"time_source": "0xa0", // GNSS is the original source (0xA0), but signal is currently not valid
"current_utc_offset_valid": 0 // UTC offset validity flag is unset
},
"port_data_set": [
{
"interface": "{{ controller_1.nic1.nic_connection.interface }}",
"port_state": "MASTER" // controller-1's NIC1 is now the active master (providing time)
},
{
"interface": "{{ controller_1.nic1.conn_to_proxmox }}",
"port_state": "MASTER" // controller-1 continues to serve time externally (to Proxmox or others)
}
]
}
}
]
}"""
ctrl0_ptp1_service_stop_exp_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, selected_instances, ctrl0_ptp1_service_stop_exp_dict)
systemctl_keywords.systemctl_stop("ptp4l", "ptp1")
time.sleep(10)
get_logger().log_info("Verifying ptp service status and recent stop event...")
ptp_service_status_validator.verify_service_status_and_recent_event("ptp4l", "ptp1", 30, "inactive (dead)")
get_logger().log_info("Waiting for alarms 100.119 due to service stop...")
ctrl0_alarm = AlarmListObject()
ctrl0_alarm.set_alarm_id("100.119")
ctrl0_alarm.set_reason_text(r"Provisioned Precision Time Protocol \(PTP\) 'hardware' time stamping mode seems to be unsupported by this host")
ctrl0_alarm.set_entity_id("host=controller-0.instance=ptp1.ptp")
ctrl1_alarm = AlarmListObject()
ctrl1_alarm.set_alarm_id("100.119")
ctrl1_alarm.set_reason_text("controller-1 is not locked to remote PTP Grand Master")
ctrl1_alarm.set_entity_id("host=controller-1.instance=ptp1.ptp=no-lock")
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([ctrl0_alarm, ctrl1_alarm])
get_logger().log_info("Verifying degraded PMC values after service stop...")
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-1", 165)
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_ptp1_service_stop_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values(check_domain=False)
get_logger().log_info("Starting ptp4l@ptp1.service on controller-0...")
systemctl_keywords.systemctl_start("ptp4l", "ptp1")
time.sleep(10)
ptp_service_status_validator.verify_service_status_and_recent_event("ptp4l", "ptp1", 30)
get_logger().log_info("Waiting for alarms to clear after start...")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ctrl0_alarm, ctrl1_alarm])
get_logger().log_info("Verifying full PMC configuration after service start...")
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-1", 6)
start_exp_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, selected_instances)
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, start_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values(check_domain=False)
get_logger().log_info("Restarting ptp4l@ptp1.service on controller-0...")
systemctl_keywords.systemctl_restart("ptp4l", "ptp1")
time.sleep(10)
ptp_service_status_validator.verify_service_status_and_recent_event("ptp4l", "ptp1", 30)
get_logger().log_info("Waiting for alarms to clear after restart...")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ctrl0_alarm, ctrl1_alarm])
get_logger().log_info("Verifying PMC configuration and clock class after service restart...")
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 6)
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 6)
ptp_verify_config_keywords.verify_ptp_pmc_values(check_domain=False)