Initial Commit

Initial commit of the component to the repository

Change-Id: Ie61d9c52a4aa66b732c42f9353dd73c34521e990
This commit is contained in:
Andreas Tsagkaropoulos 2024-01-15 13:51:38 +02:00
parent e631dc2283
commit 1d5c54125f
33 changed files with 4377 additions and 13 deletions

View File

@ -35,16 +35,18 @@ spec:
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: 8080
containerPort: 8070
protocol: TCP
livenessProbe:
httpGet:
path: /
port: http
exec:
command:
- ls
- /home
readinessProbe:
httpGet:
path: /
port: http
exec:
command:
- ls
- /home
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}

View File

@ -5,7 +5,7 @@
replicaCount: 1
image:
repository: "quay.io/nebulous/monitoring-data-persistor-java-spring-boot-demo"
repository: "quay.io/nebulous/monitoring-data-persistor-monitoring-data-persistor"
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""

View File

@ -0,0 +1,42 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
FROM python:3.11 as source
#RUN pip install --no-cache-dir --upgrade pip
RUN mkdir /src
COPY ./src/ /src/
WORKDIR /src
RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist
#RUN ls ./dist/
FROM ubuntu:noble
ENV TZ=Europe/Athens
ENV VERSION=1.0.0
ENV LOG_FILE=monitoring_data_persistor.log
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install --no-install-recommends -y \
libffi8=3.4.4-2 \
build-essential=12.10ubuntu1 \
libssl-dev=3.0.10-1ubuntu3 \
libffi-dev=3.4.4-2 \
python3-dev=3.11.4-5ubuntu1 \
python3=3.11.4-5ubuntu1 \
python3.11-venv=3.11.7-2 \
python3-pip=23.3+dfsg-1 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=source ./src/dist/monitoring-data-persistor-$VERSION.tar.gz /home/
COPY ./ /home/monitoring-data-persistor-$VERSION
WORKDIR /home
RUN python3 -m venv monitoring-data-env && /bin/sh -c ". monitoring-data-env/bin/activate && pip install --no-cache-dir /home/monitoring-data-persistor-$VERSION.tar.gz"
RUN tar -xzvf /home/monitoring-data-persistor-$VERSION.tar.gz && /bin/sh -c ". monitoring-data-env/bin/activate && pip install --no-cache-dir -r /home/monitoring-data-persistor-$VERSION/src/requirements.txt"
#RUN bash -x /home/monitoring-data-persistor-$VERSION/src/prepare_project_code.sh
#COPY ./ /home/monitoring-data-persistor
CMD ["/bin/sh","-c",". monitoring-data-env/bin/activate && python3 -u /home/monitoring-data-persistor-$VERSION/src/main/runtime/DataPersistor.py /home/monitoring-data-persistor-$VERSION/src/resources/config.properties 2>&1 > $LOG_FILE "]

View File

@ -0,0 +1,14 @@
### Necessary steps:
To further develop the component, some changes might be needed. Specifically, the NEBULOUS_BASE_NAME might need to be changed in the library code connecting the component with the broker.
### Interacting with the influx cli:
```bash
influx config create --config-name nebulous_conf \
--host-url http://localhost:8086 \
--org nebulous \
--token tq5pA5uyt3pk65g9fALRZbMXCNg-81bDXuWK3NmjAyQN-t_cT3zFAbHzhtSeX83mJ1PqwZmeXALfKfdvDlGhVQ== \
--active
```

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
from . import core
from . import handler
from . import settings
from . import connector

View File

@ -0,0 +1,84 @@
import logging
import os
from proton.reactor import Container
from main.exn.core import state_publisher, schedule_publisher
from main.exn.core.context import Context
from .core.manager import Manager
from .settings import base
from .handler import connector_handler
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
_logger = logging.getLogger(__name__)
class EXN:
context = None
container = None
def __init__(self, component=None,
handler:connector_handler.ConnectorHandler = None,
publishers=None,
consumers=None,
**kwargs):
# Load .env file
# Validate and set connector
if not component:
_logger.error("Component cannot be empty or None")
raise ValueError("Component cannot be empty or None")
self.component = component
self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL'))
self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT'))
self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME'))
self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD'))
self.handler = handler
# Validate attributes
if not self.url:
_logger.error("URL cannot be empty or None")
raise ValueError("URL cannot be empty or None")
if not self.port:
_logger.error("PORT cannot be empty or None")
raise ValueError("PORT cannot be empty or None")
if not self.username:
_logger.error("USERNAME cannot be empty or None")
raise ValueError("USERNAME cannot be empty or None")
if not self.password:
_logger.error("PASSWORD cannot be empty or None")
raise ValueError("PASSWORD cannot be empty or None")
self.context = Context(base=f"{base.NEBULOUS_BASE_NAME}.{self.component}")
if not publishers:
publishers = []
if not consumers:
consumers = []
compiled_publishers = publishers
if kwargs.get("enable_state",False):
compiled_publishers.append(state_publisher.Publisher())
if kwargs.get("enable_health",False):
compiled_publishers.append(schedule_publisher.Publisher(
base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT,
'health',
'health',
topic=True))
for c in consumers:
self.context.register_consumers(c)
for p in compiled_publishers:
self.context.register_publisher(p)
def start(self):
self.context.start(Manager(f"{self.url}:{self.port}"),self.handler)
def stop(self):
self.context.stop()

