Files
test/keywords/ptp/phc_ctl_keywords.py
Guntaka Umashankar Reddy 4878c9d90e Updated alarm message for phc_ctl time change and timeouts.
Change-Id: I83890f8484d8a977f727358b46d83feb19e9cccf
Signed-off-by: Guntaka Umashankar Reddy <umashankarguntaka.reddy@windriver.com>
2025-09-17 12:23:33 -04:00

178 lines
7.7 KiB
Python

import time
from multiprocessing import get_logger
from typing import List
from config.configuration_manager import ConfigurationManager
from framework.ssh.prompt_response import PromptResponse
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.fault_management.alarms.objects.alarm_list_object import AlarmListObject
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
class PhcCtlKeywords(BaseKeyword):
"""Directly control PHC device clock using given SSH connection."""
def __init__(self, ssh_connection: SSHConnection):
"""
Initializes the PhcCtlKeywords with an SSH connection.
Args:
ssh_connection (SSHConnection): An instance of an SSH connection.
"""
self.ssh_connection = ssh_connection
def phc_ctl_get(self, device: str) -> str:
"""
Get the current time of the PHC clock device
Args:
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
Example:
phc_ctl[643764.828]: clock time is 1739856255.215802036 or Tue Feb 18 05:24:15 2025
Returns:
str: output of command
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} get")
self.validate_success_return_code(self.ssh_connection)
output_str = "".join(output).replace("\n", "")
if output_str and len(output_str.split()) > 4:
return output_str.split()[4]
else:
raise "output_str.split() is expected to be a List with four elements."
def phc_ctl_cmp(self, device: str) -> str:
"""
Compare the PHC clock device to CLOCK_REALTIME
Args:
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
Example:
phc_ctl[645639.878]: offset from CLOCK_REALTIME is -37000000008ns
Returns:
str: output of command
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} cmp")
self.validate_success_return_code(self.ssh_connection)
output_str = "".join(output)
if output_str and len(output_str.split()) > 5:
return output_str.split()[5]
else:
raise "output_str.split() is expected to be a List with five elements."
def phc_ctl_adj(self, device: str, seconds: str) -> str:
"""
Adjust the PHC clock by an amount of seconds provided
Args:
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
seconds (str): seconds
Example:
phc_ctl[646368.470]: adjusted clock by 0.000001 seconds
Returns:
str: output of command
"""
output = self.ssh_connection.send_as_sudo(f"phc_ctl {device} adj {seconds}")
self.validate_success_return_code(self.ssh_connection)
output_str = "".join(output).replace("\n", "")
if output_str and len(output_str.split()) > 5:
return output_str.split()[4]
else:
raise "output_str.split() is expected to be a List with five elements."
def phc_ctl_set(self, device: str, seconds: str = None) -> str:
"""
Set the PHC clock time to the value specified in seconds. Defaults to reading CLOCK_REALTIME if no value is provided.
Args:
device (str): may be either CLOCK_REALTIME, any /dev/ptpX device, or any ethernet device which supports ethtool's get_ts_info ioctl.
seconds (str): seconds
Example :
phc_ctl[647759.416]: set clock time to 1739860212.789318498 or Tue Feb 18 06:30:12 2025
Returns:
str: output of command
"""
if seconds:
cmd = f"phc_ctl {device} set {seconds}"
else:
cmd = f"phc_ctl {device} set"
output = self.ssh_connection.send_as_sudo(cmd)
self.validate_success_return_code(self.ssh_connection)
output_str = "".join(output).replace("\n", "")
if output_str and len(output_str.split()) > 5:
return output_str.split()[5]
else:
raise "output_str.split() is expected to be a List with five elements."
def wait_for_phc_ctl_adjustment_alarm(self, interface: str, alarms: List[AlarmListObject], timeout: int = 180, polling_interval: int = 10) -> None:
"""
Run a remote phc_ctl adjustment loop on the controller as root, and stop it once the specified PTP alarm(s) are detected or a timeout occurs.
Args:
interface (str): The interface to apply phc_ctl adjustments to.
alarms (List[AlarmListObject]): A list of expected alarm objects to wait for.
timeout (int): Maximum wait time in seconds (default: 120).
polling_interval (int): Interval in seconds between polling attempts (default: 10).
Raises:
TimeoutError: If the expected alarms are not observed within the timeout period.
"""
# Prepare prompt responses for entering sudo
password = ConfigurationManager.get_lab_config().get_admin_credentials().get_password()
password_prompt = PromptResponse("Password:", password)
def run_as_root(command: str) -> None:
"""
Executes a given shell command on the remote host as the root user using 'sudo su'.
Args:
command (str): The shell command to be executed with root privileges.
"""
root_prompt = PromptResponse("#", command)
self.ssh_connection.send_expect_prompts("sudo su", [password_prompt, root_prompt])
# Create and store the phc_ctl loop script
remote_script_path = "/tmp/phc_loop.sh"
loop_script = f"while true; do phc_ctl {interface} -q adj 0.0001; sleep 1; done"
run_as_root(f"echo '{loop_script}' > {remote_script_path}")
run_as_root(f"chmod +x {remote_script_path}")
run_as_root(f"nohup bash {remote_script_path} & echo $! > /tmp/phc_loop.pid")
alarm_keywords = AlarmListKeywords(LabConnectionKeywords().get_active_controller_ssh())
alarm_descriptions = ", ".join(str(alarm_obj) for alarm_obj in alarms)
get_logger().log_info(f"Waiting for alarms: {alarm_descriptions}")
end_time = time.time() + timeout
all_matched = False
while time.time() < end_time:
observed_alarms = alarm_keywords.alarm_list()
all_matched = all(any(alarm_keywords.alarms_match(observed_alarm_obj, expected_alarm_obj) for observed_alarm_obj in observed_alarms) for expected_alarm_obj in alarms)
if all_matched:
get_logger().log_info("All expected alarms have been observed.")
break
remaining = end_time - time.time()
get_logger().log_info(f"Expected alarms not fully observed yet. Retrying in {polling_interval}s. " f"Time remaining: {remaining:.2f}s")
time.sleep(polling_interval)
# Clean up: stop script and remove temp files
run_as_root("test -f /tmp/phc_loop.pid && kill $(cat /tmp/phc_loop.pid) 2>/dev/null")
run_as_root("rm -f /tmp/phc_loop.sh /tmp/phc_loop.pid")
if not all_matched:
observed_alarm_strs = [str(observed_alarm_obj) for observed_alarm_obj in observed_alarms]
raise TimeoutError(f"Timeout: Expected alarms not found within {timeout}s.\n" f"Expected: {alarm_descriptions}\n" f"Observed:\n" + "\n".join(observed_alarm_strs))