Refactoring of code

Changes in structure of the code
Changes in  the subscription logic
Removed requirement for particular package versions
Improved connection parameters for InfluxDB

Change-Id: Ie61d9c52a4aa66b732c42f9353dd73c34521e991
This commit is contained in:
Andreas Tsagkaropoulos 2024-03-26 15:40:43 +02:00 committed by jmarchel
parent 1d5c54125f
commit a67c14afc2
33 changed files with 163 additions and 195 deletions

3
.hadolint.yaml Normal file
View File

@ -0,0 +1,3 @@
ignored:
- DL3008
- SC1091

View File

@ -5,7 +5,7 @@
replicaCount: 1 replicaCount: 1
image: image:
repository: "quay.io/nebulous/monitoring-data-persistor-monitoring-data-persistor" repository: "quay.io/nebulous/monitoring-data-persistor"
pullPolicy: IfNotPresent pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion. # Overrides the image tag whose default is the chart appVersion.
tag: "" tag: ""

View File

@ -1,33 +0,0 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

View File

@ -1,15 +0,0 @@
#
# Build stage
#
FROM docker.io/library/maven:3.9.2-eclipse-temurin-17 AS build
COPY src /home/app/src
COPY pom.xml /home/app
RUN mvn -f /home/app/pom.xml clean package
#
# Package stage
#
FROM docker.io/library/eclipse-temurin:17-jre
COPY --from=build /home/app/target/demo-0.0.1-SNAPSHOT.jar /usr/local/lib/demo.jar
EXPOSE 8080
ENTRYPOINT ["java","-jar","/usr/local/lib/demo.jar"]

View File

@ -1,42 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,13 +0,0 @@
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

View File

@ -1,14 +0,0 @@
package com.example.demo;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@RequestMapping("/")
public Object root() {
return null;
}
}

View File

@ -1,13 +0,0 @@
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemoApplicationTests {
@Test
void contextLoads() {
}
}

View File

@ -1,42 +1,41 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems # Use python:3.11 as the source stage to build the package
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
FROM python:3.11 as source FROM python:3.11 as source
#RUN pip install --no-cache-dir --upgrade pip
# Create and set the working directory
RUN mkdir /src RUN mkdir /src
COPY ./src/ /src/ COPY ./src/ /src/
WORKDIR /src WORKDIR /src
# Install dependencies and package the application
RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist
#RUN ls ./dist/
# Start the final stage using Ubuntu as the base image
FROM ubuntu:noble FROM ubuntu:noble
# Set the environment variables
ENV TZ=Europe/Athens ENV TZ=Europe/Athens
ENV VERSION=1.0.0 ENV VERSION=1.0.0
ENV LOG_FILE=monitoring_data_persistor.log ENV LOG_FILE=/home/monitoring_data_persistor.log
# Set the timezone
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install --no-install-recommends -y \
libffi8=3.4.4-2 \
build-essential=12.10ubuntu1 \
libssl-dev=3.0.10-1ubuntu3 \
libffi-dev=3.4.4-2 \
python3-dev=3.11.4-5ubuntu1 \
python3=3.11.4-5ubuntu1 \
python3.11-venv=3.11.7-2 \
python3-pip=23.3+dfsg-1 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=source ./src/dist/monitoring-data-persistor-$VERSION.tar.gz /home/ # Install the required packages
COPY ./ /home/monitoring-data-persistor-$VERSION RUN apt-get update && apt-get install --no-install-recommends -y \
python3 \
python3-venv \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Copy the packaged application from the source stage
COPY --from=source /src/dist/monitoring-data-persistor-$VERSION.tar.gz /home/
# Set the working directory
WORKDIR /home WORKDIR /home
RUN python3 -m venv monitoring-data-env && /bin/sh -c ". monitoring-data-env/bin/activate && pip install --no-cache-dir /home/monitoring-data-persistor-$VERSION.tar.gz"
RUN tar -xzvf /home/monitoring-data-persistor-$VERSION.tar.gz && /bin/sh -c ". monitoring-data-env/bin/activate && pip install --no-cache-dir -r /home/monitoring-data-persistor-$VERSION/src/requirements.txt" # Create a virtual environment and install the application package
#RUN bash -x /home/monitoring-data-persistor-$VERSION/src/prepare_project_code.sh RUN python3 -m venv monitoring-data-env \
#COPY ./ /home/monitoring-data-persistor && . monitoring-data-env/bin/activate \
CMD ["/bin/sh","-c",". monitoring-data-env/bin/activate && python3 -u /home/monitoring-data-persistor-$VERSION/src/main/runtime/DataPersistor.py /home/monitoring-data-persistor-$VERSION/src/resources/config.properties 2>&1 > $LOG_FILE "] && pip install --no-cache-dir monitoring-data-persistor-$VERSION.tar.gz
CMD ["/bin/bash", "-c", "source monitoring-data-env/bin/activate && python3 -u /home/monitoring-data-persistor-$VERSION/src/main/runtime/DataPersistor.py /home/monitoring-data-persistor-$VERSION/src/resources/config.properties > $LOG_FILE 2>&1"]

View File

@ -3,8 +3,8 @@ import os
from proton.reactor import Container from proton.reactor import Container
from main.exn.core import state_publisher, schedule_publisher from exn.core import state_publisher, schedule_publisher
from main.exn.core.context import Context from exn.core.context import Context
from .core.manager import Manager from .core.manager import Manager
from .settings import base from .settings import base
from .handler import connector_handler from .handler import connector_handler

View File

@ -1,13 +1,19 @@
class Constants: class Constants:
application_name_prefix = "nebulous_"
start_forecasting_prefix = "topic://eu.nebulouscloud.forecasting.start_forecasting."
forecasting_method_name = "exponentialsmoothing"
debug_messages = True
metric_list_topic = "metric_list"
configuration_file_location = "/home/resources/config.properties" configuration_file_location = "/home/resources/config.properties"
broker_port = "5672" broker_port = "5672"
broker_ip = "localhost" broker_ip = "localhost"
broker_username = "admin" broker_username = "my-user"
broker_password = "admin" broker_password = "my-password"
monitoring_broker_topic = "monitoring" monitoring_broker_topic = "eu.nebulouscloud.monitoring"
bucket_name = "nebulous" bucket_name = "my-bucket"
organization_name = "nebulous" organization_name = "my-org"
db_token = "tzIfpbU9b77quyvN0yHIbWltSh1c1371-o9nl_wJYaeo5TWdk5txyxXhp2iaLVMvOvf020HnEEAkE0yy5AllKQ==" organization_id = "e0033247dcca0c54"
db_token = "my-super-secret-auth-token"
db_hostname = "localhost" db_hostname = "localhost"
db_port = "8086" db_port = "8086"
monitoring_prefix = "topic://eu.nebulouscloud."+monitoring_broker_topic monitoring_prefix = "topic://"+monitoring_broker_topic+"."

View File

