Merge "Adding ptp keywords for pmc command"
This commit is contained in:
27
keywords/ptp/cat/cat_ptp_config_keywords.py
Normal file
27
keywords/ptp/cat/cat_ptp_config_keywords.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from framework.ssh.ssh_connection import SSHConnection
|
||||
from keywords.base_keyword import BaseKeyword
|
||||
from keywords.ptp.cat.objects.cat_ptp_config_output import CATPtpConfigOutput
|
||||
|
||||
|
||||
class CatPtpConfigKeywords(BaseKeyword):
|
||||
"""
|
||||
Class for Cat Ptp Config Keywords
|
||||
"""
|
||||
|
||||
def __init__(self, ssh_connection: SSHConnection):
|
||||
self.ssh_connection = ssh_connection
|
||||
|
||||
def cat_ptp_config(self, config_file: str) -> CATPtpConfigOutput:
|
||||
"""
|
||||
Run cat cpt config command
|
||||
Args:
|
||||
config_file (): the ptp config file
|
||||
|
||||
Returns: CatPtpConfigOutput
|
||||
|
||||
"""
|
||||
output = self.ssh_connection.send(f'cat {config_file}')
|
||||
self.validate_success_return_code(self.ssh_connection)
|
||||
cat_ptp_config_output = CATPtpConfigOutput(output)
|
||||
return cat_ptp_config_output
|
||||
|
53
keywords/ptp/cat/cat_ptp_table_parser.py
Normal file
53
keywords/ptp/cat/cat_ptp_table_parser.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from framework.exceptions.keyword_exception import KeywordException
|
||||
|
||||
|
||||
class CatPtpTableParser:
|
||||
"""
|
||||
Class for cat PTP table parsing
|
||||
Example:
|
||||
twoStepFlag 1
|
||||
slaveOnly 0
|
||||
socket_priority 0
|
||||
priority1 128
|
||||
priority2 128
|
||||
domainNumber 0
|
||||
#utc_offset 37
|
||||
clockClass 248
|
||||
clockAccuracy 0xFE
|
||||
offsetScaledLogVariance 0xFFFF
|
||||
free_running 0
|
||||
freq_est_interval 1
|
||||
dscp_event 0
|
||||
dscp_general 0
|
||||
dataset_comparison ieee1588
|
||||
G.8275.defaultDS.localPriority 128
|
||||
maxStepsRemoved 255
|
||||
"""
|
||||
|
||||
def __init__(self, cat_ptp_output):
|
||||
"""
|
||||
Constructor
|
||||
Args:
|
||||
cat_ptp_output (list[str]): a list of strings representing the output of a 'cat ptp' command.
|
||||
"""
|
||||
self.cat_ptp_output = cat_ptp_output
|
||||
|
||||
def get_output_values_dict(
|
||||
self,
|
||||
):
|
||||
"""
|
||||
Getter for output values dict
|
||||
Returns: the output values dict
|
||||
|
||||
"""
|
||||
|
||||
output_values_dict = {}
|
||||
|
||||
for row in self.cat_ptp_output:
|
||||
values = row.split(None, 1) # split once
|
||||
if len(values) == 2:
|
||||
key, value = values
|
||||
output_values_dict[key.strip()] = value.strip()
|
||||
else:
|
||||
raise KeywordException(f"Line with values: {row} was not in the expected format")
|
||||
return output_values_dict
|
247
keywords/ptp/cat/objects/cat_ptp_config_output.py
Normal file
247
keywords/ptp/cat/objects/cat_ptp_config_output.py
Normal file
@@ -0,0 +1,247 @@
|
||||
from keywords.ptp.cat.objects.clock_description_output import ClockDescriptionOutput
|
||||
from keywords.ptp.cat.objects.default_data_set_output import DefaultDataSetOutput
|
||||
from keywords.ptp.cat.objects.default_interface_options_output import DefaultInterfaceOptionsOutput
|
||||
from keywords.ptp.cat.objects.port_data_set_output import PortDataSetOutput
|
||||
from keywords.ptp.cat.objects.run_time_options_output import RunTimeOptionsOutput
|
||||
from keywords.ptp.cat.objects.servo_options_output import ServoOptionsOutput
|
||||
from keywords.ptp.cat.objects.transport_options_output import TransportOptionsOutput
|
||||
|
||||
|
||||
class CATPtpConfigOutput:
|
||||
"""
|
||||
This class parses the output of command cat ' /etc/linuxptp/ptp4l.conf'
|
||||
|
||||
Example:
|
||||
[global]
|
||||
#
|
||||
# Default Data Set
|
||||
#
|
||||
twoStepFlag 1
|
||||
slaveOnly 0
|
||||
socket_priority 0
|
||||
priority1 128
|
||||
priority2 128
|
||||
domainNumber 0
|
||||
#utc_offset 37
|
||||
clockClass 248
|
||||
clockAccuracy 0xFE
|
||||
offsetScaledLogVariance 0xFFFF
|
||||
free_running 0
|
||||
freq_est_interval 1
|
||||
dscp_event 0
|
||||
dscp_general 0
|
||||
dataset_comparison ieee1588
|
||||
G.8275.defaultDS.localPriority 128
|
||||
maxStepsRemoved 255
|
||||
#
|
||||
# Port Data Set
|
||||
#
|
||||
logAnnounceInterval 1
|
||||
logSyncInterval 0
|
||||
operLogSyncInterval 0
|
||||
logMinDelayReqInterval 0
|
||||
logMinPdelayReqInterval 0
|
||||
operLogPdelayReqInterval 0
|
||||
announceReceiptTimeout 3
|
||||
syncReceiptTimeout 0
|
||||
delayAsymmetry 0
|
||||
fault_reset_interval 4
|
||||
neighborPropDelayThresh 20000000
|
||||
masterOnly 0
|
||||
G.8275.portDS.localPriority 128
|
||||
asCapable auto
|
||||
BMCA ptp
|
||||
inhibit_announce 0
|
||||
inhibit_delay_req 0
|
||||
ignore_source_id 0
|
||||
#
|
||||
# Run time options
|
||||
#
|
||||
assume_two_step 0
|
||||
logging_level 6
|
||||
path_trace_enabled 0
|
||||
follow_up_info 0
|
||||
hybrid_e2e 0
|
||||
inhibit_multicast_service 0
|
||||
net_sync_monitor 0
|
||||
tc_spanning_tree 0
|
||||
tx_timestamp_timeout 1
|
||||
unicast_listen 0
|
||||
unicast_master_table 0
|
||||
unicast_req_duration 3600
|
||||
use_syslog 1
|
||||
verbose 0
|
||||
summary_interval 0
|
||||
kernel_leap 1
|
||||
check_fup_sync 0
|
||||
#
|
||||
# Servo Options
|
||||
#
|
||||
pi_proportional_const 0.0
|
||||
pi_integral_const 0.0
|
||||
pi_proportional_scale 0.0
|
||||
pi_proportional_exponent -0.3
|
||||
pi_proportional_norm_max 0.7
|
||||
pi_integral_scale 0.0
|
||||
pi_integral_exponent 0.4
|
||||
pi_integral_norm_max 0.3
|
||||
step_threshold 0.0
|
||||
first_step_threshold 0.00002
|
||||
max_frequency 900000000
|
||||
clock_servo pi
|
||||
sanity_freq_limit 200000000
|
||||
ntpshm_segment 0
|
||||
msg_interval_request 0
|
||||
servo_num_offset_values 10
|
||||
servo_offset_threshold 0
|
||||
write_phase_mode 0
|
||||
#
|
||||
# Transport options
|
||||
#
|
||||
transportSpecific 0x0
|
||||
ptp_dst_mac 01:1B:19:00:00:00
|
||||
p2p_dst_mac 01:80:C2:00:00:0E
|
||||
udp_ttl 1
|
||||
udp6_scope 0x0E
|
||||
uds_address /var/run/ptp4l
|
||||
uds_ro_address /var/run/ptp4lro
|
||||
#
|
||||
# Default interface options
|
||||
#
|
||||
clock_type OC
|
||||
network_transport UDPv4
|
||||
delay_mechanism E2E
|
||||
time_stamping hardware
|
||||
tsproc_mode filter
|
||||
delay_filter moving_median
|
||||
delay_filter_length 10
|
||||
egressLatency 0
|
||||
ingressLatency 0
|
||||
boundary_clock_jbod 0
|
||||
#
|
||||
# Clock description
|
||||
#
|
||||
productDescription ;;
|
||||
revisionData ;;
|
||||
manufacturerIdentity 00:00:00
|
||||
userDescription ;
|
||||
timeSource 0xA0
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, cat_config_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an list of config objects for the given output
|
||||
Args:
|
||||
cat_config_output (list[str]): a list of strings representing the output of the cat config command
|
||||
|
||||
"""
|
||||
self.data_set_output: DefaultDataSetOutput = None
|
||||
self.port_data_set_output: PortDataSetOutput = None
|
||||
self.run_time_options_output: RunTimeOptionsOutput = None
|
||||
self.servo_options_output: ServoOptionsOutput = None
|
||||
self.transport_options_output: TransportOptionsOutput = None
|
||||
self.default_interface_options_output: DefaultInterfaceOptionsOutput = None
|
||||
self.clock_description_output: ClockDescriptionOutput = None
|
||||
|
||||
in_header = False
|
||||
in_body = False
|
||||
config_object = ''
|
||||
body_str = []
|
||||
for line in cat_config_output:
|
||||
if not in_header and line == '#\n':
|
||||
self.create_config_object(config_object, body_str)
|
||||
in_header = True
|
||||
elif in_header and line != '#\n':
|
||||
config_object = line.strip()
|
||||
elif line == '#\n' and in_header: # we are exiting the header
|
||||
in_header = False
|
||||
in_body = True
|
||||
# reset the body str
|
||||
body_str = []
|
||||
elif in_body:
|
||||
body_str.append(line)
|
||||
# Create the last config item
|
||||
self.create_config_object(config_object, body_str)
|
||||
|
||||
def create_config_object(self, config_object: str, body_str: [str]):
|
||||
"""
|
||||
Creates the config object
|
||||
Args:
|
||||
config_object (): the object to be created
|
||||
body_str (): the body of the config
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
if 'Default Data Set' in config_object:
|
||||
self.data_set_output = DefaultDataSetOutput(body_str)
|
||||
if 'Port Data Set' in config_object:
|
||||
self.port_data_set_output = PortDataSetOutput(body_str)
|
||||
if 'Run time options' in config_object:
|
||||
self.run_time_options_output = RunTimeOptionsOutput(body_str)
|
||||
if 'Servo Options' in config_object:
|
||||
self.servo_options_output = ServoOptionsOutput(body_str)
|
||||
if 'Transport options' in config_object:
|
||||
self.transport_options_output = TransportOptionsOutput(body_str)
|
||||
if 'Default interface options' in config_object:
|
||||
self.default_interface_options_output = DefaultInterfaceOptionsOutput(body_str)
|
||||
if 'Clock description' in config_object:
|
||||
self.clock_description_output = ClockDescriptionOutput(body_str)
|
||||
|
||||
def get_data_set_output(self) -> DefaultDataSetOutput:
|
||||
"""
|
||||
Getter for the default data set output
|
||||
Returns: a PMCGetDefaultDataSetOutput object
|
||||
|
||||
"""
|
||||
return self.data_set_output
|
||||
|
||||
def get_default_interface_options_output(self) -> DefaultInterfaceOptionsOutput:
|
||||
"""
|
||||
Getter for default interface options ouput
|
||||
Returns: a DefaultInterfaceOptionsOutput object
|
||||
|
||||
"""
|
||||
return self.default_interface_options_output
|
||||
|
||||
def get_port_data_set_output(self) -> PortDataSetOutput:
|
||||
"""
|
||||
Getter for port data set output
|
||||
Returns: a PortDataSetOutput object
|
||||
|
||||
"""
|
||||
return self.port_data_set_output
|
||||
|
||||
def get_run_time_options_output(self) -> RunTimeOptionsOutput:
|
||||
"""
|
||||
Getter for run time options output
|
||||
Returns: a RunTimeOptionsOutput object
|
||||
|
||||
"""
|
||||
return self.run_time_options_output
|
||||
|
||||
def get_servo_options_output(self) -> ServoOptionsOutput:
|
||||
"""
|
||||
Getter for servo options output
|
||||
Returns: a ServoOptionsOutput object
|
||||
|
||||
"""
|
||||
return self.servo_options_output
|
||||
|
||||
def get_transport_options_output(self) -> TransportOptionsOutput:
|
||||
"""
|
||||
Getter for transport options output
|
||||
Returns: a TransportOptionsOutput object
|
||||
|
||||
"""
|
||||
return self.transport_options_output
|
||||
|
||||
def get_clock_description_output(self) -> ClockDescriptionOutput:
|
||||
"""
|
||||
Getter for clock description output
|
||||
Returns: a ClockDescriptionOutput object
|
||||
|
||||
"""
|
||||
return self.clock_description_output
|
107
keywords/ptp/cat/objects/clock_description_object.py
Normal file
107
keywords/ptp/cat/objects/clock_description_object.py
Normal file
@@ -0,0 +1,107 @@
|
||||
class ClockDescriptionObject:
|
||||
"""
|
||||
Object to hold the values of Clock description Object
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.product_description: str = ''
|
||||
self.revision_data: str = ''
|
||||
self.manufacturer_identity: str = ''
|
||||
self.user_description: str = ''
|
||||
self.time_source: str = ''
|
||||
|
||||
def get_product_description(self) -> str:
|
||||
"""
|
||||
Getter for product_description
|
||||
Returns: product_description
|
||||
|
||||
"""
|
||||
return self.product_description
|
||||
|
||||
def set_product_description(self, product_description: str):
|
||||
"""
|
||||
Setter for product_description
|
||||
Args:
|
||||
product_description (): the product_description value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.product_description = product_description
|
||||
|
||||
def get_revision_data(self) -> str:
|
||||
"""
|
||||
Getter for revision_data
|
||||
Returns: revision_data value
|
||||
|
||||
"""
|
||||
return self.revision_data
|
||||
|
||||
def set_revision_data(self, revision_data: str):
|
||||
"""
|
||||
Setter for revision_data
|
||||
Args:
|
||||
revision_data (): revision_data value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.revision_data = revision_data
|
||||
|
||||
def get_manufacturer_identity(self) -> str:
|
||||
"""
|
||||
Getter for manufacturer_identity
|
||||
Returns: manufacturer_identity value
|
||||
|
||||
"""
|
||||
return self.manufacturer_identity
|
||||
|
||||
def set_manufacturer_identity(self, manufacturer_identity: str):
|
||||
"""
|
||||
Setter for manufacturer_identity
|
||||
Args:
|
||||
manufacturer_identity (): manufacturer_identity value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.manufacturer_identity = manufacturer_identity
|
||||
|
||||
def get_user_description(self) -> str:
|
||||
"""
|
||||
Getter for user_description
|
||||
Returns: the user_description value
|
||||
|
||||
"""
|
||||
return self.user_description
|
||||
|
||||
def set_user_description(self, user_description: str):
|
||||
"""
|
||||
Setter for user_description
|
||||
Args:
|
||||
user_description (): the user_description value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.user_description = user_description
|
||||
|
||||
def get_time_source(self) -> str:
|
||||
"""
|
||||
Getter for time_source
|
||||
Returns: time_source value
|
||||
|
||||
"""
|
||||
return self.time_source
|
||||
|
||||
def set_time_source(self, time_source: str):
|
||||
"""
|
||||
Setter for time_source
|
||||
Args:
|
||||
time_source (): the time_source value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.time_source = time_source
|
||||
|
53
keywords/ptp/cat/objects/clock_description_output.py
Normal file
53
keywords/ptp/cat/objects/clock_description_output.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from keywords.ptp.cat.cat_ptp_table_parser import CatPtpTableParser
|
||||
from keywords.ptp.cat.objects.clock_description_object import ClockDescriptionObject
|
||||
|
||||
|
||||
class ClockDescriptionOutput:
|
||||
"""
|
||||
This class parses the output of Clock description
|
||||
|
||||
Example:
|
||||
productDescription ;;
|
||||
revisionData ;;
|
||||
manufacturerIdentity 00:00:00
|
||||
userDescription ;
|
||||
timeSource 0xA0
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, clock_description_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an internal ClockDescriptionObject from the passed parameter.
|
||||
Args:
|
||||
clock_description_output (list[str]): a list of strings representing the clock description output
|
||||
|
||||
"""
|
||||
cat_ptp_table_parser = CatPtpTableParser(clock_description_output)
|
||||
output_values = cat_ptp_table_parser.get_output_values_dict()
|
||||
self.clock_description_object = ClockDescriptionObject()
|
||||
|
||||
if 'productDescription' in output_values:
|
||||
self.clock_description_object.set_product_description(output_values['productDescription'])
|
||||
|
||||
if 'revisionData' in output_values:
|
||||
self.clock_description_object.set_revision_data(output_values['revisionData'])
|
||||
|
||||
if 'manufacturerIdentity' in output_values:
|
||||
self.clock_description_object.set_manufacturer_identity(output_values['manufacturerIdentity'])
|
||||
|
||||
if 'userDescription' in output_values:
|
||||
self.clock_description_object.set_user_description(output_values['userDescription'])
|
||||
|
||||
if 'timeSource' in output_values:
|
||||
self.clock_description_object.set_time_source(output_values['timeSource'])
|
||||
|
||||
def get_clock_description_object(self) -> ClockDescriptionObject:
|
||||
"""
|
||||
Getter for ClockDescriptionObject object.
|
||||
|
||||
Returns:
|
||||
A ClockDescriptionObject
|
||||
|
||||
"""
|
||||
return self.clock_description_object
|
97
keywords/ptp/cat/objects/default_data_set_output.py
Normal file
97
keywords/ptp/cat/objects/default_data_set_output.py
Normal file
@@ -0,0 +1,97 @@
|
||||
from keywords.ptp.cat.cat_ptp_table_parser import CatPtpTableParser
|
||||
from keywords.ptp.pmc.objects.pmc_get_default_data_set_object import PMCGetDefaultDataSetObject
|
||||
|
||||
|
||||
class DefaultDataSetOutput:
|
||||
"""
|
||||
This class parses the Default Data Set Output
|
||||
|
||||
Example:
|
||||
twoStepFlag 1
|
||||
slaveOnly 0
|
||||
numberPorts 1
|
||||
priority1 128
|
||||
clockClass 248
|
||||
clockAccuracy 0xfe
|
||||
offsetScaledLogVariance 0xffff
|
||||
priority2 128
|
||||
clockIdentity 507c6f.fffe.0b5a4d
|
||||
domainNumber 0.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, cat_ptp_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an internal PMCGetDefaultDataSet from the passed parameter.
|
||||
Args:
|
||||
cat_ptp_output (list[str]): a list of strings representing the output of the cat ptp command
|
||||
|
||||
"""
|
||||
cat_ptp_table_parser = CatPtpTableParser(cat_ptp_output)
|
||||
output_values = cat_ptp_table_parser.get_output_values_dict()
|
||||
self.pmc_get_default_data_set_object = PMCGetDefaultDataSetObject()
|
||||
|
||||
if 'twoStepFlag' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_two_step_flag(int(output_values['twoStepFlag']))
|
||||
|
||||
if 'slaveOnly' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_slave_only(int(output_values['slaveOnly']))
|
||||
|
||||
if 'socket_priority' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_socket_priority(int(output_values['socket_priority']))
|
||||
|
||||
if 'numberPorts' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_number_ports(int(output_values['numberPorts']))
|
||||
|
||||
if 'priority1' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_priority1(int(output_values['priority1']))
|
||||
|
||||
if 'clockClass' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_clock_class(int(output_values['clockClass']))
|
||||
|
||||
if 'clockAccuracy' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_clock_accuracy(output_values['clockAccuracy'])
|
||||
|
||||
if 'offsetScaledLogVariance' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_offset_scaled_log_variance(output_values['offsetScaledLogVariance'])
|
||||
|
||||
if 'priority2' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_priority2(int(output_values['priority2']))
|
||||
|
||||
if 'clockIdentity' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_clock_identity(output_values['clockIdentity'])
|
||||
|
||||
if 'domainNumber' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_domain_number(output_values['domainNumber'])
|
||||
|
||||
if 'free_running' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_free_running(int(output_values['free_running']))
|
||||
|
||||
if 'freq_est_interval' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_freq_est_interval(int(output_values['freq_est_interval']))
|
||||
|
||||
if 'dscp_event' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_dscp_event(int(output_values['dscp_event']))
|
||||
|
||||
if 'dscp_general' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_dscp_general(int(output_values['dscp_general']))
|
||||
|
||||
if 'dataset_comparison' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_dataset_comparison(output_values['dataset_comparison'])
|
||||
|
||||
if 'maxStepsRemoved' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_max_steps_removed(int(output_values['maxStepsRemoved']))
|
||||
|
||||
if '#utc_offset' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_utc_offset(int(output_values['#utc_offset']))
|
||||
|
||||
def get_pmc_get_default_data_set_object(self) -> PMCGetDefaultDataSetObject:
|
||||
"""
|
||||
Getter for pmc_get_default_data_set_object object.
|
||||
|
||||
Returns:
|
||||
A PMCGetDefaultDataSetObject
|
||||
|
||||
"""
|
||||
return self.pmc_get_default_data_set_object
|
206
keywords/ptp/cat/objects/default_interface_options_object.py
Normal file
206
keywords/ptp/cat/objects/default_interface_options_object.py
Normal file
@@ -0,0 +1,206 @@
|
||||
class DefaultInterfaceOptionsObject:
|
||||
"""
|
||||
Object to hold the values of Default Interface Options Object
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.clock_type: str = ''
|
||||
self.network_transport: str = ''
|
||||
self.delay_mechanism: str = ''
|
||||
self.time_stamping: str = ''
|
||||
self.tsproc_mode: str = ''
|
||||
self.delay_filter: str = ''
|
||||
self.delay_filter_length: int = -1
|
||||
self.egress_latency: int = -1
|
||||
self.ingress_latency: int = -1
|
||||
self.boundary_clock_jbod: int = -1
|
||||
|
||||
def get_clock_type(self) -> str:
|
||||
"""
|
||||
Getter for clock_type
|
||||
Returns: clock_type
|
||||
|
||||
"""
|
||||
return self.clock_type
|
||||
|
||||
def set_clock_type(self, clock_type: str):
|
||||
"""
|
||||
Setter for clock_type
|
||||
Args:
|
||||
clock_type (): the clock_type value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.clock_type = clock_type
|
||||
|
||||
def get_network_transport(self) -> str:
|
||||
"""
|
||||
Getter for network_transport
|
||||
Returns: network_transport value
|
||||
|
||||
"""
|
||||
return self.network_transport
|
||||
|
||||
def set_network_transport(self, network_transport: str):
|
||||
"""
|
||||
Setter for network_transport
|
||||
Args:
|
||||
network_transport (): network_transport value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.network_transport = network_transport
|
||||
|
||||
def get_delay_mechanism(self) -> str:
|
||||
"""
|
||||
Getter for delay_mechanism
|
||||
Returns: delay_mechanism value
|
||||
|
||||
"""
|
||||
return self.delay_mechanism
|
||||
|
||||
def set_delay_mechanism(self, delay_mechanism: str):
|
||||
"""
|
||||
Setter for delay_mechanism
|
||||
Args:
|
||||
delay_mechanism (): delay_mechanism value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.delay_mechanism = delay_mechanism
|
||||
|
||||
def get_time_stamping(self) -> str:
|
||||
"""
|
||||
Getter for time_stamping
|
||||
Returns: the time_stamping value
|
||||
|
||||
"""
|
||||
return self.time_stamping
|
||||
|
||||
def set_time_stamping(self, time_stamping: str):
|
||||
"""
|
||||
Setter for time_stamping
|
||||
Args:
|
||||
time_stamping (): the time_stamping value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.time_stamping = time_stamping
|
||||
|
||||
def get_tsproc_mode(self) -> str:
|
||||
"""
|
||||
Getter for tsproc_mode
|
||||
Returns: tsproc_mode value
|
||||
|
||||
"""
|
||||
return self.tsproc_mode
|
||||
|
||||
def set_tsproc_mode(self, tsproc_mode: str):
|
||||
"""
|
||||
Setter for tsproc_mode
|
||||
Args:
|
||||
tsproc_mode (): the tsproc_mode value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.tsproc_mode = tsproc_mode
|
||||
|
||||
def get_delay_filter(self) -> str:
|
||||
"""
|
||||
Getter for delay_filter
|
||||
Returns: delay_filter value
|
||||
|
||||
"""
|
||||
return self.delay_filter
|
||||
|
||||
def set_delay_filter(self, delay_filter: str):
|
||||
"""
|
||||
Setter for delay_filter
|
||||
Args:
|
||||
delay_filter (): the delay_filter value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.delay_filter = delay_filter
|
||||
|
||||
def get_delay_filter_length(self) -> int:
|
||||
"""
|
||||
Getter for delay_filter_length
|
||||
Returns: delay_filter_length value
|
||||
|
||||
"""
|
||||
return self.delay_filter_length
|
||||
|
||||
def set_delay_filter_length(self, delay_filter_length: int):
|
||||
"""
|
||||
Setter for delay_filter_length
|
||||
Args:
|
||||
delay_filter_length (): the delay_filter_length value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.delay_filter_length = delay_filter_length
|
||||
|
||||
def get_egress_latency(self) -> int:
|
||||
"""
|
||||
Getter for egress_latency
|
||||
Returns: egress_latency value
|
||||
|
||||
"""
|
||||
return self.egress_latency
|
||||
|
||||
def set_egress_latency(self, egress_latency: int):
|
||||
"""
|
||||
Setter for egress_latency
|
||||
Args:
|
||||
egress_latency (): the egress_latency value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.egress_latency = egress_latency
|
||||
|
||||
def get_ingress_latency(self) -> int:
|
||||
"""
|
||||
Getter for ingress_latency
|
||||
Returns: ingress_latency value
|
||||
|
||||
"""
|
||||
return self.ingress_latency
|
||||
|
||||
def set_ingress_latency(self, ingress_latency: int):
|
||||
"""
|
||||
Setter for ingress_latency
|
||||
Args:
|
||||
ingress_latency (): the ingress_latency value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.ingress_latency = ingress_latency
|
||||
|
||||
def get_boundary_clock_jbod(self) -> int:
|
||||
"""
|
||||
Getter for boundary_clock_jbod
|
||||
Returns: boundary_clock_jbod value
|
||||
|
||||
"""
|
||||
return self.boundary_clock_jbod
|
||||
|
||||
def set_boundary_clock_jbod(self, boundary_clock_jbod: int):
|
||||
"""
|
||||
Setter for boundary_clock_jbod
|
||||
Args:
|
||||
boundary_clock_jbod (): the boundary_clock_jbod value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.boundary_clock_jbod = boundary_clock_jbod
|
73
keywords/ptp/cat/objects/default_interface_options_output.py
Normal file
73
keywords/ptp/cat/objects/default_interface_options_output.py
Normal file
@@ -0,0 +1,73 @@
|
||||
from keywords.ptp.cat.cat_ptp_table_parser import CatPtpTableParser
|
||||
from keywords.ptp.cat.objects.default_interface_options_object import DefaultInterfaceOptionsObject
|
||||
|
||||
|
||||
class DefaultInterfaceOptionsOutput:
|
||||
"""
|
||||
This class parses the output of Default Interface Options
|
||||
|
||||
Example:
|
||||
clock_type OC
|
||||
network_transport UDPv4
|
||||
delay_mechanism E2E
|
||||
time_stamping hardware
|
||||
tsproc_mode filter
|
||||
delay_filter moving_median
|
||||
delay_filter_length 10
|
||||
egressLatency 0
|
||||
ingressLatency 0
|
||||
boundary_clock_jbod 0
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, default_interface_options_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an internal DefaultInterfaceOptionsObject from the passed parameter.
|
||||
Args:
|
||||
default_interface_options_output (list[str]): a list of strings representing the default interface options output
|
||||
|
||||
"""
|
||||
cat_ptp_table_parser = CatPtpTableParser(default_interface_options_output)
|
||||
output_values = cat_ptp_table_parser.get_output_values_dict()
|
||||
self.default_interface_options_object = DefaultInterfaceOptionsObject()
|
||||
|
||||
if 'clock_type' in output_values:
|
||||
self.default_interface_options_object.set_clock_type(output_values['clock_type'])
|
||||
|
||||
if 'network_transport' in output_values:
|
||||
self.default_interface_options_object.set_network_transport(output_values['network_transport'])
|
||||
|
||||
if 'delay_mechanism' in output_values:
|
||||
self.default_interface_options_object.set_delay_mechanism(output_values['delay_mechanism'])
|
||||
|
||||
if 'time_stamping' in output_values:
|
||||
self.default_interface_options_object.set_time_stamping(output_values['time_stamping'])
|
||||
|
||||
if 'tsproc_mode' in output_values:
|
||||
self.default_interface_options_object.set_tsproc_mode(output_values['tsproc_mode'])
|
||||
|
||||
if 'delay_filter' in output_values:
|
||||
self.default_interface_options_object.set_delay_filter(output_values['delay_filter'])
|
||||
|
||||
if 'delay_filter_length' in output_values:
|
||||
self.default_interface_options_object.set_delay_filter_length(int(output_values['delay_filter_length']))
|
||||
|
||||
if 'egressLatency' in output_values:
|
||||
self.default_interface_options_object.set_egress_latency(int(output_values['egressLatency']))
|
||||
|
||||
if 'ingressLatency' in output_values:
|
||||
self.default_interface_options_object.set_ingress_latency(int(output_values['ingressLatency']))
|
||||
|
||||
if 'boundary_clock_jbod' in output_values:
|
||||
self.default_interface_options_object.set_boundary_clock_jbod(int(output_values['boundary_clock_jbod']))
|
||||
|
||||
def get_default_interface_options_object(self) -> DefaultInterfaceOptionsObject:
|
||||
"""
|
||||
Getter for DefaultInterfaceOptionsObject object.
|
||||
|
||||
Returns:
|
||||
A DefaultInterfaceOptionsObject
|
||||
|
||||
"""
|
||||
return self.default_interface_options_object
|
351
keywords/ptp/cat/objects/port_data_set_object.py
Normal file
351
keywords/ptp/cat/objects/port_data_set_object.py
Normal file
@@ -0,0 +1,351 @@
|
||||
class PortDataSetObject:
|
||||
"""
|
||||
Object to hold the values of port data set
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.log_announce_interval: int = -1
|
||||
self.log_sync_interval: int = -1
|
||||
self.oper_log_sync_interval: int = -1
|
||||
self.log_min_delay_req_interval: int = -1
|
||||
self.log_min_p_delay_req_interval: int = -1
|
||||
self.oper_log_p_delay_req_interval: int = -1
|
||||
self.announce_receipt_timeout: int = -1
|
||||
self.sync_receipt_timeout: int = -1
|
||||
self.delay_asymmetry: int = -1
|
||||
self.fault_reset_interval: int = -1
|
||||
self.neighbor_prop_delay_thresh: int = -1
|
||||
self.master_only: int = -1
|
||||
self.as_capable: str = ''
|
||||
self.bmca: str = ''
|
||||
self.inhibit_announce: int = -1
|
||||
self.inhibit_delay_req: int = ''
|
||||
self.ignore_source_id: int = -1
|
||||
|
||||
def get_log_announce_interval(self) -> int:
|
||||
"""
|
||||
Getter for log_announce_interval
|
||||
Returns: log_announce_interval
|
||||
|
||||
"""
|
||||
return self.log_announce_interval
|
||||
|
||||
def set_log_announce_interval(self, log_announce_interval: int):
|
||||
"""
|
||||
Setter for two_step_flag
|
||||
Args:
|
||||
log_announce_interval (): the log_announce_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.log_announce_interval = log_announce_interval
|
||||
|
||||
def get_log_sync_interval(self) -> int:
|
||||
"""
|
||||
Getter for log_sync_interval
|
||||
Returns: log_sync_interval value
|
||||
|
||||
"""
|
||||
return self.log_sync_interval
|
||||
|
||||
def set_log_sync_interval(self, log_sync_interval: int):
|
||||
"""
|
||||
Setter for log_sync_interval
|
||||
Args:
|
||||
log_sync_interval (): log_sync_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.log_sync_interval = log_sync_interval
|
||||
|
||||
def get_oper_log_sync_interval(self) -> int:
|
||||
"""
|
||||
Getter for oper_log_sync_interval
|
||||
Returns: oper_log_sync_interval value
|
||||
|
||||
"""
|
||||
return self.oper_log_sync_interval
|
||||
|
||||
def set_oper_log_sync_interval(self, oper_log_sync_interval: int):
|
||||
"""
|
||||
Setter for oper_log_sync_interval
|
||||
Args:
|
||||
oper_log_sync_interval (): oper_log_sync_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.oper_log_sync_interval = oper_log_sync_interval
|
||||
|
||||
def get_log_min_delay_req_interval(self) -> int:
|
||||
"""
|
||||
Getter for log_min_delay_req_interval
|
||||
Returns: the log_min_delay_req_interval value
|
||||
|
||||
"""
|
||||
return self.log_min_delay_req_interval
|
||||
|
||||
def set_log_min_delay_req_interval(self, log_min_delay_req_interval: int):
|
||||
"""
|
||||
Setter for log_min_delay_req_interval
|
||||
Args:
|
||||
log_min_delay_req_interval (): the log_min_delay_req_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.log_min_delay_req_interval = log_min_delay_req_interval
|
||||
|
||||
def get_log_min_p_delay_req_interval(self) -> int:
|
||||
"""
|
||||
Getter for log_min_p_delay_req_interval
|
||||
Returns: log_min_p_delay_req_interval value
|
||||
|
||||
"""
|
||||
return self.log_min_p_delay_req_interval
|
||||
|
||||
def set_log_min_p_delay_req_interval(self, log_min_p_delay_req_interval: int):
|
||||
"""
|
||||
Setter for log_min_p_delay_req_interval
|
||||
Args:
|
||||
log_min_p_delay_req_interval (): the log_min_p_delay_req_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.log_min_p_delay_req_interval = log_min_p_delay_req_interval
|
||||
|
||||
def get_oper_log_p_delay_req_interval(self) -> int:
|
||||
"""
|
||||
Getter for oper_log_p_delay_req_interval
|
||||
Returns: the oper_log_p_delay_req_interval value
|
||||
|
||||
"""
|
||||
return self.oper_log_p_delay_req_interval
|
||||
|
||||
def set_oper_log_p_delay_req_interval(self, oper_log_p_delay_req_interval: int):
|
||||
"""
|
||||
Setter for oper_log_p_delay_req_interval
|
||||
Args:
|
||||
oper_log_p_delay_req_interval (): the oper_log_p_delay_req_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.oper_log_p_delay_req_interval = oper_log_p_delay_req_interval
|
||||
|
||||
def get_announce_receipt_timeout(self) -> int:
|
||||
"""
|
||||
Getter for announce_receipt_timeout
|
||||
Returns: the announce_receipt_timeout value
|
||||
|
||||
"""
|
||||
return self.announce_receipt_timeout
|
||||
|
||||
def set_announce_receipt_timeout(self, announce_receipt_timeout: int):
|
||||
"""
|
||||
Setter for announce_receipt_timeout
|
||||
Args:
|
||||
announce_receipt_timeout (): the announce_receipt_timeout value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.announce_receipt_timeout = announce_receipt_timeout
|
||||
|
||||
def get_sync_receipt_timeout(self) -> int:
|
||||
"""
|
||||
Getter for sync_receipt_timeout
|
||||
Returns: the sync_receipt_timeout value
|
||||
|
||||
"""
|
||||
return self.sync_receipt_timeout
|
||||
|
||||
def set_sync_receipt_timeout(self, sync_receipt_timeout: int):
|
||||
"""
|
||||
Setter for sync_receipt_timeout
|
||||
Args:
|
||||
sync_receipt_timeout (): the sync_receipt_timeout value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.sync_receipt_timeout = sync_receipt_timeout
|
||||
|
||||
def get_delay_asymmetry(self) -> int:
|
||||
"""
|
||||
Getter for delay_asymmetry
|
||||
Returns: the delay_asymmetry value
|
||||
|
||||
"""
|
||||
return self.delay_asymmetry
|
||||
|
||||
def set_delay_asymmetry(self, delay_asymmetry: int):
|
||||
"""
|
||||
Setter for delay_asymmetry
|
||||
Args:
|
||||
delay_asymmetry (): the delay_asymmetry value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.delay_asymmetry = delay_asymmetry
|
||||
|
||||
def get_fault_reset_interval(self) -> int:
|
||||
"""
|
||||
Getter for fault_reset_interval
|
||||
Returns: the fault_reset_interval value
|
||||
|
||||
"""
|
||||
return self.fault_reset_interval
|
||||
|
||||
def set_fault_reset_interval(self, fault_reset_interval: int):
|
||||
"""
|
||||
Setter for fault_reset_interval
|
||||
Args:
|
||||
fault_reset_interval (): the fault_reset_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.fault_reset_interval = fault_reset_interval
|
||||
|
||||
def get_neighbor_prop_delay_thresh(self) -> int:
|
||||
"""
|
||||
Getter for neighbor_prop_delay_thresh
|
||||
Returns: the neighbor_prop_delay_thresh value
|
||||
|
||||
"""
|
||||
return self.neighbor_prop_delay_thresh
|
||||
|
||||
def set_neighbor_prop_delay_thresh(self, neighbor_prop_delay_thresh: int):
|
||||
"""
|
||||
Setter for neighbor_prop_delay_thresh
|
||||
Args:
|
||||
neighbor_prop_delay_thresh (): the neighbor_prop_delay_thresh value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.neighbor_prop_delay_thresh = neighbor_prop_delay_thresh
|
||||
|
||||
def get_master_only(self) -> int:
|
||||
"""
|
||||
Getter for master_only
|
||||
Returns: the master_only value
|
||||
|
||||
"""
|
||||
return self.master_only
|
||||
|
||||
def set_master_only(self, master_only: int):
|
||||
"""
|
||||
Setter for master_only
|
||||
Args:
|
||||
master_only (): the master_only value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.master_only = master_only
|
||||
|
||||
def get_as_capable(self) -> str:
|
||||
"""
|
||||
Getter for as_capable
|
||||
Returns: the as_capable value
|
||||
|
||||
"""
|
||||
return self.as_capable
|
||||
|
||||
def set_as_capable(self, as_capable: str):
|
||||
"""
|
||||
Setter for as_capable
|
||||
Args:
|
||||
as_capable (): the as_capable value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.as_capable = as_capable
|
||||
|
||||
def get_bmca(self) -> str:
|
||||
"""
|
||||
Getter for bmca
|
||||
Returns: the bmca value
|
||||
|
||||
"""
|
||||
return self.bmca
|
||||
|
||||
def set_bmca(self, bmca: str):
|
||||
"""
|
||||
Setter for bmca
|
||||
Args:
|
||||
bmca (): the bmca value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.bmca = bmca
|
||||
|
||||
def get_inhibit_announce(self) -> int:
|
||||
"""
|
||||
Getter for inhibit_announce
|
||||
Returns: the inhibit_announce value
|
||||
|
||||
"""
|
||||
return self.inhibit_announce
|
||||
|
||||
def set_inhibit_announce(self, inhibit_announce: int):
|
||||
"""
|
||||
Setter for inhibit_announce
|
||||
Args:
|
||||
inhibit_announce (): the inhibit_announce value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.inhibit_announce = inhibit_announce
|
||||
|
||||
def get_inhibit_delay_req(self) -> int:
|
||||
"""
|
||||
Getter for inhibit_delay_req
|
||||
Returns: the inhibit_delay_req value
|
||||
|
||||
"""
|
||||
return self.inhibit_delay_req
|
||||
|
||||
def set_inhibit_delay_req(self, inhibit_delay_req: int):
|
||||
"""
|
||||
Setter for inhibit_delay_req
|
||||
Args:
|
||||
inhibit_delay_req (): the inhibit_delay_req value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.inhibit_delay_req = inhibit_delay_req
|
||||
|
||||
def get_ignore_source_id(self) -> int:
|
||||
"""
|
||||
Getter for ignore_source_id
|
||||
Returns: the ignore_source_id value
|
||||
|
||||
"""
|
||||
return self.ignore_source_id
|
||||
|
||||
def set_ignore_source_id(self, ignore_source_id: int):
|
||||
"""
|
||||
Setter for ignore_source_id
|
||||
Args:
|
||||
ignore_source_id (): the ignore_source_id value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.ignore_source_id = ignore_source_id
|
||||
|
||||
|
||||
|
||||
|
||||
|
102
keywords/ptp/cat/objects/port_data_set_output.py
Normal file
102
keywords/ptp/cat/objects/port_data_set_output.py
Normal file
@@ -0,0 +1,102 @@
|
||||
from keywords.ptp.cat.cat_ptp_table_parser import CatPtpTableParser
|
||||
from keywords.ptp.cat.objects.port_data_set_object import PortDataSetObject
|
||||
|
||||
|
||||
class PortDataSetOutput:
|
||||
"""
|
||||
This class parses the output of Port Data Set
|
||||
|
||||
Example:
|
||||
logAnnounceInterval 1
|
||||
logSyncInterval 0
|
||||
operLogSyncInterval 0
|
||||
logMinDelayReqInterval 0
|
||||
logMinPdelayReqInterval 0
|
||||
operLogPdelayReqInterval 0
|
||||
announceReceiptTimeout 3
|
||||
syncReceiptTimeout 0
|
||||
delayAsymmetry 0
|
||||
fault_reset_interval 4
|
||||
neighborPropDelayThresh 20000000
|
||||
masterOnly 0
|
||||
G.8275.portDS.localPriority 128
|
||||
asCapable auto
|
||||
BMCA ptp
|
||||
inhibit_announce 0
|
||||
inhibit_delay_req 0
|
||||
ignore_source_id 0
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, port_data_set_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an internal PortDataSetObject from the passed parameter.
|
||||
Args:
|
||||
port_data_set_output (list[str]): a list of strings representing the port data set output
|
||||
|
||||
"""
|
||||
cat_ptp_table_parser = CatPtpTableParser(port_data_set_output)
|
||||
output_values = cat_ptp_table_parser.get_output_values_dict()
|
||||
self.port_data_set_object = PortDataSetObject()
|
||||
|
||||
if 'logAnnounceInterval' in output_values:
|
||||
self.port_data_set_object.set_log_announce_interval(int(output_values['logAnnounceInterval']))
|
||||
|
||||
if 'logSyncInterval' in output_values:
|
||||
self.port_data_set_object.set_log_sync_interval(int(output_values['logSyncInterval']))
|
||||
|
||||
if 'operLogSyncInterval' in output_values:
|
||||
self.port_data_set_object.set_oper_log_sync_interval(int(output_values['operLogSyncInterval']))
|
||||
|
||||
if 'logMinDelayReqInterval' in output_values:
|
||||
self.port_data_set_object.set_log_min_delay_req_interval(int(output_values['logMinDelayReqInterval']))
|
||||
|
||||
if 'logMinPdelayReqInterval' in output_values:
|
||||
self.port_data_set_object.set_log_min_p_delay_req_interval(int(output_values['logMinPdelayReqInterval']))
|
||||
|
||||
if 'operLogPdelayReqInterval' in output_values:
|
||||
self.port_data_set_object.set_oper_log_p_delay_req_interval(int(output_values['operLogPdelayReqInterval']))
|
||||
|
||||
if 'announceReceiptTimeout' in output_values:
|
||||
self.port_data_set_object.set_announce_receipt_timeout(int(output_values['announceReceiptTimeout']))
|
||||
|
||||
if 'syncReceiptTimeout' in output_values:
|
||||
self.port_data_set_object.set_sync_receipt_timeout(int(output_values['syncReceiptTimeout']))
|
||||
|
||||
if 'delayAsymmetry' in output_values:
|
||||
self.port_data_set_object.set_delay_asymmetry(int(output_values['delayAsymmetry']))
|
||||
|
||||
if 'fault_reset_interval' in output_values:
|
||||
self.port_data_set_object.set_fault_reset_interval(int(output_values['fault_reset_interval']))
|
||||
|
||||
if 'neighborPropDelayThresh' in output_values:
|
||||
self.port_data_set_object.set_neighbor_prop_delay_thresh(int(output_values['neighborPropDelayThresh']))
|
||||
|
||||
if 'masterOnly' in output_values:
|
||||
self.port_data_set_object.set_master_only(int(output_values['masterOnly']))
|
||||
|
||||
if 'asCapable' in output_values:
|
||||
self.port_data_set_object.set_as_capable(output_values['asCapable'])
|
||||
|
||||
if 'BMCA' in output_values:
|
||||
self.port_data_set_object.set_bmca(output_values['BMCA'])
|
||||
|
||||
if 'inhibit_announce' in output_values:
|
||||
self.port_data_set_object.set_inhibit_announce(int(output_values['inhibit_announce']))
|
||||
|
||||
if 'inhibit_delay_req' in output_values:
|
||||
self.port_data_set_object.set_inhibit_delay_req(int(output_values['inhibit_delay_req']))
|
||||
|
||||
if 'ignore_source_id' in output_values:
|
||||
self.port_data_set_object.set_ignore_source_id(int(output_values['ignore_source_id']))
|
||||
|
||||
def get_port_data_set_object(self) -> PortDataSetObject:
|
||||
"""
|
||||
Getter for port_data_set_object object.
|
||||
|
||||
Returns:
|
||||
A PortDataSetObject
|
||||
|
||||
"""
|
||||
return self.port_data_set_object
|
352
keywords/ptp/cat/objects/run_time_options_object.py
Normal file
352
keywords/ptp/cat/objects/run_time_options_object.py
Normal file
@@ -0,0 +1,352 @@
|
||||
class RunTimeOptionsObject:
|
||||
"""
|
||||
Object to hold the values of Run time Options
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.assume_two_step: int = -1
|
||||
self.logging_level: int = -1
|
||||
self.path_trace_enabled: int = -1
|
||||
self.follow_up_info: int = -1
|
||||
self.hybrid_e2e: int = -1
|
||||
self.inhibit_multicast_service: int = -1
|
||||
self.net_sync_monitor: int = -1
|
||||
self.tc_spanning_tree: int = -1
|
||||
self.tx_timestamp_timeout: int = -1
|
||||
self.unicast_listen: int = -1
|
||||
self.unicast_master_table: int = -1
|
||||
self.unicast_req_duration: int = -1
|
||||
self.use_syslog: int = -1
|
||||
self.verbose: int = -1
|
||||
self.summary_interval: int = -1
|
||||
self.kernel_leap: int = ''
|
||||
self.check_fup_sync: int = -1
|
||||
|
||||
def get_assume_two_step(self) -> int:
|
||||
"""
|
||||
Getter for assume_two_step
|
||||
Returns: assume_two_step
|
||||
|
||||
"""
|
||||
return self.assume_two_step
|
||||
|
||||
def set_assume_two_step(self, assume_two_step: int):
|
||||
"""
|
||||
Setter for assume_two_step
|
||||
Args:
|
||||
assume_two_step (): the assume_two_step value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.assume_two_step = assume_two_step
|
||||
|
||||
def get_logging_level(self) -> int:
|
||||
"""
|
||||
Getter for logging_level
|
||||
Returns: logging_level value
|
||||
|
||||
"""
|
||||
return self.logging_level
|
||||
|
||||
def set_logging_level(self, logging_level: int):
|
||||
"""
|
||||
Setter for logging_level
|
||||
Args:
|
||||
log_sync_interval (): log_sync_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.logging_level = logging_level
|
||||
|
||||
def get_path_trace_enabled(self) -> int:
|
||||
"""
|
||||
Getter for path_trace_enabled
|
||||
Returns: path_trace_enabled value
|
||||
|
||||
"""
|
||||
return self.path_trace_enabled
|
||||
|
||||
def set_path_trace_enabled(self, path_trace_enabled: int):
|
||||
"""
|
||||
Setter for path_trace_enabled
|
||||
Args:
|
||||
path_trace_enabled (): path_trace_enabled value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.path_trace_enabled = path_trace_enabled
|
||||
|
||||
def get_follow_up_info(self) -> int:
|
||||
"""
|
||||
Getter for follow_up_info
|
||||
Returns: the follow_up_info value
|
||||
|
||||
"""
|
||||
return self.follow_up_info
|
||||
|
||||
def set_follow_up_info(self, follow_up_info: int):
|
||||
"""
|
||||
Setter for follow_up_info
|
||||
Args:
|
||||
follow_up_info (): the follow_up_info value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.follow_up_info = follow_up_info
|
||||
|
||||
def get_hybrid_e2e(self) -> int:
|
||||
"""
|
||||
Getter for hybrid_e2e
|
||||
Returns: hybrid_e2e value
|
||||
|
||||
"""
|
||||
return self.hybrid_e2e
|
||||
|
||||
def set_hybrid_e2e(self, hybrid_e2e: int):
|
||||
"""
|
||||
Setter for hybrid_e2e
|
||||
Args:
|
||||
hybrid_e2e (): the hybrid_e2e value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.hybrid_e2e = hybrid_e2e
|
||||
|
||||
def get_inhibit_multicast_service(self) -> int:
|
||||
"""
|
||||
Getter for inhibit_multicast_service
|
||||
Returns: the inhibit_multicast_service value
|
||||
|
||||
"""
|
||||
return self.inhibit_multicast_service
|
||||
|
||||
def set_inhibit_multicast_service(self, inhibit_multicast_service: int):
|
||||
"""
|
||||
Setter for inhibit_multicast_service
|
||||
Args:
|
||||
inhibit_multicast_service (): the inhibit_multicast_service value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.inhibit_multicast_service = inhibit_multicast_service
|
||||
|
||||
def get_net_sync_monitor(self) -> int:
|
||||
"""
|
||||
Getter for net_sync_monitor
|
||||
Returns: the net_sync_monitor value
|
||||
|
||||
"""
|
||||
return self.net_sync_monitor
|
||||
|
||||
def set_net_sync_monitor(self, net_sync_monitor: int):
|
||||
"""
|
||||
Setter for net_sync_monitor
|
||||
Args:
|
||||
net_sync_monitor (): the net_sync_monitor value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.net_sync_monitor = net_sync_monitor
|
||||
|
||||
def get_tc_spanning_tree(self) -> int:
|
||||
"""
|
||||
Getter for tc_spanning_tree
|
||||
Returns: the tc_spanning_tree value
|
||||
|
||||
"""
|
||||
return self.tc_spanning_tree
|
||||
|
||||
def set_tc_spanning_tree(self, tc_spanning_tree: int):
|
||||
"""
|
||||
Setter for sync_receipt_timeout
|
||||
Args:
|
||||
tc_spanning_tree (): the tc_spanning_tree value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.tc_spanning_tree = tc_spanning_tree
|
||||
|
||||
def get_tx_timestamp_timeout(self) -> int:
|
||||
"""
|
||||
Getter for tx_timestamp_timeout
|
||||
Returns: the tx_timestamp_timeout value
|
||||
|
||||
"""
|
||||
return self.tx_timestamp_timeout
|
||||
|
||||
def set_tx_timestamp_timeout(self, tx_timestamp_timeout: int):
|
||||
"""
|
||||
Setter for tx_timestamp_timeout
|
||||
Args:
|
||||
tx_timestamp_timeout (): the tx_timestamp_timeout value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.tx_timestamp_timeout = tx_timestamp_timeout
|
||||
|
||||
def get_unicast_listen(self) -> int:
|
||||
"""
|
||||
Getter for unicast_listen
|
||||
Returns: the unicast_listen value
|
||||
|
||||
"""
|
||||
return self.unicast_listen
|
||||
|
||||
def set_unicast_listen(self, unicast_listen: int):
|
||||
"""
|
||||
Setter for unicast_listen
|
||||
Args:
|
||||
unicast_listen (): the unicast_listen value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.unicast_listen = unicast_listen
|
||||
|
||||
def get_unicast_master_table(self) -> int:
|
||||
"""
|
||||
Getter for unicast_master_table
|
||||
Returns: the unicast_master_table value
|
||||
|
||||
"""
|
||||
return self.unicast_master_table
|
||||
|
||||
def set_unicast_master_table(self, unicast_master_table: int):
|
||||
"""
|
||||
Setter for unicast_master_table
|
||||
Args:
|
||||
unicast_master_table (): the unicast_master_table value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.unicast_master_table = unicast_master_table
|
||||
|
||||
def get_unicast_req_duration(self) -> int:
|
||||
"""
|
||||
Getter for unicast_req_duration
|
||||
Returns: the unicast_req_duration value
|
||||
|
||||
"""
|
||||
return self.unicast_req_duration
|
||||
|
||||
def set_unicast_req_duration(self, unicast_req_duration: int):
|
||||
"""
|
||||
Setter for unicast_req_duration
|
||||
Args:
|
||||
unicast_req_duration (): the unicast_req_duration value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.unicast_req_duration = unicast_req_duration
|
||||
|
||||
def get_use_syslog(self) -> int:
|
||||
"""
|
||||
Getter for use_syslog
|
||||
Returns: the use_syslog value
|
||||
|
||||
"""
|
||||
return self.use_syslog
|
||||
|
||||
def set_use_syslog(self, use_syslog: int):
|
||||
"""
|
||||
Setter for use_syslog
|
||||
Args:
|
||||
use_syslog (): the use_syslog value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.use_syslog = use_syslog
|
||||
|
||||
def get_verbose(self) -> int:
|
||||
"""
|
||||
Getter for verbose
|
||||
Returns: the verbose value
|
||||
|
||||
"""
|
||||
return self.verbose
|
||||
|
||||
def set_verbose(self, verbose: int):
|
||||
"""
|
||||
Setter for verbose
|
||||
Args:
|
||||
verbose (): the verbose value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.verbose = verbose
|
||||
|
||||
def get_summary_interval(self) -> int:
|
||||
"""
|
||||
Getter for summary_interval
|
||||
Returns: the summary_interval value
|
||||
|
||||
"""
|
||||
return self.summary_interval
|
||||
|
||||
def set_summary_interval(self, summary_interval: int):
|
||||
"""
|
||||
Setter for summary_interval
|
||||
Args:
|
||||
summary_interval (): the summary_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.summary_interval = summary_interval
|
||||
|
||||
def get_kernel_leap(self) -> int:
|
||||
"""
|
||||
Getter for kernel_leap
|
||||
Returns: the kernel_leap value
|
||||
|
||||
"""
|
||||
return self.kernel_leap
|
||||
|
||||
def set_kernel_leap(self, kernel_leap: int):
|
||||
"""
|
||||
Setter for kernel_leap
|
||||
Args:
|
||||
kernel_leap (): the kernel_leap value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.kernel_leap = kernel_leap
|
||||
|
||||
def get_check_fup_sync(self) -> int:
|
||||
"""
|
||||
Getter for check_fup_sync
|
||||
Returns: the check_fup_sync value
|
||||
|
||||
"""
|
||||
return self.check_fup_sync
|
||||
|
||||
def set_check_fup_sync(self, check_fup_sync: int):
|
||||
"""
|
||||
Setter for check_fup_sync
|
||||
Args:
|
||||
check_fup_sync (): the check_fup_sync value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.check_fup_sync = check_fup_sync
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
101
keywords/ptp/cat/objects/run_time_options_output.py
Normal file
101
keywords/ptp/cat/objects/run_time_options_output.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from keywords.ptp.cat.cat_ptp_table_parser import CatPtpTableParser
|
||||
from keywords.ptp.cat.objects.run_time_options_object import RunTimeOptionsObject
|
||||
|
||||
|
||||
class RunTimeOptionsOutput:
|
||||
"""
|
||||
This class parses the output of Run time Options
|
||||
|
||||
Example:
|
||||
assume_two_step 0
|
||||
logging_level 6
|
||||
path_trace_enabled 0
|
||||
follow_up_info 0
|
||||
hybrid_e2e 0
|
||||
inhibit_multicast_service 0
|
||||
net_sync_monitor 0
|
||||
tc_spanning_tree 0
|
||||
tx_timestamp_timeout 1
|
||||
unicast_listen 0
|
||||
unicast_master_table 0
|
||||
unicast_req_duration 3600
|
||||
use_syslog 1
|
||||
verbose 0
|
||||
summary_interval 0
|
||||
kernel_leap 1
|
||||
check_fup_sync 0
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, run_time_options_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an internal RunTimeOptionsObject from the passed parameter.
|
||||
Args:
|
||||
run_time_options_output (list[str]): a list of strings representing the run time options output
|
||||
|
||||
"""
|
||||
cat_ptp_table_parser = CatPtpTableParser(run_time_options_output)
|
||||
output_values = cat_ptp_table_parser.get_output_values_dict()
|
||||
self.run_time_options_object = RunTimeOptionsObject()
|
||||
|
||||
if 'assume_two_step' in output_values:
|
||||
self.run_time_options_object.set_assume_two_step(int(output_values['assume_two_step']))
|
||||
|
||||
if 'logging_level' in output_values:
|
||||
self.run_time_options_object.set_logging_level(int(output_values['logging_level']))
|
||||
|
||||
if 'path_trace_enabled' in output_values:
|
||||
self.run_time_options_object.set_path_trace_enabled(int(output_values['path_trace_enabled']))
|
||||
|
||||
if 'follow_up_info' in output_values:
|
||||
self.run_time_options_object.set_follow_up_info(int(output_values['follow_up_info']))
|
||||
|
||||
if 'hybrid_e2e' in output_values:
|
||||
self.run_time_options_object.set_hybrid_e2e(int(output_values['hybrid_e2e']))
|
||||
|
||||
if 'inhibit_multicast_service' in output_values:
|
||||
self.run_time_options_object.set_inhibit_multicast_service(int(output_values['inhibit_multicast_service']))
|
||||
|
||||
if 'net_sync_monitor' in output_values:
|
||||
self.run_time_options_object.set_net_sync_monitor(int(output_values['net_sync_monitor']))
|
||||
|
||||
if 'tc_spanning_tree' in output_values:
|
||||
self.run_time_options_object.set_tc_spanning_tree(int(output_values['tc_spanning_tree']))
|
||||
|
||||
if 'tx_timestamp_timeout' in output_values:
|
||||
self.run_time_options_object.set_tx_timestamp_timeout(int(output_values['tx_timestamp_timeout']))
|
||||
|
||||
if 'unicast_listen' in output_values:
|
||||
self.run_time_options_object.set_unicast_listen(int(output_values['unicast_listen']))
|
||||
|
||||
if 'unicast_master_table' in output_values:
|
||||
self.run_time_options_object.set_unicast_master_table(int(output_values['unicast_master_table']))
|
||||
|
||||
if 'unicast_req_duration' in output_values:
|
||||
self.run_time_options_object.set_unicast_req_duration(int(output_values['unicast_req_duration']))
|
||||
|
||||
if 'use_syslog' in output_values:
|
||||
self.run_time_options_object.set_use_syslog(int(output_values['use_syslog']))
|
||||
|
||||
if 'verbose' in output_values:
|
||||
self.run_time_options_object.set_verbose(int(output_values['verbose']))
|
||||
|
||||
if 'summary_interval' in output_values:
|
||||
self.run_time_options_object.set_summary_interval(int(output_values['summary_interval']))
|
||||
|
||||
if 'kernel_leap' in output_values:
|
||||
self.run_time_options_object.set_kernel_leap(int(output_values['kernel_leap']))
|
||||
|
||||
if 'check_fup_sync' in output_values:
|
||||
self.run_time_options_object.set_check_fup_sync(int(output_values['check_fup_sync']))
|
||||
|
||||
def get_run_time_options_object(self) -> RunTimeOptionsObject:
|
||||
"""
|
||||
Getter for run_time_options_object object.
|
||||
|
||||
Returns:
|
||||
A RunTimeOptionsObject
|
||||
|
||||
"""
|
||||
return self.run_time_options_object
|
373
keywords/ptp/cat/objects/servo_options_object.py
Normal file
373
keywords/ptp/cat/objects/servo_options_object.py
Normal file
@@ -0,0 +1,373 @@
|
||||
class ServoOptionsObject:
|
||||
"""
|
||||
Object to hold the values of Servo Options Options
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.pi_proportional_const: float = None
|
||||
self.pi_integral_const: float = None
|
||||
self.pi_proportional_scale: float = None
|
||||
self.pi_proportional_exponent: float = None
|
||||
self.pi_proportional_norm_max: float = None
|
||||
self.pi_integral_scale: float = None
|
||||
self.pi_integral_exponent: float = None
|
||||
self.pi_integral_norm_max: float = None
|
||||
self.step_threshold: float = None
|
||||
self.first_step_threshold: float = None
|
||||
self.max_frequency: int = None
|
||||
self.clock_servo: str = ''
|
||||
self.sanity_freq_limit: int = -1
|
||||
self.ntpshm_segment: int = -1
|
||||
self.msg_interval_request: int = -1
|
||||
self.servo_num_offset_values: int = -1
|
||||
self.servo_offset_threshold: int = -1
|
||||
self.write_phase_mode: int = -1
|
||||
|
||||
def get_pi_proportional_const(self) -> float:
|
||||
"""
|
||||
Getter for pi_proportional
|
||||
Returns: pi_proportional
|
||||
|
||||
"""
|
||||
return self.pi_proportional_const
|
||||
|
||||
def set_pi_proportional_const(self, pi_proportional_const: float):
|
||||
"""
|
||||
Setter for pi_proportional_const
|
||||
Args:
|
||||
pi_proportional_const (): the pi_proportional_const value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_proportional_const = pi_proportional_const
|
||||
|
||||
def get_pi_integral_const(self) -> float:
|
||||
"""
|
||||
Getter for pi_integral_const
|
||||
Returns: pi_integral_const value
|
||||
|
||||
"""
|
||||
return self.pi_integral_const
|
||||
|
||||
def set_pi_integral_const(self, pi_integral_const: float):
|
||||
"""
|
||||
Setter for pi_integral_const
|
||||
Args:
|
||||
pi_integral_const (): pi_integral_const value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_integral_const = pi_integral_const
|
||||
|
||||
def get_pi_proportional_scale(self) -> float:
|
||||
"""
|
||||
Getter for pi_proportional_scale
|
||||
Returns: pi_proportional_scale value
|
||||
|
||||
"""
|
||||
return self.pi_proportional_scale
|
||||
|
||||
def set_pi_proportional_scale(self, pi_proportional_scale: float):
|
||||
"""
|
||||
Setter for pi_proportional_scale
|
||||
Args:
|
||||
pi_proportional_scale (): pi_proportional_scale value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_proportional_scale = pi_proportional_scale
|
||||
|
||||
def get_pi_proportional_exponent(self) -> float:
|
||||
"""
|
||||
Getter for pi_proportional_exponent
|
||||
Returns: the pi_proportional_exponent value
|
||||
|
||||
"""
|
||||
return self.pi_proportional_exponent
|
||||
|
||||
def set_pi_proportional_exponent(self, pi_proportional_exponent: float):
|
||||
"""
|
||||
Setter for pi_proportional_exponent
|
||||
Args:
|
||||
pi_proportional_exponent (): the pi_proportional_exponent value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_proportional_exponent = pi_proportional_exponent
|
||||
|
||||
def get_pi_proportional_norm_max(self) -> float:
|
||||
"""
|
||||
Getter for pi_proportional_norm_max
|
||||
Returns: pi_proportional_norm_max value
|
||||
|
||||
"""
|
||||
return self.pi_proportional_norm_max
|
||||
|
||||
def set_pi_proportional_norm_max(self, pi_proportional_norm_max: float):
|
||||
"""
|
||||
Setter for pi_proportional_norm_max
|
||||
Args:
|
||||
pi_proportional_norm_max (): the pi_proportional_norm_max value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_proportional_norm_max = pi_proportional_norm_max
|
||||
|
||||
def get_pi_integral_scale(self) -> float:
|
||||
"""
|
||||
Getter for pi_integral_scale
|
||||
Returns: pi_integral_scale value
|
||||
|
||||
"""
|
||||
return self.pi_integral_scale
|
||||
|
||||
def set_pi_integral_scale(self, pi_integral_scale: float):
|
||||
"""
|
||||
Setter for pi_integral_scale
|
||||
Args:
|
||||
pi_integral_scale (): the pi_integral_scale value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_integral_scale = pi_integral_scale
|
||||
|
||||
def get_pi_integral_exponent(self) -> float:
|
||||
"""
|
||||
Getter for pi_integral_exponent
|
||||
Returns: pi_integral_exponent value
|
||||
|
||||
"""
|
||||
return self.pi_integral_exponent
|
||||
|
||||
def set_pi_integral_exponent(self, pi_integral_exponent: float):
|
||||
"""
|
||||
Setter for pi_integral_exponent
|
||||
Args:
|
||||
pi_integral_exponent (): the pi_integral_exponent value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_integral_exponent = pi_integral_exponent
|
||||
|
||||
def get_pi_integral_norm_max(self) -> float:
|
||||
"""
|
||||
Getter for pi_integral_norm_max
|
||||
Returns: pi_integral_norm_max value
|
||||
|
||||
"""
|
||||
return self.pi_integral_norm_max
|
||||
|
||||
def set_pi_integral_norm_max(self, pi_integral_norm_max: float):
|
||||
"""
|
||||
Setter for pi_integral_norm_max
|
||||
Args:
|
||||
pi_integral_norm_max (): the pi_integral_norm_max value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.pi_integral_norm_max = pi_integral_norm_max
|
||||
|
||||
def get_step_threshold(self) -> float:
|
||||
"""
|
||||
Getter for step_threshold
|
||||
Returns: the step_threshold value
|
||||
|
||||
"""
|
||||
return self.step_threshold
|
||||
|
||||
def set_step_threshold(self, step_threshold: float):
|
||||
"""
|
||||
Setter for step_threshold
|
||||
Args:
|
||||
step_threshold (): the step_threshold value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.step_threshold = step_threshold
|
||||
|
||||
def get_first_step_threshold(self) -> float:
|
||||
"""
|
||||
Getter for first_step_threshold
|
||||
Returns: the first_step_threshold value
|
||||
|
||||
"""
|
||||
return self.first_step_threshold
|
||||
|
||||
def set_first_step_threshold(self, first_step_threshold: float):
|
||||
"""
|
||||
Setter for first_step_threshold
|
||||
Args:
|
||||
first_step_threshold (): the first_step_threshold value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.first_step_threshold = first_step_threshold
|
||||
|
||||
def get_max_frequency(self) -> int:
|
||||
"""
|
||||
Getter for max_frequency
|
||||
Returns: the max_frequency value
|
||||
|
||||
"""
|
||||
return self.max_frequency
|
||||
|
||||
def set_max_frequency(self, max_frequency: int):
|
||||
"""
|
||||
Setter for max_frequency
|
||||
Args:
|
||||
max_frequency (): the max_frequency value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.max_frequency = max_frequency
|
||||
|
||||
def get_clock_servo(self) -> str:
|
||||
"""
|
||||
Getter for clock_servo
|
||||
Returns: the clock_servo value
|
||||
|
||||
"""
|
||||
return self.clock_servo
|
||||
|
||||
def set_clock_servo(self, clock_servo: str):
|
||||
"""
|
||||
Setter for clock_servo
|
||||
Args:
|
||||
clock_servo (): the clock_servo value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.clock_servo = clock_servo
|
||||
|
||||
def get_sanity_freq_limit(self) -> int:
|
||||
"""
|
||||
Getter for sanity_freq_limit
|
||||
Returns: the sanity_freq_limit value
|
||||
|
||||
"""
|
||||
return self.sanity_freq_limit
|
||||
|
||||
def set_sanity_freq_limit(self, sanity_freq_limit: int):
|
||||
"""
|
||||
Setter for sanity_freq_limit
|
||||
Args:
|
||||
sanity_freq_limit (): the sanity_freq_limit value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.sanity_freq_limit = sanity_freq_limit
|
||||
|
||||
def get_ntpshm_segment(self) -> int:
|
||||
"""
|
||||
Getter for ntpshm_segment
|
||||
Returns: the ntpshm_segment value
|
||||
|
||||
"""
|
||||
return self.ntpshm_segment
|
||||
|
||||
def set_ntpshm_segment(self, ntpshm_segment: int):
|
||||
"""
|
||||
Setter for ntpshm_segment
|
||||
Args:
|
||||
ntpshm_segment (): the ntpshm_segment value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.ntpshm_segment = ntpshm_segment
|
||||
|
||||
def get_msg_interval_request(self) -> int:
|
||||
"""
|
||||
Getter for msg_interval_request
|
||||
Returns: the msg_interval_request value
|
||||
|
||||
"""
|
||||
return self.msg_interval_request
|
||||
|
||||
def set_msg_interval_request(self, msg_interval_request: int):
|
||||
"""
|
||||
Setter for msg_interval_request
|
||||
Args:
|
||||
msg_interval_request (): the msg_interval_request value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.msg_interval_request = msg_interval_request
|
||||
|
||||
def get_servo_num_offset_values(self) -> int:
|
||||
"""
|
||||
Getter for servo_num_offset_values
|
||||
Returns: the servo_num_offset_values value
|
||||
|
||||
"""
|
||||
return self.servo_num_offset_values
|
||||
|
||||
def set_servo_num_offset_values(self, servo_num_offset_values: int):
|
||||
"""
|
||||
Setter for servo_num_offset_values
|
||||
Args:
|
||||
servo_num_offset_values (): the servo_num_offset_values value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.servo_num_offset_values = servo_num_offset_values
|
||||
|
||||
def get_servo_offset_threshold(self) -> int:
|
||||
"""
|
||||
Getter for servo_offset_threshold
|
||||
Returns: the servo_offset_threshold value
|
||||
|
||||
"""
|
||||
return self.servo_offset_threshold
|
||||
|
||||
def set_servo_offset_threshold(self, servo_offset_threshold: int):
|
||||
"""
|
||||
Setter for servo_offset_threshold
|
||||
Args:
|
||||
servo_offset_threshold (): the servo_offset_threshold value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.servo_offset_threshold = servo_offset_threshold
|
||||
|
||||
def get_write_phase_mode(self) -> int:
|
||||
"""
|
||||
Getter for write_phase_mode
|
||||
Returns: the write_phase_mode value
|
||||
|
||||
"""
|
||||
return self.write_phase_mode
|
||||
|
||||
def set_write_phase_mode(self, write_phase_mode: int):
|
||||
"""
|
||||
Setter for write_phase_mode
|
||||
Args:
|
||||
write_phase_mode (): the write_phase_mode value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.write_phase_mode = write_phase_mode
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
105
keywords/ptp/cat/objects/servo_options_output.py
Normal file
105
keywords/ptp/cat/objects/servo_options_output.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from keywords.ptp.cat.cat_ptp_table_parser import CatPtpTableParser
|
||||
from keywords.ptp.cat.objects.servo_options_object import ServoOptionsObject
|
||||
|
||||
|
||||
class ServoOptionsOutput:
|
||||
"""
|
||||
This class parses the output of Servo Options
|
||||
|
||||
Example:
|
||||
pi_proportional_const 0.0
|
||||
pi_integral_const 0.0
|
||||
pi_proportional_scale 0.0
|
||||
pi_proportional_exponent -0.3
|
||||
pi_proportional_norm_max 0.7
|
||||
pi_integral_scale 0.0
|
||||
pi_integral_exponent 0.4
|
||||
pi_integral_norm_max 0.3
|
||||
step_threshold 0.0
|
||||
first_step_threshold 0.00002
|
||||
max_frequency 900000000
|
||||
clock_servo pi
|
||||
sanity_freq_limit 200000000
|
||||
ntpshm_segment 0
|
||||
msg_interval_request 0
|
||||
servo_num_offset_values 10
|
||||
servo_offset_threshold 0
|
||||
write_phase_mode 0
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, servo_options_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an internal ServoOptionsObject from the passed parameter.
|
||||
Args:
|
||||
servo_options_output (list[str]): a list of strings representing the servo options output
|
||||
|
||||
"""
|
||||
cat_ptp_table_parser = CatPtpTableParser(servo_options_output)
|
||||
output_values = cat_ptp_table_parser.get_output_values_dict()
|
||||
self.servo_options_object = ServoOptionsObject()
|
||||
|
||||
if 'pi_proportional_const' in output_values:
|
||||
self.servo_options_object.set_pi_proportional_const(float(output_values['pi_proportional_const']))
|
||||
|
||||
if 'pi_integral_const' in output_values:
|
||||
self.servo_options_object.set_pi_integral_const(float(output_values['pi_integral_const']))
|
||||
|
||||
if 'pi_proportional_scale' in output_values:
|
||||
self.servo_options_object.set_pi_proportional_scale(float(output_values['pi_proportional_scale']))
|
||||
|
||||
if 'pi_proportional_exponent' in output_values:
|
||||
self.servo_options_object.set_pi_proportional_exponent(float(output_values['pi_proportional_exponent']))
|
||||
|
||||
if 'pi_proportional_norm_max' in output_values:
|
||||
self.servo_options_object.set_pi_proportional_norm_max(float(output_values['pi_proportional_norm_max']))
|
||||
|
||||
if 'pi_integral_scale' in output_values:
|
||||
self.servo_options_object.set_pi_integral_scale(float(output_values['pi_integral_scale']))
|
||||
|
||||
if 'pi_integral_exponent' in output_values:
|
||||
self.servo_options_object.set_pi_integral_exponent(float(output_values['pi_integral_exponent']))
|
||||
|
||||
if 'pi_integral_norm_max' in output_values:
|
||||
self.servo_options_object.set_pi_integral_norm_max(float(output_values['pi_integral_norm_max']))
|
||||
|
||||
if 'step_threshold' in output_values:
|
||||
self.servo_options_object.set_step_threshold(float(output_values['step_threshold']))
|
||||
|
||||
if 'first_step_threshold' in output_values:
|
||||
self.servo_options_object.set_first_step_threshold(float(output_values['first_step_threshold']))
|
||||
|
||||
if 'max_frequency' in output_values:
|
||||
self.servo_options_object.set_max_frequency(int(output_values['max_frequency']))
|
||||
|
||||
if 'clock_servo' in output_values:
|
||||
self.servo_options_object.set_clock_servo(output_values['clock_servo'])
|
||||
|
||||
if 'sanity_freq_limit' in output_values:
|
||||
self.servo_options_object.set_sanity_freq_limit(int(output_values['sanity_freq_limit']))
|
||||
|
||||
if 'ntpshm_segment' in output_values:
|
||||
self.servo_options_object.set_ntpshm_segment(int(output_values['ntpshm_segment']))
|
||||
|
||||
if 'msg_interval_request' in output_values:
|
||||
self.servo_options_object.set_msg_interval_request(int(output_values['msg_interval_request']))
|
||||
|
||||
if 'servo_num_offset_values' in output_values:
|
||||
self.servo_options_object.set_servo_num_offset_values(int(output_values['servo_num_offset_values']))
|
||||
|
||||
if 'servo_offset_threshold' in output_values:
|
||||
self.servo_options_object.set_servo_offset_threshold(int(output_values['servo_offset_threshold']))
|
||||
|
||||
if 'write_phase_mode' in output_values:
|
||||
self.servo_options_object.set_write_phase_mode(int(output_values['write_phase_mode']))
|
||||
|
||||
def get_servo_options_object(self) -> ServoOptionsObject:
|
||||
"""
|
||||
Getter for servo_options_object object.
|
||||
|
||||
Returns:
|
||||
A ServoOptionsObject
|
||||
|
||||
"""
|
||||
return self.servo_options_object
|
146
keywords/ptp/cat/objects/transport_options_object.py
Normal file
146
keywords/ptp/cat/objects/transport_options_object.py
Normal file
@@ -0,0 +1,146 @@
|
||||
class TransportOptionsObject:
|
||||
"""
|
||||
Object to hold the values of Transport Options Object
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.transport_specific: str = ''
|
||||
self.ptp_dst_mac: str = ''
|
||||
self.p2p_dst_mac: str = ''
|
||||
self.udp_ttl: int = -1
|
||||
self.udp6_scope: str = ''
|
||||
self.uds_address: str = ''
|
||||
self.uds_ro_address: str = ''
|
||||
|
||||
def get_transport_specific(self) -> str:
|
||||
"""
|
||||
Getter for transport_specific
|
||||
Returns: transport_specific
|
||||
|
||||
"""
|
||||
return self.transport_specific
|
||||
|
||||
def set_transport_specific(self, transport_specific: str):
|
||||
"""
|
||||
Setter for transport_specific
|
||||
Args:
|
||||
transport_specific (): the transport_specific value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.transport_specific = transport_specific
|
||||
|
||||
def get_ptp_dst_mac(self) -> str:
|
||||
"""
|
||||
Getter for ptp_dst_mac
|
||||
Returns: ptp_dst_mac value
|
||||
|
||||
"""
|
||||
return self.ptp_dst_mac
|
||||
|
||||
def set_ptp_dst_mac(self, ptp_dst_mac: str):
|
||||
"""
|
||||
Setter for ptp_dst_mac
|
||||
Args:
|
||||
ptp_dst_mac (): ptp_dst_mac value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.ptp_dst_mac = ptp_dst_mac
|
||||
|
||||
def get_p2p_dst_mac(self) -> str:
|
||||
"""
|
||||
Getter for p2p_dst_mac
|
||||
Returns: p2p_dst_mac value
|
||||
|
||||
"""
|
||||
return self.p2p_dst_mac
|
||||
|
||||
def set_p2p_dst_mac(self, p2p_dst_mac: str):
|
||||
"""
|
||||
Setter for p2p_dst_mac
|
||||
Args:
|
||||
p2p_dst_mac (): p2p_dst_mac value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.p2p_dst_mac = p2p_dst_mac
|
||||
|
||||
def get_udp_ttl(self) -> int:
|
||||
"""
|
||||
Getter for udp_ttl
|
||||
Returns: the udp_ttl value
|
||||
|
||||
"""
|
||||
return self.udp_ttl
|
||||
|
||||
def set_udp_ttl(self, udp_ttl: int):
|
||||
"""
|
||||
Setter for udp_ttl
|
||||
Args:
|
||||
udp_ttl (): the udp_ttl value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.udp_ttl = udp_ttl
|
||||
|
||||
def get_udp6_scope(self) -> str:
|
||||
"""
|
||||
Getter for udp6_scope
|
||||
Returns: udp6_scope value
|
||||
|
||||
"""
|
||||
return self.udp6_scope
|
||||
|
||||
def set_udp6_scope(self, udp6_scope: str):
|
||||
"""
|
||||
Setter for udp6_scope
|
||||
Args:
|
||||
udp6_scope (): the udp6_scope value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.udp6_scope = udp6_scope
|
||||
|
||||
def get_uds_address(self) -> str:
|
||||
"""
|
||||
Getter for uds_address
|
||||
Returns: uds_address value
|
||||
|
||||
"""
|
||||
return self.uds_address
|
||||
|
||||
def set_uds_address(self, uds_address: str):
|
||||
"""
|
||||
Setter for uds_address
|
||||
Args:
|
||||
uds_address (): the uds_address value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.uds_address = uds_address
|
||||
|
||||
def get_uds_ro_address(self) -> str:
|
||||
"""
|
||||
Getter for uds_ro_address
|
||||
Returns: uds_ro_address value
|
||||
|
||||
"""
|
||||
return self.uds_ro_address
|
||||
|
||||
def set_uds_ro_address(self, uds_ro_address: str):
|
||||
"""
|
||||
Setter for uds_ro_address
|
||||
Args:
|
||||
uds_ro_address (): the uds_ro_address value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.uds_ro_address = uds_ro_address
|
61
keywords/ptp/cat/objects/transport_options_output.py
Normal file
61
keywords/ptp/cat/objects/transport_options_output.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from keywords.ptp.cat.cat_ptp_table_parser import CatPtpTableParser
|
||||
from keywords.ptp.cat.objects.transport_options_object import TransportOptionsObject
|
||||
|
||||
|
||||
class TransportOptionsOutput:
|
||||
"""
|
||||
This class parses the output of Transport Options
|
||||
|
||||
Example:
|
||||
transportSpecific 0x0
|
||||
ptp_dst_mac 01:1B:19:00:00:00
|
||||
p2p_dst_mac 01:80:C2:00:00:0E
|
||||
udp_ttl 1
|
||||
udp6_scope 0x0E
|
||||
uds_address /var/run/ptp4l
|
||||
uds_ro_address /var/run/ptp4lro
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, transport_options_output: [str]):
|
||||
"""
|
||||
Constructor.
|
||||
Create an internal ServoOptionsObject from the passed parameter.
|
||||
Args:
|
||||
transport_options_output (list[str]): a list of strings representing the servo options output
|
||||
|
||||
"""
|
||||
cat_ptp_table_parser = CatPtpTableParser(transport_options_output)
|
||||
output_values = cat_ptp_table_parser.get_output_values_dict()
|
||||
self.transport_options_object = TransportOptionsObject()
|
||||
|
||||
if 'transportSpecific' in output_values:
|
||||
self.transport_options_object.set_transport_specific(output_values['transportSpecific'])
|
||||
|
||||
if 'ptp_dst_mac' in output_values:
|
||||
self.transport_options_object.set_ptp_dst_mac(output_values['ptp_dst_mac'])
|
||||
|
||||
if 'p2p_dst_mac' in output_values:
|
||||
self.transport_options_object.set_p2p_dst_mac(output_values['p2p_dst_mac'])
|
||||
|
||||
if 'udp_ttl' in output_values:
|
||||
self.transport_options_object.set_udp_ttl(int(output_values['udp_ttl']))
|
||||
|
||||
if 'udp6_scope' in output_values:
|
||||
self.transport_options_object.set_udp6_scope(output_values['udp6_scope'])
|
||||
|
||||
if 'uds_address' in output_values:
|
||||
self.transport_options_object.set_uds_address(output_values['uds_address'])
|
||||
|
||||
if 'uds_ro_address' in output_values:
|
||||
self.transport_options_object.set_uds_ro_address(output_values['uds_ro_address'])
|
||||
|
||||
def get_transport_options_object(self) -> TransportOptionsObject:
|
||||
"""
|
||||
Getter for TransportOptionsObject object.
|
||||
|
||||
Returns:
|
||||
A TransportOptionsObject
|
||||
|
||||
"""
|
||||
return self.transport_options_object
|
@@ -7,6 +7,7 @@ class PMCGetDefaultDataSetObject:
|
||||
self.two_step_flag: int = -1
|
||||
self.slave_only: int = -1
|
||||
self.number_ports: int = -1
|
||||
self.socket_priority: int = -1
|
||||
self.priority1: int = -1
|
||||
self.clock_class: int = -1
|
||||
self.clock_accuracy: str = ''
|
||||
@@ -14,6 +15,13 @@ class PMCGetDefaultDataSetObject:
|
||||
self.priority2: int = -1
|
||||
self.clock_identity: str = ''
|
||||
self.domain_number: str = ''
|
||||
self.free_running: int = -1
|
||||
self.freq_est_interval: int = -1
|
||||
self.dscp_event: int = -1
|
||||
self.dscp_general: int = -1
|
||||
self.dataset_comparison: str = ''
|
||||
self.max_steps_removed: int = -1
|
||||
self.utc_offset: int = -1
|
||||
|
||||
def get_two_step_flag(self) -> int:
|
||||
"""
|
||||
@@ -53,6 +61,25 @@ class PMCGetDefaultDataSetObject:
|
||||
"""
|
||||
self.slave_only = slave_only
|
||||
|
||||
def get_socket_priority(self) -> int:
|
||||
"""
|
||||
Getter for socket_priority
|
||||
Returns: socket_priority value
|
||||
|
||||
"""
|
||||
return self.socket_priority
|
||||
|
||||
def set_socket_priority(self, socket_priority: int):
|
||||
"""
|
||||
Setter for socket_priority
|
||||
Args:
|
||||
socket_priority (): socket_priority value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.socket_priority = socket_priority
|
||||
|
||||
def get_number_ports(self) -> int:
|
||||
"""
|
||||
Getter for number_ports
|
||||
@@ -204,3 +231,158 @@ class PMCGetDefaultDataSetObject:
|
||||
|
||||
"""
|
||||
self.domain_number = domain_number
|
||||
|
||||
def get_free_running(self) -> int:
|
||||
"""
|
||||
Getter for free_running
|
||||
Returns: the free_running value
|
||||
|
||||
"""
|
||||
return self.free_running
|
||||
|
||||
def set_free_running(self, free_running: int):
|
||||
"""
|
||||
Setter for free_running
|
||||
Args:
|
||||
free_running (): the free_running value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.free_running = free_running
|
||||
|
||||
def get_freq_est_interval(self) -> int:
|
||||
"""
|
||||
Getter for freq_est_interval
|
||||
Returns: the freq_est_interval value
|
||||
|
||||
"""
|
||||
return self.freq_est_interval
|
||||
|
||||
def set_freq_est_interval(self, freq_est_interval: int):
|
||||
"""
|
||||
Setter for freq_est_interval
|
||||
Args:
|
||||
freq_est_interval (): the freq_est_interval value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.freq_est_interval = freq_est_interval
|
||||
|
||||
def get_dscp_event(self) -> int:
|
||||
"""
|
||||
Getter for dscp_event
|
||||
Returns: the dscp_event value
|
||||
|
||||
"""
|
||||
return self.dscp_event
|
||||
|
||||
def set_dscp_event(self, dscp_event: int):
|
||||
"""
|
||||
Setter for dscp_event
|
||||
Args:
|
||||
dscp_event (): the dscp_event value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.dscp_event = dscp_event
|
||||
|
||||
def get_dscp_event(self) -> int:
|
||||
"""
|
||||
Getter for priority2
|
||||
Returns: the priority2 value
|
||||
|
||||
"""
|
||||
return self.dscp_event
|
||||
|
||||
def set_dscp_event(self, dscp_event: int):
|
||||
"""
|
||||
Setter for dscp_event
|
||||
Args:
|
||||
dscp_event (): the dscp_event value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.dscp_event = dscp_event
|
||||
|
||||
def get_dscp_general(self) -> int:
|
||||
"""
|
||||
Getter for dscp_general
|
||||
Returns: the dscp_general value
|
||||
|
||||
"""
|
||||
return self.dscp_general
|
||||
|
||||
def set_dscp_general(self, dscp_general: int):
|
||||
"""
|
||||
Setter for dscp_general
|
||||
Args:
|
||||
dscp_general (): the dscp_general value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.dscp_general = dscp_general
|
||||
|
||||
def get_dataset_comparison(self) -> str:
|
||||
"""
|
||||
Getter for dataset_comparison
|
||||
Returns: the dataset_comparison value
|
||||
|
||||
"""
|
||||
return self.dataset_comparison
|
||||
|
||||
def set_dataset_comparison(self, dataset_comparison: str):
|
||||
"""
|
||||
Setter for dataset_comparison
|
||||
Args:
|
||||
dataset_comparison (): the dataset_comparison value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.dataset_comparison = dataset_comparison
|
||||
|
||||
def get_max_steps_removed(self) -> int:
|
||||
"""
|
||||
Getter for max_steps_removed
|
||||
Returns: the max_steps_removed value
|
||||
|
||||
"""
|
||||
return self.max_steps_removed
|
||||
|
||||
def set_max_steps_removed(self, max_steps_removed: int):
|
||||
"""
|
||||
Setter for max_steps_removed
|
||||
Args:
|
||||
max_steps_removed (): the max_steps_removed value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.max_steps_removed = max_steps_removed
|
||||
|
||||
def get_utc_offset(self) -> int:
|
||||
"""
|
||||
Getter for utc_offset
|
||||
Returns: the utc_offset value
|
||||
|
||||
"""
|
||||
return self.utc_offset
|
||||
|
||||
def set_utc_offset(self, utc_offset: int):
|
||||
"""
|
||||
Setter for utc_offset
|
||||
Args:
|
||||
utc_offset (): the utc_offset value
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.utc_offset = utc_offset
|
||||
|
||||
|
||||
|
||||
|
@@ -40,6 +40,9 @@ class PMCGetDefaultDataSetOutput:
|
||||
if 'slaveOnly' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_slave_only(int(output_values['slaveOnly']))
|
||||
|
||||
if 'socket_priority' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_socket_priority(int(output_values['socket_priority']))
|
||||
|
||||
if 'numberPorts' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_number_ports(int(output_values['numberPorts']))
|
||||
|
||||
@@ -64,6 +67,27 @@ class PMCGetDefaultDataSetOutput:
|
||||
if 'domainNumber' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_domain_number(output_values['domainNumber'])
|
||||
|
||||
if 'free_running' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_free_running(int(output_values['free_running']))
|
||||
|
||||
if 'freq_est_interval' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_freq_est_interval(int(output_values['freq_est_interval']))
|
||||
|
||||
if 'dscp_event' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_dscp_event(int(output_values['dscp_event']))
|
||||
|
||||
if 'dscp_general' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_dscp_general(int(output_values['dscp_general']))
|
||||
|
||||
if 'dataset_comparison' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_dataset_comparison(output_values['dataset_comparison'])
|
||||
|
||||
if 'maxStepsRemoved' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_max_steps_removed(int(output_values['maxStepsRemoved']))
|
||||
|
||||
if '#utc_offset' in output_values:
|
||||
self.pmc_get_default_data_set_object.set_utc_offset(int(output_values['#utc_offset']))
|
||||
|
||||
def get_pmc_get_default_data_set_object(self) -> PMCGetDefaultDataSetObject:
|
||||
"""
|
||||
Getter for pmc_get_default_data_set_object object.
|
||||
|
239
unit_tests/parser/ptp/cat_ptp_config_test.py
Normal file
239
unit_tests/parser/ptp/cat_ptp_config_test.py
Normal file
@@ -0,0 +1,239 @@
|
||||
from keywords.ptp.cat.objects.cat_ptp_config_output import CATPtpConfigOutput
|
||||
|
||||
cat_ptp_config = [
|
||||
'[global] \n',
|
||||
'#\n',
|
||||
'# Default Data Set\n',
|
||||
'#\n',
|
||||
'twoStepFlag 1\n',
|
||||
'slaveOnly 0\n',
|
||||
'socket_priority 0\n',
|
||||
'priority1 128\n',
|
||||
'priority2 128\n',
|
||||
'domainNumber 0\n',
|
||||
'#utc_offset 37\n',
|
||||
'clockClass 248\n',
|
||||
'clockAccuracy 0xFE\n',
|
||||
'offsetScaledLogVariance 0xFFFF\n',
|
||||
'free_running 0\n',
|
||||
'freq_est_interval 1\n',
|
||||
'dscp_event 0\n',
|
||||
'dscp_general 0\n',
|
||||
'dataset_comparison ieee1588\n',
|
||||
'G.8275.defaultDS.localPriority 128\n',
|
||||
'maxStepsRemoved 255\n',
|
||||
'#\n',
|
||||
'# Port Data Set\n',
|
||||
'#\n',
|
||||
'logAnnounceInterval 1\n',
|
||||
'logSyncInterval 0\n',
|
||||
'operLogSyncInterval 0\n',
|
||||
'logMinDelayReqInterval 0\n',
|
||||
'logMinPdelayReqInterval 0\n',
|
||||
'operLogPdelayReqInterval 0\n',
|
||||
'announceReceiptTimeout 3\n',
|
||||
'syncReceiptTimeout 0\n',
|
||||
'delayAsymmetry 0\n',
|
||||
'fault_reset_interval 4\n',
|
||||
'neighborPropDelayThresh 20000000\n',
|
||||
'masterOnly 0\n',
|
||||
'G.8275.portDS.localPriority 128\n',
|
||||
'asCapable auto\n',
|
||||
'BMCA ptp\n',
|
||||
'inhibit_announce 0\n',
|
||||
'inhibit_delay_req 0\n',
|
||||
'ignore_source_id 0\n',
|
||||
'#\n',
|
||||
'# Run time options\n',
|
||||
'#\n',
|
||||
'assume_two_step 0\n',
|
||||
'logging_level 6\n',
|
||||
'path_trace_enabled 0\n',
|
||||
'follow_up_info 0\n',
|
||||
'hybrid_e2e 0\n',
|
||||
'inhibit_multicast_service 0\n',
|
||||
'net_sync_monitor 0\n',
|
||||
'tc_spanning_tree 0\n',
|
||||
'tx_timestamp_timeout 1\n',
|
||||
'unicast_listen 0\n',
|
||||
'unicast_master_table 0\n',
|
||||
'unicast_req_duration 3600\n',
|
||||
'use_syslog 1\n',
|
||||
'verbose 0\n',
|
||||
'summary_interval 0\n',
|
||||
'kernel_leap 1\n',
|
||||
'check_fup_sync 0\n',
|
||||
'#\n',
|
||||
'# Servo Options\n',
|
||||
'#\n',
|
||||
'pi_proportional_const 0.0\n',
|
||||
'pi_integral_const 0.0\n',
|
||||
'pi_proportional_scale 0.0\n',
|
||||
'pi_proportional_exponent -0.3\n',
|
||||
'pi_proportional_norm_max 0.7\n',
|
||||
'pi_integral_scale 0.0\n',
|
||||
'pi_integral_exponent 0.4\n',
|
||||
'pi_integral_norm_max 0.3\n',
|
||||
'step_threshold 0.0\n',
|
||||
'first_step_threshold 0.00002\n',
|
||||
'max_frequency 900000000\n',
|
||||
'clock_servo pi\n',
|
||||
'sanity_freq_limit 200000000\n',
|
||||
'ntpshm_segment 0\n',
|
||||
'msg_interval_request 0\n',
|
||||
'servo_num_offset_values 10\n',
|
||||
'servo_offset_threshold 0\n',
|
||||
'write_phase_mode 0\n',
|
||||
'#\n',
|
||||
'# Transport options\n',
|
||||
'#\n',
|
||||
'transportSpecific 0x0\n',
|
||||
'ptp_dst_mac 01:1B:19:00:00:00\n',
|
||||
'p2p_dst_mac 01:80:C2:00:00:0E\n',
|
||||
'udp_ttl 1\n',
|
||||
'udp6_scope 0x0E\n',
|
||||
'uds_address /var/run/ptp4l\n',
|
||||
'uds_ro_address /var/run/ptp4lro\n',
|
||||
'#\n',
|
||||
'# Default interface options\n',
|
||||
'#\n',
|
||||
'clock_type OC\n',
|
||||
'network_transport UDPv4\n',
|
||||
'delay_mechanism E2E\n',
|
||||
'time_stamping hardware\n',
|
||||
'tsproc_mode filter\n',
|
||||
'delay_filter moving_median\n',
|
||||
'delay_filter_length 10\n',
|
||||
'egressLatency 0\n',
|
||||
'ingressLatency 0\n',
|
||||
'boundary_clock_jbod 0\n',
|
||||
'#\n',
|
||||
'# Clock description\n',
|
||||
'#\n',
|
||||
'productDescription ;;\n',
|
||||
'revisionData ;;\n',
|
||||
'manufacturerIdentity 00:00:00\n',
|
||||
'userDescription ;\n',
|
||||
'timeSource 0xA0\n',
|
||||
]
|
||||
|
||||
|
||||
def test_cat_ptp_config_output():
|
||||
"""
|
||||
Tests cat_ptp_config_output
|
||||
Returns:
|
||||
|
||||
"""
|
||||
cat_ptp_config_output = CATPtpConfigOutput(cat_ptp_config)
|
||||
|
||||
# validate default data set
|
||||
default_data_set_object = cat_ptp_config_output.get_data_set_output().get_pmc_get_default_data_set_object()
|
||||
assert default_data_set_object.get_two_step_flag() == 1
|
||||
assert default_data_set_object.get_slave_only() == 0
|
||||
assert default_data_set_object.get_socket_priority() == 0
|
||||
assert default_data_set_object.get_priority1() == 128
|
||||
assert default_data_set_object.get_priority2() == 128
|
||||
assert default_data_set_object.get_domain_number() == '0'
|
||||
assert default_data_set_object.get_utc_offset() == 37
|
||||
assert default_data_set_object.get_clock_class() == 248
|
||||
assert default_data_set_object.get_clock_accuracy() == '0xFE'
|
||||
assert default_data_set_object.get_offset_scaled_log_variance() == '0xFFFF'
|
||||
assert default_data_set_object.get_free_running() == 0
|
||||
assert default_data_set_object.get_freq_est_interval() == 1
|
||||
assert default_data_set_object.get_dscp_event() == 0
|
||||
assert default_data_set_object.get_dscp_general() == 0
|
||||
assert default_data_set_object.get_dataset_comparison() == 'ieee1588'
|
||||
assert default_data_set_object.get_max_steps_removed() == 255
|
||||
|
||||
# validate the port data object
|
||||
port_data_object = cat_ptp_config_output.get_port_data_set_output().get_port_data_set_object()
|
||||
assert port_data_object.get_log_announce_interval() == 1
|
||||
assert port_data_object.get_log_sync_interval() == 0
|
||||
assert port_data_object.get_oper_log_sync_interval() == 0
|
||||
assert port_data_object.get_log_min_delay_req_interval() == 0
|
||||
assert port_data_object.get_log_min_p_delay_req_interval() == 0
|
||||
assert port_data_object.get_oper_log_p_delay_req_interval() == 0
|
||||
assert port_data_object.get_announce_receipt_timeout() == 3
|
||||
assert port_data_object.get_sync_receipt_timeout() == 0
|
||||
assert port_data_object.get_delay_asymmetry() == 0
|
||||
assert port_data_object.get_fault_reset_interval() == 4
|
||||
assert port_data_object.get_neighbor_prop_delay_thresh() == 20000000
|
||||
assert port_data_object.get_master_only() == 0
|
||||
assert port_data_object.get_as_capable() == 'auto'
|
||||
assert port_data_object.get_bmca() == 'ptp'
|
||||
assert port_data_object.get_inhibit_announce() == 0
|
||||
assert port_data_object.get_inhibit_delay_req() == 0
|
||||
assert port_data_object.get_ignore_source_id() == 0
|
||||
|
||||
# validate run time options
|
||||
run_time_options_object = cat_ptp_config_output.get_run_time_options_output().get_run_time_options_object()
|
||||
assert run_time_options_object.get_assume_two_step() == 0
|
||||
assert run_time_options_object.get_logging_level() == 6
|
||||
assert run_time_options_object.get_path_trace_enabled() == 0
|
||||
assert run_time_options_object.get_follow_up_info() == 0
|
||||
assert run_time_options_object.get_hybrid_e2e() == 0
|
||||
assert run_time_options_object.get_inhibit_multicast_service() == 0
|
||||
assert run_time_options_object.get_net_sync_monitor() == 0
|
||||
assert run_time_options_object.get_tc_spanning_tree() == 0
|
||||
assert run_time_options_object.get_tx_timestamp_timeout() == 1
|
||||
assert run_time_options_object.get_unicast_listen() == 0
|
||||
assert run_time_options_object.get_unicast_master_table() == 0
|
||||
assert run_time_options_object.get_unicast_req_duration() == 3600
|
||||
assert run_time_options_object.get_use_syslog() == 1
|
||||
assert run_time_options_object.get_verbose() == 0
|
||||
assert run_time_options_object.get_summary_interval() == 0
|
||||
assert run_time_options_object.get_kernel_leap() == 1
|
||||
assert run_time_options_object.get_check_fup_sync() == 0
|
||||
|
||||
# validate Servo Options
|
||||
servo_options_object = cat_ptp_config_output.get_servo_options_output().get_servo_options_object()
|
||||
assert servo_options_object.get_pi_proportional_const() == 0.0
|
||||
assert servo_options_object.get_pi_integral_const() == 0.0
|
||||
assert servo_options_object.get_pi_proportional_scale() == 0.0
|
||||
assert servo_options_object.get_pi_proportional_exponent() == -0.3
|
||||
assert servo_options_object.get_pi_proportional_norm_max() == 0.7
|
||||
assert servo_options_object.get_pi_integral_scale() == 0.0
|
||||
assert servo_options_object.get_pi_integral_exponent() == 0.4
|
||||
assert servo_options_object.get_pi_integral_norm_max() == 0.3
|
||||
assert servo_options_object.get_step_threshold() == 0.0
|
||||
assert servo_options_object.get_first_step_threshold() == 0.00002
|
||||
assert servo_options_object.get_max_frequency() == 900000000
|
||||
assert servo_options_object.get_clock_servo() == 'pi'
|
||||
assert servo_options_object.get_sanity_freq_limit() == 200000000
|
||||
assert servo_options_object.get_ntpshm_segment() == 0
|
||||
assert servo_options_object.get_msg_interval_request() == 0
|
||||
assert servo_options_object.get_servo_num_offset_values() == 10
|
||||
assert servo_options_object.get_servo_offset_threshold() == 0
|
||||
assert servo_options_object.get_write_phase_mode() == 0
|
||||
|
||||
# validate transport options
|
||||
|
||||
transport_options_object = cat_ptp_config_output.get_transport_options_output().get_transport_options_object()
|
||||
assert transport_options_object.get_transport_specific() == '0x0'
|
||||
assert transport_options_object.get_ptp_dst_mac() == '01:1B:19:00:00:00'
|
||||
assert transport_options_object.get_p2p_dst_mac() == '01:80:C2:00:00:0E'
|
||||
assert transport_options_object.get_udp_ttl() == 1
|
||||
assert transport_options_object.get_udp6_scope() == '0x0E'
|
||||
assert transport_options_object.get_uds_address() == '/var/run/ptp4l'
|
||||
assert transport_options_object.get_uds_ro_address() == '/var/run/ptp4lro'
|
||||
|
||||
# validate default interface options
|
||||
default_interface_object = cat_ptp_config_output.get_default_interface_options_output().get_default_interface_options_object()
|
||||
assert default_interface_object.get_clock_type() == 'OC'
|
||||
assert default_interface_object.get_network_transport() == 'UDPv4'
|
||||
assert default_interface_object.get_delay_mechanism() == 'E2E'
|
||||
assert default_interface_object.get_time_stamping() == 'hardware'
|
||||
assert default_interface_object.get_tsproc_mode() == 'filter'
|
||||
assert default_interface_object.get_delay_filter() == 'moving_median'
|
||||
assert default_interface_object.get_delay_filter_length() == 10
|
||||
assert default_interface_object.get_egress_latency() == 0
|
||||
assert default_interface_object.get_ingress_latency() == 0
|
||||
assert default_interface_object.get_boundary_clock_jbod() == 0
|
||||
|
||||
# validate clock description
|
||||
clock_description_object = cat_ptp_config_output.get_clock_description_output().get_clock_description_object()
|
||||
assert clock_description_object.get_product_description() == ';;'
|
||||
assert clock_description_object.get_revision_data() == ';;'
|
||||
assert clock_description_object.get_manufacturer_identity() == '00:00:00'
|
||||
assert clock_description_object.get_user_description() == ';'
|
||||
assert clock_description_object.get_time_source() == '0xA0'
|
Reference in New Issue
Block a user