Testcases for PTP error handling

Change-Id: Ief98eae1297bea064a11f276dd42aa5664ff4bcb
Signed-off-by: Guntaka Umashankar Reddy <umashankarguntaka.reddy@windriver.com>
This commit is contained in:
Guntaka Umashankar Reddy
2025-06-02 12:43:43 -04:00
parent 29544ff548
commit 5c8ce1fe4f
18 changed files with 1112 additions and 200 deletions

View File

@@ -137,6 +137,15 @@ class PTPNic:
"""
return self.base_port
def get_nic_connection(self) -> PTPNicConnection:
"""
Gets the nic connection.
Returns:
PTPNicConnection: PTP NIC connection.
"""
return self.nic_connection
def get_sma1(self) -> SMAConnector:
"""
Gets the SMAConnector associated with sma1

View File

@@ -132,15 +132,14 @@ class AlarmListKeywords(BaseKeyword):
check_interval = self.get_check_interval_in_seconds()
end_time = time.time() + self.get_timeout_in_seconds()
alarm_descriptions = ", ".join(self.alarm_to_str(alarm) for alarm in alarms)
alarm_descriptions = ", ".join(str(alarm) for alarm in alarms)
while time.time() < end_time:
observed_alarms = self.alarm_list()
all_matched = True
for expected_alarm_obj in alarms:
match_found = any(self.alarms_match(observed_alarm_obj, expected_alarm_obj) for observed_alarm_obj in observed_alarms)
if not match_found:
get_logger().log_info(f"Expected alarm not found yet: {self.alarm_to_str(expected_alarm_obj)}")
get_logger().log_info(f"Expected alarm not found yet: {expected_alarm_obj}")
all_matched = False
break
@@ -153,7 +152,7 @@ class AlarmListKeywords(BaseKeyword):
# Final check before raising
observed_alarms = self.alarm_list()
observed_alarm_str = [self.alarm_to_str(observed_alarm_obj) for observed_alarm_obj in observed_alarms]
observed_alarm_str = [str(observed_alarm_obj) for observed_alarm_obj in observed_alarms]
raise TimeoutError(f"Timeout. Alarms not found:\nExpected: {alarm_descriptions}\nObserved alarms:\n" + "\n".join(observed_alarm_str))
def alarms_match(self, observed_alarm_object: AlarmListObject, expected_alarm_object: AlarmListObject) -> bool:
@@ -186,18 +185,6 @@ class AlarmListKeywords(BaseKeyword):
# Return True only if all three conditions are met.
return id_matches and reason_text_matches and entity_id_matches
def alarm_to_str(self, alarm: AlarmListObject) -> str:
"""
Formats an AlarmListObject into a human-readable string representation.
Args:
alarm (AlarmListObject): The alarm object to format.
Returns:
str: A string in the format "ID: <alarm_id>, Reason: <reason_text>, Entity: <entity_id>".
"""
return f"[ID: {alarm.get_alarm_id()}, Reason: {alarm.get_reason_text()}, Entity: {alarm.get_entity_id()}]"
def get_timeout_in_seconds(self) -> int:
"""
Gets an integer representing the maximum time in seconds to wait for the alarms to be cleared.

View File

@@ -20,7 +20,10 @@ class AlarmListObject:
str: String representation of this object.
"""
return self.get_alarm_id()
alarm_id = self.get_alarm_id() or ""
reason_text = self.get_reason_text() or ""
entity_id = self.get_entity_id() or ""
return f"[ID: {alarm_id}, Reason: {reason_text}, Entity: {entity_id}]"
def set_alarm_id(self, alarm_id: str):
"""

View File

@@ -1,9 +1,8 @@
from framework.validation.validation import validate_equals_with_retry
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.ptp.pmc.pmc_keywords import PMCKeywords
class PTPReadinessWatcher:
class PTPReadinessKeywords:
"""
PMC (PTP Management Client) operations to check various PTP parameters with retry logic.
@@ -11,24 +10,14 @@ class PTPReadinessWatcher:
ssh_connection: An instance of an SSH connection.
"""
def __init__(self):
def __init__(self, ssh_connection):
"""
Initializes the PTPReadinessWatcher.
"""
def _get_ptp_instance_paths(self, name: str) -> tuple[str, str]:
"""
Helper method to get the config and socket file paths for a PTP instance.
Initializes the PTPReadinessKeywords with an SSH connection.
Args:
name (str): Name of the PTP instance.
Returns:
tuple[str, str]: A tuple containing (config_file_path, socket_file_path).
ssh_connection: An instance of an SSH connection.
"""
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
return config_file, socket_file
self.ssh_connection = ssh_connection
def wait_for_port_state_appear_in_port_data_set(self, name: str, hostname: str, expected_port_states: list[str]) -> None:
"""
@@ -43,21 +32,20 @@ class PTPReadinessWatcher:
Exception: If expected port states do not appear within the timeout.
"""
def check_port_state_in_port_data_set(name: str, hostname: str) -> list[str]:
def check_port_state_in_port_data_set(name: str) -> list[str]:
"""
Checks whether the observed port states from the port data set match the expected port states.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
list[str]: List of expected port states.
"""
config_file, socket_file = self._get_ptp_instance_paths(name)
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
pmc_keywords = PMCKeywords(self.ssh_connection)
observed_states = [obj.get_port_state() for obj in pmc_keywords.pmc_get_port_data_set(config_file, socket_file).get_pmc_get_port_data_set_objects()]
@@ -78,21 +66,20 @@ class PTPReadinessWatcher:
Exception: If expected clock class do not appear within the timeout.
"""
def get_clock_class_in_grandmaster_settings_np(name: str, hostname: str) -> int:
def get_clock_class_in_grandmaster_settings_np(name: str) -> int:
"""
Get the observed clock class from the grandmaster settings np.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
int: observed clock class.
"""
config_file, socket_file = self._get_ptp_instance_paths(name)
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
pmc_keywords = PMCKeywords(self.ssh_connection)
get_grandmaster_settings_np_object = pmc_keywords.pmc_get_grandmaster_settings_np(config_file, socket_file).get_pmc_get_grandmaster_settings_np_object()
observed_clock_class = get_grandmaster_settings_np_object.get_clock_class()
@@ -114,21 +101,20 @@ class PTPReadinessWatcher:
Exception: If expected gm clock class do not appear within the timeout.
"""
def get_gm_clock_class_in_parent_data_set(name: str, hostname: str) -> int:
def get_gm_clock_class_in_parent_data_set(name: str) -> int:
"""
Get the observed gm clock class from the parent data set.
Args:
name (str): Name of the PTP instance.
hostname (str): Hostname of the target system.
Returns:
int: observed gm clock class.
"""
config_file, socket_file = self._get_ptp_instance_paths(name)
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
pmc_keywords = PMCKeywords(self.ssh_connection)
parent_data_set_obj = pmc_keywords.pmc_get_parent_data_set(config_file, socket_file).get_pmc_get_parent_data_set_object()
observed_gm_clock_class = parent_data_set_obj.get_gm_clock_class()

View File

@@ -3,6 +3,7 @@ from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.ptp.objects.system_host_if_ptp_assign_output import SystemHostIfPTPAssignOutput
from keywords.cloud_platform.system.ptp.objects.system_host_if_ptp_list_output import SystemHostIfPTPListOutput
class SystemHostIfPTPKeywords(BaseKeyword):
"""
Provides methods to interact with the system host-if-ptp-instance
@@ -21,56 +22,101 @@ class SystemHostIfPTPKeywords(BaseKeyword):
"""
self.ssh_connection = ssh_connection
def system_host_if_ptp_assign(self,hostname: str, interface: str, ptp_interface: str) -> SystemHostIfPTPAssignOutput :
def system_host_if_ptp_assign(self, hostname: str, interface: str, ptp_interface: str) -> SystemHostIfPTPAssignOutput:
"""
Associate PTP to an interface at host.
Args:
hostname: Hostname or id to assign the interface to
interface: Host's interface name or uuid to assign ptp interface to
ptp_interface: PTP interface name or uuid to assign
hostname (str): Hostname or id to assign the interface to
interface (str): Host's interface name or uuid to assign ptp interface to
ptp_interface (str): PTP interface name or uuid to assign
Returns:
SystemHostIfPTPAssignOutput: Output of the command
"""
cmd = f"system host-if-ptp-assign {hostname} {interface} {ptp_interface}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_host_if_ptp_assign_output = SystemHostIfPTPAssignOutput(output)
return system_host_if_ptp_assign_output
def system_host_if_ptp_remove(self,hostname: str, interface: str, ptp_interface: str) -> SystemHostIfPTPAssignOutput :
def system_host_if_ptp_assign_with_error(self, hostname: str, interface: str, ptp_interface: str) -> str:
"""
Disassociate PTP to an interface at host.
Associate PTP to an interface at host for errors.
Args:
hostname: Hostname or id to disassociate ptp interface from
interface: Host's interface name or uuid to disassociate ptp interface from
ptp_interface: PTP interface name or uuid to disassociate
hostname (str): Hostname or id to assign the interface to
interface (str): Host's interface name or uuid to assign ptp interface to
ptp_interface (str): PTP interface name or uuid to assign
Returns:
str: Output of the command
Raises:
Exception: If the command fails with an error message
"""
cmd = f"system host-if-ptp-assign {hostname} {interface} {ptp_interface}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def system_host_if_ptp_remove(self, hostname: str, interface: str, ptp_interface: str) -> SystemHostIfPTPAssignOutput:
"""
Disassociate PTP to an interface at host.
Args:
hostname (str): Hostname or id to disassociate ptp interface from
interface (str): Host's interface name or uuid to disassociate ptp interface from
ptp_interface (str): PTP interface name or uuid to disassociate
Returns:
SystemHostIfPTPAssignOutput: Output of the command
"""
cmd = f"system host-if-ptp-remove {hostname} {interface} {ptp_interface}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_host_if_ptp_remove_output = SystemHostIfPTPAssignOutput(output)
return system_host_if_ptp_remove_output
def get_system_host_if_ptp_list(self,hostname: str) -> SystemHostIfPTPListOutput :
def system_host_if_ptp_remove_with_error(self, hostname: str, interface: str, ptp_interface: str) -> str:
"""
Disassociate PTP to an interface at host for errors.
Args:
hostname (str): Hostname or id to disassociate ptp interface from
interface (str): Host's interface name or uuid to disassociate ptp interface from
ptp_interface (str): PTP interface name or uuid to disassociate
Returns:
str: Output of the command
"""
cmd = f"system host-if-ptp-remove {hostname} {interface} {ptp_interface}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def get_system_host_if_ptp_list(self, hostname: str) -> SystemHostIfPTPListOutput:
"""
List all PTP interfaces on the specified host
Args:
hostname: Hostname or id
hostname (str): Hostname or id
Returns:
SystemHostIfPTPListOutput: system host if ptp list output
"""
command = source_openrc(f'system host-if-ptp-list --nowrap {hostname}')
command = source_openrc(f"system host-if-ptp-list --nowrap {hostname}")
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_host_if_ptp_list_output = SystemHostIfPTPListOutput(output)
return system_host_if_ptp_list_output

View File

@@ -2,6 +2,7 @@ from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.ptp.objects.system_host_ptp_instance_output import SystemHostPTPInstanceOutput
class SystemHostPTPInstanceKeywords(BaseKeyword):
"""
Provides methods to interact with the system host-ptp-instance
@@ -20,50 +21,91 @@ class SystemHostPTPInstanceKeywords(BaseKeyword):
"""
self.ssh_connection = ssh_connection
def system_host_ptp_instance_assign(self,hostname: str, name: str) -> SystemHostPTPInstanceOutput :
def system_host_ptp_instance_assign(self, hostname: str, name: str) -> SystemHostPTPInstanceOutput:
"""
Assign the ptp instance to a host
Args:
hostname : hostname or id to assign the ptp instance to host
name: name or UUID for ptp instance assign
hostname (str): hostname or id to assign the ptp instance to host
name (str): name or UUID for ptp instance assign
Returns:
SystemHostPTPInstanceOutput: Output of the command
"""
cmd = f"system host-ptp-instance-assign {hostname} {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_host_ptp_instance_assign_output = SystemHostPTPInstanceOutput(output)
return system_host_ptp_instance_assign_output
def system_host_ptp_instance_remove(self,hostname: str, name: str) -> SystemHostPTPInstanceOutput :
def system_host_ptp_instance_assign_with_error(self, hostname: str, name: str) -> str:
"""
Remove host association
Assign the ptp instance to a host for errors.
Args:
hostname : hostname or id
name: name or UUID for ptp instance name
hostname (str): hostname or id to assign the ptp instance to host
name (str): name or UUID for ptp instance assign
Returns:
str: Output of the command
"""
cmd = f"system host-ptp-instance-assign {hostname} {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def system_host_ptp_instance_remove(self, hostname: str, name: str) -> SystemHostPTPInstanceOutput:
"""
Remove host association
Args:
hostname (str): hostname or id
name (str): name or UUID for ptp instance name
Returns:
SystemHostPTPInstanceOutput: Output of the command
"""
cmd = f"system host-ptp-instance-remove {hostname} {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_host_ptp_instance_remove_output = SystemHostPTPInstanceOutput(output)
return system_host_ptp_instance_remove_output
def get_system_host_ptp_instance_list(self,hostname: str) -> SystemHostPTPInstanceOutput :
def system_host_ptp_instance_remove_with_error(self, hostname: str, name: str) -> str:
"""
List all PTP instances on the specified host
Remove host association for errors.
Args:
hostname : hostname or id
hostname (str): hostname or id
name (str): name or UUID for ptp instance name
Returns:
str: Output of the command
"""
cmd = f"system host-ptp-instance-remove {hostname} {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def get_system_host_ptp_instance_list(self, hostname: str) -> SystemHostPTPInstanceOutput:
"""
List all PTP instances on the specified host
Args:
hostname (str): hostname or id
Returns:
SystemHostPTPInstanceOutput: system host ptp instance output
"""
cmd = f"system host-ptp-instance-list {hostname}"
command = source_openrc(cmd)
@@ -71,4 +113,4 @@ class SystemHostPTPInstanceKeywords(BaseKeyword):
self.validate_success_return_code(self.ssh_connection)
system_host_ptp_instance_list_output = SystemHostPTPInstanceOutput(output)
return system_host_ptp_instance_list_output
return system_host_ptp_instance_list_output

View File

@@ -1,7 +1,8 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.ptp.objects.system_ptp_instance_output import SystemPTPInstanceOutput
from keywords.cloud_platform.system.ptp.objects.system_ptp_instance_list_output import SystemPTPInstanceListOutput
from keywords.cloud_platform.system.ptp.objects.system_ptp_instance_output import SystemPTPInstanceOutput
class SystemPTPInstanceKeywords(BaseKeyword):
"""
@@ -21,66 +22,119 @@ class SystemPTPInstanceKeywords(BaseKeyword):
"""
self.ssh_connection = ssh_connection
def system_ptp_instance_add(self,name: str, service: str) -> SystemPTPInstanceOutput :
def system_ptp_instance_add(self, name: str, service: str) -> SystemPTPInstanceOutput:
"""
Create an instance by providing a name and type
Args:
name : name of instance
service: type of instance
name (str): name of instance
service (str): type of instance
Returns:
SystemPTPInstanceOutput: Output of the command
"""
cmd = f"system ptp-instance-add {name} {service}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_instance_add_output = SystemPTPInstanceOutput(output)
return system_ptp_instance_add_output
def system_ptp_instance_delete(self,name: str) -> str :
def system_ptp_instance_add_with_error(self, name: str, service: str) -> str:
"""
Delete an instance
Create an instance by providing a name and type for errors.
Args:
name : name or UUID of instance
name (str): name of instance
service (str): type of instance
Returns:
str: Output of the command
"""
cmd = f"system ptp-instance-add {name} {service}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def system_ptp_instance_delete(self, name: str) -> str:
"""
Delete an instance
Args:
name (str): name or UUID of instance
Returns:
str: Success message
"""
cmd = f"system ptp-instance-delete {name}"
command = source_openrc(cmd)
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
def system_ptp_instance_delete_with_error(self, name: str) -> str:
"""
Delete an instance for errors.
Args:
name (str): name or UUID of instance
Returns:
str: Command output
"""
cmd = f"system ptp-instance-delete {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
# output is a List of 1 string. "['Deleted PTP instance: xxxx-xxxx-xxx\n']"
if output and len(output) > 0:
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
else:
raise "Output is expected to be a List with one element."
def get_system_ptp_instance_show(self,name: str) -> SystemPTPInstanceOutput :
return output.strip() if isinstance(output, str) else str(output)
def get_system_ptp_instance_show(self, name: str) -> SystemPTPInstanceOutput:
"""
Show PTP instance attributes.
Args:
name : name or UUID of instance
name (str): name or UUID of instance
Returns:
SystemPTPInstanceOutput: Output of the command
"""
cmd = f"system ptp-instance-show {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_instance_show_output = SystemPTPInstanceOutput(output)
return system_ptp_instance_show_output
def get_system_ptp_instance_list(self) -> SystemPTPInstanceListOutput :
def get_system_ptp_instance_show_with_error(self, name: str) -> str:
"""
Show PTP instance attributes for errors.
Args:
name (str): name or UUID of instance
Returns:
str: Output of the command
"""
cmd = f"system ptp-instance-show {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def get_system_ptp_instance_list(self) -> SystemPTPInstanceListOutput:
"""
List all PTP instances
Returns:
SystemPTPInstanceListOutput: system ptp instance list output
"""
command = source_openrc("system ptp-instance-list")
output = self.ssh_connection.send(command)
@@ -89,11 +143,12 @@ class SystemPTPInstanceKeywords(BaseKeyword):
return system_ptp_instance_list_output
def system_ptp_instance_apply(self) -> str :
def system_ptp_instance_apply(self) -> str:
"""
Apply the PTP Instance config
Returns:
str: Command output
"""
command = source_openrc("system ptp-instance-apply")
output = self.ssh_connection.send(command)
@@ -103,4 +158,4 @@ class SystemPTPInstanceKeywords(BaseKeyword):
if output and len(output) > 0:
return output[0].strip()
else:
raise "Output is expected to be a List with one element."
raise "Output is expected to be a List with one element."

View File

@@ -2,6 +2,7 @@ from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.ptp.objects.system_ptp_instance_output import SystemPTPInstanceOutput
class SystemPTPInstanceParameterKeywords(BaseKeyword):
"""
Provides methods to interact with the system ptp-instance-parameter
@@ -20,42 +21,78 @@ class SystemPTPInstanceParameterKeywords(BaseKeyword):
"""
self.ssh_connection = ssh_connection
def system_ptp_instance_parameter_add(self,name: str, parameters: str) -> SystemPTPInstanceOutput :
def system_ptp_instance_parameter_add(self, instance_name: str, parameter: str) -> SystemPTPInstanceOutput:
"""
Add parameter to instance
Add a parameter to a PTP instance.
Args:
name : name or UUID of instance
parameters: parameters can be applied to the service of a instance
Ex : system ptp-instance-parameter-add ptp1 masterOnly=0
system ptp-instance-parameter-add ptp1 masterOnly=0 domainNumber=0
instance_name (str): Name or UUID of the PTP instance
parameter (str): Parameter key=value pair to add
Returns:
SystemPTPInstanceOutput: Output of the command
"""
cmd = f"system ptp-instance-parameter-add {name} {parameters}"
cmd = f"system ptp-instance-parameter-add {instance_name} {parameter}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_instance_parameter_add_output = SystemPTPInstanceOutput(output)
return system_ptp_instance_parameter_add_output
def system_ptp_instance_parameter_delete(self,name: str, parameters: str) -> SystemPTPInstanceOutput :
def system_ptp_instance_parameter_delete(self, instance_name: str, parameter: str) -> SystemPTPInstanceOutput:
"""
Delete parameter to instance
Delete a parameter from a PTP instance.
Args:
name : name or UUID of instance
parameters: parameters can be delete from the service of a instance
Ex : system ptp-instance-parameter-delete ptp1 masterOnly=0
system ptp-instance-parameter-delete ptp1 masterOnly=0 domainNumber=0
instance_name (str): Name or UUID of the PTP instance
parameter (str): Parameter key to delete
Returns:
SystemPTPInstanceOutput: Output of the command
"""
cmd = f"system ptp-instance-parameter-delete {name} {parameters}"
cmd = f"system ptp-instance-parameter-delete {instance_name} {parameter}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_instance_parameter_delete_output = SystemPTPInstanceOutput(output)
return system_ptp_instance_parameter_delete_output
return system_ptp_instance_parameter_delete_output
def system_ptp_instance_parameter_add_with_error(self, instance: str, parameter: str) -> str:
"""
Add a parameter to a PTP instance for errors.
Args:
instance (str): name or UUID of instance
parameter (str): parameter keypair to add
Returns:
str: Command output
"""
cmd = f"system ptp-instance-parameter-add {instance} {parameter}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def system_ptp_instance_parameter_delete_with_error(self, instance: str, parameter: str) -> str:
"""
Delete a parameter from a PTP instance for errors.
Args:
instance (str): name or UUID of instance
parameter (str): name or UUID of parameter
Returns:
str: Command output
"""
cmd = f"system ptp-instance-parameter-delete {instance} {parameter}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)

View File

@@ -1,7 +1,8 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.ptp.objects.system_ptp_interface_output import SystemPTPInterfaceOutput
from keywords.cloud_platform.system.ptp.objects.system_ptp_interface_list_output import SystemPTPInterfaceListOutput
from keywords.cloud_platform.system.ptp.objects.system_ptp_interface_output import SystemPTPInterfaceOutput
class SystemPTPInterfaceKeywords(BaseKeyword):
"""
@@ -21,66 +22,125 @@ class SystemPTPInterfaceKeywords(BaseKeyword):
"""
self.ssh_connection = ssh_connection
def system_ptp_interface_add(self,name: str, ptp_instance_name: str) -> SystemPTPInterfaceOutput :
def system_ptp_interface_add(self, name: str, ptp_instance_name: str) -> SystemPTPInterfaceOutput:
"""
Add a PTP interface
Args:
name : name of interface
ptp_instance_name: ptp instance name
name (str): name of interface
ptp_instance_name (str): ptp instance name
Returns:
SystemPTPInterfaceOutput: Output of the command
"""
cmd = f"system ptp-interface-add {name} {ptp_instance_name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_interface_add_output = SystemPTPInterfaceOutput(output)
return system_ptp_interface_add_output
def system_ptp_interface_delete(self,name: str) -> str :
def system_ptp_interface_add_with_error(self, name: str, ptp_instance_name: str) -> str:
"""
Delete an interface
Add a PTP interface for errors.
Args:
name : name or UUID of interface
name (str): name of interface
ptp_instance_name (str): ptp instance name
Returns:
str: Output of the command
"""
cmd = f"system ptp-interface-add {name} {ptp_instance_name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def system_ptp_interface_delete(self, name: str) -> str:
"""
Delete an interface
Args:
name (str): name or UUID of interface
Returns:
str: Success message
"""
cmd = f"system ptp-interface-delete {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
# output is a List of 1 string. "['Deleted PTP interface: xxxx-xxxx-xxx\n']"
if output and len(output) > 0:
return output[0].strip()
else:
raise "Output is expected to be a List with one element."
def get_system_ptp_interface_show(self,name: str) -> SystemPTPInterfaceOutput :
raise Exception("Output is expected to be a List with one element.")
def system_ptp_interface_delete_with_error(self, name: str) -> str:
"""
Show PTP interface attributes.
Delete an interface for errors.
Args:
name : name or UUID of interface
name (str): name or UUID of interface
Returns:
str: Command output
"""
cmd = f"system ptp-interface-delete {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def get_system_ptp_interface_show(self, name: str) -> SystemPTPInterfaceOutput:
"""
Show PTP interface attributes.
Args:
name (str): name or UUID of interface
Returns:
SystemPTPInterfaceOutput: Output of the command
"""
cmd = f"system ptp-interface-show {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_interface_show_output = SystemPTPInterfaceOutput(output)
return system_ptp_interface_show_output
def get_system_ptp_interface_list(self) -> SystemPTPInterfaceListOutput :
def get_system_ptp_interface_show_with_error(self, name: str) -> str:
"""
Show PTP interface attributes for errors.
Args:
name (str): name or UUID of interface
Returns:
str: Output of the command
"""
cmd = f"system ptp-interface-show {name}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def get_system_ptp_interface_list(self) -> SystemPTPInterfaceListOutput:
"""
List all PTP interfaces
Returns:
SystemPTPInterfaceListOutput: System PTP Interface List Output
"""
command = source_openrc("system ptp-interface-list")
output = self.ssh_connection.send(command)
@@ -89,42 +149,86 @@ class SystemPTPInterfaceKeywords(BaseKeyword):
return system_ptp_interface_list_output
def system_ptp_interface_parameter_add(self,name: str, parameters: str) -> SystemPTPInterfaceOutput :
def system_ptp_interface_parameter_add(self, name: str, parameters: str) -> SystemPTPInterfaceOutput:
"""
Add interface level parameters
Args:
name : name or uuid of interface
parameters: parameters can be applied to the interface
name (str): name or uuid of interface
parameters (str): parameters can be applied to the interface
Ex : system ptp-interface-parameter-add ptpinterface masterOnly=1
system ptp-interface-parameter-add ptpinterface masterOnly=0 domainNumber=0
Returns:
SystemPTPInterfaceOutput: Output of the command
"""
cmd = f"system ptp-interface-parameter-add {name} {parameters}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_interface_parameter_add_output = SystemPTPInterfaceOutput(output)
return system_ptp_interface_parameter_add_output
def system_ptp_interface_parameter_delete(self,name: str, parameters: str) -> SystemPTPInterfaceOutput :
def system_ptp_interface_parameter_add_with_error(self, name: str, parameters: str) -> str:
"""
Add interface level parameters for errors.
Args:
name (str): name or uuid of interface
parameters (str): parameters can be applied to the interface
Ex : system ptp-interface-parameter-add ptpinterface masterOnly=1
system ptp-interface-parameter-add ptpinterface masterOnly=0 domainNumber=0
Returns:
str: Output of the command
"""
cmd = f"system ptp-interface-parameter-add {name} {parameters}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def system_ptp_interface_parameter_delete(self, name: str, parameters: str) -> SystemPTPInterfaceOutput:
"""
Delete interface level parameters
Args:
name : name or uuid of interface
parameters: parameters can be delete from the interface
name (str): name or uuid of interface
parameters (str): parameters can be delete from the interface
Ex : system ptp-interface-parameter-delete ptpinterface masterOnly=1
system ptp-interface-parameter-delete ptpinterface masterOnly=0 domainNumber=0
Returns:
SystemPTPInterfaceOutput: Output of the command
"""
cmd = f"system ptp-interface-parameter-delete {name} {parameters}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_interface_parameter_delete_output = SystemPTPInterfaceOutput(output)
return system_ptp_interface_parameter_delete_output
def system_ptp_interface_parameter_delete_with_error(self, name: str, parameters: str) -> str:
"""
Delete interface level parameters for errors.
Args:
name (str): name or uuid of interface
parameters (str): parameters can be delete from the interface
Ex : system ptp-interface-parameter-delete ptpinterface masterOnly=1
system ptp-interface-parameter-delete ptpinterface masterOnly=0 domainNumber=0
Returns:
str: Output of the command
"""
cmd = f"system ptp-interface-parameter-delete {name} {parameters}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)

View File

@@ -1,7 +1,8 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.ptp.objects.system_ptp_parameter_output import SystemPTPParameterOutput
from keywords.cloud_platform.system.ptp.objects.system_ptp_parameter_list_output import SystemPTPParameterListOutput
from keywords.cloud_platform.system.ptp.objects.system_ptp_parameter_output import SystemPTPParameterOutput
class SystemPTPParameterKeywords(BaseKeyword):
"""
@@ -21,46 +22,68 @@ class SystemPTPParameterKeywords(BaseKeyword):
"""
self.ssh_connection = ssh_connection
def system_ptp_parameter_modify(self,uuid: str, new_value: str) -> SystemPTPParameterOutput :
def system_ptp_parameter_modify(self, uuid: str, new_value: str) -> SystemPTPParameterOutput:
"""
Modify a PTP parameter
Args:
uuid : uuid for ptp parameter name
new_value: new value for PTP parameter
uuid (str): uuid for ptp parameter name
new_value (str): new value for PTP parameter
Returns:
SystemPTPParameterOutput: Output of the command
"""
cmd = f"system ptp-parameter-modify {uuid} {new_value}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_parameter_modify_output = SystemPTPParameterOutput(output)
return system_ptp_parameter_modify_output
def get_system_ptp_parameter_show(self,uuid: str) -> SystemPTPParameterOutput :
def system_ptp_parameter_modify_with_error(self, uuid: str, new_value: str) -> str:
"""
show an PTP parameter attributes.
Modify a PTP parameter for errors.
Args:
uuid : uuid for ptp parameter name
uuid (str): uuid for ptp parameter name
new_value (str): new value for PTP parameter
Returns:
str: Output of the command
"""
cmd = f"system ptp-parameter-modify {uuid} {new_value}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
if isinstance(output, list) and len(output) > 0:
return output[0].strip()
return output.strip() if isinstance(output, str) else str(output)
def get_system_ptp_parameter_show(self, uuid: str) -> SystemPTPParameterOutput:
"""
show an PTP parameter attributes.
Args:
uuid (str): uuid for ptp parameter name
Returns:
SystemPTPParameterOutput: system ptp parameter output
"""
cmd = f"system ptp-parameter-show {uuid}"
command = source_openrc(cmd)
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_ptp_parameter_show_output = SystemPTPParameterOutput(output)
return system_ptp_parameter_show_output
def get_system_ptp_parameter_list(self) -> SystemPTPParameterListOutput :
def get_system_ptp_parameter_list(self) -> SystemPTPParameterListOutput:
"""
List all PTP parameters
Returns:
SystemPTPParameterListOutput: system ptp parameter list output
"""
command = source_openrc("system ptp-parameter-list")
output = self.ssh_connection.send(command)
@@ -68,5 +91,3 @@ class SystemPTPParameterKeywords(BaseKeyword):
system_ptp_parameter_list_output = SystemPTPParameterListOutput(output)
return system_ptp_parameter_list_output

View File

@@ -108,17 +108,20 @@ class GnssKeywords(BaseKeyword):
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gpio_switch_port = ptp_config.get_host(host_name).get_nic(nic).get_gpio_switch_port()
if not gpio_switch_port:
raise Exception(f"GPIO switch port not configured for {hostname} {nic}")
export_cmd = f"[ ! -d /sys/class/gpio/gpio{gpio_switch_port} ] && " f"echo {gpio_switch_port} | sudo tee /sys/class/gpio/export > /dev/null"
gnss_ssh_connection.send_as_sudo(export_cmd)
# Export GPIO pin if not already exported
export_cmd = f"if [ ! -d /sys/class/gpio/gpio{gpio_switch_port} ]; then echo {gpio_switch_port} > /sys/class/gpio/export; fi"
gnss_ssh_connection.send(export_cmd)
# Set direction to output
direction_cmd = f"echo out | tee /sys/class/gpio/gpio{gpio_switch_port}/direction > /dev/null"
gnss_ssh_connection.send_as_sudo(direction_cmd)
direction_cmd = f"echo out > /sys/class/gpio/gpio{gpio_switch_port}/direction"
gnss_ssh_connection.send(direction_cmd)
# Set GPIO value to 1 (power on GNSS)
value_cmd = f"echo 1 | tee /sys/class/gpio/gpio{gpio_switch_port}/value > /dev/null"
gnss_ssh_connection.send_as_sudo(value_cmd)
value_cmd = f"echo 1 > /sys/class/gpio/gpio{gpio_switch_port}/value"
gnss_ssh_connection.send(value_cmd)
self.validate_sma1_and_gnss_1pps_eec_pps_dpll_status_with_retry(hostname, cgu_location, timeout=1200, polling_interval=120)
@@ -140,16 +143,20 @@ class GnssKeywords(BaseKeyword):
cgu_location = f"/sys/kernel/debug/ice/{pci_address}/cgu"
gpio_switch_port = ptp_config.get_host(host_name).get_nic(nic).get_gpio_switch_port()
export_cmd = f"[ ! -d /sys/class/gpio/gpio{gpio_switch_port} ] && " f"echo {gpio_switch_port} | sudo tee /sys/class/gpio/export > /dev/null"
gnss_ssh_connection.send_as_sudo(export_cmd)
if not gpio_switch_port:
raise Exception(f"GPIO switch port not configured for {hostname} {nic}")
# Export GPIO pin if not already exported
export_cmd = f"if [ ! -d /sys/class/gpio/gpio{gpio_switch_port} ]; then echo {gpio_switch_port} > /sys/class/gpio/export; fi"
gnss_ssh_connection.send(export_cmd)
# Set direction to output
direction_cmd = f"echo out | tee /sys/class/gpio/gpio{gpio_switch_port}/direction > /dev/null"
gnss_ssh_connection.send_as_sudo(direction_cmd)
direction_cmd = f"echo out > /sys/class/gpio/gpio{gpio_switch_port}/direction"
gnss_ssh_connection.send(direction_cmd)
# Set GPIO value to 0 (power off GNSS)
value_cmd = f"echo 0 | tee /sys/class/gpio/gpio{gpio_switch_port}/value > /dev/null"
gnss_ssh_connection.send_as_sudo(value_cmd)
value_cmd = f"echo 0 > /sys/class/gpio/gpio{gpio_switch_port}/value"
gnss_ssh_connection.send(value_cmd)
# Expected states for validation
expected_cgu_input_state = "invalid"

View File

@@ -32,12 +32,13 @@ class PhcCtlKeywords(BaseKeyword):
Get the current time of the PHC clock device
Args:
device : may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
Example:
phc_ctl[643764.828]: clock time is 1739856255.215802036 or Tue Feb 18 05:24:15 2025
Returns:
str: output of command
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} get")
self.validate_success_return_code(self.ssh_connection)
@@ -52,12 +53,13 @@ class PhcCtlKeywords(BaseKeyword):
Compare the PHC clock device to CLOCK_REALTIME
Args:
device : may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
Example:
phc_ctl[645639.878]: offset from CLOCK_REALTIME is -37000000008ns
Returns:
str: output of command
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} cmp")
self.validate_success_return_code(self.ssh_connection)
@@ -72,13 +74,14 @@ class PhcCtlKeywords(BaseKeyword):
Adjust the PHC clock by an amount of seconds provided
Args:
device : may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
seconds :
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
seconds (str): seconds
Example:
phc_ctl[646368.470]: adjusted clock by 0.000001 seconds
Returns:
str: output of command
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} adj {seconds}")
self.validate_success_return_code(self.ssh_connection)
@@ -93,13 +96,14 @@ class PhcCtlKeywords(BaseKeyword):
Set the PHC clock time to the value specified in seconds. Defaults to reading CLOCK_REALTIME if no value is provided.
Args:
device : may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
seconds :
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
seconds (str): seconds
Example :
phc_ctl[647759.416]: set clock time to 1739860212.789318498 or Tue Feb 18 06:30:12 2025
Returns:
str: output of command
"""
if seconds:
cmd = f"phc_ctl {device} set {seconds}"
@@ -156,7 +160,7 @@ class PhcCtlKeywords(BaseKeyword):
run_as_root(f"nohup bash {remote_script_path} & echo $! > /tmp/phc_loop.pid")
alarm_keywords = AlarmListKeywords(LabConnectionKeywords().get_active_controller_ssh())
alarm_descriptions = ", ".join(alarm_keywords.alarm_to_str(alarm_obj) for alarm_obj in alarms)
alarm_descriptions = ", ".join(str(alarm_obj) for alarm_obj in alarms)
get_logger().log_info(f"Waiting for alarms: {alarm_descriptions}")
@@ -180,5 +184,5 @@ class PhcCtlKeywords(BaseKeyword):
run_as_root("rm -f /tmp/phc_loop.sh /tmp/phc_loop.pid")
if not all_matched:
observed_alarm_strs = [alarm_keywords.alarm_to_str(observed_alarm_obj) for observed_alarm_obj in observed_alarms]
observed_alarm_strs = [str(observed_alarm_obj) for observed_alarm_obj in observed_alarms]
raise TimeoutError(f"Timeout: Expected alarms not found within {timeout}s.\n" f"Expected: {alarm_descriptions}\n" f"Observed:\n" + "\n".join(observed_alarm_strs))

