diff --git a/.hadolint.yaml b/.hadolint.yaml new file mode 100644 index 0000000..0aa5f7c --- /dev/null +++ b/.hadolint.yaml @@ -0,0 +1,2 @@ +ignored: + - DL3008 \ No newline at end of file diff --git a/charts/nebulous-exponential-smoothing-predictor/values.yaml b/charts/nebulous-exponential-smoothing-predictor/values.yaml index ca1a32d..015c22a 100644 --- a/charts/nebulous-exponential-smoothing-predictor/values.yaml +++ b/charts/nebulous-exponential-smoothing-predictor/values.yaml @@ -5,7 +5,7 @@ replicaCount: 1 image: - repository: "quay.io/nebulous/exponential-smoothing-predictor-java-spring-boot-demo" + repository: "quay.io/nebulous/exponential-smoothing-predictor" pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. tag: "" diff --git a/exponential-smoothing-predictor/Dockerfile b/exponential-smoothing-predictor/Dockerfile index 15c980b..5434210 100644 --- a/exponential-smoothing-predictor/Dockerfile +++ b/exponential-smoothing-predictor/Dockerfile @@ -9,64 +9,65 @@ FROM python:3.11 as source RUN mkdir /src COPY ./src/ /src/ - +#COPY src/requirements.txt /src/ WORKDIR /src RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist -FROM ubuntu:noble +#FROM ubuntu:noble +FROM python:3.11-slim RUN mkdir -p /home/r_predictions #RUN apt-get update +ENV LOG_FILE=exponential_smoothing.log ENV TZ=Europe/Athens RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone ARG DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install --no-install-recommends -y \ -libcurl4-openssl-dev=8.5.0-2ubuntu1 \ -build-essential=12.10ubuntu1 \ -r-base-core=4.3.2-1build1 \ -r-base-dev=4.3.2-1build1 \ -r-cran-digest=0.6.34-1 \ -r-cran-boot=1.3-28.1-1 \ -r-cran-class=7.3-22-2 \ -r-cran-cluster=2.1.6-1 \ -r-cran-codetools=0.2-19-1 \ -r-cran-foreign=0.8.86-1 \ -r-cran-kernsmooth=2.23-22-1 \ -r-cran-lattice=0.22-5-1 \ -r-cran-littler=0.3.19-1 \ -r-cran-mass=7.3-60-2 \ -r-cran-matrix=1.6-4-1 \ -r-cran-mgcv=1.9-1-1 \ -r-cran-nlme=3.1.164-1 \ -r-cran-nnet=7.3-19-2 \ -r-cran-pkgkitten=0.2.3-1 \ -r-cran-rcpp=1.0.11-1 \ -r-cran-rpart=4.1.23-1 \ -r-cran-spatial=7.3-17-1 \ -r-cran-survival=3.5-7-1 \ -r-doc-html=4.3.2-1build1 \ -r-recommended=4.3.2-1build1 \ -python3=3.11.4-5ubuntu1 \ -python3-pip=23.3+dfsg-1 \ -python3.11-venv=3.11.7-2 \ +libcurl4-openssl-dev \ +build-essential \ +r-base-core \ +r-base-dev \ +r-cran-digest \ +r-cran-boot \ +r-cran-class \ +r-cran-cluster \ +r-cran-codetools \ +r-cran-foreign \ +r-cran-kernsmooth \ +r-cran-lattice \ +r-cran-littler \ +r-cran-mass \ +r-cran-matrix \ +r-cran-mgcv \ +r-cran-nlme \ +r-cran-nnet \ +r-cran-pkgkitten \ +r-cran-rcpp \ +r-cran-rpart \ +r-cran-spatial \ +r-cran-survival \ +r-doc-html \ +r-recommended \ && rm -rf /var/lib/apt/lists/* -COPY ./src/r_predictors/r_commands.R /home/r_predictions/ +COPY --from=source /src/r_predictors/r_commands.R /home/r_predictions/ RUN Rscript /home/r_predictions/r_commands.R #install prerequisite libraries -COPY --from=source ./src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/ -COPY ./src/requirements.txt /home/r_predictions/ -COPY ./src/prepare_python_dependencies.sh /home/r_predictions/ +COPY --from=source /src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/ +COPY --from=source /src/requirements.txt /home/r_predictions/ +COPY --from=source /src/prepare_python_dependencies.sh /home/r_predictions/ RUN bash -x /home/r_predictions/prepare_python_dependencies.sh -COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/ +COPY --from=source /src/r_predictors/forecasting_real_workload.R /home/r_predictions/ +COPY --from=source /src/r_predictors/prediction_configuration.properties /home/r_predictions/ + +#The two commented lines below only serve for experiments with predictive functionality -#below two commented lines only serve for experiments with predictive functionality #COPY ./default_application.csv /home/r_predictions #RUN Rscript forecasting_real_workload.R default_application.csv MinimumCores 1638878119 -WORKDIR /home/r_predictions/esm_forecaster-0.1.0 +#WORKDIR /home/r_predictions/esm_forecaster-0.1.0 -CMD ["/bin/sh","-c",". /home/forecasting_env/bin/activate && python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "] +CMD ["/usr/local/bin/start_exsmoothing","/home/r_predictions/prediction_configuration.properties"," > $LOG_FILE 2>&1 "] diff --git a/exponential-smoothing-predictor/src/exn/__init__.py b/exponential-smoothing-predictor/src/exn/__init__.py new file mode 100644 index 0000000..0b73db1 --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/__init__.py @@ -0,0 +1,6 @@ + + +from . import core +from . import handler +from . import settings +from . import connector \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/exn/connector.py b/exponential-smoothing-predictor/src/exn/connector.py new file mode 100644 index 0000000..d0193c2 --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/connector.py @@ -0,0 +1,84 @@ +import logging +import os + +from proton.reactor import Container + +from exn.core import state_publisher, schedule_publisher +from exn.core.context import Context +from .core.manager import Manager +from .settings import base +from .handler import connector_handler + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +_logger = logging.getLogger(__name__) + + +class EXN: + + context = None + container = None + + def __init__(self, component=None, + handler:connector_handler.ConnectorHandler = None, + publishers=None, + consumers=None, + **kwargs): + + # Load .env file + # Validate and set connector + if not component: + _logger.error("Component cannot be empty or None") + raise ValueError("Component cannot be empty or None") + self.component = component + + self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL')) + self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT')) + self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME')) + self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD')) + self.handler = handler + + # Validate attributes + if not self.url: + _logger.error("URL cannot be empty or None") + raise ValueError("URL cannot be empty or None") + if not self.port: + _logger.error("PORT cannot be empty or None") + raise ValueError("PORT cannot be empty or None") + if not self.username: + _logger.error("USERNAME cannot be empty or None") + raise ValueError("USERNAME cannot be empty or None") + if not self.password: + _logger.error("PASSWORD cannot be empty or None") + raise ValueError("PASSWORD cannot be empty or None") + + self.context = Context(base=f"{base.NEBULOUS_BASE_NAME}.{self.component}") + + if not publishers: + publishers = [] + + if not consumers: + consumers = [] + + compiled_publishers = publishers + if kwargs.get("enable_state",False): + compiled_publishers.append(state_publisher.Publisher()) + + if kwargs.get("enable_health",False): + compiled_publishers.append(schedule_publisher.Publisher( + base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT, + 'health', + 'health', + topic=True)) + + for c in consumers: + self.context.register_consumers(c) + + for p in compiled_publishers: + self.context.register_publisher(p) + + def start(self): + self.context.start(Manager(f"{self.url}:{self.port}"),self.handler) + + + def stop(self): + self.context.stop() diff --git a/exponential-smoothing-predictor/src/runtime/exn/core/__init__.py b/exponential-smoothing-predictor/src/exn/core/__init__.py similarity index 85% rename from exponential-smoothing-predictor/src/runtime/exn/core/__init__.py rename to exponential-smoothing-predictor/src/exn/core/__init__.py index bdc524b..8fb3eb4 100644 --- a/exponential-smoothing-predictor/src/runtime/exn/core/__init__.py +++ b/exponential-smoothing-predictor/src/exn/core/__init__.py @@ -1,6 +1,7 @@ from . import context +from . import handler from . import publisher from . import consumer from . import state_publisher diff --git a/exponential-smoothing-predictor/src/exn/core/consumer.py b/exponential-smoothing-predictor/src/exn/core/consumer.py new file mode 100644 index 0000000..a92a1c2 --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/core/consumer.py @@ -0,0 +1,43 @@ +import logging + +from proton import Event +from .handler import Handler + +from . import link + +from proton.handlers import MessagingHandler + +_logger = logging.getLogger(__name__) +_logger.setLevel(level=logging.DEBUG) + + +class Consumer(link.Link, MessagingHandler): + application = None + + def __init__(self, key, address, handler: Handler, application=None, topic=False, fqdn=False): + super(Consumer, self).__init__(key, address, topic, fqdn) + self.application = application + self.handler = handler + self.handler._consumer = self + + def should_handle(self, event: Event): + + should = event.link.name == self._link.name and \ + (self.application is None or event.message.subject == self.application) + + _logger.debug(f"[{self.key}] checking if link is the same {event.link.name}={self._link.name} " + f" and application {self.application}={event.message.subject} == {should}") + + return should + + def on_start(self, event: Event) -> None: + _logger.debug(f"[{self.key}] on_start") + + def on_message(self, event): + _logger.debug(f"[{self.key}] handling event with address => {event.message.address}") + try: + if self.should_handle(event): + self.handler.on_message(self.key, event.message.address, event.message.body, event.message) + + except Exception as e: + _logger.error(f"Received message: {e}") diff --git a/exponential-smoothing-predictor/src/exn/core/context.py b/exponential-smoothing-predictor/src/exn/core/context.py new file mode 100644 index 0000000..db5cde6 --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/core/context.py @@ -0,0 +1,109 @@ +import logging + +from proton.reactor import Container + +from . import link +from .manager import Manager + + +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.DEBUG) + +class Context: + + base = None + handler = None + publishers = {} + consumers = {} + _manager = None + + def __init__(self, base): + + self.base = base + + def start(self, manager:Manager, handler): + self._manager = manager + + def on_ready(): + _logger.debug("[context] on_ready" ) + for key,publisher in self.publishers.items(): + self._manager.start_publisher(self,publisher) + + for key,consumer in self.consumers.items(): + self._manager.start_consumer(self,consumer) + + handler.ready(context=self) + + self._manager._on_ready=on_ready + self._manager.start() + + def stop(self): + if self._manager is not None and self._manager.started: + for key,publisher in self.publishers: + publisher._link.close() + for key,consumer in self.consumers: + consumer._link.close() + + self._manager.close() + + + def register_publisher(self, publisher): + if publisher.key in self.publishers: + _logger.warning("[context] Trying to register publisher that already exists") + return + _logger.info(f"[context] registering publisher {publisher.key} {publisher.address}" ) + self.publishers[publisher.key] = publisher + if self._manager is not None and self._manager.started: + self._manager.start_publisher(self,publisher) + + def get_publisher(self, key): + if key in self.publishers: + return self.publishers[key] + return None + + def has_publisher(self, key): + return key in self.publishers + + def has_consumer(self, key): + return key in self.consumers + + def register_consumers(self, consumer): + if consumer.key in self.consumers: + _logger.warning("[context] Trying to register consumer that already exists") + return + + self.consumers[consumer.key] = consumer + if self._manager is not None and self._manager.started: + self._manager.start_consumer(self,consumer) + + def unregister_consumer(self, key): + if not key in self.consumers: + _logger.warning("[context] Trying to unregister consumer that does not exists") + return + + consumer = self.consumers.pop(key) + if self._manager is not None and self._manager.started: + consumer._link.close() + + def unregister_publisher(self, key): + if not key in self.consumers: + _logger.warning("[context] Trying to unregister publisher that does not exists") + return + publisher = self.publishers.pop(key) + if self._manager is not None and self._manager.started: + publisher._link.close() + + def build_address_from_link(self, link: link.Link): + + if link.fqdn: + address = link.address + if link.topic and not link.address.startswith("topic://"): + address = f"topic://{address}" + return address + + address = f"{self.base}.{link.address}" + if link.topic: + address = f"topic://{address}" + + return address + diff --git a/exponential-smoothing-predictor/src/exn/core/handler.py b/exponential-smoothing-predictor/src/exn/core/handler.py new file mode 100644 index 0000000..8fd1bfb --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/core/handler.py @@ -0,0 +1,11 @@ +import logging + +from proton import Message + +_logger = logging.getLogger(__name__) + + +class Handler: + + def on_message(self, key, address, body, message: Message, context=None): + _logger.info(f"You should really override this... {key}=>{address}") diff --git a/exponential-smoothing-predictor/src/runtime/exn/core/link.py b/exponential-smoothing-predictor/src/exn/core/link.py similarity index 93% rename from exponential-smoothing-predictor/src/runtime/exn/core/link.py rename to exponential-smoothing-predictor/src/exn/core/link.py index 65d6395..bf91199 100644 --- a/exponential-smoothing-predictor/src/runtime/exn/core/link.py +++ b/exponential-smoothing-predictor/src/exn/core/link.py @@ -1,16 +1,18 @@ + from proton import Link as pLink + class Link: fqdn=False - def __init__(self, key, address, topic=False, fqdn=False): + def __init__(self, key, address, topic=False, fqdn=False): + super().__init__() self.key = key self.address = address - self._link = None self.topic= topic self.fqdn= fqdn - + self._link = None def set(self, link:pLink): # The proton container creates a sender diff --git a/exponential-smoothing-predictor/src/exn/core/manager.py b/exponential-smoothing-predictor/src/exn/core/manager.py new file mode 100644 index 0000000..dd4025e --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/core/manager.py @@ -0,0 +1,71 @@ +import logging + +from proton import Event, Connection,Session + +from proton.handlers import MessagingHandler +from proton.reactor import Container + +from .consumer import Consumer +from .publisher import Publisher + +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.DEBUG) + + +class SessionPerConsumer(object): + def session(self, connection: Connection) -> Session: + session = connection.session() + session.open() + return session + + +class Manager(MessagingHandler): + uri = None + started = False + container = None + connection = None + + _on_ready = None + + def __init__(self, uri): + super(Manager, self).__init__() + self.uri = uri + + def start(self): + _logger.info(f"[manager] starting") + self.container = Container(self) + self.container.run() + + def on_start(self, event: Event) -> None: + self.connection = self.container.connect(self.uri) + self.connection._session_policy=SessionPerConsumer() + + self.started=True + _logger.debug(f"[manager] on_start") + if self._on_ready is not None: + self._on_ready() + + def on_message(self, event: Event) -> None: + _logger.warning(f"[manager] received generic on_message make sure you have set up your handlers" + f" properly ") + + def close(self): + _logger.info(f"[manager] closing") + if self.container: + self.container.stop() + + if self.connection: + self.connection.close() + + def start_publisher(self, context, publisher: Publisher): + address = context.build_address_from_link(publisher) + _logger.info(f"[manager] starting publisher {publisher.key} => {address}") + publisher.set(self.container.create_sender(self.connection, address)) + if hasattr(publisher, "delay"): + _logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}") + self.container.schedule(publisher.delay, handler=publisher) + + def start_consumer(self, context, consumer: Consumer): + address = context.build_address_from_link(consumer) + _logger.info(f"[manager] starting consumer {consumer.key} => {address}") + consumer.set(self.container.create_receiver(self.connection, address , handler=consumer)) diff --git a/exponential-smoothing-predictor/src/runtime/exn/core/publisher.py b/exponential-smoothing-predictor/src/exn/core/publisher.py similarity index 72% rename from exponential-smoothing-predictor/src/runtime/exn/core/publisher.py rename to exponential-smoothing-predictor/src/exn/core/publisher.py index 2768c5d..e15ec6a 100644 --- a/exponential-smoothing-predictor/src/runtime/exn/core/publisher.py +++ b/exponential-smoothing-predictor/src/exn/core/publisher.py @@ -10,12 +10,15 @@ _logger = logging.getLogger(__name__) class Publisher(link.Link): - def send(self, body=None): + def send(self, body=None, application=None): if not body: body = {} - _logger.debug(f"{self.address} Sending {body} ") + _logger.info(f"[{self.key}] sending to {self._link.target.address} for application={application} - {body} ") msg = self._prepare_message(body) + if application: + msg.subject = application + self._link.send(msg) def _prepare_message(self, body=None): diff --git a/exponential-smoothing-predictor/src/exn/core/schedule_publisher.py b/exponential-smoothing-predictor/src/exn/core/schedule_publisher.py new file mode 100644 index 0000000..f7dc10c --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/core/schedule_publisher.py @@ -0,0 +1,24 @@ +import logging + +from proton.handlers import MessagingHandler +from .publisher import Publisher + +_logger = logging.getLogger(__name__) + + +class Publisher(Publisher, MessagingHandler): + send_next = False + delay = 15 + + def __init__(self, delay, key, address, application=None, topic=False, fqdn=False): + super(Publisher, self).__init__(key, address, topic,fqdn) + self.delay = delay + self.application = application + + def on_timer_task(self, event): + _logger.debug(f"[manager] on_timer_task") + self.send() + event.reactor.schedule(self.delay, self) + + def send(self, body=None, application=None): + super(Publisher, self).send(body, self.application) diff --git a/exponential-smoothing-predictor/src/runtime/exn/core/state_publisher.py b/exponential-smoothing-predictor/src/exn/core/state_publisher.py similarity index 74% rename from exponential-smoothing-predictor/src/runtime/exn/core/state_publisher.py rename to exponential-smoothing-predictor/src/exn/core/state_publisher.py index 8ea6af7..f8ae1cb 100644 --- a/exponential-smoothing-predictor/src/runtime/exn/core/state_publisher.py +++ b/exponential-smoothing-predictor/src/exn/core/state_publisher.py @@ -27,19 +27,19 @@ class Publisher(publisher.Publisher): self.send({"state": message_type,"message": None}) def starting(self): - self._send_message(States.STARTING) + self._send_message(States.STARTING.value) def started(self): - self._send_message(States.STARTED) + self._send_message(States.STARTED.value) def ready(self): - self._send_message(States.READY) + self._send_message(States.READY.value) def stopping(self): - self._send_message(States.STOPPING) + self._send_message(States.STOPPING.value) def stopped(self): - self._send_message(States.STOPPED) + self._send_message(States.STOPPED.value) def custom(self, state): self._send_message(state) diff --git a/exponential-smoothing-predictor/src/exn/handler/__init__.py b/exponential-smoothing-predictor/src/exn/handler/__init__.py new file mode 100644 index 0000000..a7a404b --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/handler/__init__.py @@ -0,0 +1,2 @@ + +from . import connector_handler \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/exn/handler/connector_handler.py b/exponential-smoothing-predictor/src/exn/handler/connector_handler.py new file mode 100644 index 0000000..dafe9d8 --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/handler/connector_handler.py @@ -0,0 +1,12 @@ + +import logging + + +_logger = logging.getLogger(__name__) + +class ConnectorHandler: + + def ready(self, context): + pass + + diff --git a/exponential-smoothing-predictor/src/runtime/exn/settings/__init__.py b/exponential-smoothing-predictor/src/exn/settings/__init__.py similarity index 100% rename from exponential-smoothing-predictor/src/runtime/exn/settings/__init__.py rename to exponential-smoothing-predictor/src/exn/settings/__init__.py diff --git a/exponential-smoothing-predictor/src/exn/settings/base.py b/exponential-smoothing-predictor/src/exn/settings/base.py new file mode 100644 index 0000000..baf5475 --- /dev/null +++ b/exponential-smoothing-predictor/src/exn/settings/base.py @@ -0,0 +1,2 @@ +NEBULOUS_BASE_NAME="eu.nebulouscloud" +NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT=15 \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/prepare_python_dependencies.sh b/exponential-smoothing-predictor/src/prepare_python_dependencies.sh index a9d6707..64e8d5c 100644 --- a/exponential-smoothing-predictor/src/prepare_python_dependencies.sh +++ b/exponential-smoothing-predictor/src/prepare_python_dependencies.sh @@ -1,10 +1,10 @@ EXPONENTIAL_SMOOTHING_VERSION="0.1.0" -python3 -m venv /home/forecasting_env -. /home/forecasting_env/bin/activate +##python3 -m venv /home/forecasting_env +##. /home/forecasting_env/bin/activate pip3 install --no-cache-dir -r /home/r_predictions/requirements.txt cd /home/r_predictions # Install the module itself (provided that the tar.gz file of the module has already been copied inside the container) -pip install esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz -tar -xzvf esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz +pip install esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz #--break-system-packages +##tar -xzvf esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz diff --git a/exponential-smoothing-predictor/src/r_predictors/forecasting_real_workload.R b/exponential-smoothing-predictor/src/r_predictors/forecasting_real_workload.R index 88c08ac..36ee019 100644 --- a/exponential-smoothing-predictor/src/r_predictors/forecasting_real_workload.R +++ b/exponential-smoothing-predictor/src/r_predictors/forecasting_real_workload.R @@ -20,8 +20,8 @@ library(purrr) # # This forecasting script relies on the presence of a dataset which contains the metric values to be forecasted. It is called with three main parameters - dataset path, metric to be forecasted and the time for which the forecast should be produced - and two optional parameters, the alpha and beta coefficients to be used during forecasting. The time for which the forecast should be produced may be ommitted under some circumstances. # -# To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds). -# Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the minimum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is also needed(TODO:perhaps this behaviour should be changed). +# To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds). Beware, this number_of_seconds_to_aggregate_on variable is changed (probably increased) at runtime from its initial configuration so that the forecaster does not consume too much time trying to create predictions. This means that more observations will be necessary to guarantee accurate predictions +# Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the maximum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is needed as well. #Then, the final data points which will be used for the forecasting are determined, and the forecasting models are created, to produce predictions. The user of the script can opt to try finding the best parameters manually, using the `try_to_optimize_parameters` configuration parameter. find_smape <- function(actual, forecast) { @@ -92,7 +92,14 @@ beta_value_argument <- as.double(args[5]) #mydata <- read.csv(configuration_properties$input_data_file, sep=",", header=TRUE) #mydata <- read.csv(dataset_to_process, sep=",", header=TRUE) -data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE) +if (file.info(dataset_to_process)$size > 0) { + # File is not empty, proceed with reading + data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE) +} else { + # File is empty, handle accordingly (e.g., show a message or skip the reading process) + print(paste("The file ",dataset_to_process," is empty. Please provide a non-empty file.")) + stop() +} #sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process) current_time <- get_current_epoch_time() if (!realtime_mode){ @@ -293,13 +300,13 @@ if (try_to_optimize_parameters){ #Creation of forecasting model if (try_to_optimize_parameters){ - holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=optimal_alpha,beta=optimal_beta,gamma=optimal_gamma) +holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=optimal_alpha,beta=optimal_beta,gamma=optimal_gamma) - ets_forecasting_model <- tryCatch({ - ets(mydata_trainseries,alpha = optimal_alpha,beta = optimal_beta,gamma = optimal_gamma) #phi is left to be optimized - }, error = function(e) { - NULL - }) +ets_forecasting_model <- tryCatch({ +ets(mydata_trainseries,alpha = optimal_alpha,beta = optimal_beta,gamma = optimal_gamma) #phi is left to be optimized +}, error = function(e) { +NULL +}) @@ -332,6 +339,19 @@ if (try_to_optimize_parameters){ }, error = function(e) { NULL }) + if (length(mydata_trainseries)<3){ + print("Possible issue expected with a very small trainseries (length is less than 3). The contents of the trainseries are the following:") + print(mydata_trainseries) + print("This trainseries originated from the following aggregated data:") + print(mydata.train) + print(paste("The number of seconds to aggregate on is:",number_of_seconds_to_aggregate_on)) + print("The above aggregated data was based on these training data points: ") + print(training_datapoints) + print("These training data points originate from these data points:") + print(data_points) + print(paste("by using the first", number_of_data_points_used_for_training, "data points")) + + } holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,gamma=FALSE) } } diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties index 9dc0496..3afcc14 100644 --- a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties +++ b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties @@ -5,15 +5,15 @@ APP_NAME=default_application METHOD=exponential_smoothing -INFLUXDB_HOSTNAME=localhost +INFLUXDB_HOSTNAME=nebulous-influxdb INFLUXDB_PORT=8086 -INFLUXDB_USERNAME=morphemic -INFLUXDB_PASSWORD=password -INFLUXDB_DBNAME=morphemic -INFLUXDB_ORG=morphemic +INFLUXDB_USERNAME=my-user +INFLUXDB_PASSWORD=my-password +INFLUXDB_ORG=my-org +INFLUXDB_ORG_ID=9c929742d57cca02 -broker_address=localhost -broker_port=61613 +broker_address=nebulous-activemq +broker_port=5672 broker_username=admin broker_password=admin prediction_method=Holt-Winters diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties index 47623fc..4175ec9 100644 --- a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties +++ b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties @@ -1,14 +1,14 @@ -#Fri Jan 12 17:06:48 UTC 2024 +#Tue May 14 12:59:33 UTC 2024 APP_NAME=default_application METHOD=exponential_smoothing -INFLUXDB_HOSTNAME=localhost +INFLUXDB_HOSTNAME=nebulous-influxdb INFLUXDB_PORT=8086 -INFLUXDB_USERNAME=morphemic -INFLUXDB_PASSWORD=password -INFLUXDB_DBNAME=morphemic -INFLUXDB_ORG=morphemic -broker_address=localhost -broker_port=61610 +INFLUXDB_USERNAME=my-user +INFLUXDB_PASSWORD=my-password +INFLUXDB_ORG=my-org +INFLUXDB_ORG_ID=9c929742d57cca02 +broker_address=nebulous-activemq +broker_port=5672 broker_username=morphemic broker_password=morphemic prediction_method=Holt-Winters diff --git a/exponential-smoothing-predictor/src/requirements.txt b/exponential-smoothing-predictor/src/requirements.txt index 54e7dfd..ff95d8a 100644 --- a/exponential-smoothing-predictor/src/requirements.txt +++ b/exponential-smoothing-predictor/src/requirements.txt @@ -1,9 +1,7 @@ -python-slugify==8.0.1 jproperties==2.1.1 requests==2.31.0 -msgpack==1.0.7 numpy==1.26.3 -pandas==2.1.4 -python-dotenv==1.0.0 python-qpid-proton==0.39.0 -influxdb-client==1.39.0 \ No newline at end of file +influxdb-client==1.39.0 +python-dotenv==1.0.0 +python-dateutil==2.8.2 \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index bcb5d19..631ebaf 100644 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -12,56 +12,105 @@ import os, sys import multiprocessing import traceback from subprocess import PIPE, run -from runtime.exn import core +from exn import core import logging -from runtime.exn import connector +from exn import connector +from exn.core.handler import Handler +from exn.handler.connector_handler import ConnectorHandler +from runtime.operational_status.ApplicationState import ApplicationState from runtime.predictions.Prediction import Prediction -from runtime.operational_status.State import State +from runtime.operational_status.EsPredictorState import EsPredictorState from runtime.utilities.PredictionPublisher import PredictionPublisher from runtime.utilities.Utilities import Utilities print_with_time = Utilities.print_with_time +def sanitize_prediction_statistics(prediction_confidence_interval, prediction_value, metric_name, application_state): + + print_with_time("Inside the sanitization process with an interval of " + prediction_confidence_interval +" and a prediction of " + str(prediction_value)) + lower_value_prediction_confidence_interval = float(prediction_confidence_interval.split(",")[0]) + upper_value_prediction_confidence_interval = float(prediction_confidence_interval.split(",")[1]) + + """if (not application_name in EsPredictorState.individual_application_state): + print_with_time("There is an issue with the application name"+application_name+" not existing in individual application states") + return prediction_confidence_interval,prediction_value_produced""" + + lower_bound_value = application_state.lower_bound_value + upper_bound_value = application_state.upper_bound_value + + print("Lower_bound_value is "+str(lower_bound_value)) + confidence_interval_modified = False + new_prediction_confidence_interval = prediction_confidence_interval + if (not (metric_name in lower_bound_value)) or (not (metric_name in upper_bound_value)): + print_with_time(f"Lower value is unmodified - {lower_value_prediction_confidence_interval} and upper value is unmodified - {upper_value_prediction_confidence_interval}") + return new_prediction_confidence_interval,prediction_value + + if (lower_value_prediction_confidence_interval < lower_bound_value[metric_name]): + lower_value_prediction_confidence_interval = lower_bound_value[metric_name] + confidence_interval_modified = True + elif (lower_value_prediction_confidence_interval > upper_bound_value[metric_name]): + lower_value_prediction_confidence_interval = upper_bound_value[metric_name] + confidence_interval_modified = True + if (upper_value_prediction_confidence_interval> upper_bound_value[metric_name]): + upper_value_prediction_confidence_interval = upper_bound_value[metric_name] + confidence_interval_modified = True + elif (upper_value_prediction_confidence_interval < lower_bound_value[metric_name]): + upper_value_prediction_confidence_interval = lower_bound_value[metric_name] + confidence_interval_modified = True + if confidence_interval_modified: + new_prediction_confidence_interval = str(lower_value_prediction_confidence_interval)+","+str(upper_value_prediction_confidence_interval) + print_with_time("The confidence interval "+prediction_confidence_interval+"was modified, becoming "+str(new_prediction_confidence_interval)+", taking into account the values of the metric") + if (prediction_value upper_bound_value[metric_name]): + print_with_time("The prediction value of " + str(prediction_value) + " for metric " + metric_name + " was sanitized to " + str(upper_bound_value)) + prediction_value = upper_bound_value + + return new_prediction_confidence_interval,prediction_value - - -def predict_attribute(attribute, configuration_file_location,next_prediction_time): +def predict_attribute(application_state, attribute, configuration_file_location,next_prediction_time): prediction_confidence_interval_produced = False prediction_value_produced = False prediction_valid = False #os.chdir(os.path.dirname(configuration_file_location)) - State.prediction_data_filename = Utilities.get_prediction_data_filename(configuration_file_location,attribute) + application_state.prediction_data_filename = application_state.get_prediction_data_filename(configuration_file_location,attribute) from sys import platform - if State.testing_prediction_functionality: + if EsPredictorState.testing_prediction_functionality: print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data") - print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(State.prediction_data_filename)+" "+attribute) + print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename)+" "+attribute) # Windows if platform == "win32": - command = ['Rscript', 'forecasting_real_workload.R', State.prediction_data_filename, attribute] + os.chdir("exponential-smoothing-predictor/src/r_predictors") + command = ['Rscript', 'forecasting_real_workload.R', application_state.prediction_data_filename, attribute] # linux elif platform == "linux" or platform == "linux2": - command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)] + os.chdir("/home/r_predictions") + command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)] #Choosing the solution of linux else: - command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)] + command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)] else: print_with_time("The current directory is "+os.path.abspath(os.getcwd())) - print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(State.prediction_data_filename)+" "+attribute+" "+next_prediction_time) + print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename)+" "+attribute+" "+next_prediction_time) # Windows if platform == "win32": - command = ['Rscript', 'forecasting_real_workload.R', State.prediction_data_filename, attribute, next_prediction_time] + os.chdir("exponential-smoothing-predictor/src/r_predictors") + command = ['Rscript', 'forecasting_real_workload.R', application_state.prediction_data_filename, attribute, next_prediction_time] # Linux elif platform == "linux" or platform == "linux2": - command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"] + os.chdir("/home/r_predictions") + command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"] #Choosing the solution of linux else: - command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time)] + os.chdir("/home/r_predictions") + command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time)] process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) if (process_output.stdout==""): @@ -91,6 +140,7 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim elif (string.startswith("smape:")): prediction_smape = string.replace("smape:", "") if (prediction_confidence_interval_produced and prediction_value_produced): + prediction_confidence_interval,prediction_value = sanitize_prediction_statistics(prediction_confidence_interval,float(prediction_value),attribute,application_state) prediction_valid = True print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval) else: @@ -101,7 +151,8 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim return output_prediction -def predict_attributes(attributes,next_prediction_time): +def predict_attributes(application_state,next_prediction_time): + attributes = application_state.metrics_to_predict pool = multiprocessing.Pool(len(attributes)) print_with_time("Prediction thread pool size set to " + str(len(attributes))) attribute_predictions = {} @@ -109,7 +160,7 @@ def predict_attributes(attributes,next_prediction_time): for attribute in attributes: print_with_time("Starting " + attribute + " prediction thread") start_time = time.time() - attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, State.configuration_file_location,str(next_prediction_time)]) + attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[application_state,attribute, EsPredictorState.configuration_file_location, str(next_prediction_time)]) #attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get() for attribute in attributes: @@ -138,43 +189,45 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi return prediction_time -def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_for_prediction): - while Bootstrap.start_forecasting: - print_with_time("Using " + State.configuration_file_location + " for configuration details...") - State.next_prediction_time = update_prediction_time(State.epoch_start, prediction_horizon,maximum_time_required_for_prediction) +def calculate_and_publish_predictions(application_state,maximum_time_required_for_prediction): + start_forecasting = application_state.start_forecasting - for attribute in State.metrics_to_predict: - if ((State.previous_prediction is not None) and (State.previous_prediction[attribute] is not None) and (State.previous_prediction[attribute].last_prediction_time_needed>maximum_time_required_for_prediction)): - maximum_time_required_for_prediction = State.previous_prediction[attribute].last_prediction_time_needed + while start_forecasting: + print_with_time("Using " + EsPredictorState.configuration_file_location + " for configuration details...") + application_state.next_prediction_time = update_prediction_time(application_state.epoch_start, application_state.prediction_horizon,maximum_time_required_for_prediction) + + for attribute in application_state.metrics_to_predict: + if ((application_state.previous_prediction is not None) and (application_state.previous_prediction[attribute] is not None) and (application_state.previous_prediction[attribute].last_prediction_time_needed>maximum_time_required_for_prediction)): + maximum_time_required_for_prediction = application_state.previous_prediction[attribute].last_prediction_time_needed #Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval - wait_time = State.next_prediction_time - prediction_horizon - time.time() - print_with_time("Waiting for "+str((int(wait_time*100))/100)+" seconds, until time "+datetime.datetime.fromtimestamp(State.next_prediction_time - prediction_horizon).strftime('%Y-%m-%d %H:%M:%S')) + wait_time = application_state.next_prediction_time - application_state.prediction_horizon - time.time() + print_with_time("Waiting for "+str((int(wait_time*100))/100)+" seconds, until time "+datetime.datetime.fromtimestamp(application_state.next_prediction_time - application_state.prediction_horizon).strftime('%Y-%m-%d %H:%M:%S')) if (wait_time>0): time.sleep(wait_time) - if(not Bootstrap.start_forecasting): + if(not start_forecasting): break Utilities.load_configuration() - Utilities.update_monitoring_data() + application_state.update_monitoring_data() first_prediction = None - for prediction_index in range(0,State.total_time_intervals_to_predict): - prediction_time = int(State.next_prediction_time)+prediction_index*prediction_horizon + for prediction_index in range(0, EsPredictorState.total_time_intervals_to_predict): + prediction_time = int(application_state.next_prediction_time)+prediction_index*application_state.prediction_horizon try: - print_with_time ("Initiating predictions for all metrics for next_prediction_time, which is "+str(State.next_prediction_time)) - prediction = predict_attributes(State.metrics_to_predict,prediction_time) - if (prediction_time == int(State.next_prediction_time)): + print_with_time ("Initiating predictions for all metrics for next_prediction_time, which is "+str(application_state.next_prediction_time)) + prediction = predict_attributes(application_state,prediction_time) + if (prediction_time == int(application_state.next_prediction_time)): first_prediction = prediction except Exception as e: - print_with_time("Could not create a prediction for some or all of the metrics for time point "+str(State.next_prediction_time)+", proceeding to next prediction time. However, "+str(prediction_index)+" predictions were produced (out of the configured "+State.total_time_intervals_to_predict+"). The encountered exception trace follows:") - print(e) + print_with_time("Could not create a prediction for some or all of the metrics for time point " + str(application_state.next_prediction_time) +", proceeding to next prediction time. However, " + str(prediction_index) +" predictions were produced (out of the configured " + str(EsPredictorState.total_time_intervals_to_predict) + "). The encountered exception trace follows:") + print(traceback.format_exc()) #continue was here, to continue while loop, replaced by break break - for attribute in State.metrics_to_predict: + for attribute in application_state.metrics_to_predict: if(not prediction[attribute].prediction_valid): #continue was here, to continue while loop, replaced by break break - if (State.disconnected or State.check_stale_connection()): + if (EsPredictorState.disconnected or EsPredictorState.check_stale_connection()): logging.info("Possible problem due to disconnection or a stale connection") #State.connection.connect() message_not_sent = True @@ -183,16 +236,13 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f "metricValue": float(prediction[attribute].value), "level": 3, "timestamp": current_time, - "probability": 0.95, + "probability": 0.95, #This is the default second parameter of the prediction intervals (first is 80%) created as part of the HoltWinters forecasting mode in R "confidence_interval": [float(prediction[attribute].lower_confidence_interval_value) , float( prediction[attribute].upper_confidence_interval_value)], "predictionTime": prediction_time, - "refersTo": "todo", - "cloud": "todo", - "provider": "todo", } training_models_message_body = { - "metrics": State.metrics_to_predict, + "metrics": application_state.metrics_to_predict, "forecasting_method": "exponentialsmoothing", "timestamp": current_time, } @@ -200,7 +250,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f try: #for publisher in State.broker_publishers: # if publisher. - for publisher in State.broker_publishers: + for publisher in EsPredictorState.broker_publishers: #if publisher.address=="eu.nebulouscloud.monitoring.preliminary_predicted.exponentialsmoothing"+attribute: if publisher.key=="publisher_"+attribute: @@ -211,16 +261,16 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f #State.connection.send_to_topic('training_models',training_models_message_body) message_not_sent = False - print_with_time("Successfully sent prediction message for %s to topic eu.nebulouscloud.preliminary_predicted.%s.%s\n\n%s\n\n" % (attribute, id, attribute, prediction_message_body)) + print_with_time("Successfully sent prediction message for %s to topic eu.nebulouscloud.preliminary_predicted.%s.%s:\n\n%s\n\n" % (attribute, EsPredictorState.forecaster_name, attribute, prediction_message_body)) except ConnectionError as exception: #State.connection.disconnect() #State.connection = messaging.morphemic.Connection('admin', 'admin') #State.connection.connect() logging.error("Error sending intermediate prediction"+str(exception)) - State.disconnected = False + EsPredictorState.disconnected = False if (first_prediction is not None): - State.previous_prediction = first_prediction #first_prediction is the first of the batch of the predictions which are produced. The size of this batch is set by the State.total_time_intervals_to_predict (currently set to 8) + application_state.previous_prediction = first_prediction #first_prediction is the first of the batch of the predictions which are produced. The size of this batch is set by the State.total_time_intervals_to_predict (currently set to 8) #State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from) #State.number_of_days_to_use_data_from = 1 + int( @@ -230,10 +280,10 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f #class Listener(messaging.listener.MorphemicListener): +class BootStrap(ConnectorHandler): + pass +class ConsumerHandler(Handler): -class Bootstrap(connector.ConnectorHandler): - - start_forecasting = None # Whether the component should start (or keep on) forecasting prediction_thread = None def ready(self, context): @@ -244,55 +294,102 @@ class Bootstrap(connector.ConnectorHandler): context.publishers['state'].stopping() context.publishers['state'].stopped() - context.publishers['publisher_cpu_usage'].send({ - 'hello': 'world' - }) + #context.publishers['publisher_cpu_usage'].send({ + # 'hello': 'world' + #}) def on_message(self, key, address, body, context, **kwargs): - application_name = "default_application" - address = address.replace("topic://eu.nebulouscloud.","") - if (address).startswith(State.MONITORING_DATA_PREFIX): - address = address.replace(State.MONITORING_DATA_PREFIX+".","",1) + address = address.replace("topic://"+EsPredictorState.GENERAL_TOPIC_PREFIX,"") + if (address).startswith(EsPredictorState.MONITORING_DATA_PREFIX): + address = address.replace(EsPredictorState.MONITORING_DATA_PREFIX, "", 1) logging.info("New monitoring data arrived at topic "+address) - logging.info(body) + if address == 'metric_list': + application_name = body["name"] + message_version = body["version"] + application_state = None + individual_application_state = {} + application_already_defined = application_name in EsPredictorState.individual_application_state + if ( application_already_defined and + ( message_version == EsPredictorState.individual_application_state[application_state].message_version ) + ): + individual_application_state = EsPredictorState.individual_application_state + application_state = individual_application_state[application_name] - elif (address).startswith(State.FORECASTING_CONTROL_PREFIX): - address = address.replace(State.FORECASTING_CONTROL_PREFIX+".","",1) - logging.info("The address is " + address) + print_with_time("Using existing application definition for "+application_name) + else: + if (application_already_defined): + print_with_time("Updating application "+application_name+" based on new metrics list message") + else: + print_with_time("Creating new application "+application_name) + application_state = ApplicationState(application_name,message_version) + metric_list_object = body["metric_list"] + lower_bound_value = application_state.lower_bound_value + upper_bound_value = application_state.upper_bound_value + for metric_object in metric_list_object: + lower_bound_value[metric_object["name"]]=float(metric_object["lower_bound"]) + upper_bound_value[metric_object["name"]]=float(metric_object["upper_bound"]) - if address == 'metrics_to_predict': + application_state.lower_bound_value.update(lower_bound_value) + application_state.upper_bound_value.update(upper_bound_value) - State.initial_metric_list_received = True - print_with_time("Inside message handler for metrics_to predict") + application_state.initial_metric_list_received = True + + individual_application_state[application_name] = application_state + EsPredictorState.individual_application_state.update(individual_application_state) #body = json.loads(body) #for element in body: # State.metrics_to_predict.append(element["metric"]) - elif address == 'test.exponentialsmoothing': - State.testing_prediction_functionality = True + + elif (address).startswith(EsPredictorState.FORECASTING_CONTROL_PREFIX): + address = address.replace(EsPredictorState.FORECASTING_CONTROL_PREFIX, "", 1) + logging.info("The address is " + address) + + + if address == 'test.exponentialsmoothing': + EsPredictorState.testing_prediction_functionality = True elif address == 'start_forecasting.exponentialsmoothing': try: - State.metrics_to_predict = body["metrics"] - print_with_time("Received request to start predicting the following metrics: "+ ",".join(State.metrics_to_predict)) - State.broker_publishers = [] - for metric in State.metrics_to_predict: - State.broker_publishers.append (PredictionPublisher(metric)) - State.publishing_connector = connector.EXN('publishing_exsmoothing', handler=Bootstrap(),#consumers=list(State.broker_consumers), - consumers=[], - publishers=State.broker_publishers, - url="localhost", - port="5672", - username="admin", - password="admin" - ) + application_name = body["name"] + message_version = 0 + if (not "version" in body): + logging.info("There was an issue in finding the message version in the body of the start forecasting message, assuming it is 1") + message_version = 1 + else: + message_version = body["version"] + if (application_name in EsPredictorState.individual_application_state) and (message_version <= EsPredictorState.individual_application_state[application_name].message_version): + application_state = EsPredictorState.individual_application_state[application_name] + else: + EsPredictorState.individual_application_state[application_name] = ApplicationState(application_name,message_version) + application_state = EsPredictorState.individual_application_state[application_name] - thread = threading.Thread(target=State.publishing_connector.start,args=()) + if (not application_state.start_forecasting) or ((application_state.metrics_to_predict is not None) and (len(application_state.metrics_to_predict)<=len(body["metrics"]))): + application_state.metrics_to_predict = body["metrics"] + print_with_time("Received request to start predicting the following metrics: "+ ",".join(application_state.metrics_to_predict)+" for application "+application_name+", proceeding with the prediction process") + else: + application_state.metrics_to_predict = body["metrics"] + print_with_time("Received request to start predicting the following metrics: "+ body["metrics"]+" for application "+application_name+"but it was perceived as a duplicate") + return + application_state.broker_publishers = [] + for metric in application_state.metrics_to_predict: + EsPredictorState.broker_publishers.append (PredictionPublisher(application_name,metric)) + EsPredictorState.publishing_connector = connector.EXN('publishing_'+EsPredictorState.forecaster_name+'-'+application_name, handler=BootStrap(), #consumers=list(State.broker_consumers), + consumers=[], + publishers=EsPredictorState.broker_publishers, + url=EsPredictorState.broker_address, + port=EsPredictorState.broker_port, + username=EsPredictorState.broker_username, + password=EsPredictorState.broker_password + ) + #EsPredictorState.publishing_connector.start() + thread = threading.Thread(target=EsPredictorState.publishing_connector.start, args=()) thread.start() except Exception as e: print_with_time("Could not load json object to process the start forecasting message \n"+str(body)) + print(traceback.format_exc()) return #if (not State.initial_metric_list_received): @@ -301,49 +398,53 @@ class Bootstrap(connector.ConnectorHandler): # return try: - Bootstrap.start_forecasting = True - State.epoch_start = body["epoch_start"] - prediction_horizon = int(body["prediction_horizon"]) - State.next_prediction_time = update_prediction_time(State.epoch_start,prediction_horizon,State.prediction_processing_time_safety_margin_seconds) # State.next_prediction_time was assigned the value of State.epoch_start here, but this re-initializes targeted prediction times after each start_forecasting message, which is not desired necessarily - print_with_time("A start_forecasting message has been received, epoch start and prediction horizon are "+str(State.epoch_start)+", and "+str(prediction_horizon)+ " seconds respectively") + application_state = EsPredictorState.individual_application_state[application_name] + application_state.start_forecasting = True + application_state.epoch_start = body["epoch_start"] + application_state.prediction_horizon = int(body["prediction_horizon"]) + application_state.next_prediction_time = update_prediction_time(application_state.epoch_start,application_state.prediction_horizon,EsPredictorState.prediction_processing_time_safety_margin_seconds) # State.next_prediction_time was assigned the value of State.epoch_start here, but this re-initializes targeted prediction times after each start_forecasting message, which is not desired necessarily + print_with_time("A start_forecasting message has been received, epoch start and prediction horizon are "+str(application_state.epoch_start)+", and "+str(application_state.prediction_horizon)+ " seconds respectively") except Exception as e: print_with_time("Problem while retrieving epoch start and/or prediction_horizon") + print(traceback.format_exc()) return - with open(State.configuration_file_location, "r+b") as f: + with open(EsPredictorState.configuration_file_location, "r+b") as f: - State.configuration_details.load(f, "utf-8") + EsPredictorState.configuration_details.load(f, "utf-8") # Do stuff with the p object... - initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"] + initial_seconds_aggregation_value, metadata = EsPredictorState.configuration_details["number_of_seconds_to_aggregate_on"] initial_seconds_aggregation_value = int(initial_seconds_aggregation_value) - if (prediction_horizon None: - - self.conn = event.container.connect(self.context.connection) - for publisher in self.publishers: - _logger.info(f"{publisher.address} registering sender") - address = self.context.build_address_from_link(publisher) - publisher.set(event.container.create_sender(self.conn,address)) - self.context.register_publisher(publisher) - _logger.debug(f"{self.context.base} Registering timer { hasattr(publisher, 'delay')}") - if hasattr(publisher, "delay"): - _logger.debug(f"{self.context.base} Registering timer") - event.reactor.schedule(publisher.delay, self) - - for consumer in self.consumers: - address = self.context.build_address_from_link(consumer) - _logger.info(f"{self.context.base} Registering consumer {address}") - consumer.set(event.container.create_receiver(self.conn, address)) - self.context.register_consumers(consumer) - - def on_sendable(self, event): - if not self.handler.initialized: - self.handler.set_ready(True, self.context) - - def on_timer_task(self, event): - _logger.debug(f"{self.context.base} On timer") - for publisher in self._delay_publishers(): - publisher.send() - event.reactor.schedule(publisher.delay, self) - - def on_message(self, event): - try: - for consumer in self.consumers: - if consumer.should_handle(event): - _logger.debug(f"Received message: {event.message.address}") - self.handler.on_message(consumer.key, event.message.address, event.message.body, self.context, event=event) - except Exception as e: - _logger.error(f"Received message: {e}") - - - def close(self): - if self.conn: - self.conn.close() - else: - _logger.warning(f"{self.context.base} No open connection") - - def _delay_publishers(self): - return [p for p in self.publishers if hasattr(p,'delay')] - - -class EXN: - def __init__(self, component=None, - handler:ConnectorHandler = None, - publishers=[], - consumers=[], - **kwargs): - - # Load .env file - load_dotenv() - - # Validate and set connector - if not component: - _logger.error("Component cannot be empty or None") - raise ValueError("Component cannot be empty or None") - self.component = component - self.handler = handler - - self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL')) - self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT')) - self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME')) - self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD')) - - # Validate attributes - if not self.url: - _logger.error("URL cannot be empty or None") - raise ValueError("URL cannot be empty or None") - if not self.port: - _logger.error("PORT cannot be empty or None") - raise ValueError("PORT cannot be empty or None") - if not self.username: - _logger.error("USERNAME cannot be empty or None") - raise ValueError("USERNAME cannot be empty or None") - if not self.password: - _logger.error("PASSWORD cannot be empty or None") - raise ValueError("PASSWORD cannot be empty or None") - - ctx = core_context.Context( - connection=f"{self.url}:{self.port}", - base=f"{base.NEBULOUS_BASE_NAME}.{self.component}", - ) - - if kwargs.get("enable_state",False): - publishers.append(state_publisher.Publisher()) - - if kwargs.get("enable_health",False): - publishers.append(schedule_publisher.Publisher( - base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT, - 'health', - 'health', - True)) - - core_handler = CoreHandler( - ctx, - handler, - publishers, - consumers - ) - - self.container = Container(core_handler) - - def start(self): - self.container.run() diff --git a/exponential-smoothing-predictor/src/runtime/exn/core/consumer.py b/exponential-smoothing-predictor/src/runtime/exn/core/consumer.py deleted file mode 100644 index e5c7424..0000000 --- a/exponential-smoothing-predictor/src/runtime/exn/core/consumer.py +++ /dev/null @@ -1,17 +0,0 @@ -import datetime - -from proton import Message, Event -from . import link -import logging - -_logger = logging.getLogger(__name__) - - -class Consumer(link.Link): - - def on_message(self, body, **kwargs): - _logger.debug(f"{self.address} Got {body} ") - - def should_handle(self, event: Event): - if event.link == self._link: - return True diff --git a/exponential-smoothing-predictor/src/runtime/exn/core/context.py b/exponential-smoothing-predictor/src/runtime/exn/core/context.py deleted file mode 100644 index 1b99056..0000000 --- a/exponential-smoothing-predictor/src/runtime/exn/core/context.py +++ /dev/null @@ -1,63 +0,0 @@ -from . import link - - -class Context: - - def __init__(self, connection, base): - - self.connection = connection - self.base = base - self.publishers = {} - self.consumers = {} - - def get_publisher(self, key): - if key in self.publishers: - return self.publishers[key] - return None - - def has_publisher(self, key): - return key in self.publishers - - def has_consumer(self, key): - return key in self.consumers - - def register_publisher(self, publisher): - self.publishers[publisher.key] = publisher - - def register_consumers(self, consumer): - self.consumers[consumer.key] = consumer - - def build_address_from_link(self, link: link.Link): - - if link.fqdn: - address = link.address - if link.topic and not link.address.startswith("topic://"): - address = f"topic://{address}" - return address - - address = f"{self.base}.{link.address}" - if link.topic: - address = f"topic://{address}" - - return address - - def match_address(self, l: link.Link, event): - - if not event \ - or not event.message \ - or not event.message.address: - return False - - address = self.build_address_from_link(l) - return address == event.message.address - - def build_address(self, *actions, topic=False): - - if len(actions) <= 0: - return self.base - - address = f"{self.base}.{'.'.join(actions)}" - if topic: - address = f"topic://{address}" - - return address diff --git a/exponential-smoothing-predictor/src/runtime/exn/core/schedule_publisher.py b/exponential-smoothing-predictor/src/runtime/exn/core/schedule_publisher.py deleted file mode 100644 index 9cf8b64..0000000 --- a/exponential-smoothing-predictor/src/runtime/exn/core/schedule_publisher.py +++ /dev/null @@ -1,14 +0,0 @@ -import logging - -from . import publisher - -_logger = logging.getLogger(__name__) - - -class Publisher(publisher.Publisher): - send_next = False - delay = 15 - - def __init__(self, delay, key, address, topic=False): - super(Publisher, self).__init__(key, address, topic) - self.delay = delay diff --git a/exponential-smoothing-predictor/src/runtime/exn/settings/base.py b/exponential-smoothing-predictor/src/runtime/exn/settings/base.py deleted file mode 100644 index 27ac8a6..0000000 --- a/exponential-smoothing-predictor/src/runtime/exn/settings/base.py +++ /dev/null @@ -1,2 +0,0 @@ -NEBULOUS_BASE_NAME="eu.nebulous" -NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT=15 \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py new file mode 100644 index 0000000..c9e85bf --- /dev/null +++ b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py @@ -0,0 +1,117 @@ +import logging +import time +import traceback + +import requests +import json + +from runtime.operational_status.EsPredictorState import EsPredictorState +from runtime.utilities.InfluxDBConnector import InfluxDBConnector +from runtime.utilities.Utilities import Utilities +from dateutil import parser + + +class ApplicationState: + + #Forecaster variables + + def get_prediction_data_filename(self,configuration_file_location,metric_name): + from jproperties import Properties + p = Properties() + with open(configuration_file_location, "rb") as f: + p.load(f, "utf-8") + path_to_datasets, metadata = p["path_to_datasets"] + #application_name, metadata = p["application_name"] + path_to_datasets = Utilities.fix_path_ending(path_to_datasets) + return "" + str(path_to_datasets) + str(self.application_name) + "_"+metric_name+ ".csv" + def __init__(self,application_name, message_version): + self.message_version = message_version + self.application_name = application_name + self.influxdb_bucket = EsPredictorState.application_name_prefix+application_name+"_bucket" + token = EsPredictorState.influxdb_token + + list_bucket_url = 'http://' + EsPredictorState.influxdb_hostname + ':8086/api/v2/buckets?name='+self.influxdb_bucket + create_bucket_url = 'http://' + EsPredictorState.influxdb_hostname + ':8086/api/v2/buckets' + headers = { + 'Authorization': 'Token {}'.format(token), + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + data = { + 'name': self.influxdb_bucket, + 'orgID': EsPredictorState.influxdb_organization_id, + 'retentionRules': [ + { + 'type': 'expire', + 'everySeconds': 2592000 #30 days (30*24*3600) + } + ] + } + + response = requests.get(list_bucket_url, headers=headers) + + logging.info("The response for listing a possibly existing bucket is "+str(response.status_code)+" for application "+application_name) + if ((response.status_code==200) and ("buckets" in response.json()) and (len(response.json()["buckets"])>0)): + logging.info("The bucket already existed for the particular application, skipping its creation...") + else: + logging.info("The response in the request to list a bucket is "+str(response.json())) + logging.info("The bucket did not exist for the particular application, creation in process...") + response = requests.post(create_bucket_url, headers=headers, data=json.dumps(data)) + logging.info("The response for creating a new bucket is "+str(response.status_code)) + self.start_forecasting = False # Whether the component should start (or keep on) forecasting + self.prediction_data_filename = application_name+".csv" + self.dataset_file_name = "exponential_smoothing_dataset_"+application_name+".csv" + self.metrics_to_predict = [] + self.epoch_start = 0 + self.next_prediction_time = 0 + self.prediction_horizon = 120 + self.previous_prediction = None + self.initial_metric_list_received = False + self.lower_bound_value = {} + self.upper_bound_value = {} + + + def update_monitoring_data(self): + #query(metrics_to_predict,number_of_days_for_which_data_was_retrieved) + #save_new_file() + Utilities.print_with_time("Starting dataset creation process...") + + try: + """ + Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method + influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost") + influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086")) + influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic") + influxdb_password = os.environ.get("INFLUXDB_PASSWORD","password") + influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic") + influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic") + application_name = "default_application" + """ + for metric_name in self.metrics_to_predict: + time_interval_to_get_data_for = str(EsPredictorState.number_of_days_to_use_data_from) + "d" + print_data_from_db = True + query_string = 'from(bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")' + influx_connector = InfluxDBConnector() + print("performing query") + current_time = time.time() + result = influx_connector.client.query_api().query(query_string, EsPredictorState.influxdb_organization) + elapsed_time = time.time()-current_time + print("performed query, it took "+str(elapsed_time) + " seconds") + #print(result.to_values()) + with open(self.get_prediction_data_filename(EsPredictorState.configuration_file_location, metric_name), 'w') as file: + for table in result: + #print header row + file.write("Timestamp,ems_time,"+metric_name+"\r\n") + for record in table.records: + dt = parser.isoparse(str(record.get_time())) + epoch_time = int(dt.timestamp()) + metric_value = record.get_value() + if(print_data_from_db): + file.write(str(epoch_time)+","+str(epoch_time)+","+str(metric_value)+"\r\n") + # Write the string data to the file + + + + except Exception as e: + Utilities.print_with_time("Could not create new dataset as an exception was thrown") + print(traceback.format_exc()) \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/State.py b/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py similarity index 57% rename from exponential-smoothing-predictor/src/runtime/operational_status/State.py rename to exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py index d6146d4..e7d49cc 100644 --- a/exponential-smoothing-predictor/src/runtime/operational_status/State.py +++ b/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py @@ -4,43 +4,46 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -import threading +import threading, logging + +from influxdb_client import InfluxDBClient from jproperties import Properties -class State: +class EsPredictorState: + + """ + The name of the predictor + """ + forecaster_name = "exponentialsmoothing" + """ + A dictionary containing statistics on the application state of individual applications + """ + individual_application_state = {} """ Fail-safe default values introduced below """ - - prediction_data_filename = "default_application.csv" - MONITORING_DATA_PREFIX = "monitoring" - FORECASTING_CONTROL_PREFIX = "forecasting" + application_name_prefix = "nebulous_" + GENERAL_TOPIC_PREFIX = "eu.nebulouscloud." + MONITORING_DATA_PREFIX = "monitoring." + FORECASTING_CONTROL_PREFIX = "forecasting." #Used to create the dataset from the InfluxDB - application_name = "default_application" - influxdb_bucket = "nebulous" - influxdb_organization = "nebulous" - influxdb_token = "tzIfpbU9b77quyvN0yHIbWltSh1c1371-o9nl_wJYaeo5TWdk5txyxXhp2iaLVMvOvf020HnEEAkE0yy5AllKQ==" - influxdb_dbname = "nebulous" - influxdb_password = "adminadmin" - influxdb_username = "admin" + influxdb_organization = "my-org" + influxdb_organization_id = "e0033247dcca0c54" + influxdb_token = "my-super-secret-auth-token" + influxdb_password = "my-password" + influxdb_username = "my-user" influxdb_port = 8086 influxdb_hostname = "localhost" path_to_datasets = "./datasets" - dataset_file_name = "exponential_smoothing_dataset.csv" number_of_days_to_use_data_from = 365 - #Forecaster variables - metrics_to_predict = [] - epoch_start = 0 - next_prediction_time = 0 - previous_prediction = None - configuration_file_location="prediction_configuration.properties" + + configuration_file_location="exponential-smoothing-predictor/prediction_configuration.properties" configuration_details = Properties() prediction_processing_time_safety_margin_seconds = 20 disconnected = True disconnection_handler = threading.Condition() - initial_metric_list_received = False testing_prediction_functionality = False total_time_intervals_to_predict = 8 @@ -59,4 +62,6 @@ class State: @staticmethod #TODO inspect State.connection def check_stale_connection(): - return (not State.subscribing_connector) + return (not EsPredictorState.subscribing_connector) + + diff --git a/exponential-smoothing-predictor/src/runtime/utilities/InfluxDBConnector.py b/exponential-smoothing-predictor/src/runtime/utilities/InfluxDBConnector.py index 1229963..34b65a4 100644 --- a/exponential-smoothing-predictor/src/runtime/utilities/InfluxDBConnector.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/InfluxDBConnector.py @@ -2,7 +2,7 @@ from influxdb_client import InfluxDBClient, Point, WritePrecision from datetime import datetime from influxdb_client.client.write_api import SYNCHRONOUS -from runtime.operational_status.State import State +from runtime.operational_status.EsPredictorState import EsPredictorState #import influxdb_client, os, time #from influxdb_client import InfluxDBClient, Point, WritePrecision @@ -28,32 +28,32 @@ from runtime.operational_status.State import State # time.sleep(1) # separate points by 1 second -data = [ - { - "measurement": "temperature", - "tags": {"location": "Prague"}, - "fields": {"temperature": 25.3} - } -] +#data = [ +# { +# "measurement": "temperature", +# "tags": {"location": "Prague"}, +# "fields": {"temperature": 25.3} +# } +#] class InfluxDBConnector: - client = InfluxDBClient(url="http://"+State.influxdb_hostname+":"+str(State.influxdb_port), token=State.influxdb_token, org=State.influxdb_organization) + client = InfluxDBClient(url="http://" + EsPredictorState.influxdb_hostname + ":" + str(EsPredictorState.influxdb_port), token=EsPredictorState.influxdb_token, org=EsPredictorState.influxdb_organization) write_api = client.write_api(write_options=SYNCHRONOUS) def InfluxDBConnector(self): pass - def write_data(self,data): - self.write_api.write(bucket=State.influxdb_bucket, org=State.influxdb_organization, record=data, write_precision=WritePrecision.S) + def write_data(self,data,bucket): + self.write_api.write(bucket=bucket, org=EsPredictorState.influxdb_organization, record=data, write_precision=WritePrecision.S) def get_data(self): query_api = self.client.query_api() query = """from(bucket: "nebulous") |> range(start: -1m) |> filter(fn: (r) => r._measurement == "temperature")""" - tables = query_api.query(query, org=State.influxdb_organization) + tables = query_api.query(query, org=EsPredictorState.influxdb_organization) for table in tables: for record in table.records: diff --git a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py index 0c0a83b..af63a49 100644 --- a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py @@ -1,11 +1,11 @@ -from runtime.exn import core +from exn import core class PredictionPublisher(core.publisher.Publisher): metric_name = "" - def __init__(self,metric_name): - super().__init__('publisher_'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+metric_name, True,True) + def __init__(self,application_name,metric_name): + super().__init__('publisher_'+application_name+'-'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+metric_name, True,True) self.metric_name = metric_name - def send(self, body={}): + def send(self, body={}, application=""): super(PredictionPublisher, self).send(body) diff --git a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py index 8dbbc1d..5b2d02c 100644 --- a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py @@ -7,17 +7,14 @@ import pathlib #from morphemic.dataset import DatasetMaker import datetime -import time,os +import logging,os from dateutil import parser +from influxdb_client import InfluxDBClient -from runtime.operational_status.State import State +from runtime.operational_status.EsPredictorState import EsPredictorState from runtime.utilities.InfluxDBConnector import InfluxDBConnector -class DatasetMaker: - pass - - class Utilities: @staticmethod @@ -27,85 +24,39 @@ class Utilities: @staticmethod def load_configuration(): - with open(State.configuration_file_location,'rb') as config_file: - State.configuration_details.load(config_file) + with open(EsPredictorState.configuration_file_location, 'rb') as config_file: + EsPredictorState.configuration_details.load(config_file) #prediction_horizon = configuration_details.get("prediction_horizon") - State.dataset_file_name = State.configuration_details.get("input_data_file").data - State.number_of_days_to_use_data_from = int(State.configuration_details.get("number_of_days_to_use_data_from").data) - State.prediction_processing_time_safety_margin_seconds = int(State.configuration_details.get("prediction_processing_time_safety_margin_seconds").data) - State.testing_prediction_functionality = State.configuration_details.get("testing_prediction_functionality").data.lower() == "true" - State.path_to_datasets = State.configuration_details.get("path_to_datasets").data - State.broker_address = State.configuration_details.get("broker_address").data - State.broker_port = int(State.configuration_details.get("broker_port").data) - State.broker_username = State.configuration_details.get("broker_username").data - State.broker_password = State.configuration_details.get("broker_password").data + EsPredictorState.number_of_days_to_use_data_from = int(EsPredictorState.configuration_details.get("number_of_days_to_use_data_from").data) + EsPredictorState.prediction_processing_time_safety_margin_seconds = int(EsPredictorState.configuration_details.get("prediction_processing_time_safety_margin_seconds").data) + EsPredictorState.testing_prediction_functionality = EsPredictorState.configuration_details.get("testing_prediction_functionality").data.lower() == "true" + EsPredictorState.path_to_datasets = EsPredictorState.configuration_details.get("path_to_datasets").data + EsPredictorState.broker_address = EsPredictorState.configuration_details.get("broker_address").data + EsPredictorState.broker_port = int(EsPredictorState.configuration_details.get("broker_port").data) + EsPredictorState.broker_username = EsPredictorState.configuration_details.get("broker_username").data + EsPredictorState.broker_password = EsPredictorState.configuration_details.get("broker_password").data - State.influxdb_hostname = State.configuration_details.get("INFLUXDB_HOSTNAME").data - State.influxdb_port = int(State.configuration_details.get("INFLUXDB_PORT").data) - State.influxdb_username = State.configuration_details.get("INFLUXDB_USERNAME").data - State.influxdb_password = State.configuration_details.get("INFLUXDB_PASSWORD").data - State.influxdb_dbname = State.configuration_details.get("INFLUXDB_DBNAME").data - State.influxdb_org = State.configuration_details.get("INFLUXDB_ORG").data - State.application_name = State.configuration_details.get("APP_NAME").data + EsPredictorState.influxdb_hostname = EsPredictorState.configuration_details.get("INFLUXDB_HOSTNAME").data + EsPredictorState.influxdb_port = int(EsPredictorState.configuration_details.get("INFLUXDB_PORT").data) + EsPredictorState.influxdb_username = EsPredictorState.configuration_details.get("INFLUXDB_USERNAME").data + EsPredictorState.influxdb_password = EsPredictorState.configuration_details.get("INFLUXDB_PASSWORD").data + EsPredictorState.influxdb_org = EsPredictorState.configuration_details.get("INFLUXDB_ORG").data #This method accesses influx db to retrieve the most recent metric values. - @staticmethod - def update_monitoring_data(): - #query(metrics_to_predict,number_of_days_for_which_data_was_retrieved) - #save_new_file() - Utilities.print_with_time("Starting dataset creation process...") - try: - """ - Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method - influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost") - influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086")) - influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic") - influxdb_password = os.environ.get("INFLUXDB_PASSWORD","password") - influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic") - influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic") - application_name = "default_application" - """ - metric_names = ["cpu_usage","ram_usage"] - for metric_name in State.metrics_to_predict: - time_interval_to_get_data_for = str(State.number_of_days_to_use_data_from)+"d" - print_data_from_db = True - query_string = 'from(bucket: "'+State.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")' - influx_connector = InfluxDBConnector() - print("performing query") - current_time = time.time() - result = influx_connector.client.query_api().query(query_string,State.influxdb_organization) - elapsed_time = time.time()-current_time - print("performed query, it took "+str(elapsed_time) + " seconds") - #print(result.to_values()) - with open(Utilities.get_prediction_data_filename(State.configuration_file_location,metric_name), 'w') as file: - for table in result: - #print header row - file.write("Timestamp,ems_time,"+metric_name+"\r\n") - for record in table.records: - dt = parser.isoparse(str(record.get_time())) - epoch_time = int(dt.timestamp()) - metric_value = record.get_value() - if(print_data_from_db): - file.write(str(epoch_time)+","+str(epoch_time)+","+str(metric_value)+"\r\n") - # Write the string data to the file - - - - except Exception as e: - Utilities.print_with_time("Could not create new dataset as an exception was thrown") - print(e) @staticmethod - def get_prediction_data_filename(configuration_file_location,metric_name): - from jproperties import Properties - p = Properties() - with open(configuration_file_location, "rb") as f: - p.load(f, "utf-8") - path_to_datasets, metadata = p["path_to_datasets"] - application_name, metadata = p["application_name"] - path_to_datasets = Utilities.fix_path_ending(path_to_datasets) - return "" + str(path_to_datasets) + str(application_name) + "_"+metric_name+ ".csv" + def update_influxdb_organization_id(): + client = InfluxDBClient(url="http://" + EsPredictorState.influxdb_hostname + ":" + str(EsPredictorState.influxdb_port), token=EsPredictorState.influxdb_token) + org_api = client.organizations_api() + # List all organizations + organizations = org_api.find_organizations() + # Find the organization by name and print its ID + for org in organizations: + if org.name == EsPredictorState.influxdb_organization: + logging.info(f"Organization Name: {org.name}, ID: {org.id}") + EsPredictorState.influxdb_organization_id = org.id + break @staticmethod def fix_path_ending(path): if (path[-1] is os.sep): diff --git a/exponential-smoothing-predictor/src/setup.py b/exponential-smoothing-predictor/src/setup.py index 5f7e6c4..1ac1643 100644 --- a/exponential-smoothing-predictor/src/setup.py +++ b/exponential-smoothing-predictor/src/setup.py @@ -18,7 +18,7 @@ setup( author_email="atsagkaropoulos@mail.ntua.gr", # Packages - packages=["r_predictors","runtime","runtime.exn","runtime.operational_status","runtime.utilities","runtime.predictions"], + packages=["r_predictors","runtime","exn","exn.core","exn.handler","exn.settings","runtime.operational_status","runtime.utilities","runtime.predictions"], # Include additional files into the package include_package_data=True, @@ -37,4 +37,10 @@ setup( "python-slugify", "jproperties" ], + #package_dir={'': '.'}, + entry_points={ + 'console_scripts': [ + 'start_exsmoothing = runtime.Predictor:main', + ], + } ) diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml index b969c27..166e228 100644 --- a/zuul.d/jobs.yaml +++ b/zuul.d/jobs.yaml @@ -14,9 +14,9 @@ container_images: - context: exponential-smoothing-predictor registry: quay.io - repository: quay.io/nebulous/exponential-smoothing-predictor-exponential-smoothing-predictor + repository: quay.io/nebulous/exponential-smoothing-predictor namespace: nebulous - repo_shortname: exponential-smoothing-predictor-exponential-smoothing-predictor + repo_shortname: exponential-smoothing-predictor repo_description: "" - job: