Code Improvements

Updates to the Dockerfile to correctly start the component
Improvements in the way of starting new connectors
Correction in the setup.py script

Change-Id: I69f6e4a45a3e0e044a0fb0b741d9776262949500
This commit is contained in:
Andreas Tsagkaropoulos 2024-04-18 12:49:57 +03:00
parent 20ed706104
commit 582fac71f0
6 changed files with 42 additions and 28 deletions

View File

@ -2,3 +2,4 @@ ignored:
- DL3008 - DL3008
- SC1091 - SC1091
- DL3015 - DL3015
- DL3015

View File

@ -47,4 +47,5 @@ RUN pip install --no-cache-dir monitoring-data-persistor-$VERSION.tar.gz && \
rm monitoring-data-persistor-$VERSION.tar.gz rm monitoring-data-persistor-$VERSION.tar.gz
# Define the entry point command # Define the entry point command
CMD ["/bin/bash", "-c", "source /home/monitoring-data-env/bin/activate && python3 -u /home/main/runtime/DataPersistor.py /home/resources/config.properties > $LOG_FILE 2>&1"] # CMD ["/bin/bash", "-c", "source /home/monitoring-data-env/bin/activate && python3 -u /home/main/runtime/DataPersistor.py /home/resources/config.properties > $LOG_FILE 2>&1"]
CMD ["/usr/local/bin/start_dp","/home/resources/config.properties","> $LOG_FILE 2>&1"]

View File

@ -1,4 +1,5 @@
class Constants: class Constants:
data_persistor_name = "data_persistor"
application_name_prefix = "nebulous_" application_name_prefix = "nebulous_"
start_forecasting_prefix = "topic://eu.nebulouscloud.forecasting.start_forecasting." start_forecasting_prefix = "topic://eu.nebulouscloud.forecasting.start_forecasting."
forecasting_method_name = "exponentialsmoothing" forecasting_method_name = "exponentialsmoothing"

View File

@ -6,8 +6,8 @@ from jproperties import Properties
from influxdb_client import Point, WritePrecision from influxdb_client import Point, WritePrecision
import exn import exn
from Constants import Constants from main.runtime.Constants import Constants
from InfluxDBConnector import InfluxDBConnector from main.runtime.InfluxDBConnector import InfluxDBConnector
from exn import connector, core from exn import connector, core
from exn.core.handler import Handler from exn.core.handler import Handler
from exn.handler.connector_handler import ConnectorHandler from exn.handler.connector_handler import ConnectorHandler
@ -34,7 +34,13 @@ class ConsumerHandler(Handler):
self.influx_connector.write_data(point,self.application_name) self.influx_connector.write_data(point,self.application_name)
class GenericConsumerHandler(Handler): class GenericConsumerHandler(Handler):
connector_thread = None
initialized_connector = None
application_consumer_handler_connectors = {} #dictionary in which keys are applications and values are the consumer handlers. application_consumer_handler_connectors = {} #dictionary in which keys are applications and values are the consumer handlers.
def GenericConsumerHandler(self):
if self.connector_thread is not None:
self.initialized_connector.stop()
def on_message(self, key, address, body, context, **kwargs): def on_message(self, key, address, body, context, **kwargs):
if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic): if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic):
@ -42,22 +48,24 @@ class GenericConsumerHandler(Handler):
logging.info("New metrics list message for application "+application_name + " - registering new connector") logging.info("New metrics list message for application "+application_name + " - registering new connector")
if (application_name in self.application_consumer_handler_connectors.keys() is not None): if (application_name in self.application_consumer_handler_connectors.keys() is not None):
self.application_consumer_handler_connectors[application_name].stop() self.application_consumer_handler_connectors[application_name].stop()
connector = exn.connector.EXN('data_persistor-'+application_name, handler=Bootstrap(), self.initialized_connector = exn.connector.EXN(Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(),
consumers=[ consumers=[
core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.realtime.>', application=application_name,topic=True, fqdn=True, handler=ConsumerHandler(application_name=application_name)), core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.realtime.>', application=application_name,topic=True, fqdn=True, handler=ConsumerHandler(application_name=application_name)),
], ],
url=Constants.broker_ip, url=Constants.broker_ip,
port=Constants.broker_port, port=Constants.broker_port,
username=Constants.broker_username, username=Constants.broker_username,
password=Constants.broker_password password=Constants.broker_password
) )
#connector.start() #connector.start()
self.application_consumer_handler_connectors[application_name] = connector self.application_consumer_handler_connectors[application_name] = self.initialized_connector
logging.info(f"Application specific connector registered for application {application_name}") logging.info(f"Application specific connector registered for application {application_name}")
thread = threading.Thread(target=connector.start,args=()) self.initialized_connector.start()
thread.start() logging.info(f"Application specific connector started for application {application_name}")
from time import sleep #If threading support is explicitly required, uncomment these lines
sleep(10000) #connector_thread = threading.Thread(target=self.initialized_connector.start,args=())
#connector_thread.start()
#connector_thread.join()
def update_properties(configuration_file_location): def update_properties(configuration_file_location):
p = Properties() p = Properties()
@ -72,22 +80,25 @@ def update_properties(configuration_file_location):
Constants.organization_name,metadata = p["organization_name"] Constants.organization_name,metadata = p["organization_name"]
Constants.bucket_name,metadata = p["bucket_name"] Constants.bucket_name,metadata = p["bucket_name"]
if __name__ == "__main__": def main():
Constants.configuration_file_location = sys.argv[1] Constants.configuration_file_location = sys.argv[1]
update_properties(Constants.configuration_file_location) update_properties(Constants.configuration_file_location)
component_handler = Bootstrap() component_handler = Bootstrap()
connector = connector.EXN('slovid', handler=component_handler, connector_instance = connector.EXN(Constants.data_persistor_name, handler=component_handler,
consumers=[ consumers=[
core.consumer.Consumer('data_persistor_application', Constants.monitoring_broker_topic + '.>', topic=True, fqdn=True, handler=GenericConsumerHandler()), core.consumer.Consumer('monitoring_data', Constants.monitoring_broker_topic + '.>', topic=True, fqdn=True, handler=GenericConsumerHandler()),
], ],
url=Constants.broker_ip, url=Constants.broker_ip,
port=Constants.broker_port, port=Constants.broker_port,
username=Constants.broker_username, username=Constants.broker_username,
password=Constants.broker_password password=Constants.broker_password
) )
#connector.start() #connector.start()
thread = threading.Thread(target=connector.start,args=()) thread = threading.Thread(target=connector_instance.start,args=())
thread.start() thread.start()
print("Waiting for messages at the metric list topic, in order to start receiving applications") print("Waiting for messages at the metric list topic, in order to start receiving applications")
if __name__ == "__main__":
main()

View File

@ -2,7 +2,7 @@ import json,logging
import requests import requests
from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client import InfluxDBClient, Point, WritePrecision
from Constants import Constants from main.runtime.Constants import Constants
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
def create_influxdb_bucket(application_name): def create_influxdb_bucket(application_name):

View File

@ -9,7 +9,7 @@ setup(
package_dir={'': '.'}, package_dir={'': '.'},
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'start_dp = main.runtime:DataPersistor', 'start_dp = main.runtime.DataPersistor:main',
], ],
} }
# other setup configurations # other setup configurations