From f07694ba6c75315dac1407c3751645de35ed14f4 Mon Sep 17 00:00:00 2001 From: Sean Mooney Date: Tue, 19 Nov 2024 01:10:00 +0000 Subject: [PATCH] Fix incompatiablity between apscheduler and eventlet The apscheduler background scheduler spawns a native thread which is not monkey patched which interacts with shared module level objects like the module level LOG instances and sqlachmey engine facades. This is unsafe and leads to mixing patched and unpatched code in the same thread. This manifests in 2 ways: 1.) https://paste.opendev.org/show/bGPgfURx1cZYOsgmtDyw/ sqlalchmey calls can fail due to a time.sleep(0) in oslo.db being invoked using the unpatched time modules in an eventlet greenthrad. 2.) https://paste.opendev.org/show/b5C2Zz4A4BFIGbKLKrQU/ over time that caused the sqlalchmy connection queuepool to fill up preventing backgound tasks form running like reconsiling audits. This change adresses this by overloading the background scheduler _main_loop to monkey patch the main loop if the calling thread was monkey patched. Closes-Bug: #2086710 Change-Id: I672c183274b0a17cb40d7b5ab8c313197760b5a0 --- watcher/common/scheduling.py | 29 ++++++++--- watcher/tests/common/test_scheduling.py | 68 +++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 7 deletions(-) create mode 100644 watcher/tests/common/test_scheduling.py diff --git a/watcher/common/scheduling.py b/watcher/common/scheduling.py index 072dd12fa..f2b1d11ba 100644 --- a/watcher/common/scheduling.py +++ b/watcher/common/scheduling.py @@ -16,16 +16,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet + from apscheduler import events -from apscheduler.executors.pool import BasePoolExecutor +from apscheduler.executors import pool as pool_executor from apscheduler.schedulers import background + import futurist + from oslo_service import service +from watcher import eventlet as eventlet_helper + job_events = events -class GreenThreadPoolExecutor(BasePoolExecutor): +class GreenThreadPoolExecutor(pool_executor.BasePoolExecutor): """Green thread pool An executor that runs jobs in a green thread pool. @@ -43,16 +49,25 @@ executors = { } -class BackgroundSchedulerService(service.ServiceBase, - background.BackgroundScheduler): - def __init__(self, gconfig={}, **options): +class BackgroundSchedulerService( + service.ServiceBase, background.BackgroundScheduler): + def __init__(self, gconfig=None, **options): + self.should_patch = eventlet_helper.is_patched() if options is None: options = {'executors': executors} else: if 'executors' not in options.keys(): options['executors'] = executors - super(BackgroundSchedulerService, self).__init__( - gconfig, **options) + super().__init__(gconfig or {}, **options) + + def _main_loop(self): + if self.should_patch: + # NOTE(sean-k-mooney): is_patched and monkey_patch form + # watcher.eventlet check a non thread local variable to early out + # as we do not use eventlet_helper.patch() here to ensure + # eventlet.monkey_patch() is actually called. + eventlet.monkey_patch() + super()._main_loop() def start(self): """Start service.""" diff --git a/watcher/tests/common/test_scheduling.py b/watcher/tests/common/test_scheduling.py new file mode 100644 index 000000000..26de87949 --- /dev/null +++ b/watcher/tests/common/test_scheduling.py @@ -0,0 +1,68 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from unittest import mock + +import eventlet + +from apscheduler.schedulers import background + +from watcher.common import scheduling +from watcher import eventlet as eventlet_helper +from watcher.tests import base + + +class TestSchedulerMonkeyPatching(base.BaseTestCase): + + def setUp(self): + super().setUp() + self.started = False + self.test_scheduler = scheduling.BackgroundSchedulerService() + self.addCleanup(self._cleanup_scheduler) + + def _cleanup_scheduler(self): + if self.started: + self.test_scheduler.shutdown() + self.started = False + + def _start_scheduler(self): + self.test_scheduler.start() + self.started = True + + @mock.patch.object(scheduling.BackgroundSchedulerService, 'start') + def test_scheduler_start(self, mock_start): + self.test_scheduler.start() + mock_start.assert_called_once_with() + + @mock.patch.object(scheduling.BackgroundSchedulerService, 'shutdown') + def test_scheduler_stop(self, mock_shutdown): + self._start_scheduler() + self.test_scheduler.stop() + mock_shutdown.assert_called_once_with() + + @mock.patch.object(scheduling.BackgroundSchedulerService, '_main_loop') + def test_scheduler_main_loop(self, mock_main_loop): + self._start_scheduler() + mock_main_loop.assert_called_once_with() + + @mock.patch.object(background.BackgroundScheduler, '_main_loop') + @mock.patch.object(eventlet, 'monkey_patch') + def test_main_loop_is_monkey_patched( + self, mock_monky_patch, mock_main_loop): + self.test_scheduler._main_loop() + self.assertEqual( + eventlet_helper.is_patched(), self.test_scheduler.should_patch) + mock_monky_patch.assert_called_once_with() + mock_main_loop.assert_called_once_with() + + def test_scheduler_should_patch(self): + self.assertEqual( + eventlet_helper.is_patched(), self.test_scheduler.should_patch)