Verify PTP operation and status change when an interface and SMA.

Change-Id: I7e0497e817939bb8af4212eb10b8f7b5a26d75c8
Signed-off-by: Guntaka Umashankar Reddy <umashankarguntaka.reddy@windriver.com>
This commit is contained in:
Guntaka Umashankar Reddy
2025-05-15 07:41:20 -04:00
parent c6ef1930ae
commit 3676085c8e
9 changed files with 842 additions and 19 deletions

View File

@@ -30,7 +30,7 @@ class AlarmListKeywords(BaseKeyword):
Returns: the list of alarms
"""
output = self._ssh_connection.send(source_openrc('fm alarm-list --nowrap'))
output = self._ssh_connection.send(source_openrc("fm alarm-list --nowrap"))
self.validate_success_return_code(self._ssh_connection)
alarms = AlarmListOutput(output)
@@ -62,9 +62,7 @@ class AlarmListKeywords(BaseKeyword):
get_logger().log_info(f"All alarms in this SSH connection ({self.get_ssh_connection()}) are now cleared.")
return
alarm_ids = ", ".join([alarm.get_alarm_id() for alarm in alarms])
get_logger().log_info(
f"There are still some alarms active in this SSH connection ({self.get_ssh_connection()}). Active alarms IDs: {alarm_ids}. Waiting for {self.get_check_interval_in_seconds():.3f} more seconds. Remaining time: {(end_time - now):.3f} seconds."
)
get_logger().log_info(f"There are still some alarms active in this SSH connection ({self.get_ssh_connection()}). Active alarms IDs: {alarm_ids}. Waiting for {self.get_check_interval_in_seconds():.3f} more seconds. Remaining time: {(end_time - now):.3f} seconds.")
time.sleep(self.get_check_interval_in_seconds())
alarms = self.alarm_list()
now = time.time()
@@ -110,15 +108,45 @@ class AlarmListKeywords(BaseKeyword):
get_logger().log_info(f"All alarms defined by the following IDs: {alarm_ids} are now cleared in this SSH connection ({self.get_ssh_connection()}).")
return
get_logger().log_info(
f"Not all alarms with the following IDs: {alarm_ids} have been cleared in this SSH connection ({self.get_ssh_connection()}). Waiting for {self.get_check_interval_in_seconds():.3f} more seconds. Remaining time: {(end_time - now):.3f} seconds."
)
get_logger().log_info(f"Not all alarms with the following IDs: {alarm_ids} have been cleared in this SSH connection ({self.get_ssh_connection()}). Waiting for {self.get_check_interval_in_seconds():.3f} more seconds. Remaining time: {(end_time - now):.3f} seconds.")
time.sleep(self._check_interval_in_seconds)
current_alarms = self.alarm_list()
now = time.time()
raise TimeoutError(f"The alarms identified by the following IDs: {alarm_ids} could not be cleared within a period of {self.get_timeout_in_seconds()} seconds.")
def wait_for_alarms_to_appear(self, alarms: list[AlarmListObject]) -> None:
"""
Waits for the specified alarms to appear on the SSH connection within the timeout
period defined by 'get_timeout_in_seconds()'. Validates Alarm ID, Reason Text, and Entity ID.
Args:
alarms (list[AlarmListObject]): The list of alarms to wait for.
Returns: None
Raises:
TimeoutError: if alarms are not found within the timeout period.
"""
timeout = self.get_timeout_in_seconds()
check_interval = self.get_check_interval_in_seconds()
end_time = time.time() + self.get_timeout_in_seconds()
alarm_descriptions = ", ".join(f"[ID: {alarm.get_alarm_id()}, Reason: {alarm.get_reason_text()}, Entity: {alarm.get_entity_id()}]" for alarm in alarms)
while time.time() < end_time:
current_alarms = self.alarm_list()
all_matched = all(any(current.get_alarm_id() == expected.get_alarm_id() and current.get_reason_text() == expected.get_reason_text() and current.get_entity_id() == expected.get_entity_id() for current in current_alarms) for expected in alarms)
if all_matched:
get_logger().log_info(f"All expected alarms are now present in SSH connection ({self.get_ssh_connection()}): {alarm_descriptions}")
return
get_logger().log_info(f"Waiting for expected alarms to appear in SSH connection ({self.get_ssh_connection()}). " f"Retrying in {check_interval:.3f} seconds. Remaining time: {end_time - time.time():.3f} seconds.")
time.sleep(check_interval)
raise TimeoutError(f"The following alarms did not appear within {timeout} seconds: {alarm_descriptions}")
def get_timeout_in_seconds(self) -> int:
"""
Gets an integer representing the maximum time in seconds to wait for the alarms to be cleared.

View File

@@ -1,4 +1,4 @@
from typing import Any, List, Tuple
from typing import Any
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals, validate_equals_with_retry, validate_str_contains
@@ -250,7 +250,16 @@ class PTPSetupExecutorKeywords(BaseKeyword):
expected_gnss_port = gnss_keywords.extract_gnss_port(ts2phc_instance_obj.get_instance_parameters())
if not expected_gnss_port:
continue
ifaces_to_check = [(host, iface) for host in ts2phc_instance_obj.get_instance_hostnames() for ptp_host_if in ts2phc_instance_obj.get_ptp_interfaces() for iface in filter(None, ptp_host_if.get_interfaces_for_hostname(host)) if gnss_keywords.get_gnss_serial_port_from_gnss_directory(host, iface) == expected_gnss_port]
ifaces_to_check = []
for host in ts2phc_instance_obj.get_instance_hostnames():
for ptp_host_if in ts2phc_instance_obj.get_ptp_interfaces():
interfaces = ptp_host_if.get_interfaces_for_hostname(host)
for iface in filter(None, interfaces):
gnss_port = gnss_keywords.get_gnss_serial_port_from_gnss_directory(host, iface)
if gnss_port == expected_gnss_port:
ifaces_to_check.append((host, iface))
for host, interface in ifaces_to_check:
pci_address = gnss_keywords.get_pci_slot_name(host, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"

View File

@@ -24,17 +24,16 @@ class PTPVerifyConfigKeywords(BaseKeyword):
ssh_connection: An instance of an SSH connection.
"""
def __init__(self, ssh_connection, ptp_setup_template_path):
def __init__(self, ssh_connection, ptp_setup: PTPSetupKeywords):
"""
Initializes the PTPVerifyConfigKeywords with an SSH connection.
Args:
ssh_connection: An instance of an SSH connection.
ptp_setup_template_path : ptp setup template path
expected_dict : expected dict
"""
self.ssh_connection = ssh_connection
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup = ptp_setup_keywords.generate_ptp_setup_from_template(ptp_setup_template_path)
self.ptp_setup = ptp_setup
self.ptp4l_setup_list = ptp_setup.get_ptp4l_setup_list()
@@ -432,7 +431,7 @@ class PTPVerifyConfigKeywords(BaseKeyword):
raise Exception("Observed port data set objects contains more entries than expected port data set objects")
for expected_port_data_set_obj, observed_port_data_set_obj in zip(expected_port_data_set_objects, observed_port_data_set_objects):
validate_equals(observed_port_data_set_obj.get_port_state(), expected_port_data_set_obj.get_port_state(), "portState value within GET PORT_DATA_SET")
validate_list_contains(observed_port_data_set_obj.get_port_state(), expected_port_data_set_obj.get_port_state(), "portState value within GET PORT_DATA_SET")
def validate_get_domain(
self,
@@ -524,6 +523,10 @@ class PTPVerifyConfigKeywords(BaseKeyword):
if not all([parent_instance_name, parent_hostname, parent_interface]):
continue # Skip incomplete entries
# If the system becomes its own master instead of using the remote,
# update the last digit in the observed parent port identity from 0 to 1.
observed_parent_port_identity = re.sub(r"-0$", "-1", observed_parent_port_identity) if (name == parent_instance_name and hostname == parent_hostname and expected_port_data_set_obj.get_interface() == parent_interface) else observed_parent_port_identity
for observed_port_data_set in port_data_set:
if observed_port_data_set.get("name") == parent_instance_name and observed_port_data_set.get("hostname") == parent_hostname and parent_interface in observed_port_data_set:
expected_port_identity = observed_port_data_set.get(parent_interface)
@@ -662,7 +665,6 @@ class PTPVerifyConfigKeywords(BaseKeyword):
for observed_port_data_set_obj, expected_port_data_set_obj in zip(observed_port_data_set_objects, expected_port_data_set_objects):
interface = expected_port_data_set_obj.get_interface()
port_data_set_dict[interface] = observed_port_data_set_obj.get_port_identity()
port_data_set_list.append(port_data_set_dict)
return port_data_set_list

View File

@@ -20,7 +20,8 @@ class PortDataSet:
self.port_state = None
if "port_state" in expected_dict:
self.port_state = expected_dict["port_state"]
port_state = expected_dict["port_state"]
self.port_state = port_state if isinstance(port_state, list) else [port_state]
self.parent_port_identity = None
if "parent_port_identity" in expected_dict:
@@ -35,12 +36,12 @@ class PortDataSet:
"""
return self.interface
def get_port_state(self) -> str:
def get_port_state(self) -> list:
"""
Gets the port state.
Returns:
str: The port state.
list: The port state.
"""
return self.port_state

View File

@@ -1,3 +1,6 @@
import copy
from typing import Any, Dict, List, Optional, Tuple
import json5
from jinja2 import Template
@@ -46,3 +49,163 @@ class PTPSetupKeywords(BaseKeyword):
# Turn the JSON Data into a ptp_setup object.
ptp_setup = PTPSetup(json_data)
return ptp_setup
def filter_and_render_ptp_config(self, template_file_location: str, selected_instances: List[Tuple[str, str, List[str]]], custom_expected_dict_template: Optional[str] = None, expected_dict_overrides: Optional[Dict[str, Any]] = None) -> PTPSetup:
"""
Filters and renders a PTP configuration from a JSON5 template based on selected instances.
This function is useful for generating a partial configuration from a complete PTP setup,
based on specific instance names, hostnames, and interfaces. It also allows:
- Overriding expected_dict via a custom Jinja2 template string
- Applying deep overrides to specific values (e.g., changing `clock_class`)
Args:
template_file_location (str): Path to the JSON5 template file.
selected_instances (List[Tuple[str, str, List[str]]]):
List of tuples, where each tuple contains:
- ptp_instance_name (str)
- hostname (str)
- list of associated interface names (List[str])
custom_expected_dict_template (Optional[str]):
Jinja2-formatted string representing an expected_dict override. If provided,
it replaces the auto-filtered expected_dict.
expected_dict_overrides: Optional[Dict[str, Any]]:
A dictionary of specific overrides to apply on the generated or provided expected_dict.
Supports nested structure (e.g. overriding grandmaster_settings -> clock_class).
Returns:
PTPSetup: A PTPSetup object containing the filtered configuration.
Example:
filter_and_render_ptp_config(
template_file_location="resources/ptp/setup/ptp_setup_template.json5",
selected_instances=[("ptp4", "controller-1", ["ptp4if1"])],
expected_dict_overrides={
"ptp4l": [
{
"name": "ptp4",
"controller-1": {
"grandmaster_settings": {
"clock_class": 165
}
}
}
]
}
)
"""
# Load and render the JSON5 template
with open(template_file_location, "r") as template_file:
json5_template = template_file.read()
ptp_defaults_path = get_stx_resource_path("resources/ptp/ptp_default_status_values.json5")
with open(ptp_defaults_path, "r") as defaults_file:
ptp_defaults = json5.load(defaults_file)
ptp_config = ConfigurationManager.get_ptp_config()
render_context = ptp_config.get_all_hosts_dictionary()
render_context.update(ptp_defaults)
# Render main config template
rendered_config = Template(json5_template).render(render_context)
ptp_config_dict = json5.loads(rendered_config)
# Optionally render custom expected_dict
custom_expected_dict = None
if custom_expected_dict_template:
rendered_custom_expected = Template(custom_expected_dict_template).render(render_context)
custom_expected_dict = json5.loads(rendered_custom_expected)
filtered_json = {"ptp_instances": {"ptp4l": []}, "ptp_host_ifs": [], "expected_dict": {"ptp4l": []}}
ptp_selection = {}
all_required_ifaces = set()
for ptp_name, hostname, iface_list in selected_instances:
if ptp_name not in ptp_selection:
ptp_selection[ptp_name] = {}
ptp_selection[ptp_name][hostname] = iface_list
all_required_ifaces.update(iface_list)
# Filter ptp_instances.ptp4l
for instance in ptp_config_dict.get("ptp_instances", {}).get("ptp4l", []):
name = instance.get("name")
if name in ptp_selection:
hosts = ptp_selection[name]
selected_ifaces = [iface for iface_list in hosts.values() for iface in iface_list]
filtered_json["ptp_instances"]["ptp4l"].append(
{
"name": name,
"instance_hostnames": list(hosts.keys()),
"instance_parameters": instance.get("instance_parameters", ""),
"ptp_interface_names": selected_ifaces,
}
)
# Filter ptp_host_ifs
for iface in ptp_config_dict.get("ptp_host_ifs", []):
if iface.get("name") in all_required_ifaces:
filtered_json["ptp_host_ifs"].append(iface)
# Use custom expected_dict if provided
if custom_expected_dict:
filtered_json["expected_dict"]["ptp4l"] = custom_expected_dict.get("ptp4l", [])
return PTPSetup(filtered_json)
# Auto-generate expected_dict by filtering
for expected_instance in ptp_config_dict.get("expected_dict", {}).get("ptp4l", []):
name = expected_instance.get("name")
if name not in ptp_selection:
continue
filtered_instance = {"name": name}
for hostname, _ in ptp_selection[name].items():
instance_data = expected_instance.get(hostname)
if not instance_data:
continue
filtered_instance[hostname] = {key: instance_data.get(key) for key in ["parent_data_set", "time_properties_data_set", "grandmaster_settings", "port_data_set"]}
filtered_json["expected_dict"]["ptp4l"].append(filtered_instance)
# Apply single-value overrides (like clock_class)
if expected_dict_overrides:
for override in expected_dict_overrides.get("ptp4l", []):
override_name = override.get("name")
for inst in filtered_json["expected_dict"]["ptp4l"]:
if inst.get("name") == override_name:
for hostname, host_data in override.items():
if hostname == "name":
continue
inst.setdefault(hostname, {})
inst[hostname] = self.deep_merge(inst[hostname], host_data)
return PTPSetup(filtered_json)
def deep_merge(self, dest: Dict[str, Any], src: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively merges the contents of `src` into `dest`.
If both `dest` and `src` contain a value for the same key and both values are dictionaries,
they will be merged recursively. Otherwise, the value from `src` overrides the one in `dest`.
This is useful for applying nested configuration overrides without losing existing structure.
Args:
dest (Dict[str, Any]): The original dictionary to merge into.
src (Dict[str, Any]): The dictionary containing overriding or additional values.
Returns:
Dict[str, Any]: A new dictionary representing the merged result.
Example:
deep_merge({"ptp4l": [{"name": "ptp4", "controller-1": {"grandmaster_settings": {"clock_class": 165}}}]}}
"""
result = copy.deepcopy(dest)
for key, value in src.items():
if isinstance(value, dict) and isinstance(result.get(key), dict):
result[key] = self.deep_merge(result[key], value)
else:
result[key] = value
return result

View File

@@ -0,0 +1,100 @@
from config.configuration_manager import ConfigurationManager
from framework.ssh.prompt_response import PromptResponse
from keywords.base_keyword import BaseKeyword
from keywords.ptp.gnss_keywords import GnssKeywords
class SmaKeywords(BaseKeyword):
"""
Disabled and enable SMA using SSH connection.
Attributes:
ssh_connection: An instance of an SSH connection.
"""
def __init__(self, ssh_connection):
"""
Initializes the SmaKeywords with an SSH connection.
Args:
ssh_connection: An instance of an SSH connection.
"""
self.ssh_connection = ssh_connection
def disable_sma(self, hostname: str, nic: str) -> None:
"""
Disables the SMA output on the specified interface.
Args:
hostname (str): The name of the host.
nic (str): The name of the NIC.
Returns : None
"""
gnss_keywords = GnssKeywords()
# Normalize host name for PTP config access
normalized_hostname = hostname.replace("-", "_")
ptp_config = ConfigurationManager.get_ptp_config()
interface = ptp_config.get_host(normalized_hostname).get_nic(nic).get_base_port()
# Disable SMA1 pin
command = f"echo 0 1 > /sys/class/net/{interface}/device/ptp/ptp1/pins/SMA1"
# Setup expected prompts for password request and echo command
password_prompt = PromptResponse("Password:", ConfigurationManager.get_lab_config().get_admin_credentials().get_password())
root_cmd = PromptResponse("root@", command)
expected_prompts = [password_prompt, root_cmd]
# Run echo command to crash standby controller
self.ssh_connection.send_expect_prompts("sudo su", expected_prompts)
# Expected states for validation
expected_gnss_1pps_state = "invalid"
expected_pps_dpll_status = ["holdover"]
# Construct CGU location path
pci_address = gnss_keywords.get_pci_slot_name(hostname, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
# Validate GNSS 1PPS state and DPLL status
gnss_keywords.validate_gnss_1pps_state_and_pps_dpll_status(hostname, cgu_location, "SMA1", expected_gnss_1pps_state, expected_pps_dpll_status)
def enable_sma(self, hostname: str, nic: str) -> None:
"""
Enables the SMA output on the specified interface.
Args:
hostname (str): The name of the host.
nic (str): The name of the NIC.
Returns : None
"""
gnss_keywords = GnssKeywords()
# Normalize host name for PTP config access
normalized_hostname = hostname.replace("-", "_")
ptp_config = ConfigurationManager.get_ptp_config()
interface = ptp_config.get_host(normalized_hostname).get_nic(nic).get_base_port()
# Enable SMA1 pin
command = f"echo 1 1 > /sys/class/net/{interface}/device/ptp/ptp1/pins/SMA1"
# Setup expected prompts for password request and echo command
password_prompt = PromptResponse("Password:", ConfigurationManager.get_lab_config().get_admin_credentials().get_password())
root_cmd = PromptResponse("root@", command)
expected_prompts = [password_prompt, root_cmd]
# Run echo command to crash standby controller
self.ssh_connection.send_expect_prompts("sudo su", expected_prompts)
# Expected states for validation
expected_gnss_1pps_state = "valid"
expected_pps_dpll_status = ["locked_ho_acq"]
# Construct CGU location path
pci_address = gnss_keywords.get_pci_slot_name(hostname, interface)
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
# Validate GNSS 1PPS state and DPLL status
gnss_keywords.validate_gnss_1pps_state_and_pps_dpll_status(hostname, cgu_location, "SMA1", expected_gnss_1pps_state, expected_pps_dpll_status)

View File

@@ -283,7 +283,7 @@
"port_data_set": [
{
"interface" : "{{ compute_0.nic1.nic_connection.interface }}",
"port_state": "LISTENING"
"port_state": ["LISTENING", "FAULTY"]
}
]
}

View File

@@ -4,11 +4,18 @@ from pytest import mark
from framework.logging.automation_logger import get_logger
from framework.resources.resource_finder import get_stx_resource_path
from framework.validation.validation import validate_equals_with_retry
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.fault_management.alarms.objects.alarm_list_object import AlarmListObject
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.ptp_setup_executor_keywords import PTPSetupExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_teardown_executor_keywords import PTPTeardownExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_verify_config_keywords import PTPVerifyConfigKeywords
from keywords.files.file_keywords import FileKeywords
from keywords.linux.ip.ip_keywords import IPKeywords
from keywords.ptp.pmc.pmc_keywords import PMCKeywords
from keywords.ptp.setup.ptp_setup_reader import PTPSetupKeywords
from keywords.ptp.sma_keywords import SmaKeywords
@mark.p0
@@ -32,6 +39,7 @@ def test_delete_and_add_all_ptp_configuration():
@mark.p0
@mark.lab_has_compute
@mark.lab_has_ptp_configuration_compute
def test_delete_and_add_all_ptp_configuration_for_compute():
"""
Delete and Add all PTP configurations
@@ -49,8 +57,519 @@ def test_delete_and_add_all_ptp_configuration_for_compute():
ptp_setup_executor_keywords.add_all_ptp_configurations()
get_logger().log_info("Verify all PTP configuration")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ptp_setup_template_path)
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup = ptp_setup_keywords.generate_ptp_setup_from_template(ptp_setup_template_path)
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ptp_setup)
ptp_verify_config_keywords.verify_all_ptp_configurations()
@mark.p1
@mark.lab_has_compute
@mark.lab_has_ptp_configuration_compute
def test_ptp_operation_interface_down_and_up():
"""
Verify PTP operation and status change when an interface goes down and comes back up.
Test Steps:
- Bring down controller-0 NIC1.
- Verify that alarm "100.119" appears on controller-1.
- Wait for the PMC port states on controller-1.
- Verify the PMC data on controller-1.
- Bring up controller-0 NIC1.
- Verify that alarm "100.119" clears on controller-1.
- Wait for PMC port states on controller-1.
- Verify the PMC data on controller-1.
- Bring down controller-0 NIC2.
- Verify that alarm "100.119" appears on controller-1.
- Wait for PMC port states on controller-1.
- Verify the PMC data on controller-1.
- Bring up controller-0 NIC2.
- Verify that alarm "100.119" clears on controller-1.
- Wait for PMC port states on controller-1.
- Verify the PMC data on controller-1.
- Download the "/var/log/user.log" file from the active controller.
Notes:
- In this scenario, controller-0 NIC1 (configured with ptp1) is powered off.
Initially, ctrl0 NIC1 is in MASTER state, and ctrl1 NIC1 is in SLAVE state.
After powering off ctrl0 NIC1, ctrl1 NIC1 transitions to MASTER independently,
rather than remaining dependent on the peer controller.
- In this scenario, controller-0 NIC2 (configured with ptp3) is powered off.
Initially, ctrl0 NIC2 is in MASTER state, and ctrl1 NIC2 is in SLAVE state.
After powering off ctrl0 NIC2, ctrl1 NIC2 transitions to MASTER independently,
rather than remaining dependent on the peer controller.
GM Clock Class Examples:
- 67 → Primary reference (e.g., GNSS locked)
- 1314 → Grandmaster in holdover
- 5253 → Slave-only clocks
- 165 → Not synchronized / GNSS invalid
- 248 → Not used for synchronization
Accuracy Examples:
- 0x20 → ±25 ns (GNSS locked)
- 0x21 → ±100 ns
- 0x22 → ±250 ns
- 0xFE → Unknown (not traceable)
- 0xFF → Reserved / Invalid
Offset Scaled Log Variance Examples:
- 0xFFFF → Unknown or unspecified
- e.g., 0x0100 → Valid stability/variance info
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
ip_keywords = IPKeywords(ssh_connection)
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
get_logger().log_info("Verify PTP operation and the corresponding status change when an interface goes down")
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp1 with controller-0), only the required instances that need to be verified are included.
# Unnecessary entries in instance_hostnames and ptp_interface_names—those not relevant to the verification—are
# removed when compared to the original ptp_configuration_expectation_compute.json5 file.
# The ptp1if1 interface is used to retrieve the interface name for the down operation.
ctrl0_nic1_iface_down_ptp_selection = [("ptp1", "controller-1", ["ptp1if1"])]
ctrl0_nic1_iface_down_exp_dict = """{
"ptp4l": [
{
"name": "ptp1",
"controller-1": {
"parent_data_set": {
"gm_clock_class": 165, // The GM clock loses its connection to the Primary Reference Time Clock
"gm_clock_accuracy": "0xfe", // Unknown
"gm_offset_scaled_log_variance": "0xffff" // Unknown stability
},
"time_properties_data_set": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0, // The offset is not currently valid
"time_traceable": 0, // Time is not traceable to UTC
"frequency_traceable": 0 // Frequency is not traceable to a known source.
},
"grandmaster_settings": {
"clock_class": 165, // The GM clock loses its connection to the Primary Reference Time Clock
"clock_accuracy": "0xfe", // Unknown
"offset_scaled_log_variance": "0xffff", // Unknown or unspecified stability
"time_traceable": 0, // Time is not traceable — the clock may be in holdover, unsynchronized, or degraded.
"frequency_traceable": 0, // Frequency is not traceable — may be in holdover or unsynchronized
"time_source": "0xa0",
"current_utc_offset_valid": 0 // UTC offset is not valid
},
"port_data_set": [
{
"interface": "{{ controller_1.nic1.nic_connection.interface }}",
"port_state": "MASTER", // A master port will send PTP sync messages to other device
// its own master instead of using the remote
"parent_port_identity": {
"name": "ptp1",
"hostname": "controller-1",
"interface": "{{ controller_1.nic1.nic_connection.interface }}"
}
},
{
"interface": "{{ controller_1.nic1.conn_to_proxmox }}",
"port_state": "MASTER" // A master port will send PTP sync messages to other device
}
]
}
}
]
}"""
ctrl0_nic1_iface_down_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic1_iface_down_ptp_selection, ctrl0_nic1_iface_down_exp_dict)
get_logger().log_info("Interface down Controller-0 NIC1.")
interfaces = ctrl0_nic1_iface_down_ptp_setup.get_ptp4l_setup("ptp1").get_ptp_interface("ptp1if1").get_interfaces_for_hostname("controller-0")
if not interfaces:
raise Exception("No interfaces found for controller-0 NIC1")
ctrl0_nic1_interface = interfaces[0]
ip_keywords.set_ip_port_state(ctrl0_nic1_interface, "down")
get_logger().log_info(f"Waiting for 100.119 alarm to appear after interface {ctrl0_nic1_interface} goes down.")
not_locked_alarm_obj = AlarmListObject()
not_locked_alarm_obj.set_alarm_id("100.119")
not_locked_alarm_obj.set_reason_text("controller-1 is not locked to remote PTP Grand Master")
not_locked_alarm_obj.set_entity_id("host=controller-1.instance=ptp1.ptp=no-lock")
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic1_interface} goes down.")
wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["MASTER", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic1_interface} goes down.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_iface_down_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp1 with controller-0 and controller-1), only the required instances that need to be verified
# are included. Unnecessary entries in instance_hostnames and ptp_interface_names—those not relevant to the verification—are
# removed when compared to the original ptp_configuration_expectation_compute.json5 file.
ctrl0_nic1_iface_up_ptp_selection = [("ptp1", "controller-0", []), ("ptp1", "controller-1", [])]
ctrl0_nic1_iface_up_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic1_iface_up_ptp_selection)
get_logger().log_info("Interface up Controller-0 NIC1.")
ip_keywords.set_ip_port_state(ctrl0_nic1_interface, "up")
get_logger().log_info(f"Waiting for alarm 100.119 to clear after interface {ctrl0_nic1_interface} comes up.")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic1_interface} comes up.")
wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["SLAVE", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic1_interface} comes up.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_iface_up_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp4 with controller-1), only the required instances that need to be verified are included.
# Unnecessary entries in instance_hostnames and ptp_interface_names—those not relevant to the verification—are
# removed when compared to the original ptp_configuration_expectation_compute.json5 file.
ctrl0_nic2_iface_down_ptp_selection = [("ptp4", "controller-1", [])]
ctrl0_nic2_iface_down_exp_dict = """{
"ptp4l": [
{
"name": "ptp4",
"controller-1" : {
"parent_data_set" : {
"gm_clock_class" : 165, // The GM clock loses its connection to the Primary Reference Time Clock
"gm_clock_accuracy" : "0xfe", // Unknown
"gm_offset_scaled_log_variance" : "0xffff" // Unknown stability
},
"time_properties_data_set": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0, // The offset is not currently valid
"time_traceable": 0, // Time is not traceable — the clock may be in holdover, unsynchronized, or degraded.
"frequency_traceable": 0 // Frequency is not traceable
},
"grandmaster_settings": {
"clock_class": 165, // The GM clock loses its connection to the Primary Reference Time Clock
"clock_accuracy": "0xfe", // Unknown
"offset_scaled_log_variance": "0xffff", // Unknown or unspecified stability
"time_traceable": 0, // Time is not traceable — the clock may be in holdover, unsynchronized, or degraded.
"frequency_traceable": 0, // Frequency is not traceable — may be in holdover or unsynchronized
"time_source": "0xa0",
"current_utc_offset_valid": 0 // The offset is not currently valid
},
"port_data_set": [
{
"interface": "{{ controller_1.nic2.nic_connection.interface }}",
"port_state": "MASTER", // A master port will send PTP sync messages to other device
// becomes its own master instead of using the remote.
"parent_port_identity" : {
"name": "ptp4",
"hostname":"controller-1",
"interface": "{{ controller_1.nic2.nic_connection.interface }}"
},
},
{
"interface": "{{ controller_1.nic2.conn_to_proxmox }}",
"port_state": "MASTER" // A master port will send PTP sync messages to other device
}
]
}
}
]
}
"""
ctrl0_nic2_iface_down_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic2_iface_down_ptp_selection, ctrl0_nic2_iface_down_exp_dict)
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp3 with controller-0 and ptp4 with controller-1), only the required instances that need
# to be verified are included. Unnecessary entries in instance_hostnames and ptp_interface_names—those not
# relevant to the verification—are removed when compared to the original ptp_configuration_expectation_compute.json5 file.
# The ptp3if1 interface is used to retrieve the interface name for the down operation.
ctrl0_nic2_iface_up_ptp_selection = [("ptp3", "controller-0", ["ptp3if1"]), ("ptp4", "controller-1", [])]
ctrl0_nic2_iface_up_exp_dict_overrides = {"ptp4l": [{"name": "ptp4", "controller-1": {"grandmaster_settings": {"clock_class": 165}}}]}
ctrl0_nic2_iface_up_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic2_iface_up_ptp_selection, expected_dict_overrides=ctrl0_nic2_iface_up_exp_dict_overrides)
get_logger().log_info("Interface down Controller-0 NIC2.")
interfaces = ctrl0_nic2_iface_up_ptp_setup.get_ptp4l_setup("ptp3").get_ptp_interface("ptp3if1").get_interfaces_for_hostname("controller-0")
if not interfaces:
raise Exception("No interfaces found for controller-0 NIC2")
ctrl0_nic2_interface = interfaces[0]
ip_keywords.set_ip_port_state(ctrl0_nic2_interface, "down")
get_logger().log_info(f"Waiting for 100.119 alarm to appear after interface {ctrl0_nic2_interface} goes down.")
not_locked_alarm_obj = AlarmListObject()
not_locked_alarm_obj.set_alarm_id("100.119")
not_locked_alarm_obj.set_reason_text("controller-1 is not locked to remote PTP Grand Master")
not_locked_alarm_obj.set_entity_id("host=controller-1.instance=ptp4.ptp=no-lock")
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic2_interface} goes down.")
wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["MASTER", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic2_interface} goes down.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_iface_down_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
get_logger().log_info("Interface up Controller-0 NIC2.")
ip_keywords.set_ip_port_state(ctrl0_nic2_interface, "up")
get_logger().log_info(f"Waiting for alarm 100.119 to clear after interface {ctrl0_nic2_interface} comes up.")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic2_interface} comes up.")
wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["SLAVE", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic2_interface} comes up.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_iface_up_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
get_logger().log_info("Downloading /var/log/user.log for reference.")
local_file_path = os.path.join(get_logger().get_test_case_log_dir(), "user.log")
FileKeywords(ssh_connection).download_file("/var/log/user.log", local_file_path)
@mark.p1
@mark.lab_has_compute
@mark.lab_has_ptp_configuration_compute
def test_ptp_operation_sma_disabled_and_enable():
"""
Verify PTP operation and status changes when the SMA is disabled and then re-enabled.
Test Steps:
- Disable SMA1 on Controller-0 NIC2.
- Wait for 100.119 to alarms to appear.
- Wait for clock class to appear in grandmaster settings.
- Verify PTP PMC values.
- Enable SMA1 on Controller-0 NIC2.
- Wait for 100.119 to alarm to clear.
- Wait for clock class to appear in grandmaster settings.
- Verify PTP PMC values.
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
get_logger().log_info("Verifying PTP operation and corresponding status changes when SMA is disabled.")
# This template is derived from the reference file ptp_configuration_expectation_compute.json5 and
# should maintain consistency in structure. Only the expected_dict section is intended to change in
# response to different PTP operation scenarios.
# In ptp4l (e.g., ptp3 with controller-0 and ptp4 with controller-1), only the required instances that need
# to be verified are included. Unnecessary entries in instance_hostnames and ptp_interface_names—those not
# relevant to the verification—are removed when compared to the original ptp_configuration_expectation_compute.json5 file.
ctrl0_nic2_sma1_disable_ptp_selection = [("ptp3", "controller-0", []), ("ptp4", "controller-1", [])]
ctrl0_nic2_sma1_disable_exp_dict = """{
"ptp4l": [
{
"name": "ptp3",
"controller-0": {
"parent_data_set" : {
"gm_clock_class" : 7, // clock is a valid time reference, but not the highest possible quality
"gm_clock_accuracy" : "0xfe", // Unknown
"gm_offset_scaled_log_variance" : "0xffff" // Unknown stability
},
"time_properties_data_set": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0, // The offset is not currently valid
"time_traceable": 1, // clocks time can be traced back to a valid time reference
"frequency_traceable": 1 // Frequency of the clock is traceable to a stable
},
"grandmaster_settings": {
"clock_class": 7,
"clock_accuracy": "0xfe", // Unknown
"offset_scaled_log_variance": "0xffff", // Unknown or unspecified stability
"time_traceable": 1, // clocks time can be traced back to a valid time reference
"frequency_traceable": 1, // Frequency of the clock is traceable to a stable
"time_source": "0xa0",
"current_utc_offset_valid": 0 // The offset is not currently valid
},
"port_data_set": [
{
"interface": "{{ controller_0.nic2.nic_connection.interface }}", // ctrl0 NIC2 is MASTER and ctr1 NIC2 is SLAVE
"port_state": "MASTER" // A master port will send PTP sync messages to other device
},
{
"interface": "{{ controller_0.nic2.conn_to_proxmox }}",
"port_state": "MASTER" // A master port will send PTP sync messages to other device
}
]
}
},
{
"name": "ptp4",
"controller-1": {
"parent_data_set" : {
"gm_clock_class" : 7, // clock is a valid time reference, but not the highest possible quality
"gm_clock_accuracy" : "0xfe", // Unknown
"gm_offset_scaled_log_variance" : "0xffff" // Unknown stability
},
"time_properties_data_set": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0, // The offset is not currently valid
"time_traceable": 1, // clocks time can be traced back to a valid time reference
"frequency_traceable": 1 // Frequency of the clock is traceable to a stable
},
"grandmaster_settings": {
"clock_class": 165, // The GM clock loses its connection to the Primary Reference Time Clock
"clock_accuracy": "0xfe", // Unknown
"offset_scaled_log_variance": "0xffff", // Unknown or unspecified stability
"time_traceable": 0, // Time is not traceable — the clock may be in holdover, unsynchronized, or degraded.
"frequency_traceable": 0, // Frequency is not traceable — may be in holdover or unsynchronized
"time_source": "0xa0",
"current_utc_offset_valid": 0 // The offset is not currently valid
},
"port_data_set": [
{
"interface": "{{ controller_1.nic2.nic_connection.interface }}",
"port_state": "SLAVE", // The slave port is synchronizing to the master port's time
"parent_port_identity": {
"name": "ptp3",
"hostname": "controller-0",
"interface": "{{ controller_0.nic2.nic_connection.interface }}" // ctrl-0 NIC2 is Master and ctrl-1 NIC2 is slave
}
},
{
"interface": "{{ controller_1.nic2.conn_to_proxmox }}",
"port_state": "MASTER" // A master port will send PTP sync messages to other device
}
]
}
}
]
}
"""
ctrl0_nic2_sma1_disable_exp_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic2_sma1_disable_ptp_selection, ctrl0_nic2_sma1_disable_exp_dict)
get_logger().log_info("Disabled SMA1 for Controller-0 NIC2.")
sma_keywords = SmaKeywords(ssh_connection)
sma_keywords.disable_sma("controller-0", "nic2")
get_logger().log_info("Waiting for alarm 100.119 to appear after SMA is disabled.")
not_locked_alarm_obj = AlarmListObject()
not_locked_alarm_obj.set_alarm_id("100.119")
not_locked_alarm_obj.set_reason_text("controller-0 is not locked to remote PTP Grand Master")
not_locked_alarm_obj.set_entity_id("host=controller-0.instance=ptp3.ptp=no-lock")
signal_loss_alarm_obj = AlarmListObject()
signal_loss_alarm_obj.set_alarm_id("100.119")
signal_loss_alarm_obj.set_reason_text("controller-0 1PPS signal loss state: holdover")
signal_loss_alarm_obj.set_entity_id("host=controller-0.interface=enp138s0f0.ptp=1PPS-signal-loss")
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj, signal_loss_alarm_obj])
get_logger().log_info("Waiting for clock class after SMA1 is disabled")
wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 7)
get_logger().log_info("Verifying PMC data after SMA1 is disabled")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_sma1_disable_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
get_logger().log_info("Verifying PTP operation and corresponding status changes when SMA is enabled.")
ctrl0_nic2_sma1_enable_ptp_selection = [("ptp3", "controller-0", []), ("ptp4", "controller-1", [])]
ctrl0_nic2_sma1_enable_exp_dict_overrides = {"ptp4l": [{"name": "ptp4", "controller-1": {"grandmaster_settings": {"clock_class": 165}}}]}
ctrl0_nic2_sma1_enable_exp_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, ctrl0_nic2_sma1_enable_ptp_selection, expected_dict_overrides=ctrl0_nic2_sma1_enable_exp_dict_overrides)
sma_keywords.enable_sma("controller-0", "nic2")
get_logger().log_info("Waiting for 100.119 alarm to clear after SMA1 is enabled")
alarm_list_object = AlarmListObject()
alarm_list_object.set_alarm_id("100.119")
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([alarm_list_object])
get_logger().log_info("Waiting for clock class after SMA1 is enabled")
wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
get_logger().log_info("Verifying PMC data after SMA1 is enabled")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_sma1_enable_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values()
def wait_for_port_state_appear_in_port_data_set(name: str, hostname: str, expected_port_states: list[str]) -> None:
"""
Waits until the port states observed in the port data set match the expected states, or times out.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
expected_port_states (list[str]): List of expected port states to wait for.
Raises:
Exception: If expected port states do not appear within the timeout.
"""
def check_port_state_in_port_data_set(name: str, hostname: str) -> list[str]:
"""
Checks whether the observed port states from the port data set match the expected port states.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
list[str]: List of expected port states.
"""
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
observed_states = [obj.get_port_state() for obj in pmc_keywords.pmc_get_port_data_set(config_file, socket_file).get_pmc_get_port_data_set_objects()]
return observed_states
validate_equals_with_retry(lambda: check_port_state_in_port_data_set(name, hostname), expected_port_states, "port state in port data set", 120, 30)
def wait_for_clock_class_appear_in_grandmaster_settings_np(name: str, hostname: str, expected_clock_class: int) -> None:
"""
Waits until the clock class observed in the grandmaster settings np match the expected clock class, or times out.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
expected_clock_class (int): expected clock class to wait for.
Raises:
Exception: If expected clock class do not appear within the timeout.
"""
def get_clock_class_in_grandmaster_settings_np(name: str, hostname: str) -> int:
"""
Get the observed clock class from the grandmaster settings np.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
int: observed clock class.
"""
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
get_grandmaster_settings_np_object = pmc_keywords.pmc_get_grandmaster_settings_np(config_file, socket_file).get_pmc_get_grandmaster_settings_np_object()
observed_clock_class = get_grandmaster_settings_np_object.get_clock_class()
return observed_clock_class
validate_equals_with_retry(lambda: get_clock_class_in_grandmaster_settings_np(name, hostname), expected_clock_class, "clock class in grandmaster settings np", 120, 30)

View File

@@ -33,6 +33,7 @@ markers=
lab_has_compute: mark tests that require at least one compute node
subcloud_lab_has_compute: mark tests that require at least one subcloud containing at least one compute node
lab_has_secondary_system_controller: mark tests that require a secondary system controller
lab_has_ptp_configuration_compute: mark tests that requred ptp_configuration_expectation_compute.json5
#TODO: add 'lab_has_bmc_ipmi', 'lab_has_bmc_redfish', 'lab_has_bmc_dynamic', and 'lab_bmc_sensor'