@ -8,48 +8,73 @@ from jproperties import Properties
from influxdb_client import Point, WritePrecision, InfluxDBClient from influxdb_client import Point, WritePrecision, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
import exn
from Constants import Constants from Constants import Constants
from InfluxDBConnector import InfluxDBConnector from InfluxDBConnector import InfluxDBConnector
from main.exn import connector, core from exn import connector, core
from main.exn.handler.connector_handler import ConnectorHandler from exn.core.handler import Handler
from exn.handler.connector_handler import ConnectorHandler
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('main.exn.connector').setLevel(logging.DEBUG) logging.getLogger('main.exn.connector').setLevel(logging.DEBUG)
class Bootstrap(ConnectorHandler): class Bootstrap(ConnectorHandler):
pass
class ConsumerHandler(Handler):
influx_connector = InfluxDBConnector() influx_connector = InfluxDBConnector()
application_name = ""
def __init__(self,application_name):
self.application_name = application_name
def on_message(self, key, address, body, context, **kwargs): def on_message(self, key, address, body, context, **kwargs):
logging.info(f"Received {key} => {address}") logging.info(f"Received {key} => {address}")
application_name = "default_application" if ((str(address)).startswith(Constants.monitoring_prefix) and not (str(address)).endswith(Constants.metric_list_topic)):
if (str(address)).startswith(Constants.monitoring_prefix):
logging.info("New monitoring data arrived at topic "+address) logging.info("New monitoring data arrived at topic "+address)
logging.info(body) logging.info(body)
point = Point(str(address).split(".")[-1]).field("metricValue",body["metricValue"]).tag("level",body["level"]).tag("component_id",body["component_id"]).tag("application_name",application_name) point = Point(str(address).split(".")[-1]).field("metricValue",body["metricValue"]).tag("level",body["level"]).tag("component_id",body["component_id"]).tag("application_name",self.application_name)
point.time(body["timestamp"],write_precision=WritePrecision.S) point.time(body["timestamp"],write_precision=WritePrecision.S)
self.influx_connector.write_data(point) self.influx_connector.write_data(point,self.application_name)
else:
print("Address is "+str(address)+", but it was expected for it to start with " + Constants.monitoring_prefix)
class GenericConsumerHandler(Handler):
def on_message(self, key, address, body, context, **kwargs):
if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic):
application_name = body["name"]
logging.info("New metrics list message for application "+application_name)
connector = exn.connector.EXN('slovid', handler=Bootstrap(),
consumers=[
core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.>', application=application_name,topic=True, fqdn=True, handler=ConsumerHandler(application_name=application_name)),
],
url=Constants.broker_ip,
port=Constants.broker_port,
username=Constants.broker_username,
password=Constants.broker_password
)
#connector.start()
thread = threading.Thread(target=connector.start,args=())
thread.start()
def update_properties(configuration_file_location): def update_properties(configuration_file_location):
p = Properties() p = Properties()
with open(Constants.configuration_file_location, "rb") as f: with open(configuration_file_location, "rb") as f:
p.load(f, "utf-8") p.load(f, "utf-8")
Constants.broker_ip, metadata = p["broker_ip"] Constants.broker_ip, metadata = p["broker_ip"]
Constants.broker_port, metadata = p["broker_port"] Constants.broker_port, metadata = p["broker_port"]
Constants.broker_username, metadata = p["broker_username"] Constants.broker_username, metadata = p["broker_username"]
Constants.broker_password, metadata = p["broker_password"] Constants.broker_password, metadata = p["broker_password"]
Constants.monitoring_broker_topic, metadata = p["monitoring_broker_topic"] Constants.monitoring_broker_topic, metadata = p["monitoring_broker_topic"]
Constants.organization_name,metadata = p["organization_name"]
Constants.bucket_name,metadata = p["bucket_name"]
if __name__ == "__main__": if __name__ == "__main__":
Constants.configuration_file_location = sys.argv[1] Constants.configuration_file_location = sys.argv[1]
update_properties(Constants.configuration_file_location) update_properties(Constants.configuration_file_location)
application_handler = Bootstrap() component_handler = Bootstrap()
connector = connector.EXN('slovid', handler=application_handler,
connector = connector.EXN('slovid', handler=component_handler,
consumers=[ consumers=[
core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.>', topic=True, handler=application_handler), core.consumer.Consumer('data_persistor_application', Constants.monitoring_broker_topic + '.>', topic=True, fqdn=True, handler=GenericConsumerHandler()),
], ],
url=Constants.broker_ip, url=Constants.broker_ip,
port=Constants.broker_port, port=Constants.broker_port,
@ -60,3 +85,5 @@ if __name__ == "__main__":
thread = threading.Thread(target=connector.start,args=()) thread = threading.Thread(target=connector.start,args=())
thread.start() thread.start()
print("Waiting for messages at the metric list topic, in order to start receiving applications")

View File

@ -1,13 +1,14 @@
import logging import logging
import threading import threading
import time,random import time,random
import traceback
from influxdb_client import Point, WritePrecision, InfluxDBClient from influxdb_client import Point, WritePrecision, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
from Constants import Constants from Constants import Constants
from InfluxDBConnector import InfluxDBConnector from InfluxDBConnector import InfluxDBConnector
from main.exn import connector, core from exn import connector, core
from datetime import datetime from datetime import datetime
@ -18,12 +19,12 @@ class Bootstrap(connector.connector_handler.ConnectorHandler):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('main.exn.connector').setLevel(logging.DEBUG) logging.getLogger('main.exn.connector').setLevel(logging.DEBUG)
application_name = "_Application1"
metric_list = ["cpu_usage","ram_usage"] metric_list = ["cpu_usage","ram_usage"]
publisher_dict = {} publisher_dict = {}
publisher_list = [] publisher_list = []
for metric in metric_list: for metric in metric_list:
new_publisher = (core.publisher.Publisher("slovid","monitoring."+metric,topic=True)) new_publisher = (core.publisher.Publisher("demopublisher_"+metric,"eu.nebulouscloud.monitoring."+metric,topic=True,fqdn=True))
publisher_dict[metric]= new_publisher publisher_dict[metric]= new_publisher
publisher_list.append(new_publisher) publisher_list.append(new_publisher)
@ -55,8 +56,11 @@ for metric_name in metric_list:
"component_id":"wordpress_1", "component_id":"wordpress_1",
"timestamp": time_point "timestamp": time_point
} }
publisher_dict[metric_name].send(body=message) try:
if counter%50==0: publisher_dict[metric_name].send(body=message,application=application_name)
print("Sending message "+str(counter)) if counter%50==0:
counter = counter +1 print("Sending message "+str(counter))
counter = counter +1
except Exception as e:
print(traceback.format_exc())