View File

@ -0,0 +1,9 @@
from . import context
from . import handler
from . import publisher
from . import consumer
from . import state_publisher
from . import schedule_publisher

View File

@ -0,0 +1,43 @@
import logging
from proton import Event
from .handler import Handler
from . import link
from proton.handlers import MessagingHandler
_logger = logging.getLogger(__name__)
_logger.setLevel(level=logging.DEBUG)
class Consumer(link.Link, MessagingHandler):
application = None
def __init__(self, key, address, handler: Handler, application=None, topic=False, fqdn=False):
super(Consumer, self).__init__(key, address, topic, fqdn)
self.application = application
self.handler = handler
self.handler._consumer = self
def should_handle(self, event: Event):
should = event.link.name == self._link.name and \
(self.application is None or event.message.subject == self.application)
_logger.debug(f"[{self.key}] checking if link is the same {event.link.name}={self._link.name} "
f" and application {self.application}={event.message.subject} == {should}")
return should
def on_start(self, event: Event) -> None:
_logger.debug(f"[{self.key}] on_start")
def on_message(self, event):
_logger.debug(f"[{self.key}] handling event with address => {event.message.address}")
try:
if self.should_handle(event):
self.handler.on_message(self.key, event.message.address, event.message.body, event.message)
except Exception as e:
_logger.error(f"Received message: {e}")

View File

@ -0,0 +1,109 @@
import logging
from proton.reactor import Container
from . import link
from .manager import Manager
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.DEBUG)
class Context:
base = None
handler = None
publishers = {}
consumers = {}
_manager = None
def __init__(self, base):
self.base = base
def start(self, manager:Manager, handler):
self._manager = manager
def on_ready():
_logger.debug("[context] on_ready" )
for key,publisher in self.publishers.items():
self._manager.start_publisher(self,publisher)
for key,consumer in self.consumers.items():
self._manager.start_consumer(self,consumer)
handler.ready(context=self)
self._manager._on_ready=on_ready
self._manager.start()
def stop(self):
if self._manager is not None and self._manager.started:
for key,publisher in self.publishers:
publisher._link.close()
for key,consumer in self.consumers:
consumer._link.close()
self._manager.close()
def register_publisher(self, publisher):
if publisher.key in self.publishers:
_logger.warning("[context] Trying to register publisher that already exists")
return
_logger.info(f"[context] registering publisher {publisher.key} {publisher.address}" )
self.publishers[publisher.key] = publisher
if self._manager is not None and self._manager.started:
self._manager.start_publisher(self,publisher)
def get_publisher(self, key):
if key in self.publishers:
return self.publishers[key]
return None
def has_publisher(self, key):
return key in self.publishers
def has_consumer(self, key):
return key in self.consumers
def register_consumers(self, consumer):
if consumer.key in self.consumers:
_logger.warning("[context] Trying to register consumer that already exists")
return
self.consumers[consumer.key] = consumer
if self._manager is not None and self._manager.started:
self._manager.start_consumer(self,consumer)
def unregister_consumer(self, key):
if not key in self.consumers:
_logger.warning("[context] Trying to unregister consumer that does not exists")
return
consumer = self.consumers.pop(key)
if self._manager is not None and self._manager.started:
consumer._link.close()
def unregister_publisher(self, key):
if not key in self.consumers:
_logger.warning("[context] Trying to unregister publisher that does not exists")
return
publisher = self.publishers.pop(key)
if self._manager is not None and self._manager.started:
publisher._link.close()
def build_address_from_link(self, link: link.Link):
if link.fqdn:
address = link.address
if link.topic and not link.address.startswith("topic://"):
address = f"topic://{address}"
return address
address = f"{self.base}.{link.address}"
if link.topic:
address = f"topic://{address}"
return address

