Bug fix and improvements

Fixed the initialization of the Influxdb client

Added and improved diagnostic messages

Change-Id: Ic8aaa0728a4b936cd4c6e1ed5a0e01ba8f0fb003
This commit is contained in:
Andreas Tsagkaropoulos 2024-05-16 15:33:36 +03:00
parent c9109ba8ed
commit 73faebe3fa
5 changed files with 23 additions and 11 deletions

View File

@ -1,4 +1,4 @@
#Tue May 14 15:38:04 UTC 2024 #Thu May 16 12:31:21 UTC 2024
APP_NAME=default_application APP_NAME=default_application
METHOD=exponential_smoothing METHOD=exponential_smoothing
INFLUXDB_HOSTNAME=nebulous-influxdb INFLUXDB_HOSTNAME=nebulous-influxdb

View File

@ -60,7 +60,7 @@ def sanitize_prediction_statistics(prediction_confidence_interval, prediction_va
confidence_interval_modified = True confidence_interval_modified = True
if confidence_interval_modified: if confidence_interval_modified:
new_prediction_confidence_interval = str(lower_value_prediction_confidence_interval)+","+str(upper_value_prediction_confidence_interval) new_prediction_confidence_interval = str(lower_value_prediction_confidence_interval)+","+str(upper_value_prediction_confidence_interval)
print_with_time("The confidence interval "+prediction_confidence_interval+"was modified, becoming "+str(new_prediction_confidence_interval)+", taking into account the values of the metric") print_with_time("The confidence interval "+prediction_confidence_interval+" was modified, becoming "+str(new_prediction_confidence_interval)+", taking into account the values of the metric")
if (prediction_value<lower_bound_value[metric_name]): if (prediction_value<lower_bound_value[metric_name]):
print_with_time("The prediction value of " + str(prediction_value) + " for metric " + metric_name + " was sanitized to " + str(lower_bound_value)) print_with_time("The prediction value of " + str(prediction_value) + " for metric " + metric_name + " was sanitized to " + str(lower_bound_value))
prediction_value = lower_bound_value prediction_value = lower_bound_value

View File

@ -92,11 +92,13 @@ class ApplicationState:
print_data_from_db = True print_data_from_db = True
query_string = 'from(bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")' query_string = 'from(bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")'
influx_connector = InfluxDBConnector() influx_connector = InfluxDBConnector()
print("performing query for application with bucket "+str(self.influxdb_bucket)) logging.info("performing query for application with bucket "+str(self.influxdb_bucket))
logging.info("The body of the query is "+query_string)
logging.info("The configuration of the client is "+Utilities.get_fields_and_values(influx_connector))
current_time = time.time() current_time = time.time()
result = influx_connector.client.query_api().query(query_string, EsPredictorState.influxdb_organization) result = influx_connector.client.query_api().query(query_string, EsPredictorState.influxdb_organization)
elapsed_time = time.time()-current_time elapsed_time = time.time()-current_time
print("performed query, it took "+str(elapsed_time) + " seconds") logging.info("performed query, it took "+str(elapsed_time) + " seconds")
#print(result.to_values()) #print(result.to_values())
with open(self.get_prediction_data_filename(EsPredictorState.configuration_file_location, metric_name), 'w') as file: with open(self.get_prediction_data_filename(EsPredictorState.configuration_file_location, metric_name), 'w') as file:
for table in result: for table in result:

View File

@ -1,7 +1,7 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime from datetime import datetime
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
import logging
from runtime.operational_status.EsPredictorState import EsPredictorState from runtime.operational_status.EsPredictorState import EsPredictorState
#import influxdb_client, os, time #import influxdb_client, os, time
@ -40,11 +40,11 @@ from runtime.operational_status.EsPredictorState import EsPredictorState
class InfluxDBConnector: class InfluxDBConnector:
client = InfluxDBClient(url="http://" + EsPredictorState.influxdb_hostname + ":" + str(EsPredictorState.influxdb_port), token=EsPredictorState.influxdb_token, org=EsPredictorState.influxdb_organization)
write_api = client.write_api(write_options=SYNCHRONOUS)
def InfluxDBConnector(self): def __init__(self):
pass self.client = InfluxDBClient(url="http://" + EsPredictorState.influxdb_hostname + ":" + str(EsPredictorState.influxdb_port), token=EsPredictorState.influxdb_token, org=EsPredictorState.influxdb_organization)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
logging.info("Successfully created InfluxDB connector, client is "+str(self.client))
def write_data(self,data,bucket): def write_data(self,data,bucket):
self.write_api.write(bucket=bucket, org=EsPredictorState.influxdb_organization, record=data, write_precision=WritePrecision.S) self.write_api.write(bucket=bucket, org=EsPredictorState.influxdb_organization, record=data, write_precision=WritePrecision.S)

View File

@ -8,7 +8,7 @@ import pathlib
#from morphemic.dataset import DatasetMaker #from morphemic.dataset import DatasetMaker
import datetime import datetime
import logging,os import logging,os
from dateutil import parser import json
from influxdb_client import InfluxDBClient from influxdb_client import InfluxDBClient
from runtime.operational_status.EsPredictorState import EsPredictorState from runtime.operational_status.EsPredictorState import EsPredictorState
@ -42,7 +42,7 @@ class Utilities:
EsPredictorState.influxdb_password = EsPredictorState.configuration_details.get("INFLUXDB_PASSWORD").data EsPredictorState.influxdb_password = EsPredictorState.configuration_details.get("INFLUXDB_PASSWORD").data
EsPredictorState.influxdb_org = EsPredictorState.configuration_details.get("INFLUXDB_ORG").data EsPredictorState.influxdb_org = EsPredictorState.configuration_details.get("INFLUXDB_ORG").data
#This method accesses influx db to retrieve the most recent metric values. #This method accesses influx db to retrieve the most recent metric values.
Utilities.print_with_time("The configuration effective currently is the following\n "+Utilities.get_fields_and_values(EsPredictorState))
@staticmethod @staticmethod
def update_influxdb_organization_id(): def update_influxdb_organization_id():
@ -63,3 +63,13 @@ class Utilities:
return path return path
else: else:
return path + os.sep return path + os.sep
@staticmethod
def default_to_string(obj):
return str(obj)
@classmethod
def get_fields_and_values(cls,object):
#Returns those fields that do not start with __ (and their values)
fields_values = {key: value for key, value in object.__dict__.items() if not key.startswith("__")}
return json.dumps(fields_values,indent=4,default=cls.default_to_string)