View File

@@ -6,11 +6,10 @@ from pytest import mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.resources.resource_finder import get_stx_resource_path
from framework.validation.validation import validate_equals_with_retry
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.fault_management.alarms.objects.alarm_list_object import AlarmListObject
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.ptp_readiness_watcher import PTPReadinessWatcher
from keywords.cloud_platform.system.ptp.ptp_readiness_keywords import PTPReadinessKeywords
from keywords.cloud_platform.system.ptp.ptp_setup_executor_keywords import PTPSetupExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_teardown_executor_keywords import PTPTeardownExecutorKeywords
from keywords.cloud_platform.system.ptp.ptp_verify_config_keywords import PTPVerifyConfigKeywords
@@ -131,7 +130,6 @@ def test_ptp_operation_interface_down_and_up():
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
ip_keywords = IPKeywords(ssh_connection)
ptp_readiness_watcher = PTPReadinessWatcher()
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
@@ -203,7 +201,8 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic1_interface} goes down.")
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["MASTER", "MASTER"])
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-1"))
ptp_readiness_keywords.wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["MASTER", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic1_interface} goes down.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_iface_down_ptp_setup)
@@ -219,7 +218,8 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic1_interface} comes up.")
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["SLAVE", "MASTER"])
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-1"))
ptp_readiness_keywords.wait_for_port_state_appear_in_port_data_set("ptp1", "controller-1", ["SLAVE", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic1_interface} comes up.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_iface_up_ptp_setup)
@@ -293,7 +293,8 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic2_interface} goes down.")
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["MASTER", "MASTER"])
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-1"))
ptp_readiness_keywords.wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["MASTER", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic2_interface} goes down.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_iface_down_ptp_setup)
@@ -306,7 +307,8 @@ def test_ptp_operation_interface_down_and_up():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([not_locked_alarm_obj])
get_logger().log_info(f"Waiting for PMC port states after interface {ctrl0_nic2_interface} comes up.")
ptp_readiness_watcher.wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["SLAVE", "MASTER"])
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-1"))
ptp_readiness_keywords.wait_for_port_state_appear_in_port_data_set("ptp4", "controller-1", ["SLAVE", "MASTER"])
get_logger().log_info(f"Verifying PMC data after interface {ctrl0_nic2_interface} comes up.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_iface_up_ptp_setup)
@@ -342,8 +344,6 @@ def test_ptp_operation_sma_disabled_and_enable():
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
ptp_readiness_watcher = PTPReadinessWatcher()
get_logger().log_info("Verifying PTP operation and corresponding status changes when SMA is disabled.")
ctrl0_nic2_sma1_disable_ptp_selection = [("ptp3", "controller-0", []), ("ptp4", "controller-1", [])]
@@ -450,7 +450,8 @@ def test_ptp_operation_sma_disabled_and_enable():
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([not_locked_alarm_obj, signal_loss_alarm_obj])
get_logger().log_info("Waiting for clock class after SMA1 is disabled")
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 7)
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-0"))
ptp_readiness_keywords.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 7)
get_logger().log_info("Verifying PMC data after SMA1 is disabled")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_sma1_disable_exp_ptp_setup)
@@ -469,7 +470,8 @@ def test_ptp_operation_sma_disabled_and_enable():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([alarm_list_object])
get_logger().log_info("Waiting for clock class after SMA1 is enabled")
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-0"))
ptp_readiness_keywords.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
get_logger().log_info("Verifying PMC data after SMA1 is enabled")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic2_sma1_enable_exp_ptp_setup)
@@ -499,8 +501,6 @@ def test_ptp_operation_gnss_off_and_on():
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
ptp_readiness_watcher = PTPReadinessWatcher()
get_logger().log_info("Verifying PTP operation and status when GNSS is turned off...")
selected_instances = [("ptp1", "controller-0", []), ("ptp1", "controller-1", []), ("ptp3", "controller-0", []), ("ptp4", "controller-1", [])]
@@ -696,10 +696,11 @@ def test_ptp_operation_gnss_off_and_on():
get_logger().log_info("Verifying clock class degradation after GNSS is off.")
# The clock is in "Holdover" or "Degraded" mode
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 248)
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 248)
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-0"))
ptp_readiness_keywords.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 248)
ptp_readiness_keywords.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 248)
# GNSS loss
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 165)
ptp_readiness_keywords.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 165)
get_logger().log_info("Verifying PMC configuration after GNSS is off.")
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_nic1_gnss_disable_exp_ptp_setup)
@@ -712,9 +713,9 @@ def test_ptp_operation_gnss_off_and_on():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ptp1_not_locked_alarm_obj, ptp4_not_locked_alarm_obj, pps_signal_loss_alarm_obj, gnss_signal_loss_alarm_obj])
get_logger().log_info("Verifying clock class restoration after GNSS is on.")
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 6)
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 6)
ptp_readiness_keywords.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 6)
ptp_readiness_keywords.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp3", "controller-0", 6)
ptp_readiness_keywords.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 6)
get_logger().log_info("Verifying PMC configuration after GNSS is restored.")
ctrl0_nic1_gnss_enable_exp_dict_overrides = {"ptp4l": [{"name": "ptp4", "controller-1": {"grandmaster_settings": {"clock_class": 165}}}]}
@@ -820,7 +821,6 @@ def test_ptp_operation_service_stop_start_restart():
ptp_setup_template_path = get_stx_resource_path("resources/ptp/setup/ptp_configuration_expectation_compute.json5")
systemctl_keywords = SystemCTLKeywords(ssh_connection)
ptp_service_status_validator = PTPServiceStatusValidator(ssh_connection)
ptp_readiness_watcher = PTPReadinessWatcher()
get_logger().log_info("Stopping ptp4l@ptp1.service on controller-0...")
@@ -921,7 +921,8 @@ def test_ptp_operation_service_stop_start_restart():
AlarmListKeywords(ssh_connection).wait_for_alarms_to_appear([ctrl0_alarm, ctrl1_alarm])
get_logger().log_info("Verifying degraded PMC values after service stop...")
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-1", 165)
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-1"))
ptp_readiness_keywords.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-1", 165)
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, ctrl0_ptp1_service_stop_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values(check_domain=False)
@@ -934,7 +935,7 @@ def test_ptp_operation_service_stop_start_restart():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ctrl0_alarm, ctrl1_alarm])
get_logger().log_info("Verifying full PMC configuration after service start...")
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-1", 6)
ptp_readiness_keywords.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-1", 6)
start_exp_ptp_setup = ptp_setup_keywords.filter_and_render_ptp_config(ptp_setup_template_path, selected_instances)
ptp_verify_config_keywords = PTPVerifyConfigKeywords(ssh_connection, start_exp_ptp_setup)
ptp_verify_config_keywords.verify_ptp_pmc_values(check_domain=False)
@@ -948,6 +949,7 @@ def test_ptp_operation_service_stop_start_restart():
AlarmListKeywords(ssh_connection).wait_for_alarms_cleared([ctrl0_alarm, ctrl1_alarm])
get_logger().log_info("Verifying PMC configuration and clock class after service restart...")
ptp_readiness_watcher.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 6)
ptp_readiness_watcher.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 6)
ptp_readiness_keywords = PTPReadinessKeywords(LabConnectionKeywords().get_ssh_for_hostname("controller-0"))
ptp_readiness_keywords.wait_for_clock_class_appear_in_grandmaster_settings_np("ptp1", "controller-0", 6)
ptp_readiness_keywords.wait_for_gm_clock_class_appear_in_parent_data_set("ptp1", "controller-0", 6)
ptp_verify_config_keywords.verify_ptp_pmc_values(check_domain=False)