View File

@ -0,0 +1,11 @@
import logging
from proton import Message
_logger = logging.getLogger(__name__)
class Handler:
def on_message(self, key, address, body, message: Message, context=None):
_logger.info(f"You should really override this... {key}=>{address}")

View File

@ -0,0 +1,20 @@
from proton import Link as pLink
class Link:
fqdn=False
def __init__(self, key, address, topic=False, fqdn=False):
super().__init__()
self.key = key
self.address = address
self.topic= topic
self.fqdn= fqdn
self._link = None
def set(self, link:pLink):
# The proton container creates a sender
# so we just use composition instead of extension
self._link = link

View File

@ -0,0 +1,71 @@
import logging
from proton import Event, Connection,Session
from proton.handlers import MessagingHandler
from proton.reactor import Container
from .consumer import Consumer
from .publisher import Publisher
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.DEBUG)
class SessionPerConsumer(object):
def session(self, connection: Connection) -> Session:
session = connection.session()
session.open()
return session
class Manager(MessagingHandler):
uri = None
started = False
container = None
connection = None
_on_ready = None
def __init__(self, uri):
super(Manager, self).__init__()
self.uri = uri
def start(self):
_logger.info(f"[manager] starting")
self.container = Container(self)
self.container.run()
def on_start(self, event: Event) -> None:
self.connection = self.container.connect(self.uri)
self.connection._session_policy=SessionPerConsumer()
self.started=True
_logger.debug(f"[manager] on_start")
if self._on_ready is not None:
self._on_ready()
def on_message(self, event: Event) -> None:
_logger.warning(f"[manager] received generic on_message make sure you have set up your handlers"
f" properly ")
def close(self):
_logger.info(f"[manager] closing")
if self.container:
self.container.stop()
if self.connection:
self.connection.close()
def start_publisher(self, context, publisher: Publisher):
address = context.build_address_from_link(publisher)
_logger.info(f"[manager] starting publisher {publisher.key} => {address}")
publisher.set(self.container.create_sender(self.connection, address))
if hasattr(publisher, "delay"):
_logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}")
self.container.schedule(publisher.delay, handler=publisher)
def start_consumer(self, context, consumer: Consumer):
address = context.build_address_from_link(consumer)
_logger.info(f"[manager] starting consumer {consumer.key} => {address}")
consumer.set(self.container.create_receiver(self.connection, address , handler=consumer))

View File

@ -0,0 +1,36 @@
import datetime
import logging
from proton import Message
from . import link
_logger = logging.getLogger(__name__)
class Publisher(link.Link):
def send(self, body=None, application=None):
if not body:
body = {}
_logger.info(f"[{self.key}] sending to {self._link.target.address} for application={application} - {body} ")
msg = self._prepare_message(body)
if application:
msg.subject = application
self._link.send(msg)
def _prepare_message(self, body=None):
if not body:
body = {}
send = {"when": datetime.datetime.utcnow().isoformat()}
send.update(body)
msg = Message(
address=self._link.target.address,
body=send
)
msg.content_type = "application/json"
return msg

View File

@ -0,0 +1,24 @@
import logging
from proton.handlers import MessagingHandler
from .publisher import Publisher
_logger = logging.getLogger(__name__)
class Publisher(Publisher, MessagingHandler):
send_next = False
delay = 15
def __init__(self, delay, key, address, application=None, topic=False, fqdn=False):
super(Publisher, self).__init__(key, address, topic,fqdn)
self.delay = delay
self.application = application
def on_timer_task(self, event):
_logger.debug(f"[manager] on_timer_task")
self.send()
event.reactor.schedule(self.delay, self)
def send(self, body=None, application=None):
super(Publisher, self).send(body, self.application)

View File

@ -0,0 +1,45 @@
import datetime
import json
from enum import Enum
from proton import Message
from . import publisher
import logging
_logger = logging.getLogger(__name__)
class States(Enum):
STARTING = "starting"
STARTED = "started"
READY = "ready"
STOPPING = "stopping"
STOPPED = "stopped"
class Publisher(publisher.Publisher):
def __init__(self):
super().__init__("state","state", True)
def _send_message(self, message_type):
self.send({"state": message_type,"message": None})
def starting(self):
self._send_message(States.STARTING.value)
def started(self):
self._send_message(States.STARTED.value)
def ready(self):
self._send_message(States.READY.value)
def stopping(self):
self._send_message(States.STOPPING.value)
def stopped(self):
self._send_message(States.STOPPED.value)
def custom(self, state):
self._send_message(state)

