Corrections

Correction in the Dockerfile to allow starting the component
Correction in the requirements of the component and its configuration file
Correction in the logic of spawning new subscribers per application

Change-Id: I69f6e4a45a3e0e044a0fb0b741d97762629495df
This commit is contained in:
Andreas Tsagkaropoulos 2024-04-06 00:13:28 +03:00
parent fd1f1de653
commit 20ed706104
7 changed files with 78 additions and 43 deletions

View File

@ -1,3 +1,4 @@
ignored:
- DL3008
- SC1091
- DL3015

View File

@ -1,41 +1,50 @@
# Use python:3.11 as the source stage to build the package
FROM python:3.11 as source
# First stage: Build the application
FROM python:3.11 as builder
# Create and set the working directory
RUN mkdir /src
COPY ./src/ /src/
# Set the working directory in the container
WORKDIR /src
# Install dependencies and package the application
RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist
# Copy the source code into the container
COPY ./src/ .
# Start the final stage using Ubuntu as the base image
FROM ubuntu:noble
# Install dependencies and build the application package
RUN pip install --no-cache-dir -r requirements.txt && \
python setup.py sdist
# Set the environment variables
ENV TZ=Europe/Athens
ENV VERSION=1.0.0
ENV LOG_FILE=/home/monitoring_data_persistor.log
# Second stage: Prepare the runtime environment
FROM python:3.11-slim
# Set environment variables
ENV TZ=Europe/Athens \
VERSION=1.0.0 \
LOG_FILE=/home/monitoring_data_persistor.log
# Set the timezone
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Install the required packages
RUN apt-get update && apt-get install --no-install-recommends -y \
python3 \
python3-venv \
python3-pip \
# Install gcc and other necessary build tools for compiling C extensions
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone && apt-get update && apt-get install -y \
gcc \
g++ \
make \
libffi-dev \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy the packaged application from the source stage
COPY --from=source /src/dist/monitoring-data-persistor-$VERSION.tar.gz /home/
# Set the working directory
# Set the working directory in the container
WORKDIR /home
# Create a virtual environment and install the application package
RUN python3 -m venv monitoring-data-env \
&& . monitoring-data-env/bin/activate \
&& pip install --no-cache-dir monitoring-data-persistor-$VERSION.tar.gz
# Copy the built application package and other necessary files from the builder stage
COPY --from=builder /src/dist/monitoring-data-persistor-$VERSION.tar.gz ./
COPY --from=builder /src/resources ./resources
COPY --from=builder /src/requirements.txt .
CMD ["/bin/bash", "-c", "source monitoring-data-env/bin/activate && python3 -u /home/monitoring-data-persistor-$VERSION/src/main/runtime/DataPersistor.py /home/monitoring-data-persistor-$VERSION/src/resources/config.properties > $LOG_FILE 2>&1"]
# Install the application
# Optionally, cleanup to reduce image size
# This step removes the build tools and libraries after they are no longer needed
# Cleanup the tar.gz file
RUN pip install --no-cache-dir monitoring-data-persistor-$VERSION.tar.gz && \
pip install --no-cache-dir -r requirements.txt && \
apt-get purge -y --auto-remove gcc g++ make libffi-dev libssl-dev && \
rm monitoring-data-persistor-$VERSION.tar.gz
# Define the entry point command
CMD ["/bin/bash", "-c", "source /home/monitoring-data-env/bin/activate && python3 -u /home/main/runtime/DataPersistor.py /home/resources/config.properties > $LOG_FILE 2>&1"]

View File

