Merge "Extend decision engine to support threading mode"

This commit is contained in:
Zuul
2025-08-06 15:38:31 +00:00
committed by Gerrit Code Review
16 changed files with 270 additions and 58 deletions

View File

@@ -271,6 +271,13 @@
devstack_localrc: devstack_localrc:
NODE_EXPORTER_COLLECTOR_EXCLUDE: "" NODE_EXPORTER_COLLECTOR_EXCLUDE: ""
- job:
name: watcher-prometheus-integration-threading
parent: watcher-sg-core-tempest-base
vars:
devstack_localrc:
'SYSTEMD_ENV_VARS["watcher-decision-engine"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true
- project: - project:
queue: watcher queue: watcher
templates: templates:
@@ -290,6 +297,7 @@
- ^watcher/api/* - ^watcher/api/*
- watcher-tempest-functional-ipv6-only - watcher-tempest-functional-ipv6-only
- watcher-prometheus-integration - watcher-prometheus-integration
- watcher-prometheus-integration-threading
gate: gate:
jobs: jobs:
- watcher-tempest-functional - watcher-tempest-functional

View File

@@ -52,18 +52,43 @@ types of concurrency used in various services of Watcher.
.. _wait_for_any: https://docs.openstack.org/futurist/latest/reference/index.html#waiters .. _wait_for_any: https://docs.openstack.org/futurist/latest/reference/index.html#waiters
Concurrency modes
#################
Evenlet has been the main concurrency library within the OpenStack community
for the last 10 years since the removal of twisted. Over the last few years,
the maintenance of eventlet has decreased and the efforts to remove the GIL
from Python (PEP 703), have fundamentally changed how concurrency is making
eventlet no longer viable. While transitioning to a new native thread
solution, Watcher services will be supporting both modes, with the usage of
native threading mode initially classified as ``experimental``.
It is possible to enable the new native threading mode by setting the following
environment variable in the corresponding service configuration:
.. code:: bash
OS_WATCHER_DISABLE_EVENTLET_PATCHING=true
.. note::
The only service that supports two different concurrency modes is the
``decision engine``.
Decision engine concurrency Decision engine concurrency
*************************** ***************************
The concurrency in the decision engine is governed by two independent The concurrency in the decision engine is governed by two independent
threadpools. Both of these threadpools are GreenThreadPoolExecutor_ from the threadpools. These threadpools can be configured as GreenThreadPoolExecutor_
futurist_ library. One of these is used automatically and most contributors or ThreadPoolExecutor_, both from the futurist_ library, depending on the
service configuration. One of these is used automatically and most contributors
will not interact with it while developing new features. The other threadpool will not interact with it while developing new features. The other threadpool
can frequently be used while developing new features or updating existing ones. can frequently be used while developing new features or updating existing ones.
It is known as the DecisionEngineThreadpool and allows to achieve performance It is known as the DecisionEngineThreadpool and allows to achieve performance
improvements in network or I/O bound operations. improvements in network or I/O bound operations.
.. _GreenThreadPoolExecutor: https://docs.openstack.org/futurist/latest/reference/index.html#executors .. _GreenThreadPoolExecutor: https://docs.openstack.org/futurist/latest/reference/index.html#futurist.GreenThreadPoolExecutor
.. _ThreadPoolExecutor: https://docs.openstack.org/futurist/latest/reference/index.html#futurist.ThreadPoolExecutor
AuditEndpoint AuditEndpoint
############# #############

View File

@@ -0,0 +1,12 @@
---
features:
- |
The Decision Engine service now supports running with ``native threading``
mode enabled as opposed to the use of the Eventlet library.
Note that the use of ``native threading`` is still ``experimental``,
and is disabled by default. It should not be used in production. To
switch from Eventlet to native threading mode, the environment variable
``OS_WATCHER_DISABLE_EVENTLET_PATCHING=true`` needs to be added to
the decision engine service configuration. For more information,
please check `eventlet removal
<https://wiki.openstack.org/wiki/Eventlet-removal>`__ documentation.

View File

@@ -22,7 +22,7 @@ oslo.messaging>=14.1.0 # Apache-2.0
oslo.policy>=4.5.0 # Apache-2.0 oslo.policy>=4.5.0 # Apache-2.0
oslo.reports>=1.27.0 # Apache-2.0 oslo.reports>=1.27.0 # Apache-2.0
oslo.serialization>=2.25.0 # Apache-2.0 oslo.serialization>=2.25.0 # Apache-2.0
oslo.service>=1.30.0 # Apache-2.0 oslo.service[threading]>=4.2.1 # Apache-2.0
oslo.upgradecheck>=1.3.0 # Apache-2.0 oslo.upgradecheck>=1.3.0 # Apache-2.0
oslo.utils>=7.0.0 # Apache-2.0 oslo.utils>=7.0.0 # Apache-2.0
oslo.versionedobjects>=1.32.0 # Apache-2.0 oslo.versionedobjects>=1.32.0 # Apache-2.0

View File

@@ -25,5 +25,12 @@
# That is problematic and can lead to errors on python 3.12+. # That is problematic and can lead to errors on python 3.12+.
# The maas support added asyncio to the codebase which is unsafe to mix # The maas support added asyncio to the codebase which is unsafe to mix
# with eventlets by default. # with eventlets by default.
from watcher.common import oslo_service_helper as helper
from watcher import eventlet from watcher import eventlet
eventlet.patch() eventlet.patch()
# NOTE(dviroel): oslo service backend needs to be initialize
# as soon as possible, before importing oslo service. If eventlet
# patching is enabled, it should be patched before calling this
# function
helper.init_oslo_service_backend()

View File

@@ -0,0 +1,51 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from apscheduler.executors import pool as pool_executor
import futurist
from watcher import eventlet as eventlet_helper
def get_futurist_pool_executor(max_workers=10):
"""Returns a futurist pool executor
:param max_workers: the maximum number of spawned threads
:return: a futurist pool executor
:rtype: futurist.ThreadPoolExecutor or futurist.GreenThreadPoolExecutor
depending if eventlet patching is enabled or not
"""
if eventlet_helper.is_patched():
return futurist.GreenThreadPoolExecutor(max_workers)
else:
return futurist.ThreadPoolExecutor(max_workers)
class APSchedulerThreadPoolExecutor(pool_executor.BasePoolExecutor):
"""Thread pool executor for APScheduler based classes
This will return an executor for APScheduler based class which
will be constructed using the futurist.ThreadPoolExecutor or
futurist.GreenThreadPoolExecutor as pool, depending if eventlet
patching is enabled or not.
:param max_workers: the maximum number of spawned threads
:return: a thread pool executor
:rtype: an APScheduler pool executor object
"""
def __init__(self, max_workers=10):
pool = get_futurist_pool_executor(max_workers)
super(APSchedulerThreadPoolExecutor, self).__init__(pool)

View File

@@ -0,0 +1,33 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_service import backend
from watcher import eventlet as eventlet_helper
LOG = log.getLogger(__name__)
def init_oslo_service_backend():
if eventlet_helper.is_patched():
backend.init_backend(backend.BackendType.EVENTLET)
LOG.warning(
"Service is starting with Eventlet based service backend.")
else:
backend.init_backend(backend.BackendType.THREADING)
LOG.warning(
"Service is starting with Threading based service backend. "
"This is an experimental feature, do not use it in production.")

View File

@@ -16,43 +16,24 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import eventlet
from apscheduler import events from apscheduler import events
from apscheduler.executors import pool as pool_executor
from apscheduler.schedulers import background from apscheduler.schedulers import background
import futurist
from oslo_service import service from oslo_service import service
from watcher.common import executor
from watcher import eventlet as eventlet_helper from watcher import eventlet as eventlet_helper
job_events = events job_events = events
class GreenThreadPoolExecutor(pool_executor.BasePoolExecutor):
"""Green thread pool
An executor that runs jobs in a green thread pool.
Plugin alias: ``threadpool``
:param max_workers: the maximum number of spawned threads.
"""
def __init__(self, max_workers=10):
pool = futurist.GreenThreadPoolExecutor(int(max_workers))
super(GreenThreadPoolExecutor, self).__init__(pool)
executors = { executors = {
'default': GreenThreadPoolExecutor(), 'default': executor.APSchedulerThreadPoolExecutor(),
} }
class BackgroundSchedulerService( class BackgroundSchedulerService(
service.ServiceBase, background.BackgroundScheduler): service.ServiceBase, background.BackgroundScheduler):
def __init__(self, gconfig=None, **options): def __init__(self, gconfig=None, **options):
self.should_patch = eventlet_helper.is_patched()
if options is None: if options is None:
options = {'executors': executors} options = {'executors': executors}
else: else:
@@ -61,12 +42,10 @@ class BackgroundSchedulerService(
super().__init__(gconfig or {}, **options) super().__init__(gconfig or {}, **options)
def _main_loop(self): def _main_loop(self):
if self.should_patch: # NOTE(dviroel): to make sure that we monkey patch when needed.
# NOTE(sean-k-mooney): is_patched and monkey_patch form # helper patch() now checks a environment variable to see if
# watcher.eventlet check a non thread local variable to early out # the service should or not be patched.
# as we do not use eventlet_helper.patch() here to ensure eventlet_helper.patch()
# eventlet.monkey_patch() is actually called.
eventlet.monkey_patch()
super()._main_loop() super()._main_loop()
def start(self): def start(self):

View File

@@ -16,12 +16,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
import futurist
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from watcher.decision_engine.audit import continuous as c_handler from watcher.common import executor
from watcher.decision_engine.audit import event as e_handler from watcher.decision_engine.audit import event as e_handler
from watcher.decision_engine.audit import oneshot as o_handler from watcher.decision_engine.audit import oneshot as o_handler
@@ -35,10 +33,10 @@ class AuditEndpoint(object):
def __init__(self, messaging): def __init__(self, messaging):
self._messaging = messaging self._messaging = messaging
self._executor = futurist.GreenThreadPoolExecutor( self.amount_workers = CONF.watcher_decision_engine.max_audit_workers
max_workers=CONF.watcher_decision_engine.max_audit_workers) self._executor = (
executor.get_futurist_pool_executor(self.amount_workers))
self._oneshot_handler = o_handler.OneShotAuditHandler() self._oneshot_handler = o_handler.OneShotAuditHandler()
self._continuous_handler = c_handler.ContinuousAuditHandler().start()
self._event_handler = e_handler.EventAuditHandler() self._event_handler = e_handler.EventAuditHandler()
@property @property

View File

@@ -13,6 +13,7 @@
# under the License. # under the License.
from watcher.common import service as watcher_service from watcher.common import service as watcher_service
from watcher.decision_engine.audit import continuous as c_handler
from watcher.decision_engine import manager from watcher.decision_engine import manager
from watcher.decision_engine import scheduling from watcher.decision_engine import scheduling
@@ -31,6 +32,7 @@ class DecisionEngineService(watcher_service.Service):
# task, an one shot task to cancel ongoing audits and a periodic # task, an one shot task to cancel ongoing audits and a periodic
# check for expired action plans # check for expired action plans
self._bg_scheduler = None self._bg_scheduler = None
self._continuous_handler = None
@property @property
def bg_scheduler(self): def bg_scheduler(self):
@@ -38,10 +40,17 @@ class DecisionEngineService(watcher_service.Service):
self._bg_scheduler = scheduling.DecisionEngineSchedulingService() self._bg_scheduler = scheduling.DecisionEngineSchedulingService()
return self._bg_scheduler return self._bg_scheduler
@property
def continuous_handler(self):
if self._continuous_handler is None:
self._continuous_handler = c_handler.ContinuousAuditHandler()
return self._continuous_handler
def start(self): def start(self):
"""Start service.""" """Start service."""
super().start() super().start()
self.bg_scheduler.start() self.bg_scheduler.start()
self.continuous_handler.start()
def stop(self): def stop(self):
"""Stop service.""" """Stop service."""

View File

@@ -17,13 +17,14 @@
# limitations under the License. # limitations under the License.
import copy import copy
import futurist
from futurist import waiters from futurist import waiters
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_service import service from oslo_service import service
from watcher.common import executor
CONF = cfg.CONF CONF = cfg.CONF
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@@ -33,8 +34,8 @@ class DecisionEngineThreadPool(object, metaclass=service.Singleton):
def __init__(self): def __init__(self):
self.amount_workers = CONF.watcher_decision_engine.max_general_workers self.amount_workers = CONF.watcher_decision_engine.max_general_workers
self._threadpool = futurist.GreenThreadPoolExecutor( self._threadpool = (
max_workers=self.amount_workers) executor.get_futurist_pool_executor(self.amount_workers))
def submit(self, fn, *args, **kwargs): def submit(self, fn, *args, **kwargs):
"""Will submit the job to the underlying threadpool """Will submit the job to the underlying threadpool

View File

@@ -22,20 +22,25 @@ def is_patched():
def _monkey_patch(): def _monkey_patch():
if is_patched():
return
# Anything imported here will not be monkey patched. It is # Anything imported here will not be monkey patched. It is
# important to take care not to import anything here which requires monkey # important to take care not to import anything here which requires monkey
# patching. eventlet processes environment variables at import-time. # patching. eventlet processes environment variables at import-time.
# as such any eventlet configuration should happen here if needed. # as such any eventlet configuration should happen here if needed.
import eventlet import eventlet
eventlet.monkey_patch() eventlet.monkey_patch()
global MONKEY_PATCHED
MONKEY_PATCHED = True
def _is_patching_enabled():
if (os.environ.get('OS_WATCHER_DISABLE_EVENTLET_PATCHING', '').lower()
not in ('1', 'true', 'yes', 'y')):
return True
return False
def patch(): def patch():
# This is only for debugging, this should not be used in production. # NOTE(dviroel): monkey_patch when called, even if is already patched.
if (os.environ.get('OS_WATCHER_DISABLE_EVENTLET_PATCHING', '').lower() # Ignore if the control flag is disabling patching
not in ('1', 'true', 'yes', 'y')): if _is_patching_enabled():
_monkey_patch() _monkey_patch()
global MONKEY_PATCHED
MONKEY_PATCHED = True

View File

@@ -0,0 +1,38 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import futurist
from unittest import mock
from watcher.common import executor
from watcher import eventlet as eventlet_helper
from watcher.tests import base
@mock.patch.object(eventlet_helper, 'is_patched')
class TestFuturistPoolExecutor(base.TestCase):
def test_get_futurist_pool_executor_eventlet(self, eventlet_patched_mock):
eventlet_patched_mock.return_value = True
pool_executor = executor.get_futurist_pool_executor(max_workers=1)
self.assertIsInstance(pool_executor, futurist.GreenThreadPoolExecutor)
def test_get_futurist_pool_executor_threading(self, eventlet_patched_mock):
eventlet_patched_mock.return_value = False
pool_executor = executor.get_futurist_pool_executor(max_workers=1)
self.assertIsInstance(pool_executor, futurist.ThreadPoolExecutor)

View File

@@ -0,0 +1,50 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_service import backend
from watcher.common import oslo_service_helper
from watcher import eventlet as eventlet_helper
from watcher.tests import base
class TestOsloServiceHelper(base.TestCase):
@mock.patch.object(eventlet_helper, 'is_patched')
@mock.patch.object(backend, 'init_backend')
def test_init_oslo_backend_eventlet(self, mock_init_backend,
mock_is_patched):
mock_is_patched.return_value = True
oslo_service_helper.init_oslo_service_backend()
mock_init_backend.assert_called_once_with(
backend.BackendType.EVENTLET
)
@mock.patch.object(eventlet_helper, 'is_patched')
@mock.patch.object(backend, 'init_backend')
def test_init_oslo_backend_threading(self, mock_init_backend,
mock_is_patched):
mock_is_patched.return_value = False
oslo_service_helper.init_oslo_service_backend()
mock_init_backend.assert_called_once_with(
backend.BackendType.THREADING
)

View File

@@ -11,8 +11,6 @@
# under the License. # under the License.
from unittest import mock from unittest import mock
import eventlet
from apscheduler.schedulers import background from apscheduler.schedulers import background
from watcher.common import scheduling from watcher.common import scheduling
@@ -54,15 +52,9 @@ class TestSchedulerMonkeyPatching(base.BaseTestCase):
mock_main_loop.assert_called_once_with() mock_main_loop.assert_called_once_with()
@mock.patch.object(background.BackgroundScheduler, '_main_loop') @mock.patch.object(background.BackgroundScheduler, '_main_loop')
@mock.patch.object(eventlet, 'monkey_patch') @mock.patch.object(eventlet_helper, 'patch')
def test_main_loop_is_monkey_patched( def test_main_loop_is_monkey_patched(
self, mock_monky_patch, mock_main_loop): self, mock_eventlet_patch, mock_main_loop):
self.test_scheduler._main_loop() self.test_scheduler._main_loop()
self.assertEqual( mock_eventlet_patch.assert_called_once_with()
eventlet_helper.is_patched(), self.test_scheduler.should_patch)
mock_monky_patch.assert_called_once_with()
mock_main_loop.assert_called_once_with() mock_main_loop.assert_called_once_with()
def test_scheduler_should_patch(self):
self.assertEqual(
eventlet_helper.is_patched(), self.test_scheduler.should_patch)

View File

@@ -15,6 +15,7 @@
from unittest import mock from unittest import mock
from watcher.common import service as watcher_service from watcher.common import service as watcher_service
from watcher.decision_engine.audit import continuous as c_handler
from watcher.decision_engine import scheduling from watcher.decision_engine import scheduling
from watcher.decision_engine import service from watcher.decision_engine import service
from watcher.tests import base from watcher.tests import base
@@ -25,9 +26,11 @@ from watcher.tests import base
@mock.patch.object(watcher_service.Service, '__init__', return_value=None) @mock.patch.object(watcher_service.Service, '__init__', return_value=None)
class TestDecisionEngineService(base.TestCase): class TestDecisionEngineService(base.TestCase):
@mock.patch.object(c_handler.ContinuousAuditHandler, 'start')
@mock.patch.object(scheduling.DecisionEngineSchedulingService, 'start') @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'start')
@mock.patch.object(watcher_service.Service, 'start') @mock.patch.object(watcher_service.Service, 'start')
def test_decision_engine_service_start(self, svc_start, sch_start, def test_decision_engine_service_start(self, svc_start, sch_start,
cont_audit_start,
svc_init, sch_init): svc_init, sch_init):
de_service = service.DecisionEngineService() de_service = service.DecisionEngineService()
de_service.start() de_service.start()
@@ -37,6 +40,7 @@ class TestDecisionEngineService(base.TestCase):
svc_start.assert_called() svc_start.assert_called()
sch_start.assert_called() sch_start.assert_called()
cont_audit_start.assert_called()
@mock.patch.object(scheduling.DecisionEngineSchedulingService, 'stop') @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'stop')
@mock.patch.object(watcher_service.Service, 'stop') @mock.patch.object(watcher_service.Service, 'stop')