Fix incompatiablity between apscheduler and eventlet

The apscheduler background scheduler spawns a native thread
which is not monkey patched which interacts with shared module
level objects like the module level LOG instances and sqlachmey
engine facades.

This is unsafe and leads to mixing patched and unpatched
code in the same thread.

This manifests in 2 ways:
1.) https://paste.opendev.org/show/bGPgfURx1cZYOsgmtDyw/
sqlalchmey calls can fail due to a time.sleep(0) in oslo.db being invoked
using the unpatched time modules in an eventlet greenthrad.
2.) https://paste.opendev.org/show/b5C2Zz4A4BFIGbKLKrQU/
over time that caused the sqlalchmy connection queuepool to fill up preventing
backgound tasks form running like reconsiling audits.

This change adresses this by overloading the background scheduler _main_loop
to monkey patch the main loop if the calling thread was monkey patched.

Closes-Bug: #2086710
Change-Id: I672c183274b0a17cb40d7b5ab8c313197760b5a0
This commit is contained in:
Sean Mooney 2024-11-19 01:10:00 +00:00
parent 9abec18c8b
commit f07694ba6c
2 changed files with 90 additions and 7 deletions

View File

@ -16,16 +16,22 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import eventlet
from apscheduler import events from apscheduler import events
from apscheduler.executors.pool import BasePoolExecutor from apscheduler.executors import pool as pool_executor
from apscheduler.schedulers import background from apscheduler.schedulers import background
import futurist import futurist
from oslo_service import service from oslo_service import service
from watcher import eventlet as eventlet_helper
job_events = events job_events = events
class GreenThreadPoolExecutor(BasePoolExecutor): class GreenThreadPoolExecutor(pool_executor.BasePoolExecutor):
"""Green thread pool """Green thread pool
An executor that runs jobs in a green thread pool. An executor that runs jobs in a green thread pool.
@ -43,16 +49,25 @@ executors = {
} }
class BackgroundSchedulerService(service.ServiceBase, class BackgroundSchedulerService(
background.BackgroundScheduler): service.ServiceBase, background.BackgroundScheduler):
def __init__(self, gconfig={}, **options): def __init__(self, gconfig=None, **options):
self.should_patch = eventlet_helper.is_patched()
if options is None: if options is None:
options = {'executors': executors} options = {'executors': executors}
else: else:
if 'executors' not in options.keys(): if 'executors' not in options.keys():
options['executors'] = executors options['executors'] = executors
super(BackgroundSchedulerService, self).__init__( super().__init__(gconfig or {}, **options)
gconfig, **options)
def _main_loop(self):
if self.should_patch:
# NOTE(sean-k-mooney): is_patched and monkey_patch form
# watcher.eventlet check a non thread local variable to early out
# as we do not use eventlet_helper.patch() here to ensure
# eventlet.monkey_patch() is actually called.
eventlet.monkey_patch()
super()._main_loop()
def start(self): def start(self):
"""Start service.""" """Start service."""

View File

@ -0,0 +1,68 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import eventlet
from apscheduler.schedulers import background
from watcher.common import scheduling
from watcher import eventlet as eventlet_helper
from watcher.tests import base
class TestSchedulerMonkeyPatching(base.BaseTestCase):
def setUp(self):
super().setUp()
self.started = False
self.test_scheduler = scheduling.BackgroundSchedulerService()
self.addCleanup(self._cleanup_scheduler)
def _cleanup_scheduler(self):
if self.started:
self.test_scheduler.shutdown()
self.started = False
def _start_scheduler(self):
self.test_scheduler.start()
self.started = True
@mock.patch.object(scheduling.BackgroundSchedulerService, 'start')
def test_scheduler_start(self, mock_start):
self.test_scheduler.start()
mock_start.assert_called_once_with()
@mock.patch.object(scheduling.BackgroundSchedulerService, 'shutdown')
def test_scheduler_stop(self, mock_shutdown):
self._start_scheduler()
self.test_scheduler.stop()
mock_shutdown.assert_called_once_with()
@mock.patch.object(scheduling.BackgroundSchedulerService, '_main_loop')
def test_scheduler_main_loop(self, mock_main_loop):
self._start_scheduler()
mock_main_loop.assert_called_once_with()
@mock.patch.object(background.BackgroundScheduler, '_main_loop')
@mock.patch.object(eventlet, 'monkey_patch')
def test_main_loop_is_monkey_patched(
self, mock_monky_patch, mock_main_loop):
self.test_scheduler._main_loop()
self.assertEqual(
eventlet_helper.is_patched(), self.test_scheduler.should_patch)
mock_monky_patch.assert_called_once_with()
mock_main_loop.assert_called_once_with()
def test_scheduler_should_patch(self):
self.assertEqual(
eventlet_helper.is_patched(), self.test_scheduler.should_patch)