@ -28,20 +28,23 @@ class ConsumerHandler(Handler):
if ((str(address)).startswith(Constants.monitoring_prefix) and not (str(address)).endswith(Constants.metric_list_topic)):
logging.info("New monitoring data arrived at topic "+address)
logging.info(body)
point = Point(str(address).split(".")[-1]).field("metricValue",body["metricValue"]).tag("level",body["level"]).tag("component_id",body["component_id"]).tag("application_name",self.application_name)
point.time(body["timestamp"],write_precision=WritePrecision.S)
point = Point(str(address).split(".")[-1]).field("metricValue",body["metricValue"]).tag("level",body["level"]).tag("application_name",self.application_name)
point.time(body["timestamp"],write_precision=WritePrecision.MS)
logging.info("Writing new monitoring data to Influx DB")
self.influx_connector.write_data(point,self.application_name)
class GenericConsumerHandler(Handler):
application_consumer_handler_connectors = {} #dictionary in which keys are applications and values are the consumer handlers.
def on_message(self, key, address, body, context, **kwargs):
if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic):
application_name = body["name"]
logging.info("New metrics list message for application "+application_name)
connector = exn.connector.EXN('slovid', handler=Bootstrap(),
logging.info("New metrics list message for application "+application_name + " - registering new connector")
if (application_name in self.application_consumer_handler_connectors.keys() is not None):
self.application_consumer_handler_connectors[application_name].stop()
connector = exn.connector.EXN('data_persistor-'+application_name, handler=Bootstrap(),
consumers=[
core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.>', application=application_name,topic=True, fqdn=True, handler=ConsumerHandler(application_name=application_name)),
core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.realtime.>', application=application_name,topic=True, fqdn=True, handler=ConsumerHandler(application_name=application_name)),
],
url=Constants.broker_ip,
port=Constants.broker_port,
@ -49,13 +52,18 @@ class GenericConsumerHandler(Handler):
password=Constants.broker_password
)
#connector.start()
self.application_consumer_handler_connectors[application_name] = connector
logging.info(f"Application specific connector registered for application {application_name}")
thread = threading.Thread(target=connector.start,args=())
thread.start()
from time import sleep
sleep(10000)
def update_properties(configuration_file_location):
p = Properties()
with open(configuration_file_location, "rb") as f:
p.load(f, "utf-8")
Constants.db_hostname, metadata = p["db_hostname"]
Constants.broker_ip, metadata = p["broker_ip"]
Constants.broker_port, metadata = p["broker_port"]
Constants.broker_username, metadata = p["broker_username"]

View File

@ -1,4 +1,4 @@
import json
import json,logging
import requests
from influxdb_client import InfluxDBClient, Point, WritePrecision
@ -33,20 +33,35 @@ def create_influxdb_bucket(application_name):
class InfluxDBConnector:
applications_with_influxdb_bucket_created = []
def __init__(self):
self.client = InfluxDBClient(url="http://"+Constants.db_hostname+":"+Constants.db_port, token=Constants.db_token, org=Constants.organization_name)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.influxdb_bucket_created = False
#self.influxdb_bucket_created[application_name] = False
self.bucket_name = "demo_bucket"
def InfluxDBConnector(self):
pass
def write_data(self,data,application_name):
if not self.influxdb_bucket_created:
self.bucket_name = create_influxdb_bucket(application_name)
self.influxdb_bucket_created = True
if not application_name in self.applications_with_influxdb_bucket_created:
org_api = self.client.organizations_api()
# List all organizations
organizations = org_api.find_organizations()
# Find the organization by name and print its ID
for org in organizations:
if org.name == Constants.organization_name:
logging.info(f"Organization Name: {org.name}, ID: {org.id}")
Constants.organization_id = org.id
break
logging.info("The influxdb bucket was reported as not created")
self.bucket_name = create_influxdb_bucket(application_name)
self.applications_with_influxdb_bucket_created.append(application_name)
else:
logging.info("The influxdb bucket was reported as created")
logging.info(f"The data point is {data}")
self.write_api.write(bucket=self.bucket_name, org=Constants.organization_name, record=data, write_precision=WritePrecision.S)
logging.info("The data point has been written!")
def get_data(self,metric_name):
query_api = self.client.query_api()

View File

@ -2,4 +2,5 @@ python-dotenv==1.0.0
python-qpid-proton==0.39.0
influxdb-client==1.39.0
jproperties==2.1.1
requests==2.31.0
#libffi7 is required in linux first for python-qpid-proton

View File

@ -1,7 +1,8 @@
broker_ip=localhost
broker_ip=nebulous-activemq
broker_port=5672
broker_username=admin
broker_password=admin
db_hostname=nebulous-influxdb
db_username=my-user
db_password=my-password
monitoring_broker_topic =eu.nebulouscloud.monitoring

View File

@ -9,7 +9,7 @@ setup(
package_dir={'': '.'},
entry_points={
'console_scripts': [
'start_exsmoothing = main.runtime:DataPersistor',
'start_dp = main.runtime:DataPersistor',
],
}
# other setup configurations