From f879b10b05fdbd67ba2e5e4890fc0235403dd7d8 Mon Sep 17 00:00:00 2001 From: Douglas Viroel Date: Tue, 10 Jun 2025 10:39:17 -0300 Subject: [PATCH] Extend decision engine to support threading mode With the events of eventlet removal, Watcher will need to be adapted to support both modes, eventlet and threading, for a couple of releases before removing all eventlet code. This patch adds methods and classes that allow decision engine modules to create futurist thread pools instead of green thread pools, based on a environment variable that can be enabled by service. It moves continuous audit handler instance to decison engine service, so it can be started together with the main decision engine service. Adds an environment variable that allows the user to disable eventlet monkey patching and to use oslo.service threading backend. Change-Id: I8a8be0a7cebdc44005fd77ec960543828c7da318 Signed-off-by: Douglas Viroel --- .zuul.yaml | 8 +++ doc/source/contributor/concurrency.rst | 31 +++++++++-- ...ngine-threading-mode-26fc8066dcd499a2.yaml | 12 +++++ requirements.txt | 2 +- watcher/cmd/eventlet/__init__.py | 7 +++ watcher/common/executor.py | 51 +++++++++++++++++++ watcher/common/oslo_service_helper.py | 33 ++++++++++++ watcher/common/scheduling.py | 33 +++--------- .../messaging/audit_endpoint.py | 10 ++-- watcher/decision_engine/service.py | 9 ++++ watcher/decision_engine/threading.py | 7 +-- watcher/eventlet.py | 19 ++++--- watcher/tests/common/test_executor.py | 38 ++++++++++++++ .../tests/common/test_oslo_service_helper.py | 50 ++++++++++++++++++ watcher/tests/common/test_scheduling.py | 14 ++--- watcher/tests/decision_engine/test_service.py | 4 ++ 16 files changed, 270 insertions(+), 58 deletions(-) create mode 100644 releasenotes/notes/decision-engine-threading-mode-26fc8066dcd499a2.yaml create mode 100644 watcher/common/executor.py create mode 100644 watcher/common/oslo_service_helper.py create mode 100644 watcher/tests/common/test_executor.py create mode 100644 watcher/tests/common/test_oslo_service_helper.py diff --git a/.zuul.yaml b/.zuul.yaml index b71946390..039601715 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -271,6 +271,13 @@ devstack_localrc: NODE_EXPORTER_COLLECTOR_EXCLUDE: "" +- job: + name: watcher-prometheus-integration-threading + parent: watcher-sg-core-tempest-base + vars: + devstack_localrc: + 'SYSTEMD_ENV_VARS["watcher-decision-engine"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true + - project: queue: watcher templates: @@ -290,6 +297,7 @@ - ^watcher/api/* - watcher-tempest-functional-ipv6-only - watcher-prometheus-integration + - watcher-prometheus-integration-threading gate: jobs: - watcher-tempest-functional diff --git a/doc/source/contributor/concurrency.rst b/doc/source/contributor/concurrency.rst index 7db51cac2..2a001288d 100644 --- a/doc/source/contributor/concurrency.rst +++ b/doc/source/contributor/concurrency.rst @@ -52,18 +52,43 @@ types of concurrency used in various services of Watcher. .. _wait_for_any: https://docs.openstack.org/futurist/latest/reference/index.html#waiters +Concurrency modes +################# + +Evenlet has been the main concurrency library within the OpenStack community +for the last 10 years since the removal of twisted. Over the last few years, +the maintenance of eventlet has decreased and the efforts to remove the GIL +from Python (PEP 703), have fundamentally changed how concurrency is making +eventlet no longer viable. While transitioning to a new native thread +solution, Watcher services will be supporting both modes, with the usage of +native threading mode initially classified as ``experimental``. + +It is possible to enable the new native threading mode by setting the following +environment variable in the corresponding service configuration: + +.. code:: bash + + OS_WATCHER_DISABLE_EVENTLET_PATCHING=true + +.. note:: + + The only service that supports two different concurrency modes is the + ``decision engine``. + Decision engine concurrency *************************** The concurrency in the decision engine is governed by two independent -threadpools. Both of these threadpools are GreenThreadPoolExecutor_ from the -futurist_ library. One of these is used automatically and most contributors +threadpools. These threadpools can be configured as GreenThreadPoolExecutor_ +or ThreadPoolExecutor_, both from the futurist_ library, depending on the +service configuration. One of these is used automatically and most contributors will not interact with it while developing new features. The other threadpool can frequently be used while developing new features or updating existing ones. It is known as the DecisionEngineThreadpool and allows to achieve performance improvements in network or I/O bound operations. -.. _GreenThreadPoolExecutor: https://docs.openstack.org/futurist/latest/reference/index.html#executors +.. _GreenThreadPoolExecutor: https://docs.openstack.org/futurist/latest/reference/index.html#futurist.GreenThreadPoolExecutor +.. _ThreadPoolExecutor: https://docs.openstack.org/futurist/latest/reference/index.html#futurist.ThreadPoolExecutor AuditEndpoint ############# diff --git a/releasenotes/notes/decision-engine-threading-mode-26fc8066dcd499a2.yaml b/releasenotes/notes/decision-engine-threading-mode-26fc8066dcd499a2.yaml new file mode 100644 index 000000000..9c58eeef2 --- /dev/null +++ b/releasenotes/notes/decision-engine-threading-mode-26fc8066dcd499a2.yaml @@ -0,0 +1,12 @@ +--- +features: + - | + The Decision Engine service now supports running with ``native threading`` + mode enabled as opposed to the use of the Eventlet library. + Note that the use of ``native threading`` is still ``experimental``, + and is disabled by default. It should not be used in production. To + switch from Eventlet to native threading mode, the environment variable + ``OS_WATCHER_DISABLE_EVENTLET_PATCHING=true`` needs to be added to + the decision engine service configuration. For more information, + please check `eventlet removal + `__ documentation. diff --git a/requirements.txt b/requirements.txt index bdf5fc803..a35fe0945 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,7 +22,7 @@ oslo.messaging>=14.1.0 # Apache-2.0 oslo.policy>=4.5.0 # Apache-2.0 oslo.reports>=1.27.0 # Apache-2.0 oslo.serialization>=2.25.0 # Apache-2.0 -oslo.service>=1.30.0 # Apache-2.0 +oslo.service[threading]>=4.2.1 # Apache-2.0 oslo.upgradecheck>=1.3.0 # Apache-2.0 oslo.utils>=7.0.0 # Apache-2.0 oslo.versionedobjects>=1.32.0 # Apache-2.0 diff --git a/watcher/cmd/eventlet/__init__.py b/watcher/cmd/eventlet/__init__.py index fe21c7a40..e3e409eb9 100644 --- a/watcher/cmd/eventlet/__init__.py +++ b/watcher/cmd/eventlet/__init__.py @@ -25,5 +25,12 @@ # That is problematic and can lead to errors on python 3.12+. # The maas support added asyncio to the codebase which is unsafe to mix # with eventlets by default. +from watcher.common import oslo_service_helper as helper from watcher import eventlet eventlet.patch() + +# NOTE(dviroel): oslo service backend needs to be initialize +# as soon as possible, before importing oslo service. If eventlet +# patching is enabled, it should be patched before calling this +# function +helper.init_oslo_service_backend() diff --git a/watcher/common/executor.py b/watcher/common/executor.py new file mode 100644 index 000000000..2892a1451 --- /dev/null +++ b/watcher/common/executor.py @@ -0,0 +1,51 @@ +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from apscheduler.executors import pool as pool_executor + +import futurist + +from watcher import eventlet as eventlet_helper + + +def get_futurist_pool_executor(max_workers=10): + """Returns a futurist pool executor + + :param max_workers: the maximum number of spawned threads + :return: a futurist pool executor + :rtype: futurist.ThreadPoolExecutor or futurist.GreenThreadPoolExecutor + depending if eventlet patching is enabled or not + """ + if eventlet_helper.is_patched(): + return futurist.GreenThreadPoolExecutor(max_workers) + else: + return futurist.ThreadPoolExecutor(max_workers) + + +class APSchedulerThreadPoolExecutor(pool_executor.BasePoolExecutor): + """Thread pool executor for APScheduler based classes + + This will return an executor for APScheduler based class which + will be constructed using the futurist.ThreadPoolExecutor or + futurist.GreenThreadPoolExecutor as pool, depending if eventlet + patching is enabled or not. + + :param max_workers: the maximum number of spawned threads + :return: a thread pool executor + :rtype: an APScheduler pool executor object + """ + + def __init__(self, max_workers=10): + pool = get_futurist_pool_executor(max_workers) + super(APSchedulerThreadPoolExecutor, self).__init__(pool) diff --git a/watcher/common/oslo_service_helper.py b/watcher/common/oslo_service_helper.py new file mode 100644 index 000000000..8da661563 --- /dev/null +++ b/watcher/common/oslo_service_helper.py @@ -0,0 +1,33 @@ + +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log +from oslo_service import backend + +from watcher import eventlet as eventlet_helper + +LOG = log.getLogger(__name__) + + +def init_oslo_service_backend(): + if eventlet_helper.is_patched(): + backend.init_backend(backend.BackendType.EVENTLET) + LOG.warning( + "Service is starting with Eventlet based service backend.") + else: + backend.init_backend(backend.BackendType.THREADING) + LOG.warning( + "Service is starting with Threading based service backend. " + "This is an experimental feature, do not use it in production.") diff --git a/watcher/common/scheduling.py b/watcher/common/scheduling.py index f2b1d11ba..c02b9e9eb 100644 --- a/watcher/common/scheduling.py +++ b/watcher/common/scheduling.py @@ -16,43 +16,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -import eventlet - from apscheduler import events -from apscheduler.executors import pool as pool_executor from apscheduler.schedulers import background -import futurist - from oslo_service import service +from watcher.common import executor from watcher import eventlet as eventlet_helper job_events = events - -class GreenThreadPoolExecutor(pool_executor.BasePoolExecutor): - """Green thread pool - - An executor that runs jobs in a green thread pool. - Plugin alias: ``threadpool`` - :param max_workers: the maximum number of spawned threads. - """ - - def __init__(self, max_workers=10): - pool = futurist.GreenThreadPoolExecutor(int(max_workers)) - super(GreenThreadPoolExecutor, self).__init__(pool) - - executors = { - 'default': GreenThreadPoolExecutor(), + 'default': executor.APSchedulerThreadPoolExecutor(), } class BackgroundSchedulerService( service.ServiceBase, background.BackgroundScheduler): def __init__(self, gconfig=None, **options): - self.should_patch = eventlet_helper.is_patched() if options is None: options = {'executors': executors} else: @@ -61,12 +42,10 @@ class BackgroundSchedulerService( super().__init__(gconfig or {}, **options) def _main_loop(self): - if self.should_patch: - # NOTE(sean-k-mooney): is_patched and monkey_patch form - # watcher.eventlet check a non thread local variable to early out - # as we do not use eventlet_helper.patch() here to ensure - # eventlet.monkey_patch() is actually called. - eventlet.monkey_patch() + # NOTE(dviroel): to make sure that we monkey patch when needed. + # helper patch() now checks a environment variable to see if + # the service should or not be patched. + eventlet_helper.patch() super()._main_loop() def start(self): diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index 7689117c3..c3947273c 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -16,12 +16,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import futurist - from oslo_config import cfg from oslo_log import log -from watcher.decision_engine.audit import continuous as c_handler +from watcher.common import executor from watcher.decision_engine.audit import event as e_handler from watcher.decision_engine.audit import oneshot as o_handler @@ -35,10 +33,10 @@ class AuditEndpoint(object): def __init__(self, messaging): self._messaging = messaging - self._executor = futurist.GreenThreadPoolExecutor( - max_workers=CONF.watcher_decision_engine.max_audit_workers) + self.amount_workers = CONF.watcher_decision_engine.max_audit_workers + self._executor = ( + executor.get_futurist_pool_executor(self.amount_workers)) self._oneshot_handler = o_handler.OneShotAuditHandler() - self._continuous_handler = c_handler.ContinuousAuditHandler().start() self._event_handler = e_handler.EventAuditHandler() @property diff --git a/watcher/decision_engine/service.py b/watcher/decision_engine/service.py index 2896a0953..36083a58c 100644 --- a/watcher/decision_engine/service.py +++ b/watcher/decision_engine/service.py @@ -13,6 +13,7 @@ # under the License. from watcher.common import service as watcher_service +from watcher.decision_engine.audit import continuous as c_handler from watcher.decision_engine import manager from watcher.decision_engine import scheduling @@ -31,6 +32,7 @@ class DecisionEngineService(watcher_service.Service): # task, an one shot task to cancel ongoing audits and a periodic # check for expired action plans self._bg_scheduler = None + self._continuous_handler = None @property def bg_scheduler(self): @@ -38,10 +40,17 @@ class DecisionEngineService(watcher_service.Service): self._bg_scheduler = scheduling.DecisionEngineSchedulingService() return self._bg_scheduler + @property + def continuous_handler(self): + if self._continuous_handler is None: + self._continuous_handler = c_handler.ContinuousAuditHandler() + return self._continuous_handler + def start(self): """Start service.""" super().start() self.bg_scheduler.start() + self.continuous_handler.start() def stop(self): """Stop service.""" diff --git a/watcher/decision_engine/threading.py b/watcher/decision_engine/threading.py index 8b9453a97..c9c85017c 100644 --- a/watcher/decision_engine/threading.py +++ b/watcher/decision_engine/threading.py @@ -17,13 +17,14 @@ # limitations under the License. import copy -import futurist from futurist import waiters from oslo_config import cfg from oslo_log import log from oslo_service import service +from watcher.common import executor + CONF = cfg.CONF LOG = log.getLogger(__name__) @@ -33,8 +34,8 @@ class DecisionEngineThreadPool(object, metaclass=service.Singleton): def __init__(self): self.amount_workers = CONF.watcher_decision_engine.max_general_workers - self._threadpool = futurist.GreenThreadPoolExecutor( - max_workers=self.amount_workers) + self._threadpool = ( + executor.get_futurist_pool_executor(self.amount_workers)) def submit(self, fn, *args, **kwargs): """Will submit the job to the underlying threadpool diff --git a/watcher/eventlet.py b/watcher/eventlet.py index c2d985c6c..921de3ef5 100644 --- a/watcher/eventlet.py +++ b/watcher/eventlet.py @@ -22,20 +22,25 @@ def is_patched(): def _monkey_patch(): - if is_patched(): - return # Anything imported here will not be monkey patched. It is # important to take care not to import anything here which requires monkey # patching. eventlet processes environment variables at import-time. # as such any eventlet configuration should happen here if needed. import eventlet eventlet.monkey_patch() + global MONKEY_PATCHED + MONKEY_PATCHED = True + + +def _is_patching_enabled(): + if (os.environ.get('OS_WATCHER_DISABLE_EVENTLET_PATCHING', '').lower() + not in ('1', 'true', 'yes', 'y')): + return True + return False def patch(): - # This is only for debugging, this should not be used in production. - if (os.environ.get('OS_WATCHER_DISABLE_EVENTLET_PATCHING', '').lower() - not in ('1', 'true', 'yes', 'y')): + # NOTE(dviroel): monkey_patch when called, even if is already patched. + # Ignore if the control flag is disabling patching + if _is_patching_enabled(): _monkey_patch() - global MONKEY_PATCHED - MONKEY_PATCHED = True diff --git a/watcher/tests/common/test_executor.py b/watcher/tests/common/test_executor.py new file mode 100644 index 000000000..27c3ceb06 --- /dev/null +++ b/watcher/tests/common/test_executor.py @@ -0,0 +1,38 @@ +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import futurist +from unittest import mock + +from watcher.common import executor +from watcher import eventlet as eventlet_helper +from watcher.tests import base + + +@mock.patch.object(eventlet_helper, 'is_patched') +class TestFuturistPoolExecutor(base.TestCase): + + def test_get_futurist_pool_executor_eventlet(self, eventlet_patched_mock): + eventlet_patched_mock.return_value = True + + pool_executor = executor.get_futurist_pool_executor(max_workers=1) + + self.assertIsInstance(pool_executor, futurist.GreenThreadPoolExecutor) + + def test_get_futurist_pool_executor_threading(self, eventlet_patched_mock): + eventlet_patched_mock.return_value = False + + pool_executor = executor.get_futurist_pool_executor(max_workers=1) + + self.assertIsInstance(pool_executor, futurist.ThreadPoolExecutor) diff --git a/watcher/tests/common/test_oslo_service_helper.py b/watcher/tests/common/test_oslo_service_helper.py new file mode 100644 index 000000000..6933fc678 --- /dev/null +++ b/watcher/tests/common/test_oslo_service_helper.py @@ -0,0 +1,50 @@ +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from oslo_service import backend + +from watcher.common import oslo_service_helper +from watcher import eventlet as eventlet_helper +from watcher.tests import base + + +class TestOsloServiceHelper(base.TestCase): + + @mock.patch.object(eventlet_helper, 'is_patched') + @mock.patch.object(backend, 'init_backend') + def test_init_oslo_backend_eventlet(self, mock_init_backend, + mock_is_patched): + + mock_is_patched.return_value = True + + oslo_service_helper.init_oslo_service_backend() + + mock_init_backend.assert_called_once_with( + backend.BackendType.EVENTLET + ) + + @mock.patch.object(eventlet_helper, 'is_patched') + @mock.patch.object(backend, 'init_backend') + def test_init_oslo_backend_threading(self, mock_init_backend, + mock_is_patched): + + mock_is_patched.return_value = False + + oslo_service_helper.init_oslo_service_backend() + + mock_init_backend.assert_called_once_with( + backend.BackendType.THREADING + ) diff --git a/watcher/tests/common/test_scheduling.py b/watcher/tests/common/test_scheduling.py index 26de87949..88e3eb380 100644 --- a/watcher/tests/common/test_scheduling.py +++ b/watcher/tests/common/test_scheduling.py @@ -11,8 +11,6 @@ # under the License. from unittest import mock -import eventlet - from apscheduler.schedulers import background from watcher.common import scheduling @@ -54,15 +52,9 @@ class TestSchedulerMonkeyPatching(base.BaseTestCase): mock_main_loop.assert_called_once_with() @mock.patch.object(background.BackgroundScheduler, '_main_loop') - @mock.patch.object(eventlet, 'monkey_patch') + @mock.patch.object(eventlet_helper, 'patch') def test_main_loop_is_monkey_patched( - self, mock_monky_patch, mock_main_loop): + self, mock_eventlet_patch, mock_main_loop): self.test_scheduler._main_loop() - self.assertEqual( - eventlet_helper.is_patched(), self.test_scheduler.should_patch) - mock_monky_patch.assert_called_once_with() + mock_eventlet_patch.assert_called_once_with() mock_main_loop.assert_called_once_with() - - def test_scheduler_should_patch(self): - self.assertEqual( - eventlet_helper.is_patched(), self.test_scheduler.should_patch) diff --git a/watcher/tests/decision_engine/test_service.py b/watcher/tests/decision_engine/test_service.py index e605dde4a..0790e61fc 100644 --- a/watcher/tests/decision_engine/test_service.py +++ b/watcher/tests/decision_engine/test_service.py @@ -15,6 +15,7 @@ from unittest import mock from watcher.common import service as watcher_service +from watcher.decision_engine.audit import continuous as c_handler from watcher.decision_engine import scheduling from watcher.decision_engine import service from watcher.tests import base @@ -25,9 +26,11 @@ from watcher.tests import base @mock.patch.object(watcher_service.Service, '__init__', return_value=None) class TestDecisionEngineService(base.TestCase): + @mock.patch.object(c_handler.ContinuousAuditHandler, 'start') @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'start') @mock.patch.object(watcher_service.Service, 'start') def test_decision_engine_service_start(self, svc_start, sch_start, + cont_audit_start, svc_init, sch_init): de_service = service.DecisionEngineService() de_service.start() @@ -37,6 +40,7 @@ class TestDecisionEngineService(base.TestCase): svc_start.assert_called() sch_start.assert_called() + cont_audit_start.assert_called() @mock.patch.object(scheduling.DecisionEngineSchedulingService, 'stop') @mock.patch.object(watcher_service.Service, 'stop')