Code improvements

Change of variable names to improve comprehensibility
Setting of constants according to values in properties file
Improvements in the creation process of an influxdb bucket

Change-Id: I69f6e4a45a3e0e044a0fb0b741d9776262949503
This commit is contained in:
Andreas Tsagkaropoulos 2024-04-25 11:27:24 +03:00
parent ee38b479de
commit 3d111f38e7
5 changed files with 69 additions and 37 deletions

View File

@ -10,13 +10,12 @@ class Constants:
broker_ip = "nebulous-activemq"
broker_username = "admin"
broker_password = "admin"
influx_username = "my-user"
influx_password = "my-password"
influxdb_username = "my-user"
influxdb_password = "my-password"
monitoring_broker_topic = "eu.nebulouscloud.monitoring"
bucket_name = "my-bucket"
organization_name = "my-org"
organization_id = "e0033247dcca0c54"
db_token = "my-super-secret-auth-token"
db_hostname = "nebulous-influxdb"
db_port = "8086"
influxdb_organization_name = "my-org"
influxdb_organization_id = "e0033247dcca0c54"
influxdb_token = "my-super-secret-auth-token"
influxdb_hostname = "nebulous-influxdb"
influxdb_port = "8086"
monitoring_prefix = "topic://"+monitoring_broker_topic+"."

View File

@ -51,8 +51,6 @@ class GenericConsumerHandler(Handler):
logging.info("Stopping the old existing connector...")
self.application_consumer_handler_connectors[application_name].stop()
logging.info("Attempting to register new connector...")
from time import sleep
sleep(10)
self.initialized_connector = exn.connector.EXN(
Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(),
consumers=[
@ -84,14 +82,16 @@ def update_properties(configuration_file_location):
p = Properties()
with open(configuration_file_location, "rb") as f:
p.load(f, "utf-8")
Constants.db_hostname, metadata = p["db_hostname"]
Constants.broker_ip, metadata = p["broker_ip"]
Constants.broker_port, metadata = p["broker_port"]
Constants.broker_username, metadata = p["broker_username"]
Constants.broker_password, metadata = p["broker_password"]
Constants.monitoring_broker_topic, metadata = p["monitoring_broker_topic"]
Constants.organization_name,metadata = p["organization_name"]
Constants.bucket_name,metadata = p["bucket_name"]
Constants.influxdb_hostname, metadata = p["influxdb_hostname"]
Constants.influxdb_password, metadata = p["influxdb_password"]
Constants.influxdb_username, metadata = p["influxdb_username"]
Constants.influxdb_token, metadata = p["influxdb_token"]
Constants.influxdb_organization_name,metadata = p["influxdb_organization_name"]
def main():
Constants.configuration_file_location = sys.argv[1]

View File

@ -5,19 +5,21 @@ from influxdb_client import InfluxDBClient, Point, WritePrecision
from main.runtime.Constants import Constants
from influxdb_client.client.write_api import SYNCHRONOUS
def create_influxdb_bucket(application_name):
def get_influxdb_bucket(application_name):
bucket_name = Constants.application_name_prefix+application_name+"_bucket"
# Replace with your actual values
url = 'http://' + Constants.db_hostname + ':8086/api/v2/buckets'
token = Constants.db_token
token = Constants.influxdb_token
list_bucket_url = 'http://' + Constants.influxdb_hostname + ':8086/api/v2/buckets?name=' + bucket_name
create_bucket_url = 'http://' + Constants.influxdb_hostname + ':8086/api/v2/buckets'
headers = {
'Authorization': 'Token {}'.format(token),
'Content-Type': 'application/json'
'Content-Type': 'application/json',
'Accept': 'application/json'
}
data = {
'name': bucket_name,
'orgID': Constants.organization_id,
'orgID': Constants.influxdb_organization_id,
'retentionRules': [
{
'type': 'expire',
@ -26,16 +28,47 @@ def create_influxdb_bucket(application_name):
]
}
response = requests.post(url, headers=headers, data=json.dumps(data))
logging.info("The response code for our attempt in trying to create the bucket is "+str(response.status_code))
logging.info("The response json for our attempt in trying to create the bucket is "+str(response.json()))
response = requests.get(list_bucket_url, headers=headers)
logging.info("The response for listing a possibly existing bucket is "+str(response.status_code)+" for application "+application_name)
if ((response.status_code==200) and ("buckets" in response.json()) and (len(response.json()["buckets"])>0)):
logging.info("The bucket already existed for the particular application, skipping its creation...")
else:
logging.info("The response in the request to list a bucket is "+str(response.json()))
logging.info("The bucket did not exist for the particular application, creation in process...")
response = requests.post(create_bucket_url, headers=headers, data=json.dumps(data))
logging.info("The response for creating a new bucket is "+str(response.status_code))
return bucket_name
# Replace with your actual values
#url = 'http://' + Constants.influxdb_hostname + ':8086/api/v2/buckets'
#token = Constants.influxdb_token
#headers = {
# 'Authorization': 'Token {}'.format(token),
# 'Content-Type': 'application/json'
#}
#data = {
# 'name': bucket_name,
# 'orgID': Constants.influxdb_organization_id,
# 'retentionRules': [
# {
# 'type': 'expire',
# 'everySeconds': 2592000 #30 days (30*24*3600)
# }
# ]
#}
#
#response = requests.post(url, headers=headers, data=json.dumps(data))
#logging.info("The response code for our attempt in trying to create the bucket is "+str(response.status_code))
#logging.info("The response json for our attempt in trying to create the bucket is "+str(response.json()))
#return bucket_name
class InfluxDBConnector:
applications_with_influxdb_bucket_created = []
def __init__(self):
self.client = InfluxDBClient(url="http://"+Constants.db_hostname+":"+Constants.db_port, token=Constants.db_token, org=Constants.organization_name)
self.client = InfluxDBClient(url="http://" + Constants.influxdb_hostname + ":" + Constants.influxdb_port, token=Constants.influxdb_token, org=Constants.influxdb_organization_name)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
#self.influxdb_bucket_created[application_name] = False
self.bucket_name = "demo_bucket"
@ -49,18 +82,18 @@ class InfluxDBConnector:
# Find the organization by name and print its ID
for org in organizations:
if org.name == Constants.organization_name:
if org.name == Constants.influxdb_organization_name:
logging.info(f"Organization Name: {org.name}, ID: {org.id}")
Constants.organization_id = org.id
Constants.influxdb_organization_id = org.id
break
logging.info("The influxdb bucket was reported as not created")
self.bucket_name = create_influxdb_bucket(application_name)
logging.info("Retrieving the influxdb bucket relevant for the application name (perhaps it is not created yet)")
self.bucket_name = get_influxdb_bucket(application_name)
self.applications_with_influxdb_bucket_created.append(application_name)
else:
logging.info("The influxdb bucket was reported as created")
logging.info(f"The data point is {data}")
self.write_api.write(bucket=self.bucket_name, org=Constants.organization_name, record=data, write_precision=WritePrecision.S)
self.write_api.write(bucket=self.bucket_name, org=Constants.influxdb_organization_name, record=data, write_precision=WritePrecision.S)
logging.info("The data point has been written!")
def get_data(self,metric_name):
@ -68,7 +101,7 @@ class InfluxDBConnector:
query = f"""from(bucket: "nebulous")
|> range(start: -1m)
|> filter(fn: (r) => r._measurement == "{metric_name}")"""
tables = query_api.query(query, org=Constants.organization_name)
tables = query_api.query(query, org=Constants.influxdb_organization_name)
for table in tables:
for record in table.records:

View File

@ -4,17 +4,17 @@ from InfluxDBConnector import InfluxDBConnector
import time
## This utility assumes that the database has been filled with values for cpu usage and ram usage
influxdb_bucket = "nebulous__Application1_bucket"
metric_names = ["cpu_usage","ram_usage"]
for metric_name in metric_names:
time_interval_to_get_data_for = "10h"
print_data_from_db = True
query_string = 'from(bucket: "'+Constants.bucket_name+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")'
query_string = 'from(bucket: "' + influxdb_bucket + '") |> range(start:-' + time_interval_to_get_data_for + ') |> filter(fn: (r) => r["_measurement"] == "' + metric_name + '")'
influx_connector = InfluxDBConnector()
for counter in range(10):
print("performing query")
current_time = time.time()
result = influx_connector.client.query_api().query(query_string, Constants.organization_name)
result = influx_connector.client.query_api().query(query_string, Constants.influxdb_organization_name)
elapsed_time = time.time()-current_time
print("performed query, it took "+str(elapsed_time) + " seconds")
#print(result.to_values())

View File

@ -2,9 +2,9 @@ broker_ip=nebulous-activemq
broker_port=5672
broker_username=admin
broker_password=admin
db_hostname=nebulous-influxdb
db_username=my-user
db_password=my-password
influxdb_hostname=nebulous-influxdb
influxdb_username=my-user
influxdb_password=my-password
influxdb_token=my-super-secret-auth-token
monitoring_broker_topic =eu.nebulouscloud.monitoring
organization_name=my-org
bucket_name=my-bucket
organization_name=my-org