Merge decision engine services into a single one
The decision engine process was built based on 2 services: a service that handle rpc requests and a scheduler to trigger watcher periodic tasks. With the new version of oslo.service, a new threading backend was added, based on cotyledon service manager, which starts a new process for each service tha it manages. These two services can't run in different process since they need access to a shared in-memory representation of the cluster (cluster data models) This patch proposes creating a Decision Engine Service which includes everything in a single main service. Change-Id: I335a97ca14b6e023fef055978a56aefebf22d433 Signed-off-by: Douglas Viroel <viroel@gmail.com>
This commit is contained in:
@@ -25,8 +25,7 @@ from oslo_log import log
|
||||
from watcher.common import service as watcher_service
|
||||
from watcher import conf
|
||||
from watcher.decision_engine import gmr
|
||||
from watcher.decision_engine import manager
|
||||
from watcher.decision_engine import scheduling
|
||||
from watcher.decision_engine import service as decision_engine_service
|
||||
from watcher.decision_engine import sync
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@@ -43,11 +42,7 @@ def main():
|
||||
syncer = sync.Syncer()
|
||||
syncer.sync()
|
||||
|
||||
de_service = watcher_service.Service(manager.DecisionEngineManager)
|
||||
bg_scheduler_service = scheduling.DecisionEngineSchedulingService()
|
||||
de_service = decision_engine_service.DecisionEngineService()
|
||||
|
||||
# Only 1 process
|
||||
launcher = watcher_service.launch(CONF, de_service)
|
||||
launcher.launch_service(bg_scheduler_service)
|
||||
|
||||
launcher.wait()
|
||||
|
59
watcher/decision_engine/service.py
Normal file
59
watcher/decision_engine/service.py
Normal file
@@ -0,0 +1,59 @@
|
||||
# Copyright 2025 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from watcher.common import service as watcher_service
|
||||
from watcher.decision_engine import manager
|
||||
from watcher.decision_engine import scheduling
|
||||
|
||||
|
||||
class DecisionEngineService(watcher_service.Service):
|
||||
"""Decision Engine Service that runs on a host.
|
||||
|
||||
The decision engine service holds a RPC server, a notification
|
||||
listener server, a heartbeat service and starts a background scheduling
|
||||
service to run watcher periodic jobs.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(manager.DecisionEngineManager)
|
||||
# Background scheduler starts the cluster model collector periodic
|
||||
# task, an one shot task to cancel ongoing audits and a periodic
|
||||
# check for expired action plans
|
||||
self._bg_scheduler = None
|
||||
|
||||
@property
|
||||
def bg_scheduler(self):
|
||||
if self._bg_scheduler is None:
|
||||
self._bg_scheduler = scheduling.DecisionEngineSchedulingService()
|
||||
return self._bg_scheduler
|
||||
|
||||
def start(self):
|
||||
"""Start service."""
|
||||
super().start()
|
||||
self.bg_scheduler.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop service."""
|
||||
super().stop()
|
||||
self.bg_scheduler.stop()
|
||||
|
||||
def wait(self):
|
||||
"""Wait for service to complete."""
|
||||
super().wait()
|
||||
self.bg_scheduler.wait()
|
||||
|
||||
def reset(self):
|
||||
"""Reset service."""
|
||||
super().reset()
|
||||
self.bg_scheduler.reset()
|
69
watcher/tests/decision_engine/test_service.py
Normal file
69
watcher/tests/decision_engine/test_service.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# Copyright 2025 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from watcher.common import service as watcher_service
|
||||
from watcher.decision_engine import scheduling
|
||||
from watcher.decision_engine import service
|
||||
from watcher.tests import base
|
||||
|
||||
|
||||
@mock.patch.object(scheduling.DecisionEngineSchedulingService, '__init__',
|
||||
return_value=None)
|
||||
@mock.patch.object(watcher_service.Service, '__init__', return_value=None)
|
||||
class TestDecisionEngineService(base.TestCase):
|
||||
|
||||
@mock.patch.object(scheduling.DecisionEngineSchedulingService, 'start')
|
||||
@mock.patch.object(watcher_service.Service, 'start')
|
||||
def test_decision_engine_service_start(self, svc_start, sch_start,
|
||||
svc_init, sch_init):
|
||||
de_service = service.DecisionEngineService()
|
||||
de_service.start()
|
||||
# Creates an DecisionEngineSchedulingService instance
|
||||
self.assertIsInstance(de_service.bg_scheduler,
|
||||
scheduling.DecisionEngineSchedulingService)
|
||||
|
||||
svc_start.assert_called()
|
||||
sch_start.assert_called()
|
||||
|
||||
@mock.patch.object(scheduling.DecisionEngineSchedulingService, 'stop')
|
||||
@mock.patch.object(watcher_service.Service, 'stop')
|
||||
def test_decision_engine_service_stop(self, svc_stop, sch_stop,
|
||||
svc_init, sch_init):
|
||||
de_service = service.DecisionEngineService()
|
||||
de_service.stop()
|
||||
|
||||
svc_stop.assert_called()
|
||||
sch_stop.assert_called()
|
||||
|
||||
@mock.patch.object(scheduling.DecisionEngineSchedulingService, 'wait')
|
||||
@mock.patch.object(watcher_service.Service, 'wait')
|
||||
def test_decision_engine_service_wait(self, svc_wait, sch_wait,
|
||||
svc_init, sch_init):
|
||||
de_service = service.DecisionEngineService()
|
||||
de_service.wait()
|
||||
|
||||
svc_wait.assert_called()
|
||||
sch_wait.assert_called()
|
||||
|
||||
@mock.patch.object(scheduling.DecisionEngineSchedulingService, 'reset')
|
||||
@mock.patch.object(watcher_service.Service, 'reset')
|
||||
def test_decision_engine_service_reset(self, svc_reset, sch_reset,
|
||||
svc_init, sch_init):
|
||||
de_service = service.DecisionEngineService()
|
||||
de_service.reset()
|
||||
|
||||
svc_reset.assert_called()
|
||||
sch_reset.assert_called()
|
Reference in New Issue
Block a user