View File

@@ -0,0 +1,147 @@
from pytest import mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_str_contains
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.system_host_if_ptp_keywords import SystemHostIfPTPKeywords
from keywords.cloud_platform.system.ptp.system_ptp_instance_keywords import SystemPTPInstanceKeywords
from keywords.cloud_platform.system.ptp.system_ptp_interface_keywords import SystemPTPInterfaceKeywords
@mark.p3
def test_host_if_ptp_assign_errors():
"""
Test error scenarios for host-if-ptp-assign command.
Test Steps:
- Try to assign non-existent interface to host
- Try to assign interface to non-existent host
- Try to make already existent association more than once
- Try to assign interface from non-existent ptp instance to host
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_host_if_ptp_keywords = SystemHostIfPTPKeywords(ssh_connection)
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
system_ptp_interface_keywords = SystemPTPInterfaceKeywords(ssh_connection)
valid_hostname = "controller-0"
host_name = valid_hostname.replace("-", "_")
# Fetch interface from default.json5 using ConfigurationManager
ptp_config = ConfigurationManager.get_ptp_config()
valid_interface = ptp_config.get_host(host_name).get_nic("nic1").get_nic_connection().get_interface()
valid_instance = "ptp7"
valid_ptp_interface = "ptp7if1"
non_existent_interface = "00000000-0000-0000-0000-000000000001"
non_existent_hostname = "compute-100"
non_existent_ptp_interface = "00000000-0000-0000-0000-000000000002"
get_logger().log_info("Trying to assign non-existent interface to host")
error_message = system_host_if_ptp_keywords.system_host_if_ptp_assign_with_error(valid_hostname, non_existent_interface, valid_ptp_interface)
expected_error = f"Interface not found: host {valid_hostname} interface {non_existent_interface}"
validate_str_contains(error_message, expected_error, "Error message for non-existent interface")
get_logger().log_info("Trying to assign interface to non-existent host")
error_message = system_host_if_ptp_keywords.system_host_if_ptp_assign_with_error(non_existent_hostname, valid_interface, valid_ptp_interface)
expected_error = f"host not found: {non_existent_hostname}"
validate_str_contains(error_message, expected_error, "Error message for non-existent host")
# Create PTP instance
get_logger().log_info(f"Creating PTP instance {valid_instance}")
if "PTP instance not found" in system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(valid_instance):
system_ptp_instance_keywords.system_ptp_instance_add(valid_instance, "ptp4l")
else:
get_logger().log_info("Failed to create PTP instance or resources already exist")
# Create PTP interface
get_logger().log_info(f"Creating PTP interface {valid_ptp_interface}")
if "PTP interface not found" in system_ptp_interface_keywords.get_system_ptp_interface_show_with_error(valid_ptp_interface):
system_ptp_interface_keywords.system_ptp_interface_add(valid_ptp_interface, valid_instance)
else:
get_logger().log_info("Faled to create PTP interface or resources already exist")
# Associate PTP to an interface at host.
try:
get_logger().log_info("Associate PTP to an interface at host")
system_host_if_ptp_keywords.system_host_if_ptp_assign(valid_hostname, valid_interface, valid_ptp_interface)
except Exception:
get_logger().log_info("Association may already exist, continuing with test")
get_logger().log_info("Trying to make already existent association more than once")
error_message = system_host_if_ptp_keywords.system_host_if_ptp_assign_with_error(valid_hostname, valid_interface, valid_ptp_interface)
expected_error = "PTP interface is already associated to interface"
validate_str_contains(error_message, expected_error, "Error message for duplicate association")
get_logger().log_info("Trying to assign interface from non-existent ptp instance to host")
error_message = system_host_if_ptp_keywords.system_host_if_ptp_assign_with_error(valid_hostname, valid_interface, non_existent_ptp_interface)
expected_error = f"PTP interface not found: {non_existent_ptp_interface}"
validate_str_contains(error_message, expected_error, "Error message for non-existent PTP interface")
@mark.p3
def test_host_if_ptp_remove_errors():
"""
Test error scenarios for host-if-ptp-remove command.
Test Steps:
- Try to remove non-existent interface from host
- Try to remove interface from non-existent host
- Try to remove interface from non-existent ptp instance
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_host_if_ptp_keywords = SystemHostIfPTPKeywords(ssh_connection)
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
system_ptp_interface_keywords = SystemPTPInterfaceKeywords(ssh_connection)
valid_instance = "ptp7"
valid_ptp_interface = "ptp7if1"
valid_hostname = "controller-0"
host_name = valid_hostname.replace("-", "_")
# Fetch interface from default.json5 using ConfigurationManager
ptp_config = ConfigurationManager.get_ptp_config()
valid_interface = ptp_config.get_host(host_name).get_nic("nic1").get_nic_connection().get_interface()
valid_ptp_interface = "ptp7if1"
non_existent_interface = "00000000-0000-0000-0000-000000000001"
non_existent_hostname = "compute-100"
non_existent_ptp_interface = "00000000-0000-0000-0000-000000000002"
# Create PTP instance
get_logger().log_info(f"Creating PTP instance {valid_instance}")
if "PTP instance not found" in system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(valid_instance):
system_ptp_instance_keywords.system_ptp_instance_add(valid_instance, "ptp4l")
else:
get_logger().log_info("Failed to create PTP instance or resources already exist")
# Create PTP interface
get_logger().log_info(f"Creating PTP interface {valid_ptp_interface}")
if "PTP interface not found" in system_ptp_interface_keywords.get_system_ptp_interface_show_with_error(valid_ptp_interface):
system_ptp_interface_keywords.system_ptp_interface_add(valid_ptp_interface, valid_instance)
else:
get_logger().log_info("Faled to create PTP interface or resources already exist")
get_logger().log_info("Trying to remove non-existent interface from host")
error_message = system_host_if_ptp_keywords.system_host_if_ptp_remove_with_error(valid_hostname, non_existent_interface, valid_ptp_interface)
expected_error = f"Interface not found: host {valid_hostname} interface {non_existent_interface}"
validate_str_contains(error_message, expected_error, "Error message for non-existent interface")
get_logger().log_info("Trying to remove interface from non-existent host")
error_message = system_host_if_ptp_keywords.system_host_if_ptp_remove_with_error(non_existent_hostname, valid_interface, valid_ptp_interface)
expected_error = f"host not found: {non_existent_hostname}"
validate_str_contains(error_message, expected_error, "Error message for non-existent host")
get_logger().log_info("Trying to remove interface from non-existent ptp instance")
error_message = system_host_if_ptp_keywords.system_host_if_ptp_remove_with_error(valid_hostname, valid_interface, non_existent_ptp_interface)
expected_error = f"PTP interface not found: {non_existent_ptp_interface}"
validate_str_contains(error_message, expected_error, "Error message for non-existent PTP interface")