View File

@ -1,17 +1,53 @@
import json
import requests
from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client import InfluxDBClient, Point, WritePrecision
from Constants import Constants from Constants import Constants
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
def create_influxdb_bucket(application_name):
bucket_name = Constants.application_name_prefix+application_name+"_bucket"
# Replace with your actual values
url = 'http://' + Constants.db_hostname + ':8086/api/v2/buckets'
token = Constants.db_token
headers = {
'Authorization': 'Token {}'.format(token),
'Content-Type': 'application/json'
}
data = {
'name': bucket_name,
'orgID': Constants.organization_id,
'retentionRules': [
{
'type': 'expire',
'everySeconds': 2592000 #30 days (30*24*3600)
}
]
}
response = requests.post(url, headers=headers, data=json.dumps(data))
print(response.status_code)
print(response.json())
return bucket_name
class InfluxDBConnector: class InfluxDBConnector:
client = InfluxDBClient(url="http://"+Constants.db_hostname+":"+Constants.db_port, token=Constants.db_token, org=Constants.organization_name)
write_api = client.write_api(write_options=SYNCHRONOUS)
def __init__(self):
self.client = InfluxDBClient(url="http://"+Constants.db_hostname+":"+Constants.db_port, token=Constants.db_token, org=Constants.organization_name)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.influxdb_bucket_created = False
self.bucket_name = "demo_bucket"
def InfluxDBConnector(self): def InfluxDBConnector(self):
pass pass
def write_data(self,data): def write_data(self,data,application_name):
self.write_api.write(bucket=Constants.bucket_name, org=Constants.organization_name, record=data, write_precision=WritePrecision.S) if not self.influxdb_bucket_created:
self.bucket_name = create_influxdb_bucket(application_name)
self.influxdb_bucket_created = True
self.write_api.write(bucket=self.bucket_name, org=Constants.organization_name, record=data, write_precision=WritePrecision.S)
def get_data(self): def get_data(self):
query_api = self.client.query_api() query_api = self.client.query_api()

View File

@ -0,0 +1,19 @@
import threading
from Constants import Constants
from exn import connector, core
from main.runtime.DataPersistor import Bootstrap
if __name__=="__main__":
application_handler = Bootstrap()
connector = connector.EXN('slovid', handler=application_handler,
consumers=[
core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.>', topic=True,fqdn=True, handler=application_handler),
],
url=Constants.broker_ip,
port=Constants.broker_port,
username=Constants.broker_username,
password=Constants.broker_password
)
#connector.start()
thread = threading.Thread(target=connector.start,args=())
thread.start()

View File

@ -2,4 +2,8 @@ broker_ip=localhost
broker_port=5672 broker_port=5672
broker_username=admin broker_username=admin
broker_password=admin broker_password=admin
monitoring_broker_topic =monitoring db_username=my-user
db_password=my-password
monitoring_broker_topic =eu.nebulouscloud.monitoring
organization_name=my-org
bucket_name=my-bucket

View File

@ -4,7 +4,7 @@ setup(
name='monitoring-data-persistor', name='monitoring-data-persistor',
version='1.0.0', version='1.0.0',
#packages=find_packages('.'), #packages=find_packages('.'),
packages=["main","main.exn","main.exn.core","main.exn.handler","main.exn.settings","main.runtime"], packages=["main","exn","exn.core","exn.handler","exn.settings","main.runtime"],
package_dir={'': '.'}, package_dir={'': '.'},
entry_points={ entry_points={

View File

@ -14,9 +14,9 @@
container_images: container_images:
- context: monitoring-data-persistor - context: monitoring-data-persistor
registry: quay.io registry: quay.io
repository: quay.io/nebulous/monitoring-data-persistor-monitoring-data-persistor repository: quay.io/nebulous/monitoring-data-persistor
namespace: nebulous namespace: nebulous
repo_shortname: monitoring-data-persistor-monitoring-data-persistor repo_shortname: monitoring-data-persistor
repo_description: "" repo_description: ""
- job: - job: