Merge "Create keywords to check and verify PTP - Part 2"

This commit is contained in:
Zuul
2025-05-05 16:16:53 +00:00
committed by Gerrit Code Review
14 changed files with 871 additions and 157 deletions

View File

@@ -47,11 +47,16 @@ class SystemPTPInterfaceListOutput:
Returns the ptp interface parameters for the specified name
Args:
ptp_interface_obj (obj): PTP interface object
ptp_interface_obj (SystemPTPInterfaceListObject): PTP interface object
Returns:
str: ptp interface parameters
Example : "cmdline_opts='-s xxxx -O -37 -m' boundary_clock_jbod=1 domainNumber=24"
"""
return repr(" ".join(ptp_interface_obj.get_parameters()))
parameters = ptp_interface_obj.get_parameters()
if not parameters:
return ""
return repr(" ".join(parameters))

View File

@@ -1,16 +1,17 @@
import re
from typing import Any
from typing import Any, Dict
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals
from framework.validation.validation import validate_equals, validate_equals_with_retry, validate_list_contains
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.linux.systemctl.systemctl_status_keywords import SystemCTLStatusKeywords
from keywords.ptp.cat.cat_clock_conf_keywords import CatClockConfKeywords
from keywords.ptp.cat.cat_ptp_cgu_keywords import CatPtpCguKeywords
from keywords.ptp.cat.cat_ptp_config_keywords import CatPtpConfigKeywords
from keywords.ptp.gnss_keywords import GnssKeywords
from keywords.ptp.pmc.pmc_keywords import PMCKeywords
from keywords.ptp.setup.object.ptp_host_interface_setup import PTPHostInterfaceSetup
from keywords.ptp.setup.ptp_setup_reader import PTPSetupKeywords
@@ -39,6 +40,12 @@ class PTPVerifyConfigKeywords(BaseKeyword):
self.ts2phc_setup_list = ptp_setup.get_ts2phc_setup_list()
self.clock_setup_list = ptp_setup.get_clock_setup_list()
self.ptp4l_expected_list_objects = ptp_setup.get_expected_ptp4l_list()
self.expected_parent_data_set_object = ptp_setup.get_parent_data_set()
self.expected_time_properties_data_set_object = ptp_setup.get_time_properties_data_set()
self.expected_grandmaster_settings_tbc_object = ptp_setup.get_grandmaster_settings_tbc()
self.expected_grandmaster_settings_tgm_object = ptp_setup.get_grandmaster_settings_tgm()
self.ctrl0_hostname = "controller-0"
self.ctrl1_hostname = "controller-1"
self.comp0_hostname = "compute-0"
@@ -59,6 +66,8 @@ class PTPVerifyConfigKeywords(BaseKeyword):
self.verify_ptp_config_file_content()
validate_equals_with_retry(self.no_alarms, True, "Validate that no alarms on the system", 300)
def verify_gnss_status(self) -> None:
"""
verify GNSS status
@@ -77,23 +86,14 @@ class PTPVerifyConfigKeywords(BaseKeyword):
continue
for ptp_host_if in ptp_host_ifs:
controller_0_interfaces = ptp_host_if.get_controller_0_interfaces()
for interface in controller_0_interfaces:
if not interface:
continue
self.validate_gnss_status_on_hostname(self.ctrl0_hostname, interface, expected_gnss_port)
controller_1_interfaces = ptp_host_if.get_controller_1_interfaces()
for interface in controller_1_interfaces:
if not interface:
continue
self.validate_gnss_status_on_hostname(self.ctrl1_hostname, interface, expected_gnss_port)
compute_0_interfaces = ptp_host_if.get_compute_0_interfaces()
for interface in compute_0_interfaces:
if not interface:
continue
self.validate_gnss_status_on_hostname(self.comp0_hostname, interface, expected_gnss_port)
for hostname, get_interfaces in [
(self.ctrl0_hostname, ptp_host_if.get_controller_0_interfaces),
(self.ctrl1_hostname, ptp_host_if.get_controller_1_interfaces),
(self.comp0_hostname, ptp_host_if.get_compute_0_interfaces),
]:
for interface in get_interfaces():
if interface:
self.validate_gnss_status_on_hostname(hostname, interface, expected_gnss_port)
def verify_sma_status(self) -> None:
"""
@@ -108,23 +108,14 @@ class PTPVerifyConfigKeywords(BaseKeyword):
ptp_interface_parameters = ptp_host_if.get_ptp_interface_parameter()
if "input" in ptp_interface_parameters:
controller_0_interfaces = ptp_host_if.get_controller_0_interfaces()
for interface in controller_0_interfaces:
if not interface:
continue
self.validate_sma_status_on_hostname(self.ctrl0_hostname, interface)
controller_1_interfaces = ptp_host_if.get_controller_1_interfaces()
for interface in controller_1_interfaces:
if not interface:
continue
self.validate_sma_status_on_hostname(self.ctrl1_hostname, interface)
compute_0_interfaces = ptp_host_if.get_compute_0_interfaces()
for interface in compute_0_interfaces:
if not interface:
continue
self.validate_sma_status_on_hostname(self.comp0_hostname, interface)
for hostname, get_interfaces in [
(self.ctrl0_hostname, ptp_host_if.get_controller_0_interfaces),
(self.ctrl1_hostname, ptp_host_if.get_controller_1_interfaces),
(self.comp0_hostname, ptp_host_if.get_compute_0_interfaces),
]:
for interface in get_interfaces():
if interface:
self.validate_sma_status_on_hostname(hostname, interface)
def verify_systemctl_status(self) -> None:
"""
@@ -134,30 +125,22 @@ class PTPVerifyConfigKeywords(BaseKeyword):
"""
systemctl_status_Keywords = SystemCTLStatusKeywords(self.ssh_connection)
for ptp4l_instance_obj in self.ptp4l_setup_list:
name = ptp4l_instance_obj.get_name()
service_name = f"ptp4l@{name}.service"
for service_type, setup_list in [
("ptp4l", self.ptp4l_setup_list),
("phc2sys", self.phc2sys_setup_list),
("ts2phc", self.ts2phc_setup_list),
]:
for instance_obj in setup_list:
name = instance_obj.get_name()
service_name = f"{service_type}@{name}.service"
hostnames = instance_obj.get_instance_hostnames()
instance_parameters = instance_obj.get_instance_parameters()
hostnames = ptp4l_instance_obj.get_instance_hostnames()
for hostname in hostnames:
systemctl_status_Keywords.verify_status_on_hostname(hostname, name, service_name)
for phc2sys_instance_obj in self.phc2sys_setup_list:
name = phc2sys_instance_obj.get_name()
service_name = f"phc2sys@{name}.service"
hostnames = phc2sys_instance_obj.get_instance_hostnames()
instance_parameters = phc2sys_instance_obj.get_instance_parameters()
for hostname in hostnames:
systemctl_status_Keywords.verify_ptp_status_and_instance_parameters_on_hostname(hostname, name, service_name, instance_parameters)
for ts2phc_instance_obj in self.ts2phc_setup_list:
name = ts2phc_instance_obj.get_name()
service_name = f"ts2phc@{name}.service"
hostnames = ts2phc_instance_obj.get_instance_hostnames()
for hostname in hostnames:
systemctl_status_Keywords.verify_status_on_hostname(hostname, name, service_name)
for hostname in hostnames:
if service_type == "phc2sys":
systemctl_status_Keywords.verify_ptp_status_and_instance_parameters_on_hostname(hostname, name, service_name, instance_parameters)
else:
systemctl_status_Keywords.verify_status_on_hostname(hostname, name, service_name)
def verify_ptp_config_file_content(self) -> None:
"""
@@ -165,17 +148,19 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Returns: None
"""
for ptp4l_instance_obj in self.ptp4l_setup_list:
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{ptp4l_instance_obj.get_name()}.conf"
hostnames = ptp4l_instance_obj.get_instance_hostnames()
for hostname in hostnames:
self.validate_ptp_config_file_content(ptp4l_instance_obj, hostname, config_file)
for ts2phc_instance_obj in self.ts2phc_setup_list:
config_file = f"/etc/linuxptp/ptpinstance/ts2phc-{ts2phc_instance_obj.get_name()}.conf"
hostnames = ts2phc_instance_obj.get_instance_hostnames()
for hostname in hostnames:
self.validate_ptp_config_file_content(ts2phc_instance_obj, hostname, config_file)
for service_type, setup_list in [
("ptp4l", self.ptp4l_setup_list),
("ts2phc", self.ts2phc_setup_list),
("clock", self.clock_setup_list),
]:
for instance_obj in setup_list:
config_file = f"/etc/linuxptp/ptpinstance/{service_type}-{instance_obj.get_name()}.conf" if service_type != "clock" else "/etc/linuxptp/ptpinstance/clock-conf.conf"
hostnames = instance_obj.get_instance_hostnames()
for hostname in hostnames:
if service_type == "clock":
self.validate_ptp_config_file_content_for_clock(instance_obj, hostname, config_file)
else:
self.validate_ptp_config_file_content(instance_obj, hostname, config_file)
def verify_ptp_pmc_values(self) -> None:
"""
@@ -183,6 +168,7 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Returns: None
"""
port_data_set = self.get_port_data_set_using_interface_and_port_identity_mapping()
for ptp4l_instance_obj in self.ptp4l_setup_list:
name = ptp4l_instance_obj.get_name()
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
@@ -190,14 +176,14 @@ class PTPVerifyConfigKeywords(BaseKeyword):
hostnames = ptp4l_instance_obj.get_instance_hostnames()
instance_parameters = ptp4l_instance_obj.get_instance_parameters()
ptp_role = ptp4l_instance_obj.get_ptp_role()
ptp_role = next((obj.get_ptp_role() for obj in self.ptp4l_expected_list_objects if obj.get_name() == name), None)
for hostname in hostnames:
self.validate_port_data_set(hostname, config_file, socket_file, expected_port_state="MASTER")
self.validate_port_data_set(hostname, name, config_file, socket_file)
self.validate_get_domain(hostname, instance_parameters, config_file, socket_file)
self.validate_parent_data_set(hostname, instance_parameters, config_file, socket_file)
self.validate_parent_data_set(hostname, name, port_data_set, config_file, socket_file)
self.validate_time_properties_data_set(hostname, config_file, socket_file)
@@ -369,31 +355,90 @@ class PTPVerifyConfigKeywords(BaseKeyword):
observed_tx_timestamp_timeout = get_pmc_get_default_data_set_object.get_tx_timestamp_timeout()
validate_equals(observed_tx_timestamp_timeout, expected_tx_timestamp_timeout, "tx_timestamp_timeout value within PTP config file content")
get_associated_interfaces = list(
map(
lambda ptp_host_if: ptp_host_if.get_controller_0_interfaces() if hostname == "controller-0" else ptp_host_if.get_controller_1_interfaces() if hostname == "controller-1" else ptp_host_if.get_compute_0_interfaces() if hostname == "compute-0" else [],
ptp_instance_obj.get_ptp_interfaces(),
)
)
expected_associated_interfaces = sum(get_associated_interfaces, [])
interfaces_getter = {
"controller-0": lambda x: x.get_controller_0_interfaces(),
"controller-1": lambda x: x.get_controller_1_interfaces(),
"compute-0": lambda x: x.get_compute_0_interfaces(),
}.get(hostname, lambda x: [])
expected_associated_interfaces = [interface for ptp_host_if in ptp_instance_obj.get_ptp_interfaces() for interface in interfaces_getter(ptp_host_if) if interface] # Avoid empty interface names
observed_associated_interfaces = cat_ptp_config_output.get_associated_interfaces()
validate_equals(observed_associated_interfaces, expected_associated_interfaces, "Associated interfaces within PTP config file content")
def validate_ptp_config_file_content_for_clock(
self,
ptp_instance_obj: Any,
hostname: str,
config_file: str,
) -> None:
"""
Validates the ptp config file content for clock.
Args:
ptp_instance_obj (Any) : PTP instance setup object
hostname (str): The name of the host.
config_file (str): the config file.
Returns: None
Raises:
Exception: raised when validate fails
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
cat_ptp_config_keywords = CatClockConfKeywords(ssh_connection)
cat_ptp_config_output = cat_ptp_config_keywords.cat_clock_conf(config_file)
get_clock_conf_objects = cat_ptp_config_output.get_clock_conf_objects()
expected_clock_config = []
interfaces_getter = {
"controller-0": lambda x: x.get_controller_0_interfaces(),
"controller-1": lambda x: x.get_controller_1_interfaces(),
"compute-0": lambda x: x.get_compute_0_interfaces(),
}.get(hostname, lambda x: [])
for ptp_host_if in ptp_instance_obj.get_ptp_interfaces():
interfaces = interfaces_getter(ptp_host_if)
if interfaces:
expected_clock_config.append(
{
"ifname": ", ".join(interfaces),
"ptp_interface_parameter": ptp_host_if.get_ptp_interface_parameter(),
}
)
for index, clock_conf_obj in enumerate(get_clock_conf_objects):
observed_ifname = clock_conf_obj.get_ifname()
observed_sma_name = clock_conf_obj.get_sma_name()
observed_sma_mode = clock_conf_obj.get_sma_mode()
if index >= len(expected_clock_config):
raise Exception("Observed clock index is greater than expected clock list index")
expected_ifname = expected_clock_config[index].get("ifname")
expected_ptp_interface_parameter = expected_clock_config[index].get("ptp_interface_parameter")
validate_equals(observed_ifname, expected_ifname, "ifname value within PTP config file content for clock-conf.conf")
validate_list_contains(observed_sma_name, expected_ptp_interface_parameter, "sma name value within PTP config file content for clock-conf.conf")
validate_list_contains(observed_sma_mode, expected_ptp_interface_parameter, "sma mode value within PTP config file content for clock-conf.conf")
def validate_port_data_set(
self,
hostname: str,
name: str,
config_file: str,
socket_file: str,
expected_port_state: str,
) -> None:
"""
Validates the get port data set.
Args:
hostname (str): The name of the host.
name (str): The ptp instance name
config_file (str): the config file.
socket_file (str): the socket file.
expected_port_state (str): The current state of the port (e.g., MASTER, SLAVE, PASSIVE, LISTENING)
Returns: None
@@ -404,12 +449,28 @@ class PTPVerifyConfigKeywords(BaseKeyword):
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
get_port_data_set_output = pmc_keywords.pmc_get_port_data_set(config_file, socket_file)
get_pmc_get_port_data_set_object = get_port_data_set_output.get_pmc_get_port_data_set_object()
observed_port_identity = get_pmc_get_port_data_set_object.get_port_identity()
observed_port_state = get_pmc_get_port_data_set_object.get_port_state()
for ptp4l_expected_object in self.ptp4l_expected_list_objects:
if ptp4l_expected_object.get_name() == name:
port_data_set_getter = {
"controller-0": ptp4l_expected_object.get_controller_0_port_data_set,
"controller-1": ptp4l_expected_object.get_controller_1_port_data_set,
"compute-0": ptp4l_expected_object.get_compute_0_port_data_set,
}.get(hostname)
break
validate_equals(observed_port_state, expected_port_state, "portState value within GET PORT_DATA_SET")
expected_port_data_set_objects = port_data_set_getter() if port_data_set_getter else None
get_port_data_set_output = pmc_keywords.pmc_get_port_data_set(config_file, socket_file)
get_pmc_get_port_data_set_objects = get_port_data_set_output.get_pmc_get_port_data_set_objects()
for index, get_pmc_get_port_data_set_object in enumerate(get_pmc_get_port_data_set_objects):
if index >= len(expected_port_data_set_objects):
raise Exception("Observed port data set index is greater than expected port data set objects index")
expected_port_state = expected_port_data_set_objects[index].get_port_state()
observed_port_state = get_pmc_get_port_data_set_object.get_port_state()
validate_equals(observed_port_state, expected_port_state, "portState value within GET PORT_DATA_SET")
def validate_get_domain(
self,
@@ -447,7 +508,8 @@ class PTPVerifyConfigKeywords(BaseKeyword):
def validate_parent_data_set(
self,
hostname: str,
instance_parameters: str,
name: str,
port_data_set: Dict,
config_file: str,
socket_file: str,
) -> None:
@@ -456,7 +518,8 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Args:
hostname (str): The name of the host.
instance_parameters (str): instance parameters
name (str): The ptp instance name
port_data_set (Dict): port data set using interface and port indentity mapping
config_file (str): the config file.
socket_file (str): the socket file.
@@ -470,27 +533,50 @@ class PTPVerifyConfigKeywords(BaseKeyword):
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
# gm.ClockClass value is always 6 if it is in a good status.
expected_gm_clock_class = 6
# gm.ClockAccuracy and gm.OffsetScaledLogVariance values can be overwritten using instance parameters,
# but for now, the default values are in use
expected_gm_clock_accuracy = "0x20"
expected_gm_offset_scaled_log_variance = "0x4e5d"
parameters = self.parse_instance_parameters_string(instance_parameters)
expected_grandmaster_priority2 = parameters.get("priority2")
expected_gm_clock_class = self.expected_parent_data_set_object.get_gm_clock_class()
expected_gm_clock_accuracy = self.expected_parent_data_set_object.get_gm_clock_accuracy()
expected_gm_offset_scaled_log_variance = self.expected_parent_data_set_object.get_gm_offset_scaled_log_variance()
get_parent_data_set_output = pmc_keywords.pmc_get_parent_data_set(config_file, socket_file)
get_parent_data_set_object = get_parent_data_set_output.get_pmc_get_parent_data_set_object()
observed_parent_port_identity = get_parent_data_set_object.get_parent_port_identity()
observed_gm_clock_class = get_parent_data_set_object.get_gm_clock_class()
observed_gm_clock_accuracy = get_parent_data_set_object.get_gm_clock_accuracy()
observed_gm_offset_scaled_log_variance = get_parent_data_set_object.get_gm_offset_scaled_log_variance()
observed_grandmaster_priority2 = get_parent_data_set_object.get_grandmaster_priority2()
validate_equals(observed_gm_clock_class, expected_gm_clock_class, "gm.ClockClass value within GET PARENT_DATA_SET")
validate_equals(observed_gm_clock_accuracy, expected_gm_clock_accuracy, "gm.ClockAccuracy value within GET PARENT_DATA_SET")
validate_equals(observed_gm_offset_scaled_log_variance, expected_gm_offset_scaled_log_variance, "gm.OffsetScaledLogVariance value within GET PARENT_DATA_SET")
validate_equals(observed_grandmaster_priority2, expected_grandmaster_priority2, "grandmasterPriority2 value within GET PARENT_DATA_SET")
# Validates the parentPortIdentity of the SLAVE's PARENT_DATA_SET against the portIdentity of the MASTER's PORT_DATA_SET.
if not port_data_set:
return
for ptp4l_expected_object in self.ptp4l_expected_list_objects:
if ptp4l_expected_object.get_name() == name:
port_data_set_getter = {
"controller-0": ptp4l_expected_object.get_controller_0_port_data_set,
"controller-1": ptp4l_expected_object.get_controller_1_port_data_set,
"compute-0": ptp4l_expected_object.get_compute_0_port_data_set,
}.get(hostname)
break
expected_port_data_set_objects = port_data_set_getter() if port_data_set_getter else None
for expected_port_data_set_object in expected_port_data_set_objects:
expected_parent_port_identity_dict = expected_port_data_set_object.get_parent_port_identity()
if expected_parent_port_identity_dict:
parent_instance_name = expected_parent_port_identity_dict.get("name")
parent_hostname = expected_parent_port_identity_dict.get("hostname")
parent_interface = expected_parent_port_identity_dict.get("interface")
if not all([parent_instance_name, parent_hostname, parent_interface]):
continue # Skip if any essential key is missing
for observed_port_data_set in port_data_set:
expected_port_identity = observed_port_data_set.get(parent_interface)
if observed_port_data_set.get("name") == parent_instance_name and observed_port_data_set.get("hostname") == parent_hostname and expected_port_identity:
validate_equals(observed_parent_port_identity, expected_port_identity, "Parent port identity matches the master port identity")
def validate_time_properties_data_set(
self,
@@ -515,11 +601,10 @@ class PTPVerifyConfigKeywords(BaseKeyword):
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
# default values if it is in a good status.
expected_current_utc_offset = 37
expected_current_utc_offset_valid = 0
expected_time_traceable = 1
expected_frequency_traceable = 1
expected_current_utc_offset = self.expected_time_properties_data_set_object.get_current_utc_offset()
expected_current_utc_offset_valid = self.expected_time_properties_data_set_object.get_current_utc_offset_valid()
expected_time_traceable = self.expected_time_properties_data_set_object.get_time_traceable()
expected_frequency_traceable = self.expected_time_properties_data_set_object.get_frequency_traceable()
get_time_properties_data_set_output = pmc_keywords.pmc_get_time_properties_data_set(config_file, socket_file)
get_time_properties_data_set_object = get_time_properties_data_set_output.get_pmc_get_time_properties_data_set_object()
@@ -554,26 +639,21 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Raises:
Exception: raised when validate fails
"""
if ptp_role == "MASTER":
expected_clock_class = 6
expected_clock_accuracy = "0x20"
expected_offset_scaled_log_variance = "0x4e5d"
expected_time_traceable = 1
expected_frequency_traceable = 1
expected_time_source = "0x20"
if ptp_role == "tgm":
expected_grandmaster_settings_object = self.expected_grandmaster_settings_tgm_object
else:
expected_clock_class = 248
expected_clock_accuracy = "0xfe"
expected_offset_scaled_log_variance = "0xffff"
expected_time_traceable = 0
expected_frequency_traceable = 0
expected_time_source = "0xa0"
expected_grandmaster_settings_object = self.expected_grandmaster_settings_tbc_object
expected_current_utc_offset_valid = 0
expected_clock_class = expected_grandmaster_settings_object.get_clock_class()
expected_clock_accuracy = expected_grandmaster_settings_object.get_clock_accuracy()
expected_offset_scaled_log_variance = expected_grandmaster_settings_object.get_offset_scaled_log_variance()
expected_time_traceable = expected_grandmaster_settings_object.get_time_traceable()
expected_frequency_traceable = expected_grandmaster_settings_object.get_frequency_traceable()
expected_time_source = expected_grandmaster_settings_object.get_time_source()
expected_current_utc_offset_valid = expected_grandmaster_settings_object.get_current_utc_offset_valid()
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
get_grandmaster_settings_np_output = pmc_keywords.pmc_get_grandmaster_settings_np(config_file, socket_file)
get_grandmaster_settings_np_object = get_grandmaster_settings_np_output.get_pmc_get_grandmaster_settings_np_object()
@@ -593,6 +673,56 @@ class PTPVerifyConfigKeywords(BaseKeyword):
validate_equals(observed_frequency_traceable, expected_frequency_traceable, "frequencyTraceable value within GET GRANDMASTER_SETTINGS_NP")
validate_equals(observed_time_source, expected_time_source, "timeSource value within GET GRANDMASTER_SETTINGS_NP")
def get_port_data_set_using_interface_and_port_identity_mapping(self) -> Dict:
"""
Get port data set using interface and port identity mapping to validate the parentPortIdentity of the SLAVE's
PARENT_DATA_SET against the portIdentity of the MASTER's PORT_DATA_SET.
Returns:
Dict: port data set using interface and port indentity mapping
"""
port_data_set_list = []
lab_connect_keywords = LabConnectionKeywords()
for ptp4l_instance_obj in self.ptp4l_setup_list:
name = ptp4l_instance_obj.get_name()
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
hostnames = ptp4l_instance_obj.get_instance_hostnames()
for hostname in hostnames:
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
port_data_set_dict = {}
port_data_set_dict["name"] = name
port_data_set_dict["hostname"] = hostname
get_port_data_set_output = pmc_keywords.pmc_get_port_data_set(config_file, socket_file)
get_pmc_get_port_data_set_objects = get_port_data_set_output.get_pmc_get_port_data_set_objects()
for ptp4l_expected_object in self.ptp4l_expected_list_objects:
if ptp4l_expected_object.get_name() == name:
port_data_set_getter = {
"controller-0": ptp4l_expected_object.get_controller_0_port_data_set,
"controller-1": ptp4l_expected_object.get_controller_1_port_data_set,
"compute-0": ptp4l_expected_object.get_compute_0_port_data_set,
}.get(hostname)
break
expected_port_data_set_objects = port_data_set_getter() if port_data_set_getter else None
for index, get_pmc_get_port_data_set_object in enumerate(get_pmc_get_port_data_set_objects):
if index >= len(expected_port_data_set_objects):
raise Exception("Observed port data set index is greater than expected port data set objects index")
port_data_set_dict[expected_port_data_set_objects[index].get_interface()] = get_pmc_get_port_data_set_object.get_port_identity()
port_data_set_list.append(port_data_set_dict)
return port_data_set_list
def parse_instance_parameters_string(self, instance_parameters: str) -> dict:
"""
Parses a string containing instance parameters and returns a dictionary.
@@ -621,3 +751,13 @@ class PTPVerifyConfigKeywords(BaseKeyword):
pass
parameters[key] = value
return parameters
def no_alarms(self) -> bool:
"""
Checks if there are no alarms on the system
Returns:
bool: True if no alarms
"""
alarms = AlarmListKeywords(self.ssh_connection).alarm_list()
return not alarms

View File

@@ -8,7 +8,6 @@ from keywords.ptp.pmc.objects.pmc_get_parent_data_set_output import PMCGetParent
from keywords.ptp.pmc.objects.pmc_get_port_data_set_output import PMCGetPortDataSetOutput
from keywords.ptp.pmc.objects.pmc_get_time_properties_data_set_output import PMCGetTimePropertiesDataSetOutput
from keywords.ptp.pmc.objects.pmc_get_time_status_np_output import PMCGetTimeStatusNpOutput
from keywords.ptp.pmc.objects.pmc_get_port_data_set_output import PMCGetPortDataSetOutput
class PMCKeywords(BaseKeyword):
@@ -154,14 +153,17 @@ class PMCKeywords(BaseKeyword):
def pmc_get_default_data_set(self, config_file: str, socket_file: str, unicast: bool = True, boundry_clock: int = 0) -> PMCGetDefaultDataSetOutput:
"""
Gets the default data set
Args:
config_file (str): the config file
socket_file (str): the socket file
unicast (bool): true to use unicast
boundry_clock (int): the boundry clock
Returns:
PMCGetDefaultDataSetOutput: the default data set output
Example: PMCKeywords(ssh_connection).pmc_get_default_data_set('/etc/linuxptp/ptpinstance/ptp4l-ptp5.conf', ' /var/run/ptp4l-ptp5')
"""
cmd = f"pmc {'-u' if unicast else ''} -b {boundry_clock} -f {config_file} -s {socket_file} 'GET DEFAULT_DATA_SET'"
@@ -172,14 +174,17 @@ class PMCKeywords(BaseKeyword):
def pmc_get_port_data_set(self, config_file: str, socket_file: str, unicast: bool = True, boundry_clock: int = 0) -> PMCGetPortDataSetOutput:
"""
Gets the port data set
Args:
config_file (str): the config file
socket_file (str): the socket file
unicast (bool): true to use unicast
boundry_clock (int): the boundry clock
Returns:
PMCGetPortDataSetOutput: the port data set output
Example: PMCKeywords(ssh_connection).pmc_get_port_data_set('/etc/linuxptp/ptpinstance/ptp4l-ptp5.conf', ' /var/run/ptp4l-ptp5')
"""
cmd = f"pmc {'-u' if unicast else ''} -b {boundry_clock} -f {config_file} -s {socket_file} 'GET PORT_DATA_SET'"

View File

@@ -0,0 +1,106 @@
from typing import Any, Dict
class GrandmasterSettings:
"""
Class models a grandmaster settings
"""
def __init__(self, expected_dict: Dict[str, Any]):
"""
Constructor.
Args:
expected_dict (Dict[str, Any]): The dictionary read from the JSON setup template file associated with this grandmaster settings
"""
if "clock_class" not in expected_dict:
raise Exception("Every expected dict should have a clock_class.")
self.clock_class = expected_dict["clock_class"]
if "clock_accuracy" not in expected_dict:
raise Exception("Every expected dict should have a clock_accuracy.")
self.clock_accuracy = expected_dict["clock_accuracy"]
if "offset_scaled_log_variance" not in expected_dict:
raise Exception("Every expected dict should have a offset_scaled_log_variance.")
self.offset_scaled_log_variance = expected_dict["offset_scaled_log_variance"]
if "time_traceable" not in expected_dict:
raise Exception("Every expected dict should have a time_traceable.")
self.time_traceable = expected_dict["time_traceable"]
if "frequency_traceable" not in expected_dict:
raise Exception("Every expected dict should have a frequency_traceable.")
self.frequency_traceable = expected_dict["frequency_traceable"]
if "time_source" not in expected_dict:
raise Exception("Every expected dict should have a time_source.")
self.time_source = expected_dict["time_source"]
if "current_utc_offset_valid" not in expected_dict:
raise Exception("Every expected dict should have a current_utc_offset_valid.")
self.current_utc_offset_valid = expected_dict["current_utc_offset_valid"]
def get_clock_class(self) -> int:
"""
Gets the clock class.
Returns:
int: The clock class.
"""
return self.clock_class
def get_clock_accuracy(self) -> str:
"""
Gets the clock accuracy
Returns:
str: The clock accuracy.
"""
return self.clock_accuracy
def get_offset_scaled_log_variance(self) -> str:
"""
Gets the offset scaled log variance.
Returns:
str: The offset scaled log variance.
"""
return self.offset_scaled_log_variance
def get_time_traceable(self) -> int:
"""
Gets the time traceability
Returns:
int: The time traceability.
"""
return self.time_traceable
def get_frequency_traceable(self) -> int:
"""
Gets the frequency traceability.
Returns:
int: The frequency traceability.
"""
return self.frequency_traceable
def get_time_source(self) -> str:
"""
Gets the time source.
Returns:
str: The time source.
"""
return self.time_source
def get_current_utc_offset_valid(self) -> int:
"""
Gets the validity of the UTC offset.
Returns:
int: The current utc offset valid.
"""
return self.current_utc_offset_valid

View File

@@ -0,0 +1,53 @@
from typing import Any, Dict
class ParentDataSet:
"""
Class models a parent data set
"""
def __init__(self, expected_dict: Dict[str, Any]):
"""
Constructor.
Args:
expected_dict (Dict[str, Any]): The dictionary read from the JSON setup template file associated with this parent data set
"""
if "gm_clock_class" not in expected_dict:
raise Exception("Every expected dict should have a gm_clock_class.")
self.gm_clock_class = expected_dict["gm_clock_class"]
if "gm_clock_accuracy" not in expected_dict:
raise Exception("Every expected dict should have a gm_clock_accuracy.")
self.gm_clock_accuracy = expected_dict["gm_clock_accuracy"]
if "gm_offset_scaled_log_variance" not in expected_dict:
raise Exception("Every expected dict should have a gm_offset_scaled_log_variance.")
self.gm_offset_scaled_log_variance = expected_dict["gm_offset_scaled_log_variance"]
def get_gm_clock_class(self) -> int:
"""
Gets the gm clock class.
Returns:
int: The gm clock class.
"""
return self.gm_clock_class
def get_gm_clock_accuracy(self) -> str:
"""
Gets the gm clock accuracy.
Returns:
str: The gm clock accuracy.
"""
return self.gm_clock_accuracy
def get_gm_offset_scaled_log_variance(self) -> str:
"""
Gets the gm offset scaled log variance.
Returns:
str: The gm offset scaled log variance.
"""
return self.gm_offset_scaled_log_variance

View File

@@ -0,0 +1,54 @@
from typing import Any, Dict
class PortDataSet:
"""
Class models a port data set
"""
def __init__(self, expected_dict: Dict[str, Any]):
"""
Constructor.
Args:
expected_dict (Dict[str, Any]): The dictionary read from the JSON setup template file associated with this port data set
"""
self.port_identity = None
if "interface" in expected_dict:
self.interface = expected_dict["interface"]
self.port_state = None
if "port_state" in expected_dict:
self.port_state = expected_dict["port_state"]
self.parent_port_identity = None
if "parent_port_identity" in expected_dict:
self.parent_port_identity = expected_dict["parent_port_identity"]
def get_interface(self) -> str:
"""
Gets the interface.
Returns:
str: The interface.
"""
return self.interface
def get_port_state(self) -> str:
"""
Gets the port state.
Returns:
str: The port state.
"""
return self.port_state
def get_parent_port_identity(self) -> str:
"""
Gets the parent port identity.
Returns:
str: The parent port identity.
"""
return self.parent_port_identity

View File

@@ -0,0 +1,104 @@
from typing import Any, Dict, List
from starlingx.keywords.ptp.setup.object.port_data_set import PortDataSet
class PTP4LExpectedDict:
"""
Class models a ptp4l expected dict
"""
def __init__(self, expected_dict: Dict[str, Any]):
"""
Constructor.
Args:
expected_dict (Dict[str, Any]): The dictionary read from the JSON setup template file associated with this ptp4l expected dict
"""
if "name" not in expected_dict:
raise Exception("Every PTP4L expected dict should have a name.")
self.name = expected_dict["name"]
if "ptp_role" not in expected_dict:
raise Exception("Every PTP4L expected dict should have a ptp_role.")
self.ptp_role = expected_dict["ptp_role"]
self.controller_0_port_data_set = None
if "controller_0_port_data_set" in expected_dict:
self.controller_0_port_data_set = expected_dict["controller_0_port_data_set"]
self.controller_1_port_data_set = None
if "controller_1_port_data_set" in expected_dict:
self.controller_1_port_data_set = expected_dict["controller_1_port_data_set"]
self.compute_0_port_data_set = None
if "compute_0_port_data_set" in expected_dict:
self.compute_0_port_data_set = expected_dict["compute_0_port_data_set"]
def __str__(self) -> str:
"""
String representation of this object.
Returns:
str: String representation of this object.
"""
return self.get_name()
def get_name(self) -> str:
"""
Gets the name of this ptp4l expected dict.
Returns:
str: The name of this ptp4l expected dict.
"""
return self.name
def get_ptp_role(self) -> str:
"""
Gets the ptp role.
Returns:
str: The ptp role.
"""
return self.ptp_role
def get_controller_0_port_data_set(self) -> List[PortDataSet]:
"""
Gets the list of controller-0 port data set.
Returns:
List[PortDataSet]: The list of controller-0 port data set.
"""
port_data_set_list = []
for port_data_set in self.controller_0_port_data_set:
port_data_set_object = PortDataSet(port_data_set)
port_data_set_list.append(port_data_set_object)
return port_data_set_list
def get_controller_1_port_data_set(self) -> List[PortDataSet]:
"""
Gets the list of controller-1 port data set.
Returns:
List[PortDataSet]: The list of controller-1 port data set.
"""
port_data_set_list = []
for port_data_set in self.controller_1_port_data_set:
port_data_set_object = PortDataSet(port_data_set)
port_data_set_list.append(port_data_set_object)
return port_data_set_list
def get_compute_0_port_data_set(self) -> List[PortDataSet]:
"""
Gets the list of compute-0 port data set.
Returns:
List[PortDataSet]: The list of compute-0 port data set.
"""
port_data_set_list = []
for port_data_set in self.compute_0_port_data_set:
port_data_set_object = PortDataSet(port_data_set)
port_data_set_list.append(port_data_set_object)
return port_data_set_list

View File

@@ -39,12 +39,6 @@ class PTP4LSetup:
ptp_interfaces.append(ptp_host_ifs_dict[ptp_interface_name])
self.ptp_interfaces = ptp_interfaces
self.ptp_role = "MASTER" # default value is MASTER
if "ptp_role" in setup_dict:
self.ptp_role = setup_dict["ptp_role"]
self.port_state = setup_dict.get("port_state")
def __str__(self) -> str:
"""
String representation of this object.
@@ -105,22 +99,3 @@ class PTP4LSetup:
if ptp_interface.get_name() == interface_name:
return ptp_interface
raise Exception(f"There is no ptp interface named {interface_name} in the ptp4l setup.")
def get_ptp_role(self) -> str:
"""
Gets the ptp role
Returns:
str: ptp role
"""
return self.ptp_role
def get_port_state(self) -> str:
"""
Gets the port state
Returns:
str: port state
"""
return self.port_state

View File

@@ -1,10 +1,14 @@
from typing import Dict, List
from keywords.ptp.setup.object.clock_setup import ClockSetup
from keywords.ptp.setup.object.grandmaster_settings import GrandmasterSettings
from keywords.ptp.setup.object.parent_data_set import ParentDataSet
from keywords.ptp.setup.object.phc2sys_setup import PHC2SysSetup
from keywords.ptp.setup.object.ptp4l_expected_dict import PTP4LExpectedDict
from keywords.ptp.setup.object.ptp4l_setup import PTP4LSetup
from keywords.ptp.setup.object.ptp_host_interface_setup import PTPHostInterfaceSetup
from keywords.ptp.setup.object.ts2phc_setup import TS2PHCSetup
from keywords.ptp.setup.time_properties_data_set import TimePropertiesDataSet
class PTPSetup:
@@ -25,6 +29,11 @@ class PTPSetup:
self.ts2phc_setup_list: List[TS2PHCSetup] = []
self.clock_setup_list: List[ClockSetup] = []
self.host_ptp_if_dict: Dict[str, PTPHostInterfaceSetup] = {} # Name -> PTPHostInterfaceSetup
self.ptp4l_expected_list: List[PTP4LExpectedDict] = []
self.parent_data_set: Dict[str, ParentDataSet] = {}
self.time_properties_data_set: Dict[str, TimePropertiesDataSet] = {}
self.grandmaster_settings_tgm: Dict[str, GrandmasterSettings] = {}
self.grandmaster_settings_tbc: Dict[str, GrandmasterSettings] = {}
if "ptp_instances" not in setup_dict:
raise Exception("You must define a ptp_instances section in your ptp setup_dict")
@@ -63,6 +72,25 @@ class PTPSetup:
clock_setup = ClockSetup(clock_entry_dict, self.host_ptp_if_dict)
self.clock_setup_list.append(clock_setup)
expected_dict = setup_dict["expected_dict"]
if "ptp4l" in expected_dict:
ptp4l_list = expected_dict["ptp4l"]
for ptp4l_expected_dict in ptp4l_list:
ptp4l_expected = PTP4LExpectedDict(ptp4l_expected_dict)
self.ptp4l_expected_list.append(ptp4l_expected)
if "parent_data_set" in expected_dict:
self.parent_data_set = ParentDataSet(expected_dict["parent_data_set"])
if "time_properties_data_set" in expected_dict:
self.time_properties_data_set = TimePropertiesDataSet(expected_dict["time_properties_data_set"])
if "grandmaster_settings_tgm" in expected_dict:
self.grandmaster_settings_tgm = GrandmasterSettings(expected_dict["grandmaster_settings_tgm"])
if "grandmaster_settings_tbc" in expected_dict:
self.grandmaster_settings_tbc = GrandmasterSettings(expected_dict["grandmaster_settings_tbc"])
def __str__(self) -> str:
"""
String representation of this object.
@@ -168,3 +196,48 @@ class PTPSetup:
if setup.get_name() == setup_name:
return setup
raise Exception(f"There is no clock setup named {setup_name}")
def get_expected_ptp4l_list(self) -> List[PTP4LExpectedDict]:
"""
Getter for the list of expected ptp4l list.
Returns:
List[PTP4LExpectedDict]: list of ptp4l expected dict
"""
return self.ptp4l_expected_list
def get_parent_data_set(self) -> ParentDataSet:
"""
Getter for the parent data set.
Returns:
ParentDataSet: The parent data set
"""
return self.parent_data_set
def get_time_properties_data_set(self) -> TimePropertiesDataSet:
"""
Getter for the time properties data set.
Returns:
TimePropertiesDataSet: The time properties data set
"""
return self.time_properties_data_set
def get_grandmaster_settings_tgm(self) -> GrandmasterSettings:
"""
Getter for the grandmaster settings tgm.
Returns:
GrandmasterSettings: The grandmaster settings tgm
"""
return self.grandmaster_settings_tgm
def get_grandmaster_settings_tbc(self) -> GrandmasterSettings:
"""
Getter for the grandmaster settings tbc.
Returns:
GrandmasterSettings: The grandmaster settings tbc
"""
return self.grandmaster_settings_tbc

View File

@@ -2,6 +2,7 @@ import json5
from jinja2 import Template
from config.configuration_manager import ConfigurationManager
from framework.resources.resource_finder import get_stx_resource_path
from keywords.base_keyword import BaseKeyword
from keywords.ptp.setup.object.ptp_setup import PTPSetup
@@ -26,10 +27,17 @@ class PTPSetupKeywords(BaseKeyword):
with open(template_file_location, "r") as template_file:
json5_template = template_file.read()
ptp_default_status_values_file_path = get_stx_resource_path("resources/ptp/ptp_default_status_values.json5")
with open(ptp_default_status_values_file_path, "r") as ptp_default_status_values_template_file:
ptp_default_status_values_template = json5.load(ptp_default_status_values_template_file)
# Build a replacement dictionary from the PTP Config
ptp_config = ConfigurationManager.get_ptp_config()
replacement_dictionary = ptp_config.get_all_hosts_dictionary()
# Update lab_topology dict with ptp_default_status_values
replacement_dictionary.update(ptp_default_status_values_template)
# Render the JSON5 file by replacing the tokens.
template = Template(json5_template)
rendered_json_string = template.render(replacement_dictionary)

View File

@@ -0,0 +1,67 @@
from typing import Any, Dict
class TimePropertiesDataSet:
"""
Class models a time properties data set
"""
def __init__(self, expected_dict: Dict[str, Any]):
"""
Constructor.
Args:
expected_dict (Dict[str, Any]): The dictionary read from the JSON setup template file associated with this time properties data set
"""
if "current_utc_offset" not in expected_dict:
raise Exception("Every expected dict should have a current_utc_offset.")
self.current_utc_offset = expected_dict["current_utc_offset"]
if "current_utc_offset_valid" not in expected_dict:
raise Exception("Every expected dict should have a current_utc_offset_valid.")
self.current_utc_offset_valid = expected_dict["current_utc_offset_valid"]
if "time_traceable" not in expected_dict:
raise Exception("Every expected dict should have a time_traceable.")
self.time_traceable = expected_dict["time_traceable"]
if "frequency_traceable" not in expected_dict:
raise Exception("Every expected dict should have a frequency_traceable.")
self.frequency_traceable = expected_dict["frequency_traceable"]
def get_current_utc_offset(self) -> int:
"""
Gets the current UTC offset.
Returns:
int: The current UTC offset.
"""
return self.current_utc_offset
def get_current_utc_offset_valid(self) -> int:
"""
Gets the validity of the current UTC offset.
Returns:
int: The offset scaled log variance.
"""
return self.current_utc_offset_valid
def get_time_traceable(self) -> int:
"""
Gets the time traceability.
Returns:
int: The time traceability.
"""
return self.time_traceable
def get_frequency_traceable(self) -> int:
"""
Gets the frequency traceability.
Returns:
int: The frequency traceability.
"""
return self.frequency_traceable

View File

@@ -0,0 +1,33 @@
{
"parent_data_set_tgm_default" : {
"gm_clock_class" : 6,
"gm_clock_accuracy" : "0x20",
"gm_offset_scaled_log_variance" : "0x4e5d"
},
"time_properties_data_set_tgm_default": {
"current_utc_offset": 37,
"current_utc_offset_valid": 0,
"time_traceable": 1,
"frequency_traceable": 1
},
"grandmaster_settings": {
"grandmaster_settings_tgm_default": {
"clock_class": 6,
"clock_accuracy": "0x20",
"offset_scaled_log_variance": "0x4e5d",
"time_traceable": 1,
"frequency_traceable": 1,
"time_source": "0x20",
"current_utc_offset_valid": 0
},
"grandmaster_settings_tbc_default": {
"clock_class": 248,
"clock_accuracy": "0xfe",
"offset_scaled_log_variance": "0xffff",
"time_traceable": 0,
"frequency_traceable": 0,
"time_source": "0xa0",
"current_utc_offset_valid": 0
}
}
}

View File

@@ -11,7 +11,6 @@
"ptp1if1",
"ptp1if2",
],
ptp_role: "MASTER"
},
{
@@ -22,7 +21,6 @@
"ptp2if1",
"ptp2if2"
],
ptp_role: "MASTER"
},
{
@@ -33,7 +31,6 @@
"ptp3if1",
"ptp3if2"
],
ptp_role: "MASTER"
},
{
@@ -44,7 +41,6 @@
"ptp4if1",
"ptp4if2"
],
ptp_role: "SLAVE"
}
],
@@ -232,4 +228,88 @@
ptp_interface_parameter : "sma1={{ controller_0.nic2.sma1.is_input_or_output }}",
},
],
}
// This section is for validation purposes. All expected values are maintained here
"expected_dict": {
"ptp4l": [
{
"name": "ptp1",
"ptp_role": "tgm",
"controller_0_port_data_set": [
{
"interface" : "{{ controller_0.nic1.nic_connection.interface }}",
"port_state": "MASTER"
},
{
"interface" : "{{ controller_0.nic1.conn_to_proxmox }}",
"port_state": "MASTER"
}
],
"controller_1_port_data_set": [
{
"interface" : "{{ controller_1.nic1.nic_connection.interface }}",
"port_state": "PASSIVE"
},
{
"interface" : "{{ controller_1.nic1.conn_to_proxmox }}",
"port_state": "MASTER"
}
],
"compute_0_port_data_set": [
{
"interface" : "{{ compute_0.nic1.nic_connection.interface }}",
"port_state": "LISTENING"
}
]
},
{
"name": "ptp2",
"ptp_role": "tgm"
},
{
"name": "ptp3",
"ptp_role": "tgm",
"controller_0_port_data_set": [
{
"interface": "{{ controller_0.nic2.nic_connection.interface }}", // ctrl0 NIC2 is MASTER and ctr1 NIC2 is SLAVE
"port_state": "MASTER"
},
{
"interface": "{{ controller_0.nic2.conn_to_proxmox }}",
"port_state": "MASTER"
}
],
"compute_0_port_data_set": [
{
"interface": "{{ compute_0.nic2.nic_connection.interface }}",
"port_state": "MASTER"
}
]
},
{
"name": "ptp4",
"ptp_role": "tbc",
"controller_1_port_data_set": [
{
"interface": "{{ controller_1.nic2.nic_connection.interface }}",
"port_state": "SLAVE",
"parent_port_identity" : {
"name": "ptp3",
"hostname":"controller-0",
"interface": "{{ controller_0.nic2.nic_connection.interface }}" // ctrl-0 NIC2 is Master and ctrl-1 NIC2 is slave
},
},
{
"interface": "{{ controller_1.nic2.conn_to_proxmox }}",
"port_state": "MASTER"
}
]
}
],
"parent_data_set" : {{ parent_data_set_tgm_default }},
"time_properties_data_set": {{ time_properties_data_set_tgm_default }},
"grandmaster_settings": {
"tgm" : {{ grandmaster_settings.grandmaster_settings_tgm_default }},
"tbm" : {{ grandmaster_settings.grandmaster_settings_tbc_default }}
}
},
}

View File

@@ -1,3 +1,5 @@
import os
from pytest import mark
from framework.logging.automation_logger import get_logger
@@ -5,6 +7,8 @@ from framework.resources.resource_finder import get_stx_resource_path
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.ptp_setup_executor_keywords import PTPSetupExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_teardown_executor_keywords import PTPTeardownExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_verify_config import PTPVerifyConfigKeywords
from keywords.files.file_keywords import FileKeywords
@mark.p0
@@ -40,6 +44,13 @@ def test_delete_and_add_all_ptp_configuration_for_compute():
ptp_teardown_keywords.delete_all_ptp_configurations()
get_logger().log_info("Add all PTP configuration")
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_setup_template_with_compute.json5")
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
ptp_setup_keywords = PTPSetupExecutorKeywords(ssh_connection, ptp_setup_template_path)
ptp_setup_keywords.add_all_ptp_configurations()
get_logger().log_info("Verify all PTP configuration")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ptp_setup_template_path)
ptp_verify_config_keywords.verify_all_ptp_configurations()
local_file_path = os.path.join(get_logger().get_test_case_log_dir(), "user.log")
FileKeywords(ssh_connection).download_file("/var/log/user.log", local_file_path)