DEV: Implement gnss monitoring in collectd

This commit supports new alarms based upon monitoring ptp-instance.
instance-level alarm: gpsd daemon fault
device-level alarms:
   GNSS signal loss/no lock
   Number of satellites below configured threshold
   Average SNR below configured threshold

This reads configured threshold values and devices from
/etc/linuxptp/ptpinstance/monitoring-*.conf and compares with
gpsd data, to trigger raise/clear alarms.

Unit tests has been added for testing gpsd protocol.

ptp_monitoring_cli.py added for testing gpsd data polling on live
system.

TEST PLAN:
PASS: Deploy on system where no gnss signal received
   system ptp-instance-add test-monitor monitoring
   system ptp-instance-parameter-add test-monitor satellite_count=12
   system ptp-instance-parameter-add test-monitor signal_quality_db=30
   system ptp-instance-parameter-add test-monitor devices="/dev/gnss0 /dev/gnss1"
   system ptp-instance-parameter-add test-monitor cmdline_opts="-D 7"

   system host-ptp-instance-assign controller-0 test-monitor
   system host-update controller-0 clock_synchronization=ptp
   system ptp-instance-apply

   - Below alarms received for both device_path=/dev/gnss0 and
    device_path=/dev/gnss1
      controller-0 GNSS signal quality db below threshold state:
         signal_quality_db 0 (expected: >= 30.0)
      controller-0 GNSS satellite count below threshold state:
         satellite count 0 (expected: >= 12)
      controller-0 GNSS signal loss state: signal lock False (expected: True)

   - gpsd.service process running and no alarm for this service
   - "systemctl stop gpsd.service" triggers "gpsd.service enabled
      but not running"
   - "systemctl start gpsd.service" clears "gpsd.service enabled
      but not running"

PASS: without devices, raises no device-specific alarms are reported,
     no errors on collectd.log
   system ptp-instance-parameter-delete test-monitor devices=
       "/dev/gnss0 /dev/gnss1"
   system ptp-instance-apply
   -  fm alarm-list # no alarms
   - "systemctl stop gpsd.service" triggers "gpsd.service enabled but
     not running" alarm
   - "systemctl start gpsd.service" clears "gpsd.service enabled but
     not running" alarm

PASS: Add wrong satellite_count value, only that alarm get excluded
   system ptp-instance-parameter-add test-monitor devices=
       "/dev/gnss0 /dev/gnss1"
   system ptp-instance-parameter-add test-monitor 'satellite_count=x'
   system ptp-instance-apply
   collectd log: ptp plugin Reading satellite_count from monitoring
       config file /etc/linuxptp/ptpinstance/monitoring-ptp.conf
       failed. error: invalid literal for int() with base 10: 'x'
   - only satellite_count alarm get excluded, working as expected

PASS: Add wrong signal_quality_db value, only that alarm get excluded
   system ptp-instance-parameter-add test-monitor
       'signal_quality_db=100.x'
   system ptp-instance-apply
   collectd log: ptp plugin Reading signal_quality_db from monitoring
       config file /etc/linuxptp/ptpinstance/monitoring-ptp.conf failed.
       error: could not convert string to float: '100.x'
   - No traceback on collectd.log, signal_quality_db has no effect on
       other alarms.

PASS: Test with single device and float value
   system ptp-instance-parameter-add test-monitor devices="/dev/gnss0"
   system ptp-instance-parameter-add test-monitor
       'signal_quality_db=100.7'
   system ptp-instance-apply
   fm alarm-list
   controller-0 GNSS signal quality db below threshold state:
       signal_quality_db 0 (expected: >= 100.7)
   controller-0 GNSS satellite count below threshold state:
       satellite count 0 (expected: >= 5)
   controller-0 GNSS signal loss state: signal lock False (expected: True)

PASS: Deploy on system where gnss signal received
   Test with cli first: sudo python /usr/rootdirs/opt/
       collectd/extensions/python/ptp_monitoring_cli.py
       /dev/gnss0's gps_data: GpsData(gpsd_running=1, lock_state=1,
       satellite_count=10,
       signal_quality_db=SignalQualityDb(min=31.0, max=48.0, avg=43.6))
       /dev/gnss1's gps_data: GpsData(gpsd_running=1, lock_state=1,
       satellite_count=10,
       signal_quality_db=SignalQualityDb(min=30.0, max=48.0, avg=43.5))
       Error: ptp plugin /dev/gnssx is not being monitored by GPSD
       /dev/gnssx's gps_data: GpsData(gpsd_running=1, lock_state=0,
       satellite_count=0, signal_quality_db=SignalQualityDb
       (min=0, max=0, avg=0))

       This shows satellite count and signal_quality_db of devices, that
       can be tested against on following tests.

   system ptp-instance-add test-monitor monitoring
   system ptp-instance-parameter-add test-monitor satellite_count=8
   system ptp-instance-parameter-add test-monitor signal_quality_db=30
   system ptp-instance-parameter-add test-monitor devices=
       "/dev/gnss0 /dev/gnss1"
   system ptp-instance-parameter-add test-monitor cmdline_opts="-D 7"

   system host-ptp-instance-assign controller-0 test-monitor
   system host-update controller-0 clock_synchronization=ptp
   system ptp-instance-apply

   - fm alarm-list # reports no alarms
   - check collectd.log for actual GpsData:
       info ptp plugin instance monitoring-ptp device /dev/gnss0 data: GpsData(..)

  PASS: increase threshold to check device specific alarms triggered
   system ptp-instance-parameter-add test-monitor satellite_count=100
   system ptp-instance-parameter-add test-monitor signal_quality_db=300
   system ptp-instance-apply

   - fm alarm-list # reports both alarms on both devices

  PASS: test with monitoring instance with other instances (except ts2phc)
  PASS: remove monitoring instance, keep other instances

Story: 2011345
Task: 52521

Change-Id: I52d1451cd7cac364bcaeff850a424ddcc8e8de94
Signed-off-by: Tara Nath Subedi <tara.subedi@windriver.com>
This commit is contained in:
Tara Subedi
2025-07-07 14:26:12 -04:00
parent c023988751
commit 709613a1fa
11 changed files with 1008 additions and 88 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.tox
.coverage
.stestr
__pycache__/

View File

@@ -11,6 +11,7 @@
- kube-memory-tox-py39
- kube-memory-tox-flake8
- kube-memory-tox-pylint
- collectd-extensions-py39
gate:
jobs:
- starlingx-common-tox-linters
@@ -20,6 +21,7 @@
- kube-memory-tox-py39
- kube-memory-tox-flake8
- kube-memory-tox-pylint
- collectd-extensions-py39
post:
jobs:
- stx-monitoring-upload-git-mirror
@@ -123,6 +125,21 @@
tox_envlist: py39
tox_extra_args: -c kube-memory/kube-memory/tox.ini
- job:
name: collectd-extensions-py39
parent: tox-py39
description: |
Run py39 for collectd-extensions
required-projects:
- starlingx/config
nodeset: debian-bullseye
files:
- collectd-extensions/*
vars:
python_version: 3.9
tox_envlist: py39
tox_extra_args: -c collectd-extensions/tox.ini
- secret:
name: stx-monitoring-github-secret2
data:

View File

@@ -46,6 +46,8 @@ override_dh_install:
install -m 700 ptp.py $(LOCAL_PYTHON_EXTENSIONS_DIR)
install -m 700 cgu_handler.py $(LOCAL_PYTHON_EXTENSIONS_DIR)
install -m 700 ptp_interface.py $(LOCAL_PYTHON_EXTENSIONS_DIR)
install -m 700 ptp_monitoring.py $(LOCAL_PYTHON_EXTENSIONS_DIR)
install -m 700 ptp_monitoring_cli.py $(LOCAL_PYTHON_EXTENSIONS_DIR)
install -m 700 ovs_interface.py $(LOCAL_PYTHON_EXTENSIONS_DIR)
install -m 700 service_res.py $(LOCAL_PYTHON_EXTENSIONS_DIR)

View File

@@ -44,6 +44,7 @@ from glob import glob
from oslo_utils import timeutils
from functools import lru_cache
from ptp_interface import Interface
import ptp_monitoring as pm
from cgu_handler import CguHandler
from pynetlink import DeviceType
from pynetlink import LockStatus
@@ -113,6 +114,7 @@ PTPINSTANCE_CLOCK_CONF_FILE_PATTERN = PTPINSTANCE_PATH + 'clock-*.conf'
PTPINSTANCE_PTP4L_CONF_FILE_PATTERN = PTPINSTANCE_PATH + 'ptp4l-*.conf'
PTPINSTANCE_PHC2SYS_CONF_FILE_PATTERN = PTPINSTANCE_PATH + 'phc2sys-*.conf'
PTPINSTANCE_TS2PHC_CONF_FILE_PATTERN = PTPINSTANCE_PATH + 'ts2phc-*.conf'
PTPINSTANCE_MONITORING_CONF_FILE_PATTERN = PTPINSTANCE_PATH + "monitoring-*.conf"
def _get_ptp_options_path():
@@ -133,6 +135,7 @@ PTP_INSTANCE_TYPE_PTP4L = 'ptp4l'
PTP_INSTANCE_TYPE_PHC2SYS = 'phc2sys'
PTP_INSTANCE_TYPE_TS2PHC = 'ts2phc'
PTP_INSTANCE_TYPE_CLOCK = 'clock'
PTP_INSTANCE_TYPE_MONITORING = "monitoring"
# Tools used by plugin
SYSTEMCTL = '/usr/bin/systemctl'
@@ -182,6 +185,11 @@ ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_LOSS = 22
ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_NO_LOCK = 23
ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_FORCED_SELECTION = 24
# Monitoring Alarm codes
ALARM_CAUSE__MONITORING_GNSS_SIGNAL_LOSS = 30
ALARM_CAUSE__MONITORING_SATELLITE_COUNT = 31
ALARM_CAUSE__MONITORING_SIGNAL_QUALITY_DB = 32
# Run Phase
RUN_PHASE__INIT = 0
RUN_PHASE__DISABLED = 1
@@ -455,6 +463,30 @@ ts2phc_instance_map = {}
timing_instance_list = []
def read_monitoring_ptp_config():
"""read monitoring-ptp conf files"""
filenames = glob(PTPINSTANCE_MONITORING_CONF_FILE_PATTERN)
if len(filenames) == 0:
collectd.debug(
"%s No PTP conf file located for %s"
% (PLUGIN, PTP_INSTANCE_TYPE_MONITORING)
)
else:
for filename in filenames:
instance = TimingInstance(filename)
ptpinstances[instance.instance_name] = None
create_interface_alarm_objects(
'dummy', instance.instance_name, PTP_INSTANCE_TYPE_MONITORING
)
collectd.info("ptpinstances = %s" % ptpinstances)
for device_path in instance.device_paths:
create_interface_alarm_objects(
device_path, instance.instance_name, PTP_INSTANCE_TYPE_MONITORING
)
ptpinstances[instance.instance_name].timing_instance = instance
def read_files_for_timing_instances():
"""read phc2sys conf files"""
filenames = glob(PTPINSTANCE_PHC2SYS_CONF_FILE_PATTERN)
@@ -487,7 +519,7 @@ class TimingInstance:
"""The purpose of TimingInstance is to track the config and state data of a ptp instance.
By supplying a config file path, a TimingInstance object will parse and store the instance
configuration into a dict and provide functions for reading the state datafor each instance.
configuration into a dict and provide functions for reading the state data for each instance.
At this time, only the phc2sys instance type is in use, but some of the other basic instance
functions have been defined for future enhancements.
@@ -496,17 +528,32 @@ class TimingInstance:
def __init__(self, config_file_path) -> None:
self.config_file_path = config_file_path
self.interfaces = set() # use a python set to prevent duplicates
self.device_paths = (
set()
) # set to hold device_path list from monitoring instance
self.config = {} # dict of params from config file
self.state = {} # dict to hold the values read from pmc or cgu
self.state = {} # dict to hold the values read from pmc or cgu or gpsd
# synce4l handling to be included when full synce4l support is implemented
self.instance_types = ["clock", "phc2sys", "ptp4l", "ts2phc"]
self.config_parsers_dict = {"clock": self.parse_clock_config,
"phc2sys": self.parse_phc2sys_config,
"ptp4l": self.parse_ptp4l_config,
"ts2phc": self.parse_ts2phc_config}
self.instance_types = [
"clock",
"phc2sys",
"ptp4l",
"ts2phc",
PTP_INSTANCE_TYPE_MONITORING,
]
self.config_parsers_dict = {
"clock": self.parse_clock_config,
"phc2sys": self.parse_phc2sys_config,
"ptp4l": self.parse_ptp4l_config,
"ts2phc": self.parse_ts2phc_config,
PTP_INSTANCE_TYPE_MONITORING: self.parse_monitoring_config,
}
self.state_setter_dict = {"phc2sys": self.set_phc2sys_state}
self.state_setter_dict = {
"phc2sys": self.set_phc2sys_state,
PTP_INSTANCE_TYPE_MONITORING: self.set_monitoring_state,
}
# Determine instance name and type
# Instance is guaranteed to be one of the valid types because that was checked in
@@ -521,9 +568,13 @@ class TimingInstance:
collectd.info("%s Config file %s matches instance type %s"
% (PLUGIN, config_file_path, item))
self.instance_type = item
self.instance_name = instance
if item == PTP_INSTANCE_TYPE_MONITORING:
self.instance_name = f"{item}-{instance}"
else:
self.instance_name = instance
# Select the appropriate parser to initialize self.interfaces and self.config
# Select the appropriate parser to initialize self.interfaces/self.device_paths
# and self.config
self.parse_instance_config()
if self.instance_type == PTP_INSTANCE_TYPE_PHC2SYS:
@@ -541,7 +592,44 @@ class TimingInstance:
def set_instance_state_data(self):
collectd.debug("%s Setting state for %s" %
(PLUGIN, self.instance_name))
self.state_setter_dict[self.instance_type]()
self.state = self.state_setter_dict[self.instance_type]()
def config_to_dict(self, config):
return {section: dict(config[section]) for section in config}
def parse_monitoring_config(self):
config = pm.parse_monitoring_config(self.config_file_path)
collectd.info(f"{PLUGIN} parsing of {self.config_file_path}: {self.config_to_dict(config)}")
try:
self.device_paths = set(
[
device_str.strip()
for device_str in config["global"]["devices"].split(" ")
]
)
except Exception as exc:
collectd.error(
"%s Reading devices from monitoring config file %s failed. error: %s"
% (PLUGIN, self.config_file_path, exc)
)
return config
def set_monitoring_state(self):
collectd.debug(
"%s Setting state for monitoring instance %s" % (PLUGIN, self.instance_name)
)
state = {}
for device_path in self.device_paths:
collectd.info((
f"{PLUGIN} instance {self.instance_name} reading gpsd data for"
f" {device_path}"
))
state[device_path] = pm.get_gps_data(device_path)
collectd.info((
f"{PLUGIN} instance {self.instance_name} device {device_path}"
f" data: {state[device_path]}"
))
return state
def parse_clock_config(self) -> dict:
# Clock config is not an .ini style format, parse it manually
@@ -646,12 +734,17 @@ class TimingInstance:
def set_phc2sys_state(self):
collectd.debug("%s Setting state for phc2sys instance %s" %
(PLUGIN, self.instance_name))
self.state['phc2sys_source_interface'] = self.query_phc2sys_socket('clock source',
self.phc2sys_com_socket)
self.state['phc2sys_forced_lock'] = self.query_phc2sys_socket('forced lock',
self.phc2sys_com_socket)
self.state['phc2sys_valid_sources'] = self.query_phc2sys_socket('valid sources',
self.phc2sys_com_socket)
state = {}
state["phc2sys_source_interface"] = self.query_phc2sys_socket(
"clock source", self.phc2sys_com_socket
)
state["phc2sys_forced_lock"] = self.query_phc2sys_socket(
"forced lock", self.phc2sys_com_socket
)
state["phc2sys_valid_sources"] = self.query_phc2sys_socket(
"valid sources", self.phc2sys_com_socket
)
return state
def query_phc2sys_socket(self, query, unix_socket=None):
if unix_socket:
@@ -817,8 +910,13 @@ def raise_alarm(alarm_cause,
"mode is supported by this host."
)
elif alarm_cause in [ALARM_CAUSE__1PPS_SIGNAL_LOSS,
ALARM_CAUSE__GNSS_SIGNAL_LOSS]:
elif alarm_cause in [
ALARM_CAUSE__1PPS_SIGNAL_LOSS,
ALARM_CAUSE__GNSS_SIGNAL_LOSS,
ALARM_CAUSE__MONITORING_GNSS_SIGNAL_LOSS,
ALARM_CAUSE__MONITORING_SATELLITE_COUNT,
ALARM_CAUSE__MONITORING_SIGNAL_QUALITY_DB,
]:
reason += ' state: ' + str(data)
elif alarm_cause == ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_LOW_PRIORITY:
@@ -980,76 +1078,130 @@ def create_interface_alarm_objects(interface, instance=None, instance_type=PTP_I
if interface and not ptpinterfaces.get(interface, None):
# Create required interface based alarm objects for supplied interface
o = PTP_alarm_object(interface)
# 1-PPS signal loss
o.alarm = ALARM_CAUSE__1PPS_SIGNAL_LOSS
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname
o.reason += ' 1PPS signal loss'
o.repair = 'Check network'
o.eid = obj.base_eid + '.interface=' + interface + '.ptp=1PPS-signal-loss'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_29 # loss-of-signal
ALARM_OBJ_LIST.append(o)
if instance_type != PTP_INSTANCE_TYPE_MONITORING:
o = PTP_alarm_object(interface)
# 1-PPS signal loss
o.alarm = ALARM_CAUSE__1PPS_SIGNAL_LOSS
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname
o.reason += ' 1PPS signal loss'
o.repair = 'Check network'
o.eid = obj.base_eid + '.interface=' + interface + '.ptp=1PPS-signal-loss'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_29 # loss-of-signal
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
# Clock source selection change
o.alarm = ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_SELECTION_CHANGE
o.severity = fm_constants.FM_ALARM_SEVERITY_WARNING
o.reason = obj.hostname
o.reason += ' phc2sys HA source selection algorithm selected new active source'
o.repair += 'Check network'
o.eid = obj.base_eid + '.phc2sys=' + instance + '.interface=' + interface\
+ '.phc2sys=source-failover'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_51 # timing-problem
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
# Clock source selection change
o.alarm = ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_SELECTION_CHANGE
o.severity = fm_constants.FM_ALARM_SEVERITY_WARNING
o.reason = obj.hostname
o.reason += ' phc2sys HA source selection algorithm selected new active source'
o.repair += 'Check network'
o.eid = obj.base_eid + '.phc2sys=' + instance + '.interface=' + interface\
+ '.phc2sys=source-failover'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_51 # timing-problem
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
# Source clock no lock
o.alarm = ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_NO_LOCK
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname
o.reason += ' phc2sys HA source clock is not locked to a PRC'
o.repair += 'Check network and ptp4l configuration'
o.eid = obj.base_eid + '.phc2sys=' + instance + '.interface=' + interface + \
'.phc2sys=source-clock-no-prc-lock'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_29 # loss-of-signal
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
# Source clock no lock
o.alarm = ALARM_CAUSE__PHC2SYS_CLOCK_SOURCE_NO_LOCK
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname
o.reason += ' phc2sys HA source clock is not locked to a PRC'
o.repair += 'Check network and ptp4l configuration'
o.eid = obj.base_eid + '.phc2sys=' + instance + '.interface=' + interface + \
'.phc2sys=source-clock-no-prc-lock'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_29 # loss-of-signal
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__UNSUPPORTED_HW
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname + " '" + interface + "' does not support "
o.reason += PTP + ' Hardware timestamping'
o.repair = 'Check host hardware reference manual to verify PTP '
o.repair += 'Hardware timestamping is supported by this interface'
o.eid = obj.base_eid + '.ptp=' + interface
o.eid += '.unsupported=hardware-timestamping'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_7 # 'config error'
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__UNSUPPORTED_HW
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname + " '" + interface + "' does not support "
o.reason += PTP + ' Hardware timestamping'
o.repair = 'Check host hardware reference manual to verify PTP '
o.repair += 'Hardware timestamping is supported by this interface'
o.eid = obj.base_eid + '.ptp=' + interface
o.eid += '.unsupported=hardware-timestamping'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_7 # 'config error'
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__UNSUPPORTED_SW
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname + " '" + interface + "' does not support "
o.reason += PTP + ' Software timestamping'
o.repair = 'Check host hardware reference manual to verify PTP '
o.repair += 'Software timestamping is supported by this interface'
o.eid = obj.base_eid + '.ptp=' + interface
o.eid += '.unsupported=software-timestamping'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_7 # 'config error'
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__UNSUPPORTED_SW
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname + " '" + interface + "' does not support "
o.reason += PTP + ' Software timestamping'
o.repair = 'Check host hardware reference manual to verify PTP '
o.repair += 'Software timestamping is supported by this interface'
o.eid = obj.base_eid + '.ptp=' + interface
o.eid += '.unsupported=software-timestamping'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_7 # 'config error'
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__UNSUPPORTED_LEGACY
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname + " '" + interface + "' does not support "
o.reason += PTP + " Legacy timestamping"
o.repair = 'Check host hardware reference manual to verify PTP '
o.repair += 'Legacy or Raw Clock is supported by this host'
o.eid = obj.base_eid + '.ptp=' + interface
o.eid += '.unsupported=legacy-timestamping'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_7 # 'config error'
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__UNSUPPORTED_LEGACY
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname + " '" + interface + "' does not support "
o.reason += PTP + " Legacy timestamping"
o.repair = 'Check host hardware reference manual to verify PTP '
o.repair += 'Legacy or Raw Clock is supported by this host'
o.eid = obj.base_eid + '.ptp=' + interface
o.eid += '.unsupported=legacy-timestamping'
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_7 # 'config error'
ALARM_OBJ_LIST.append(o)
else: # instance_type == PTP_INSTANCE_TYPE_MONITORING
# Create monitoring instance specific required device_path based alarm
# objects for supplied interface (here interface means device_path e.g. /dev/gnss0)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__MONITORING_GNSS_SIGNAL_LOSS
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname
o.reason += " GNSS signal loss"
o.repair += "Check network"
o.eid = (
obj.base_eid
+ ".monitoring="
+ instance
+ ".device_path="
+ interface
+ ".ptp=GNSS-signal-loss"
)
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_29 # loss-of-signal
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__MONITORING_SATELLITE_COUNT
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname
o.reason += " GNSS satellite count below threshold"
o.repair += "Check network"
o.eid = (
obj.base_eid
+ ".monitoring="
+ instance
+ ".device_path="
+ interface
+ ".ptp=GNSS-satellite-count"
)
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_50 # THRESHOLD CROSS
ALARM_OBJ_LIST.append(o)
o = PTP_alarm_object(interface)
o.alarm = ALARM_CAUSE__MONITORING_SIGNAL_QUALITY_DB
o.severity = fm_constants.FM_ALARM_SEVERITY_MAJOR
o.reason = obj.hostname
o.reason += " GNSS signal quality db below threshold"
o.repair += "Check network"
o.eid = (
obj.base_eid
+ ".monitoring="
+ instance
+ ".device_path="
+ interface
+ ".ptp=GNSS-signal-quality-db"
)
o.cause = fm_constants.ALARM_PROBABLE_CAUSE_50 # THRESHOLD CROSS
ALARM_OBJ_LIST.append(o)
# Add interface to ptpinterfaces dict if not present
ptpinterfaces[interface] = []
@@ -1455,6 +1607,7 @@ def init_func():
read_clock_config()
# Initialize TimingInstance for HA phc2sys
read_files_for_timing_instances()
read_monitoring_ptp_config()
for key, ctrl in ptpinstances.items():
collectd.info("%s instance:%s type:%s found" %
(PLUGIN, key, ctrl.instance_type))
@@ -2301,9 +2454,14 @@ def read_func():
collectd.info("%s Instance: %s Instance type: %s"
% (PLUGIN, instance_name, ctrl.instance_type))
instance = instance_name
ptp_service = ctrl.instance_type + '@' + instance_name + '.service'
conf_file = (PTPINSTANCE_PATH + ctrl.instance_type +
'-' + instance_name + '.conf')
if ctrl.instance_type == PTP_INSTANCE_TYPE_MONITORING:
ptp_service = "gpsd.service"
conf_file = PTPINSTANCE_PATH + instance_name + ".conf"
else:
ptp_service = ctrl.instance_type + "@" + instance_name + ".service"
conf_file = (
PTPINSTANCE_PATH + ctrl.instance_type + "-" + instance_name + ".conf"
)
# Clock instance does not have a service, thus check non-clock instance type
if ctrl.instance_type != PTP_INSTANCE_TYPE_CLOCK:
@@ -2348,7 +2506,8 @@ def read_func():
if ctrl.process_alarm_object.alarm == ALARM_CAUSE__PROCESS and ctrl.instance_type \
in [PTP_INSTANCE_TYPE_PTP4L,
PTP_INSTANCE_TYPE_PHC2SYS,
PTP_INSTANCE_TYPE_TS2PHC]:
PTP_INSTANCE_TYPE_TS2PHC,
PTP_INSTANCE_TYPE_MONITORING]:
# If the process is not running, raise the process alarm
if ctrl.process_alarm_object.raised is False:
collectd.error("%s PTP service %s enabled but not running" %
@@ -2419,9 +2578,119 @@ def read_func():
if ctrl.instance_type == PTP_INSTANCE_TYPE_PHC2SYS and ctrl.phc2sys_ha_enabled is True:
process_phc2sys_ha(ctrl)
if ctrl.instance_type == PTP_INSTANCE_TYPE_MONITORING:
process_monitoring(ctrl)
return 0
def process_monitoring_alarm(ctrl, alarm_obj, device_path, raise_condition, state):
if raise_condition:
rc = raise_alarm(alarm_obj.alarm, device_path, state, alarm_obj)
if rc is True:
alarm_obj.raised = True
if not (ctrl.log_throttle_count % obj.INIT_LOG_THROTTLE):
collectd.info(
"%s %s instance %s device_path %s alarm raised: %s"
% (
PLUGIN,
ctrl.instance_type,
ctrl.timing_instance.instance_name,
device_path,
state,
)
)
ctrl.log_throttle_count += 1
else:
if alarm_obj.raised is True:
if clear_alarm(alarm_obj.eid) is True:
alarm_obj.raised = False
collectd.info(
"%s %s instance %s device path %s alarm cleared: %s"
% (
PLUGIN,
ctrl.instance_type,
ctrl.timing_instance.instance_name,
device_path,
state,
)
)
def process_monitoring(ctrl):
collectd.debug(f"{PLUGIN} {ctrl.timing_instance.instance_name} process_monitoring")
# per device signal lock expected
expected_signal_lock = True
# per device expected satellite count, read from config file.
expected_satellite_count = None
try:
expected_satellite_count = int(
ctrl.timing_instance.config["global"]["satellite_count"]
)
except Exception as exc:
collectd.error(
"%s Reading satellite_count from monitoring config file %s failed. error: %s"
% (PLUGIN, ctrl.timing_instance.config_file_path, exc)
)
# per device expected signal quality db, read from config file.
expected_signal_quality_db = None
try:
expected_signal_quality_db = float(
ctrl.timing_instance.config['global']['signal_quality_db']
)
except Exception as exc:
collectd.error(
"%s Reading signal_quality_db from monitoring config file %s failed. error: %s"
% (PLUGIN, ctrl.timing_instance.config_file_path, exc)
)
ctrl.timing_instance.set_instance_state_data()
for device_path in ctrl.timing_instance.device_paths:
# alarm ALARM_CAUSE__MONITORING_GNSS_SIGNAL_LOSS
signal_lock = ctrl.timing_instance.state[device_path].lock_state
raise_condition = signal_lock != expected_signal_lock
alarm_obj = get_alarm_object(
ALARM_CAUSE__MONITORING_GNSS_SIGNAL_LOSS, device_path
)
state = f"signal lock {bool(signal_lock)} (expected: {expected_signal_lock})"
process_monitoring_alarm(ctrl, alarm_obj, device_path, raise_condition, state)
# alarm ALARM_CAUSE__MONITORING_SATELLITE_COUNT
if expected_satellite_count is not None:
satellite_count = ctrl.timing_instance.state[device_path].satellite_count
raise_condition = satellite_count < expected_satellite_count
alarm_obj = get_alarm_object(
ALARM_CAUSE__MONITORING_SATELLITE_COUNT, device_path
)
state = f"satellite count {satellite_count} (expected: >= {expected_satellite_count})"
process_monitoring_alarm(ctrl, alarm_obj, device_path, raise_condition, state)
# alarm ALARM_CAUSE__MONITORING_SIGNAL_QUALITY_DB
if expected_signal_quality_db is not None:
# alarm is based upon avg snr
signal_quality_db = ctrl.timing_instance.state[
device_path
].signal_quality_db.avg
raise_condition = signal_quality_db < expected_signal_quality_db
alarm_obj = get_alarm_object(
ALARM_CAUSE__MONITORING_SIGNAL_QUALITY_DB, device_path
)
state = (
f"signal_quality_db {signal_quality_db}"
f" (expected: >= {expected_signal_quality_db})"
)
process_monitoring_alarm(ctrl, alarm_obj, device_path, raise_condition, state)
def process_phc2sys_ha(ctrl):
# Update state for phc2sys instances
collectd.debug(f"{PLUGIN} {ctrl.timing_instance.instance_name} process_phc2sys_ha")

View File

@@ -0,0 +1,205 @@
#
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
############################################################################
#
# This file is part of the collectd 'Precision Time Protocol' Service Monitor.
#
############################################################################
import collectd
import configparser
from dataclasses import dataclass
import gps
import traceback
# name of the plugin - all logs produced by this plugin are prefixed with this
PLUGIN = "ptp plugin"
GPSD_TCP_PORT = "2947"
@dataclass
class SignalQualityDb:
min: float = 0
max: float = 0
avg: float = 0
@dataclass
class GpsData:
gpsd_running: int = 0
lock_state: int = 0
satellite_count: int = 0
signal_quality_db: SignalQualityDb = SignalQualityDb()
def parse_monitoring_config(config_file_path):
# https://docs.python.org/3/library/configparser.html
# You can access the parameters like so:
# config['global']['parameter_name']
config = configparser.ConfigParser(delimiters=" ")
config.read(config_file_path)
return config
def set_nmea_device(device_path):
# Put GPS into NMEA mode
# Equivalent to: gpsctl --nmea <device_path>
#
# gpsd daemon changes GPS device to binary mode, and the device output is no
# longer NMEA sentence. This function can be used to revert back to NMEA mode
# before shutting down gpsd daemon.
#
# This function is for reference purpose and has not been used yet.
try:
session = gps.gps(host="localhost", port=GPSD_TCP_PORT)
except Exception as exc:
message = (
f"{PLUGIN} Could not connect to localhost:{GPSD_TCP_PORT} error: {exc}"
)
collectd.error(message)
return
device_command = (
'?DEVICE={"path":"' + device_path + '", "driver": "NMEA0183", "native": 0}'
)
session.send(device_command)
message = f"{PLUGIN} {device_path} set to driver:NMEA0183 and native:0"
collectd.info(message)
session.close()
def get_device_paths(devices):
# Input: [{'class': 'DEVICE', 'path': '/dev/gnss0', ..}]
# Where path is optional
device_paths = []
for device_dict in devices:
if "path" in device_dict:
device_paths.append(device_dict["path"])
return device_paths
def trunc(num, precision):
x = 10 ** precision
return int(num * x) / x
def get_signal_to_noise_ratio(satellites):
# Input: [{'PRN': 5, 'el': 31.0, 'az': 86.0, 'ss': 45.0, 'used': True, 'gnssid': 0,
# 'svid': 5, 'health': 1},..]
# ss is optional key, Signal to Noise ratio in dBHz
# used key is always present, satellites may be flagged used if the solution has
# corrections from them, but not all drivers make this information available
snr = SignalQualityDb(min=0, max=0, avg=0)
signal_to_noise_data = []
for satellite in satellites:
if satellite["used"] and "ss" in satellite:
signal_to_noise_data.append(satellite["ss"])
if signal_to_noise_data:
snr.min = min(signal_to_noise_data)
snr.max = max(signal_to_noise_data)
snr.avg = trunc(sum(signal_to_noise_data) / len(signal_to_noise_data), 3)
return snr
def get_gps_data_by_session(session, device_path):
# Details are in:
# https://gpsd.io/client-howto.html#_how_the_gpsd_wire_protocol_works
# https://gpsd.gitlab.io/gpsd/gpsd_json.html
data = GpsData(
gpsd_running=1,
lock_state=0,
satellite_count=0,
signal_quality_db=SignalQualityDb(min=0, max=0, avg=0),
)
try:
for report in session:
if report["class"] in ["VERSION", "WATCH", "DEVICE"]:
continue
elif report["class"] == "DEVICES" and device_path not in get_device_paths(
report["devices"]
):
message = f"{PLUGIN} {device_path} is not being monitored by GPSD"
collectd.error(message)
break
# device key is optional in TPV class
elif (
report["class"] == "TPV"
and "device" in report
and report["device"] == device_path
):
# mode is always present on TPV class.
# NMEA mode: 0=unknown, 1=no fix, 2=2D, 3=3D.
# Until the sensor achieves satellite lock, the fixes (reports) will be
# "mode 1" - no valid data (mode 2 is a 2D fix, mode 3 is a 3D fix).
if report["mode"] in [0, 1]:
message = f"{PLUGIN} {device_path} have not achieved satellite lock: {report}"
collectd.debug(message)
data.lock_state = 0
# reset satellite_count and signal_quality_db
data.satellite_count = 0
data.signal_quality_db = SignalQualityDb(min=0, max=0, avg=0)
break
else:
data.lock_state = 1
# device key is optional in SKY class
elif (
report["class"] == "SKY"
and "device" in report
and report["device"] == device_path
):
# uSat key is optional in SKY class, Number of satellites used in navigation solution
if "uSat" in report:
data.satellite_count = report["uSat"]
# satellites key is optional in SKY class, List of satellite objects in skyview
if "satellites" in report:
data.signal_quality_db = get_signal_to_noise_ratio(
report["satellites"]
)
# All reports collected, No more polling required.
break
except Exception as exc:
# In case of parsing error, should be reported instead of throwing exception
message = f"{PLUGIN} Programming error occured: {type(exc)}:{exc}"
collectd.error(message)
collectd.error(traceback.format_exc())
data = GpsData(
gpsd_running=1,
lock_state=0,
satellite_count=0,
signal_quality_db=SignalQualityDb(min=0, max=0, avg=0),
)
return data
def get_gps_data(device_path):
# Whenever there is client connection, only then gpsd polls
# on serial devices, otherwise it won't keep looking for sensor
# data to save energy. So here we are opening and closing client
# connection, and not keeping session open forever.
# Here we also ask to WATCH per device
try:
session = gps.gps(host="localhost", port=GPSD_TCP_PORT)
except Exception as exc:
message = (
f"{PLUGIN} Could not connect to localhost:{GPSD_TCP_PORT} error: {exc}"
)
collectd.error(message)
return GpsData(
gpsd_running=0,
lock_state=0,
satellite_count=0,
signal_quality_db=SignalQualityDb(min=0, max=0, avg=0),
)
session.stream(flags=gps.WATCH_JSON)
session.stream(flags=gps.WATCH_DEVICE, devpath=device_path)
data = get_gps_data_by_session(session, device_path)
session.close()
return data

View File

@@ -0,0 +1,47 @@
#
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
############################################################################
#
# This file is part of the collectd 'Precision Time Protocol' Service Monitor.
#
############################################################################
import sys
class Collectd():
def info(self, message):
print(f"Info: {message}")
def debug(self, message):
print(f"Debug: {message}")
def error(self, message):
print(f"Error: {message}")
# bypass 'import collectd' as it's C-based daemon, and cannot be directly imported.
sys.modules["collectd"] = Collectd()
import ptp_monitoring as pm
if __name__ == "__main__":
for device_path in ["/dev/gnss0", "/dev/gnss1", "/dev/gnssx"]:
data = pm.get_gps_data(device_path)
message = f"{device_path}'s gps_data: {data}"
print(message)
# set back to NMEA mode
# pm.set_nmea_device(device_path)
## Program Output:
# /dev/gnss0's gps_data: GpsData(gpsd_running=1, lock_state=1, satellite_count=9,
# signal_quality_db=SignalQualityDb(min=45.0, max=48.0, avg=45.111))
# /dev/gnss1 have not achieved satellite lock: <dictwrapper: {'class': 'TPV',
# 'device': '/dev/gnss1', 'mode': 1, 'leapseconds': 18}>
# /dev/gnss1's gps_data: GpsData(gpsd_running=1, lock_state=0, satellite_count=0,
# signal_quality_db=SignalQualityDb(min=0, max=0, avg=0))
# /dev/gnssx is not being monitored by GPSD
# /dev/gnssx's gps_data: GpsData(gpsd_running=1, lock_state=0, satellite_count=0,
# signal_quality_db=SignalQualityDb(min=0, max=0, avg=0))

View File

@@ -0,0 +1 @@
stestr

View File

View File

@@ -0,0 +1,4 @@
[global]
devices /dev/gnss0 /dev/gnss1
satellite_count 12
signal_quality_db 30

View File

@@ -0,0 +1,356 @@
#
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import sys
import unittest
from unittest.mock import MagicMock
# bypass 'import collectd' as it's C-based daemon, and cannot be directly imported.
sys.modules["collectd"] = mock_collectd = MagicMock()
sys.modules["gps"] = MagicMock()
from src import ptp_monitoring as ptp_monitoring
class TestPtpMonitoring(unittest.TestCase):
def test_parse_monitoring_config(self):
config_file_path = (
"./tests/test_input_files/monitoring-ptp.conf"
)
expected_devices = "/dev/gnss0 /dev/gnss1"
expected_satellite_count = "12"
expected_signal_quality_db = "30"
config = ptp_monitoring.parse_monitoring_config(config_file_path)
self.assertEqual(config["global"]["devices"], expected_devices)
self.assertEqual(config["global"]["satellite_count"], expected_satellite_count)
self.assertEqual(
config["global"]["signal_quality_db"], expected_signal_quality_db
)
def test_get_gps_data_by_session_empty_devices(self):
device_path = "/dev/gnss0"
session = [
{
"class": "VERSION",
"release": "3.22",
"rev": "3.22",
"proto_major": 3,
"proto_minor": 14,
},
{"class": "DEVICES", "devices": []},
{
"class": "WATCH",
"enable": True,
"json": True,
"nmea": False,
"raw": 0,
"scaled": False,
"timing": False,
"split24": False,
"pps": False,
},
]
data = ptp_monitoring.get_gps_data_by_session(session, device_path)
expected_data = ptp_monitoring.GpsData(
gpsd_running=1,
lock_state=0,
satellite_count=0,
signal_quality_db=ptp_monitoring.SignalQualityDb(min=0, max=0, avg=0),
)
expected_error_log = (
f"{ptp_monitoring.PLUGIN} {device_path} is not being monitored by GPSD"
)
mock_collectd.error.assert_called_with(expected_error_log)
self.assertEqual(
data,
expected_data,
msg=f"actual {data} not equal to expected {expected_data} ",
)
def test_get_gps_data_by_session_mode_1(self):
# NMEA mode: 0=unknown, 1=no fix, 2=2D, 3=3D.
# Until the sensor achieves satellite lock, the fixes (reports) will be
# "mode 1" - no valid data
device_path = "/dev/gnss0"
session = [
{
"class": "VERSION",
"release": "3.22",
"rev": "3.22",
"proto_major": 3,
"proto_minor": 14,
},
{
"class": "DEVICES",
"devices": [
{
"class": "DEVICE",
"path": "/dev/gnss0",
"activated": "2025-06-13T12:52:15.463Z",
"native": 0,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
},
{
"class": "DEVICE",
"path": "/dev/gnss1",
"activated": "2025-06-13T12:52:19.637Z",
"native": 0,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
},
],
},
{
"class": "WATCH",
"enable": True,
"json": True,
"nmea": False,
"raw": 0,
"scaled": False,
"timing": False,
"split24": False,
"pps": False,
},
{
"class": "DEVICE",
"path": "/dev/gnss0",
"driver": "NMEA0183",
"activated": "2025-06-13T12:52:19.637Z",
"native": 0,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
},
{"class": "TPV", "device": "/dev/gnss0", "mode": 1},
{"class": "TPV", "device": "/dev/gnss0", "mode": 1},
{"class": "TPV", "device": "/dev/gnss0", "mode": 1},
{
"class": "DEVICE",
"path": "/dev/gnss1",
"driver": "NMEA0183",
"activated": "2025-06-13T12:52:20.168Z",
"native": 0,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
},
{"class": "TPV", "device": "/dev/gnss1", "mode": 1},
{
"class": "DEVICE",
"path": "/dev/gnss0",
"driver": "u-blox",
"subtype": "SW EXT CORE 1.00 (3fda8e),HW 00190000",
"subtype1": (
"ROM BASE 0x118B2060,FWVER=TIM 2.20,PROTVER=29.20,MOD=ZED-F9T,"
"GPS;GLO;GAL;BDS,SBAS;QZSS,NAVIC"
),
"activated": "2025-06-13T12:52:20.716Z",
"flags": 1,
"native": 1,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
"mincycle": 0.02,
},
{"class": "TPV", "device": "/dev/gnss0", "mode": 1},
]
data = ptp_monitoring.get_gps_data_by_session(session, device_path)
expected_data = ptp_monitoring.GpsData(
gpsd_running=1,
lock_state=0,
satellite_count=0,
signal_quality_db=ptp_monitoring.SignalQualityDb(min=0, max=0, avg=0),
)
expected_debug_log = (
f"{ptp_monitoring.PLUGIN} {device_path} "
f"have not achieved satellite lock: {session[4]}"
)
mock_collectd.debug.assert_called_with(expected_debug_log)
self.assertEqual(
data,
expected_data,
msg=f"actual {data} not equal to expected {expected_data} ",
)
def test_get_gps_data_by_session_valid(self):
device_path = "/dev/gnss0"
session = [
{
"class": "VERSION",
"release": "3.22",
"rev": "3.22",
"proto_major": 3,
"proto_minor": 14,
},
{
"class": "DEVICES",
"devices": [
{
"class": "DEVICE",
"path": "/dev/gnss0",
"activated": "2025-06-13T12:52:15.463Z",
"native": 0,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
},
{
"class": "DEVICE",
"path": "/dev/gnss1",
"activated": "2025-06-13T12:52:19.637Z",
"native": 0,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
},
],
},
{
"class": "WATCH",
"enable": True,
"json": True,
"nmea": False,
"raw": 0,
"scaled": False,
"timing": False,
"split24": False,
"pps": False,
},
{
"class": "DEVICE",
"path": "/dev/gnss0",
"driver": "NMEA0183",
"activated": "2025-06-13T12:52:19.637Z",
"native": 0,
"bps": 9600,
"parity": "N",
"stopbits": 0,
"cycle": 1.0,
},
{"class": "TPV", "device": "/dev/gnss0", "mode": 2},
{"class": "TPV", "device": "/dev/gnss0", "mode": 3},
{
"class": "SKY",
"device": "/dev/gnss0",
"time": "2019-04-10T20:27:52.000Z",
"xdop": 0.69,
"ydop": 0.68,
"vdop": 1.33,
"tdop": 0.88,
"hdop": 0.97,
"gdop": 1.86,
"pdop": 1.64,
"nSat": 4,
"uSat": 3,
"satellites": [
{
"PRN": 5,
"el": 31.0,
"az": 86.0,
"ss": 45.2,
"used": True,
"gnssid": 0,
"svid": 5,
"health": 1,
},
{
"PRN": 10,
"el": 10.0,
"az": 278.0,
"ss": 40.0,
"used": False,
"gnssid": 0,
"svid": 10,
"health": 1,
},
{
"PRN": 13,
"el": 44.0,
"az": 53.0,
"ss": 47.4,
"used": True,
"gnssid": 0,
"svid": 13,
"health": 1,
},
{
"PRN": 15,
"el": 80.0,
"az": 68.0,
"ss": 48.8,
"used": True,
"gnssid": 0,
"svid": 15,
"health": 1,
},
],
},
]
snr = [45.2, 47.4, 48.8]
avg = sum(snr) / len(snr)
trunc_avg = int(avg * 1000) / 1000
expected_signal_quality_db = ptp_monitoring.SignalQualityDb(
min=min(snr), max=max(snr), avg=trunc_avg
)
expected_data = ptp_monitoring.GpsData(
gpsd_running=1,
lock_state=1,
satellite_count=3,
signal_quality_db=expected_signal_quality_db,
)
data = ptp_monitoring.get_gps_data_by_session(session, device_path)
mock_collectd.assert_not_called()
self.assertEqual(
data,
expected_data,
msg=f"actual {data} not equal to expected {expected_data} ",
)
def test_get_gps_data_by_session_with_garbase_data(self):
device_path = "/dev/gnss0"
session = [
{
"GARBASE-class": "VERSION",
"release": "3.22",
"rev": "3.22",
"proto_major": 3,
"proto_minor": 14,
},
]
data = ptp_monitoring.get_gps_data_by_session(session, device_path)
expected_data = ptp_monitoring.GpsData(
gpsd_running=1,
lock_state=0,
satellite_count=0,
signal_quality_db=ptp_monitoring.SignalQualityDb(min=0, max=0, avg=0),
)
expected_error_log_1 = (
f"{ptp_monitoring.PLUGIN} Programming error occured: <class 'KeyError'>:'class'"
)
expected_error_log_substring_2 = "Traceback (most recent call last)"
self.assertEqual(mock_collectd.error.call_count, 2)
call_arg_1 = mock_collectd.error.call_args_list[0].args[0]
self.assertEqual(call_arg_1, expected_error_log_1)
call_arg_2 = mock_collectd.error.call_args_list[1].args[0]
self.assertIn(expected_error_log_substring_2, call_arg_2)
print(call_arg_2)
self.assertEqual(
data,
expected_data,
msg=f"actual {data} not equal to expected {expected_data} ",
)

View File

@@ -0,0 +1,18 @@
[tox]
envlist = py39
minversion = 2.3
skipsdist = True
sitepackages=False
[stestr]
test_path=./tests
[testenv:py39]
basepython = python3.9
passenv = TOX_DEBUG_LEVEL
commands =
stestr run {posargs}
stestr slowest
deps =
-r{toxinidir}/test-requirements.txt