From 8a4fe7f146446201d88237804e007c8987814622 Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Thu, 31 Aug 2017 21:24:49 +0000 Subject: [PATCH] Fix services launcher to handle shutdown properly When Mistral services are configured with systemd, the services time out on shutdown. The root cause is that each of the server thread is assigned to a different instance of the oslo.service ServiceLauncher. The ServiceLauncher can manage multiple server threads. When the server threads are added to the same instance of the ServiceLauncher, SIGTERM is handled properly for each server threads. Change-Id: I8b739844f6073890324189aee028c0d7da3cc76e Closes-Bug: #1714351 --- mistral/cmd/launch.py | 99 +++++++++++++++--------- mistral/tests/unit/test_launcher.py | 114 ++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 37 deletions(-) create mode 100644 mistral/tests/unit/test_launcher.py diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index de1e1d35c..d9954f407 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -51,60 +51,65 @@ from mistral import version CONF = cfg.CONF +SERVER_THREAD_MANAGER = None +SERVER_PROCESS_MANAGER = None + + +def launch_thread(server, workers=1): + try: + global SERVER_THREAD_MANAGER + + if not SERVER_THREAD_MANAGER: + SERVER_THREAD_MANAGER = service.ServiceLauncher(CONF) + + SERVER_THREAD_MANAGER.launch_service(server, workers=workers) + except Exception as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) + + +def launch_process(server, workers=1): + try: + global SERVER_PROCESS_MANAGER + + if not SERVER_PROCESS_MANAGER: + SERVER_PROCESS_MANAGER = service.ProcessLauncher(CONF) + + SERVER_PROCESS_MANAGER.launch_service(server, workers=workers) + except Exception as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_executor(): - try: - launcher = service.ServiceLauncher(CONF) - - launcher.launch_service(executor_server.get_oslo_service()) - - launcher.wait() - except RuntimeError as e: - sys.stderr.write("ERROR: %s\n" % e) - sys.exit(1) + launch_thread(executor_server.get_oslo_service()) def launch_engine(): - try: - launcher = service.ServiceLauncher(CONF) - - launcher.launch_service(engine_server.get_oslo_service()) - - launcher.wait() - except RuntimeError as e: - sys.stderr.write("ERROR: %s\n" % e) - sys.exit(1) + launch_thread(engine_server.get_oslo_service()) def launch_event_engine(): - try: - launcher = service.ServiceLauncher(CONF) - - launcher.launch_service(event_engine_server.get_oslo_service()) - - launcher.wait() - except RuntimeError as e: - sys.stderr.write("ERROR: %s\n" % e) - sys.exit(1) + launch_thread(event_engine_server.get_oslo_service()) def launch_api(): - launcher = service.ProcessLauncher(cfg.CONF) - server = api_service.WSGIService('mistral_api') - - launcher.launch_service(server, workers=server.workers) - - launcher.wait() + launch_process(server, workers=server.workers) def launch_any(options): - # Launch the servers on different threads. - threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) - for option in options] + for option in options: + LAUNCH_OPTIONS[option]() - [thread.wait() for thread in threads] + global SERVER_PROCESS_MANAGER + global SERVER_THREAD_MANAGER + + if SERVER_PROCESS_MANAGER: + SERVER_PROCESS_MANAGER.wait() + + if SERVER_THREAD_MANAGER: + SERVER_THREAD_MANAGER.wait() # Map cli options to appropriate functions. The cli options are @@ -203,5 +208,25 @@ def main(): sys.exit(1) +# Helper method used in unit tests to reset the service launchers. +def reset_server_managers(): + global SERVER_THREAD_MANAGER + global SERVER_PROCESS_MANAGER + SERVER_THREAD_MANAGER = None + SERVER_PROCESS_MANAGER = None + + +# Helper method used in unit tests to access the service launcher. +def get_server_thread_manager(): + global SERVER_THREAD_MANAGER + return SERVER_THREAD_MANAGER + + +# Helper method used in unit tests to access the process launcher. +def get_server_process_manager(): + global SERVER_PROCESS_MANAGER + return SERVER_PROCESS_MANAGER + + if __name__ == '__main__': sys.exit(main()) diff --git a/mistral/tests/unit/test_launcher.py b/mistral/tests/unit/test_launcher.py new file mode 100644 index 000000000..8b2e6aead --- /dev/null +++ b/mistral/tests/unit/test_launcher.py @@ -0,0 +1,114 @@ +# Copyright 2017 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import mock +import pecan.testing + +from oslo_config import cfg + +from mistral.api import service as api_service +from mistral.cmd import launch +from mistral.tests.unit import base + + +class ServiceLauncherTest(base.DbTestCase): + + def setUp(self): + super(ServiceLauncherTest, self).setUp() + launch.reset_server_managers() + + @mock.patch('mistral.api.app.setup_app') + @mock.patch.object(api_service.wsgi, 'Server') + def test_launch_all(self, wsgi_server, mock_app): + mock_app.return_value = pecan.testing.load_test_app({ + 'app': { + 'root': cfg.CONF.pecan.root, + 'modules': cfg.CONF.pecan.modules, + 'debug': cfg.CONF.pecan.debug, + 'auth_enable': cfg.CONF.pecan.auth_enable, + 'disable_cron_trigger_thread': True + } + }) + + eventlet.spawn(launch.launch_any, launch.LAUNCH_OPTIONS.keys()) + + for i in range(0, 50): + svr_proc_mgr = launch.get_server_process_manager() + svr_thrd_mgr = launch.get_server_thread_manager() + + if svr_proc_mgr and svr_thrd_mgr: + break + + eventlet.sleep(0.1) + + self.assertIsNotNone(svr_proc_mgr) + self.assertIsNotNone(svr_thrd_mgr) + + api_server = api_service.WSGIService('mistral_api') + api_workers = api_server.workers + + self.assertEqual(len(svr_proc_mgr.children.keys()), api_workers) + self.assertEqual(len(svr_thrd_mgr.services.services), 3) + + @mock.patch('mistral.api.app.setup_app') + @mock.patch.object(api_service.wsgi, 'Server') + def test_launch_process(self, wsgi_server, mock_app): + mock_app.return_value = pecan.testing.load_test_app({ + 'app': { + 'root': cfg.CONF.pecan.root, + 'modules': cfg.CONF.pecan.modules, + 'debug': cfg.CONF.pecan.debug, + 'auth_enable': cfg.CONF.pecan.auth_enable, + 'disable_cron_trigger_thread': True + } + }) + + eventlet.spawn(launch.launch_any, ['api']) + + for i in range(0, 50): + svr_proc_mgr = launch.get_server_process_manager() + + if svr_proc_mgr: + break + + eventlet.sleep(0.1) + + svr_thrd_mgr = launch.get_server_thread_manager() + + self.assertIsNotNone(svr_proc_mgr) + self.assertIsNone(svr_thrd_mgr) + + api_server = api_service.WSGIService('mistral_api') + api_workers = api_server.workers + + self.assertEqual(len(svr_proc_mgr.children.keys()), api_workers) + + def test_launch_thread(self): + eventlet.spawn(launch.launch_any, ['engine']) + + for i in range(0, 50): + svr_thrd_mgr = launch.get_server_thread_manager() + + if svr_thrd_mgr: + break + + eventlet.sleep(0.1) + + svr_proc_mgr = launch.get_server_process_manager() + + self.assertIsNone(svr_proc_mgr) + self.assertIsNotNone(svr_thrd_mgr) + + self.assertEqual(len(svr_thrd_mgr.services.services), 1)