View File

@@ -0,0 +1,87 @@
from pytest import mark
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_str_contains
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.system_host_ptp_instance_keywords import SystemHostPTPInstanceKeywords
from starlingx.keywords.cloud_platform.system.ptp.system_ptp_instance_keywords import SystemPTPInstanceKeywords
@mark.p3
def test_host_ptp_instance_assign_errors():
"""
Test error scenarios for host-ptp-instance-assign command.
Test Steps:
- Try to assign instance to a non-existent host
- Try to assign non-existent instance to a host
- Try to assign instance to a host more than once
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_host_ptp_instance_keywords = SystemHostPTPInstanceKeywords(ssh_connection)
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
valid_hostname = "controller-0"
valid_instance = "ptp7"
non_existent_hostname = "compute-100"
non_existent_instance = "no_ptp8"
# Create PTP instance
get_logger().log_info(f"Creating PTP instance {valid_instance}")
if "PTP instance not found" in system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(valid_instance):
system_ptp_instance_keywords.system_ptp_instance_add(valid_instance, "ptp4l")
else:
get_logger().log_info("Failed to create PTP instance or resources already exist")
get_logger().log_info("Trying to assign instance to a non-existent host")
error_message = system_host_ptp_instance_keywords.system_host_ptp_instance_assign_with_error(non_existent_hostname, valid_instance)
validate_str_contains(error_message, f"host not found: {non_existent_hostname}", "Error message for non-existent host")
get_logger().log_info("Trying to assign non-existent instance to a host")
error_message = system_host_ptp_instance_keywords.system_host_ptp_instance_assign_with_error(valid_hostname, non_existent_instance)
validate_str_contains(error_message, f"PTP instance not found: {non_existent_instance}", "Error message for non-existent instance")
try:
get_logger().log_info("Creating initial association")
system_host_ptp_instance_keywords.system_host_ptp_instance_assign(valid_hostname, valid_instance)
except Exception:
get_logger().log_info("Association may already exist, continuing with test")
get_logger().log_info("Trying to assign instance to a host more than once")
error_message = system_host_ptp_instance_keywords.system_host_ptp_instance_assign_with_error(valid_hostname, valid_instance)
validate_str_contains(error_message, "PTP instance is already associated to host", "Error message for duplicate association")
@mark.p3
def test_host_ptp_instance_remove_errors():
"""
Test error scenarios for host-ptp-instance-remove command.
Test Steps:
- Try to remove instance from non-existent host
- Try to remove non-existent instance from host
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_host_ptp_instance_keywords = SystemHostPTPInstanceKeywords(ssh_connection)
# Use specified values
valid_hostname = "controller-0"
valid_instance = "ptp7"
non_existent_hostname = "compute-100"
non_existent_instance = "no_ptp8"
get_logger().log_info("Trying to remove instance from non-existent host")
error_message = system_host_ptp_instance_keywords.system_host_ptp_instance_remove_with_error(non_existent_hostname, valid_instance)
validate_str_contains(error_message, f"host not found: {non_existent_hostname}", "Error message for non-existent host")
get_logger().log_info("Trying to remove non-existent instance from host")
error_message = system_host_ptp_instance_keywords.system_host_ptp_instance_remove_with_error(valid_hostname, non_existent_instance)
validate_str_contains(error_message, f"PTP instance not found: {non_existent_instance}", "Error message for non-existent instance")

