From 081cd5fae9788756bd37398fd2ed7fef62af4622 Mon Sep 17 00:00:00 2001 From: Douglas Viroel Date: Thu, 12 Jun 2025 15:50:30 -0300 Subject: [PATCH] Merge decision engine services into a single one The decision engine process was built based on 2 services: a service that handle rpc requests and a scheduler to trigger watcher periodic tasks. With the new version of oslo.service, a new threading backend was added, based on cotyledon service manager, which starts a new process for each service tha it manages. These two services can't run in different process since they need access to a shared in-memory representation of the cluster (cluster data models) This patch proposes creating a Decision Engine Service which includes everything in a single main service. Change-Id: I335a97ca14b6e023fef055978a56aefebf22d433 Signed-off-by: Douglas Viroel --- watcher/cmd/eventlet/decisionengine.py | 9 +-- watcher/decision_engine/service.py | 59 ++++++++++++++++ watcher/tests/decision_engine/test_service.py | 69 +++++++++++++++++++ 3 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 watcher/decision_engine/service.py create mode 100644 watcher/tests/decision_engine/test_service.py diff --git a/watcher/cmd/eventlet/decisionengine.py b/watcher/cmd/eventlet/decisionengine.py index 9b4f517fd..836e3d396 100644 --- a/watcher/cmd/eventlet/decisionengine.py +++ b/watcher/cmd/eventlet/decisionengine.py @@ -25,8 +25,7 @@ from oslo_log import log from watcher.common import service as watcher_service from watcher import conf from watcher.decision_engine import gmr -from watcher.decision_engine import manager -from watcher.decision_engine import scheduling +from watcher.decision_engine import service as decision_engine_service from watcher.decision_engine import sync LOG = log.getLogger(__name__) @@ -43,11 +42,7 @@ def main(): syncer = sync.Syncer() syncer.sync() - de_service = watcher_service.Service(manager.DecisionEngineManager) - bg_scheduler_service = scheduling.DecisionEngineSchedulingService() + de_service = decision_engine_service.DecisionEngineService() - # Only 1 process launcher = watcher_service.launch(CONF, de_service) - launcher.launch_service(bg_scheduler_service) - launcher.wait() diff --git a/watcher/decision_engine/service.py b/watcher/decision_engine/service.py new file mode 100644 index 000000000..2896a0953 --- /dev/null +++ b/watcher/decision_engine/service.py @@ -0,0 +1,59 @@ +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from watcher.common import service as watcher_service +from watcher.decision_engine import manager +from watcher.decision_engine import scheduling + + +class DecisionEngineService(watcher_service.Service): + """Decision Engine Service that runs on a host. + + The decision engine service holds a RPC server, a notification + listener server, a heartbeat service and starts a background scheduling + service to run watcher periodic jobs. + """ + + def __init__(self): + super().__init__(manager.DecisionEngineManager) + # Background scheduler starts the cluster model collector periodic + # task, an one shot task to cancel ongoing audits and a periodic + # check for expired action plans + self._bg_scheduler = None + + @property + def bg_scheduler(self): + if self._bg_scheduler is None: + self._bg_scheduler = scheduling.DecisionEngineSchedulingService() + return self._bg_scheduler + + def start(self): + """Start service.""" + super().start() + self.bg_scheduler.start() + + def stop(self): + """Stop service.""" + super().stop() + self.bg_scheduler.stop() + + def wait(self): + """Wait for service to complete.""" + super().wait() + self.bg_scheduler.wait() + + def reset(self): + """Reset service.""" + super().reset() + self.bg_scheduler.reset() diff --git a/watcher/tests/decision_engine/test_service.py b/watcher/tests/decision_engine/test_service.py new file mode 100644 index 000000000..e605dde4a --- /dev/null +++ b/watcher/tests/decision_engine/test_service.py @@ -0,0 +1,69 @@ +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from watcher.common import service as watcher_service +from watcher.decision_engine import scheduling +from watcher.decision_engine import service +from watcher.tests import base + + +@mock.patch.object(scheduling.DecisionEngineSchedulingService, '__init__', + return_value=None) +@mock.patch.object(watcher_service.Service, '__init__', return_value=None) +class TestDecisionEngineService(base.TestCase): + + @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'start') + @mock.patch.object(watcher_service.Service, 'start') + def test_decision_engine_service_start(self, svc_start, sch_start, + svc_init, sch_init): + de_service = service.DecisionEngineService() + de_service.start() + # Creates an DecisionEngineSchedulingService instance + self.assertIsInstance(de_service.bg_scheduler, + scheduling.DecisionEngineSchedulingService) + + svc_start.assert_called() + sch_start.assert_called() + + @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'stop') + @mock.patch.object(watcher_service.Service, 'stop') + def test_decision_engine_service_stop(self, svc_stop, sch_stop, + svc_init, sch_init): + de_service = service.DecisionEngineService() + de_service.stop() + + svc_stop.assert_called() + sch_stop.assert_called() + + @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'wait') + @mock.patch.object(watcher_service.Service, 'wait') + def test_decision_engine_service_wait(self, svc_wait, sch_wait, + svc_init, sch_init): + de_service = service.DecisionEngineService() + de_service.wait() + + svc_wait.assert_called() + sch_wait.assert_called() + + @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'reset') + @mock.patch.object(watcher_service.Service, 'reset') + def test_decision_engine_service_reset(self, svc_reset, sch_reset, + svc_init, sch_init): + de_service = service.DecisionEngineService() + de_service.reset() + + svc_reset.assert_called() + sch_reset.assert_called()