View File

@ -0,0 +1,2 @@
from . import connector_handler

View File

@ -0,0 +1,12 @@
import logging
_logger = logging.getLogger(__name__)
class ConnectorHandler:
def ready(self, context):
pass

View File

@ -0,0 +1 @@
from . import base

View File

@ -0,0 +1,2 @@
NEBULOUS_BASE_NAME="eu.nebulouscloud"
NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT=15

View File

@ -0,0 +1,13 @@
class Constants:
configuration_file_location = "/home/resources/config.properties"
broker_port = "5672"
broker_ip = "localhost"
broker_username = "admin"
broker_password = "admin"
monitoring_broker_topic = "monitoring"
bucket_name = "nebulous"
organization_name = "nebulous"
db_token = "tzIfpbU9b77quyvN0yHIbWltSh1c1371-o9nl_wJYaeo5TWdk5txyxXhp2iaLVMvOvf020HnEEAkE0yy5AllKQ=="
db_hostname = "localhost"
db_port = "8086"
monitoring_prefix = "topic://eu.nebulouscloud."+monitoring_broker_topic

View File

@ -0,0 +1,62 @@
import logging
import os
import sys
import threading
import time
from jproperties import Properties
from influxdb_client import Point, WritePrecision, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from Constants import Constants
from InfluxDBConnector import InfluxDBConnector
from main.exn import connector, core
from main.exn.handler.connector_handler import ConnectorHandler
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('main.exn.connector').setLevel(logging.DEBUG)
class Bootstrap(ConnectorHandler):
influx_connector = InfluxDBConnector()
def on_message(self, key, address, body, context, **kwargs):
logging.info(f"Received {key} => {address}")
application_name = "default_application"
if (str(address)).startswith(Constants.monitoring_prefix):
logging.info("New monitoring data arrived at topic "+address)
logging.info(body)
point = Point(str(address).split(".")[-1]).field("metricValue",body["metricValue"]).tag("level",body["level"]).tag("component_id",body["component_id"]).tag("application_name",application_name)
point.time(body["timestamp"],write_precision=WritePrecision.S)
self.influx_connector.write_data(point)
else:
print("Address is "+str(address)+", but it was expected for it to start with " + Constants.monitoring_prefix)
def update_properties(configuration_file_location):
p = Properties()
with open(Constants.configuration_file_location, "rb") as f:
p.load(f, "utf-8")
Constants.broker_ip, metadata = p["broker_ip"]
Constants.broker_port, metadata = p["broker_port"]
Constants.broker_username, metadata = p["broker_username"]
Constants.broker_password, metadata = p["broker_password"]
Constants.monitoring_broker_topic, metadata = p["monitoring_broker_topic"]
if __name__ == "__main__":
Constants.configuration_file_location = sys.argv[1]
update_properties(Constants.configuration_file_location)
application_handler = Bootstrap()
connector = connector.EXN('slovid', handler=application_handler,
consumers=[
core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.>', topic=True, handler=application_handler),
],
url=Constants.broker_ip,
port=Constants.broker_port,
username=Constants.broker_username,
password=Constants.broker_password
)
#connector.start()
thread = threading.Thread(target=connector.start,args=())
thread.start()

View File

@ -0,0 +1,62 @@
import logging
import threading
import time,random
from influxdb_client import Point, WritePrecision, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from Constants import Constants
from InfluxDBConnector import InfluxDBConnector
from main.exn import connector, core
from datetime import datetime
class Bootstrap(connector.connector_handler.ConnectorHandler):
pass
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('main.exn.connector').setLevel(logging.DEBUG)
metric_list = ["cpu_usage","ram_usage"]
publisher_dict = {}
publisher_list = []
for metric in metric_list:
new_publisher = (core.publisher.Publisher("slovid","monitoring."+metric,topic=True))
publisher_dict[metric]= new_publisher
publisher_list.append(new_publisher)
connector = connector.EXN('slovid', handler=Bootstrap(),
consumers=[],
publishers=publisher_list,
url="localhost",
port="5672",
username="admin",
password="admin",
enable_health=False
)
#connector.start()
thread = threading.Thread(target=connector.start,args=())
thread.start()
time.sleep(5)
time_to_generate_time_for = 10*3600
frequency = 5
for metric_name in metric_list:
current_time = int(time.time())
counter = 0
print("Data for "+metric_name)
for time_point in range(current_time-time_to_generate_time_for,current_time,frequency):
random_value = random.uniform(0,100)
message = {
"metricValue": random_value,
"level": 1,
"component_id":"wordpress_1",
"timestamp": time_point
}
publisher_dict[metric_name].send(body=message)
if counter%50==0:
print("Sending message "+str(counter))
counter = counter +1

View File

@ -0,0 +1,25 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision
from Constants import Constants
from influxdb_client.client.write_api import SYNCHRONOUS
class InfluxDBConnector:
client = InfluxDBClient(url="http://"+Constants.db_hostname+":"+Constants.db_port, token=Constants.db_token, org=Constants.organization_name)
write_api = client.write_api(write_options=SYNCHRONOUS)
def InfluxDBConnector(self):
pass
def write_data(self,data):
self.write_api.write(bucket=Constants.bucket_name, org=Constants.organization_name, record=data, write_precision=WritePrecision.S)
def get_data(self):
query_api = self.client.query_api()
query = """from(bucket: "nebulous")
|> range(start: -1m)
|> filter(fn: (r) => r._measurement == "temperature")"""
tables = query_api.query(query, org=Constants.organization_name)
for table in tables:
for record in table.records:
print(record)

View File

@ -0,0 +1,32 @@
from datetime import datetime
from Constants import Constants
from InfluxDBConnector import InfluxDBConnector
import time
## This utility assumes that the database has been filled with values for cpu usage and ram usage
metric_names = ["cpu_usage","ram_usage"]
for metric_name in metric_names:
time_interval_to_get_data_for = "10h"
print_data_from_db = True
query_string = 'from(bucket: "'+Constants.bucket_name+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")'
influx_connector = InfluxDBConnector()
for counter in range(10):
print("performing query")
current_time = time.time()
result = influx_connector.client.query_api().query(query_string, Constants.organization_name)
elapsed_time = time.time()-current_time
print("performed query, it took "+str(elapsed_time) + " seconds")
#print(result.to_values())
for table in result:
#print header row
print("Timestamp,ems_time,"+metric_name)
for record in table.records:
dt = datetime.fromisoformat(str(record.get_time()))
epoch_time = int(dt.timestamp())
metric_value = record.get_value()
if(print_data_from_db):
print(str(epoch_time)+","+str(epoch_time)+","+str(metric_value))
time.sleep(10)

View File

@ -0,0 +1,8 @@
#!/bin/bash
VERSION="1.0.0"
pip install monitoring-data-persistor-$VERSION.tar.gz
tar -xzvf monitoring-data-persistor-$VERSION.tar.gz
pip3 install -r requirements.txt

View File

@ -0,0 +1,5 @@
python-dotenv==1.0.0
python-qpid-proton==0.39.0
influxdb-client==1.39.0
jproperties==2.1.1
#libffi7 is required in linux first for python-qpid-proton

View File

@ -0,0 +1,5 @@
broker_ip=localhost
broker_port=5672
broker_username=admin
broker_password=admin
monitoring_broker_topic =monitoring

View File

@ -0,0 +1,16 @@
from setuptools import setup, find_packages
setup(
name='monitoring-data-persistor',
version='1.0.0',
#packages=find_packages('.'),
packages=["main","main.exn","main.exn.core","main.exn.handler","main.exn.settings","main.runtime"],
package_dir={'': '.'},
entry_points={
'console_scripts': [
'start_exsmoothing = main.runtime:DataPersistor',
],
}
# other setup configurations
)

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
jproperties~=2.1.1
setuptools~=49.2.1

View File

@ -8,15 +8,15 @@
- nebulous-monitoring-data-persistor-container-images
description: Build the container images.
files: &image_files
- ^java-spring-boot-demo/
- ^monitoring-data-persistor/
vars: &image_vars
promote_container_image_job: nebulous-monitoring-data-persistor-upload-container-images
container_images:
- context: java-spring-boot-demo
- context: monitoring-data-persistor
registry: quay.io
repository: quay.io/nebulous/monitoring-data-persistor-java-spring-boot-demo
repository: quay.io/nebulous/monitoring-data-persistor-monitoring-data-persistor
namespace: nebulous
repo_shortname: monitoring-data-persistor-java-spring-boot-demo
repo_shortname: monitoring-data-persistor-monitoring-data-persistor
repo_description: ""
- job:
@ -44,7 +44,7 @@
description: Run Hadolint on Dockerfile(s).
vars:
dockerfiles:
- java-spring-boot-demo/Dockerfile
- monitoring-data-persistor/Dockerfile
- job:
name: nebulous-monitoring-data-persistor-helm-lint