View File

@@ -0,0 +1,148 @@
from pytest import mark
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_str_contains
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.system_ptp_instance_keywords import SystemPTPInstanceKeywords
from keywords.cloud_platform.system.ptp.system_ptp_instance_parameter_keywords import SystemPTPInstanceParameterKeywords
@mark.p3
def test_ptp_instance_add_errors():
"""
Test error scenarios for ptp-instance-add command.
Test Steps:
- Try to create a ptp-instance of invalid type
Expected Results:
- Appropriate error message is returned
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
instance_name = "ptp7"
invalid_type = "random"
get_logger().log_info("Trying to create a ptp-instance of invalid type")
error_message = system_ptp_instance_keywords.system_ptp_instance_add_with_error(instance_name, invalid_type)
validate_str_contains(error_message, "usage: system ptp-instance-add", "Error message for invalid PTP instance type")
@mark.p3
def test_ptp_instance_delete_errors():
"""
Test error scenarios for ptp-instance-delete command.
Test Steps:
- Try to delete a non-existent ptp-instance
Expected Results:
- Appropriate error message is returned
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
non_existent_instance = "no_ptp8"
get_logger().log_info("Trying to delete a non-existent ptp-instance")
error_message = system_ptp_instance_keywords.system_ptp_instance_delete_with_error(non_existent_instance)
validate_str_contains(error_message, f"PTP instance not found: {non_existent_instance}", "Error message for non-existent PTP instance")
@mark.p3
def test_ptp_instance_show_errors():
"""
Test error scenarios for ptp-instance-show command.
Test Steps:
- Try to show info from a non-existent ptp-instance
Expected Results:
- Appropriate error message is returned
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
non_existent_instance = "no_ptp8"
get_logger().log_info("Trying to show info from a non-existent ptp-instance")
error_message = system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(non_existent_instance)
validate_str_contains(error_message, f"PTP instance not found: {non_existent_instance}", "Error message for non-existent PTP instance")
@mark.p3
def test_ptp_instance_parameter_delete_errors():
"""
Test error scenarios for ptp-instance-parameter-delete command.
Test Steps:
- Try to delete parameter from an invalid instance
- Try to delete parameter with an invalid parameter UUID
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
system_ptp_instance_parameter_keywords = SystemPTPInstanceParameterKeywords(ssh_connection)
invalid_instance = "no_ptp8"
valid_instance = "ptp7"
invalid_parameter = "invalid_param"
# Create PTP instance and interface
get_logger().log_info(f"Creating PTP instance {valid_instance}")
if "PTP instance not found" in system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(valid_instance):
system_ptp_instance_keywords.system_ptp_instance_add(valid_instance, "ptp4l")
else:
get_logger().log_info("Failed to create PTP instance or resources already exist")
get_logger().log_info("Trying to delete parameter from an invalid instance")
error_message = system_ptp_instance_parameter_keywords.system_ptp_instance_parameter_delete_with_error(invalid_instance, "param")
validate_str_contains(error_message, f"PTP instance not found: {invalid_instance}", "Error message for invalid instance")
get_logger().log_info("Trying to delete parameter with an invalid parameter UUID")
error_message = system_ptp_instance_parameter_keywords.system_ptp_instance_parameter_delete_with_error(valid_instance, invalid_parameter)
validate_str_contains(error_message, f"Bad PTP parameter keypair: {invalid_parameter}", "Error message for invalid parameter")
@mark.p3
def test_ptp_instance_parameter_add_errors():
"""
Test error scenarios for ptp-instance-parameter-add command.
Test Steps:
- Try to add a parameter to an invalid instance
- Try to add an invalid parameter
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
system_ptp_instance_parameter_keywords = SystemPTPInstanceParameterKeywords(ssh_connection)
invalid_instance = "no_ptp8"
valid_instance = "ptp7"
invalid_parameter = "invalid_param"
# Create PTP instance and interface
get_logger().log_info(f"Creating PTP instance {valid_instance}")
if "PTP instance not found" in system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(valid_instance):
system_ptp_instance_keywords.system_ptp_instance_add(valid_instance, "ptp4l")
else:
get_logger().log_info("Failed to create PTP instance or resources already exist")
get_logger().log_info("Trying to add a parameter to an invalid instance")
error_message = system_ptp_instance_parameter_keywords.system_ptp_instance_parameter_add_with_error(invalid_instance, "param=value")
validate_str_contains(error_message, f"PTP instance not found: {invalid_instance}", "Error message for invalid instance")
get_logger().log_info("Trying to add an invalid parameter")
error_message = system_ptp_instance_parameter_keywords.system_ptp_instance_parameter_add_with_error(valid_instance, invalid_parameter)
validate_str_contains(error_message, f"Bad PTP parameter keypair: {invalid_parameter}", "Error message for invalid parameter")

View File

@@ -0,0 +1,191 @@
from pytest import mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_str_contains
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.system_ptp_instance_keywords import SystemPTPInstanceKeywords
from keywords.cloud_platform.system.ptp.system_ptp_interface_keywords import SystemPTPInterfaceKeywords
@mark.p3
def test_ptp_interface_add_errors():
"""
Test error scenarios for ptp-interface-add command.
Test Steps:
- Try to add an interface to an invalid ptp instance
Expected Results:
- Appropriate error message is returned
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_interface_keywords = SystemPTPInterfaceKeywords(ssh_connection)
valid_hostname = "controller-0"
host_name = valid_hostname.replace("-", "_")
# Fetch interface from default.json5 using ConfigurationManager
ptp_config = ConfigurationManager.get_ptp_config()
interface_name = ptp_config.get_host(host_name).get_nic("nic1").get_base_port()
non_existent_instance = "no_ptp8"
get_logger().log_info("Trying to add an interface to an invalid ptp instance")
error_message = system_ptp_interface_keywords.system_ptp_interface_add_with_error(interface_name, non_existent_instance)
validate_str_contains(error_message, f"PTP instance not found: {non_existent_instance}", "Error message for non-existent PTP instance")
@mark.p3
def test_ptp_interface_delete_errors():
"""
Test error scenarios for ptp-interface-delete command.
Test Steps:
- Try to delete an interface that does not exist
- Try to delete an interface with an invalid UUID
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_interface_keywords = SystemPTPInterfaceKeywords(ssh_connection)
non_existent_uuid = "00000000-0000-0000-0000-000000000001"
invalid_uuid = "invalid_uuid"
get_logger().log_info("Trying to delete an interface that does not exist")
error_message = system_ptp_interface_keywords.system_ptp_interface_delete_with_error(non_existent_uuid)
validate_str_contains(error_message, f"PTP interface not found: {non_existent_uuid}", "Error message for non-existent interface")
get_logger().log_info("Trying to delete an interface with an invalid UUID")
error_message = system_ptp_interface_keywords.system_ptp_interface_delete_with_error(invalid_uuid)
validate_str_contains(error_message, f"PTP interface not found: {invalid_uuid}", "Error message for invalid UUID")
@mark.p3
def test_ptp_interface_parameter_add_errors():
"""
Test error scenarios for ptp-interface-parameter-add command.
Test Steps:
- Create PTP instance and interface
- Try to add a bad keypair ptp-parameter to the interface
- Trying to add a parameter using a non-existent interface name
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_interface_keywords = SystemPTPInterfaceKeywords(ssh_connection)
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
invalid_parameter = "sma1"
instance_name = "ptp7"
interface_name = "ptp7if1"
valid_parameter = "sma1=output"
non_existent_interface = "non_existent_interface"
# Create PTP instance and interface
get_logger().log_info(f"Creating PTP instance {instance_name}")
if "PTP instance not found" in system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(instance_name):
system_ptp_instance_keywords.system_ptp_instance_add(instance_name, "ptp4l")
else:
get_logger().log_info("Failed to create PTP instance or resources already exist")
get_logger().log_info(f"Creating PTP interface {interface_name}")
if "PTP interface not found" in system_ptp_interface_keywords.get_system_ptp_interface_show_with_error(interface_name):
system_ptp_interface_keywords.system_ptp_interface_add(interface_name, instance_name)
else:
get_logger().log_info("Faled to create PTP interface or resources already exist")
get_logger().log_info("Trying to add a bad keypair ptp-parameter to the interface")
error_message = system_ptp_interface_keywords.system_ptp_interface_parameter_add_with_error(interface_name, invalid_parameter)
validate_str_contains(error_message, f"Bad PTP parameter keypair: {invalid_parameter}", "Error message for bad key pair parameter")
get_logger().log_info("Trying to add a parameter with non-existent using invalid interface name")
error_message = system_ptp_interface_keywords.system_ptp_interface_parameter_add_with_error(non_existent_interface, valid_parameter)
validate_str_contains(error_message, f"PTP interface not found: {non_existent_interface}", "Error message for invalid interface name")
@mark.p3
def test_ptp_interface_show_errors():
"""
Test error scenarios for ptp-interface-show command.
Test Steps:
- Try to show an interface with invalid UUID
- Try to show an interface with non-existent UUID
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_interface_keywords = SystemPTPInterfaceKeywords(ssh_connection)
invalid_uuid = "invalid_uuid"
non_existent_uuid = "00000000-0000-0000-0000-000000000001"
get_logger().log_info("Trying to show an interface with invalid UUID")
error_message = system_ptp_interface_keywords.get_system_ptp_interface_show_with_error(invalid_uuid)
validate_str_contains(error_message, f"PTP interface not found: {invalid_uuid}", "Error message for invalid UUID")
get_logger().log_info("Trying to show an interface with non-existent UUID")
error_message = system_ptp_interface_keywords.get_system_ptp_interface_show_with_error(non_existent_uuid)
validate_str_contains(error_message, f"PTP interface not found: {non_existent_uuid}", "Error message for non-existent interface")
@mark.p3
def test_ptp_interface_parameter_delete_errors():
"""
Test error scenarios for ptp-interface-parameter-delete command.
Test Steps:
- Try to delete an interface parameter with non-existent interface UUID
- Try to delete an interface parameter with invalid parameter UUID
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_instance_keywords = SystemPTPInstanceKeywords(ssh_connection)
system_ptp_interface_keywords = SystemPTPInterfaceKeywords(ssh_connection)
non_existent_uuid = "00000000-0000-0000-0000-000000000001"
invalid_parameter = "sma1"
valid_parameter = "sma1=output"
instance_name = "ptp7"
interface_name = "ptp7if1"
# Create PTP instance and interface
get_logger().log_info(f"Creating PTP instance {instance_name}")
if "PTP instance not found" in system_ptp_instance_keywords.get_system_ptp_instance_show_with_error(instance_name):
system_ptp_instance_keywords.system_ptp_instance_add(instance_name, "ptp4l")
else:
get_logger().log_info("Failed to create PTP instance or resources already exist")
get_logger().log_info(f"Creating PTP interface {interface_name}")
if "PTP interface not found" in system_ptp_interface_keywords.get_system_ptp_interface_show_with_error(interface_name):
system_ptp_interface_keywords.system_ptp_interface_add(interface_name, instance_name)
else:
get_logger().log_info("Faled to create PTP interface or resources already exist")
get_logger().log_info(f"Adding PTP interface parameter {interface_name}")
if valid_parameter not in system_ptp_interface_keywords.get_system_ptp_interface_show(interface_name).get_ptp_interface().get_parameters():
system_ptp_interface_keywords.system_ptp_interface_parameter_add(interface_name, valid_parameter)
else:
get_logger().log_info("Faled to create PTP interface or resources already exist")
get_logger().log_info("Trying to delete an interface parameter with non-existent interface UUID")
error_message = system_ptp_interface_keywords.system_ptp_interface_parameter_delete_with_error(non_existent_uuid, valid_parameter)
validate_str_contains(error_message, f"PTP interface not found: {non_existent_uuid}", "Error message for non-existent interface")
get_logger().log_info("Trying to delete an interface parameter with invalid parameter")
valid_uuid = system_ptp_interface_keywords.get_system_ptp_interface_show(interface_name).get_ptp_interface().get_uuid()
error_message = system_ptp_interface_keywords.system_ptp_interface_parameter_delete_with_error(valid_uuid, invalid_parameter)
validate_str_contains(error_message, f"Bad PTP parameter keypair: {invalid_parameter}", "Error message for invalid parameter")

