diff --git a/.hadolint.yaml b/.hadolint.yaml index 669e664..7e3a7b1 100644 --- a/.hadolint.yaml +++ b/.hadolint.yaml @@ -2,3 +2,4 @@ ignored: - DL3008 - SC1091 - DL3015 + - DL3015 diff --git a/monitoring-data-persistor/Dockerfile b/monitoring-data-persistor/Dockerfile index f83e220..a830870 100644 --- a/monitoring-data-persistor/Dockerfile +++ b/monitoring-data-persistor/Dockerfile @@ -47,4 +47,5 @@ RUN pip install --no-cache-dir monitoring-data-persistor-$VERSION.tar.gz && \ rm monitoring-data-persistor-$VERSION.tar.gz # Define the entry point command -CMD ["/bin/bash", "-c", "source /home/monitoring-data-env/bin/activate && python3 -u /home/main/runtime/DataPersistor.py /home/resources/config.properties > $LOG_FILE 2>&1"] +# CMD ["/bin/bash", "-c", "source /home/monitoring-data-env/bin/activate && python3 -u /home/main/runtime/DataPersistor.py /home/resources/config.properties > $LOG_FILE 2>&1"] +CMD ["/usr/local/bin/start_dp","/home/resources/config.properties","> $LOG_FILE 2>&1"] \ No newline at end of file diff --git a/monitoring-data-persistor/src/main/runtime/Constants.py b/monitoring-data-persistor/src/main/runtime/Constants.py index 0c667c0..b5d9286 100644 --- a/monitoring-data-persistor/src/main/runtime/Constants.py +++ b/monitoring-data-persistor/src/main/runtime/Constants.py @@ -1,4 +1,5 @@ class Constants: + data_persistor_name = "data_persistor" application_name_prefix = "nebulous_" start_forecasting_prefix = "topic://eu.nebulouscloud.forecasting.start_forecasting." forecasting_method_name = "exponentialsmoothing" diff --git a/monitoring-data-persistor/src/main/runtime/DataPersistor.py b/monitoring-data-persistor/src/main/runtime/DataPersistor.py index 2b9db6a..22bdbe6 100644 --- a/monitoring-data-persistor/src/main/runtime/DataPersistor.py +++ b/monitoring-data-persistor/src/main/runtime/DataPersistor.py @@ -6,8 +6,8 @@ from jproperties import Properties from influxdb_client import Point, WritePrecision import exn -from Constants import Constants -from InfluxDBConnector import InfluxDBConnector +from main.runtime.Constants import Constants +from main.runtime.InfluxDBConnector import InfluxDBConnector from exn import connector, core from exn.core.handler import Handler from exn.handler.connector_handler import ConnectorHandler @@ -34,7 +34,13 @@ class ConsumerHandler(Handler): self.influx_connector.write_data(point,self.application_name) class GenericConsumerHandler(Handler): + connector_thread = None + initialized_connector = None application_consumer_handler_connectors = {} #dictionary in which keys are applications and values are the consumer handlers. + + def GenericConsumerHandler(self): + if self.connector_thread is not None: + self.initialized_connector.stop() def on_message(self, key, address, body, context, **kwargs): if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic): @@ -42,22 +48,24 @@ class GenericConsumerHandler(Handler): logging.info("New metrics list message for application "+application_name + " - registering new connector") if (application_name in self.application_consumer_handler_connectors.keys() is not None): self.application_consumer_handler_connectors[application_name].stop() - connector = exn.connector.EXN('data_persistor-'+application_name, handler=Bootstrap(), - consumers=[ + self.initialized_connector = exn.connector.EXN(Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(), + consumers=[ core.consumer.Consumer('monitoring', Constants.monitoring_broker_topic + '.realtime.>', application=application_name,topic=True, fqdn=True, handler=ConsumerHandler(application_name=application_name)), ], - url=Constants.broker_ip, - port=Constants.broker_port, - username=Constants.broker_username, - password=Constants.broker_password - ) + url=Constants.broker_ip, + port=Constants.broker_port, + username=Constants.broker_username, + password=Constants.broker_password + ) #connector.start() - self.application_consumer_handler_connectors[application_name] = connector + self.application_consumer_handler_connectors[application_name] = self.initialized_connector logging.info(f"Application specific connector registered for application {application_name}") - thread = threading.Thread(target=connector.start,args=()) - thread.start() - from time import sleep - sleep(10000) + self.initialized_connector.start() + logging.info(f"Application specific connector started for application {application_name}") + #If threading support is explicitly required, uncomment these lines + #connector_thread = threading.Thread(target=self.initialized_connector.start,args=()) + #connector_thread.start() + #connector_thread.join() def update_properties(configuration_file_location): p = Properties() @@ -72,22 +80,25 @@ def update_properties(configuration_file_location): Constants.organization_name,metadata = p["organization_name"] Constants.bucket_name,metadata = p["bucket_name"] -if __name__ == "__main__": +def main(): Constants.configuration_file_location = sys.argv[1] update_properties(Constants.configuration_file_location) component_handler = Bootstrap() - connector = connector.EXN('slovid', handler=component_handler, - consumers=[ - core.consumer.Consumer('data_persistor_application', Constants.monitoring_broker_topic + '.>', topic=True, fqdn=True, handler=GenericConsumerHandler()), + connector_instance = connector.EXN(Constants.data_persistor_name, handler=component_handler, + consumers=[ + core.consumer.Consumer('monitoring_data', Constants.monitoring_broker_topic + '.>', topic=True, fqdn=True, handler=GenericConsumerHandler()), ], - url=Constants.broker_ip, - port=Constants.broker_port, - username=Constants.broker_username, - password=Constants.broker_password - ) + url=Constants.broker_ip, + port=Constants.broker_port, + username=Constants.broker_username, + password=Constants.broker_password + ) #connector.start() - thread = threading.Thread(target=connector.start,args=()) + thread = threading.Thread(target=connector_instance.start,args=()) thread.start() - print("Waiting for messages at the metric list topic, in order to start receiving applications") \ No newline at end of file + print("Waiting for messages at the metric list topic, in order to start receiving applications") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/monitoring-data-persistor/src/main/runtime/InfluxDBConnector.py b/monitoring-data-persistor/src/main/runtime/InfluxDBConnector.py index 2dcf630..040cde3 100644 --- a/monitoring-data-persistor/src/main/runtime/InfluxDBConnector.py +++ b/monitoring-data-persistor/src/main/runtime/InfluxDBConnector.py @@ -2,7 +2,7 @@ import json,logging import requests from influxdb_client import InfluxDBClient, Point, WritePrecision -from Constants import Constants +from main.runtime.Constants import Constants from influxdb_client.client.write_api import SYNCHRONOUS def create_influxdb_bucket(application_name): diff --git a/monitoring-data-persistor/src/setup.py b/monitoring-data-persistor/src/setup.py index 2d09305..bd9ccdd 100644 --- a/monitoring-data-persistor/src/setup.py +++ b/monitoring-data-persistor/src/setup.py @@ -9,7 +9,7 @@ setup( package_dir={'': '.'}, entry_points={ 'console_scripts': [ - 'start_dp = main.runtime:DataPersistor', + 'start_dp = main.runtime.DataPersistor:main', ], } # other setup configurations