View File

@@ -0,0 +1,36 @@
from pytest import mark
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_str_contains
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.ptp.system_ptp_parameter_keywords import SystemPTPParameterKeywords
@mark.p3
def test_ptp_parameter_modify_errors():
"""
Test error scenarios for ptp-parameter-modify command.
Test Steps:
- Try to modify a parameter with non-existent id
- Try to modify a parameter with invalid UUID
Expected Results:
- Appropriate error messages are returned for each scenario
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
system_ptp_parameter_keywords = SystemPTPParameterKeywords(ssh_connection)
# Use specified values
non_existent_uuid = "00000000-0000-0000-0000-000000000001"
invalid_uuid = "invalid_param"
new_value = "2"
get_logger().log_info("Trying to modify a parameter with non-existent id")
error_message = system_ptp_parameter_keywords.system_ptp_parameter_modify_with_error(non_existent_uuid, new_value)
validate_str_contains(error_message, f"No PTP parameter with id {non_existent_uuid} found", "Error message for non-existent parameter")
get_logger().log_info("Trying to modify a parameter with invalid UUID")
error_message = system_ptp_parameter_keywords.system_ptp_parameter_modify_with_error(invalid_uuid, new_value)
validate_str_contains(error_message, "Invalid input for field/attribute uuid", "Error message for invalid UUID")