Merge "Fix services launcher to handle shutdown properly"

This commit is contained in:
Jenkins 2017-09-08 10:00:41 +00:00 committed by Gerrit Code Review
commit 5aad444d56
2 changed files with 176 additions and 37 deletions

View File

@ -51,60 +51,65 @@ from mistral import version
CONF = cfg.CONF CONF = cfg.CONF
SERVER_THREAD_MANAGER = None
SERVER_PROCESS_MANAGER = None
def launch_thread(server, workers=1):
try:
global SERVER_THREAD_MANAGER
if not SERVER_THREAD_MANAGER:
SERVER_THREAD_MANAGER = service.ServiceLauncher(CONF)
SERVER_THREAD_MANAGER.launch_service(server, workers=workers)
except Exception as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_process(server, workers=1):
try:
global SERVER_PROCESS_MANAGER
if not SERVER_PROCESS_MANAGER:
SERVER_PROCESS_MANAGER = service.ProcessLauncher(CONF)
SERVER_PROCESS_MANAGER.launch_service(server, workers=workers)
except Exception as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_executor(): def launch_executor():
try: launch_thread(executor_server.get_oslo_service())
launcher = service.ServiceLauncher(CONF)
launcher.launch_service(executor_server.get_oslo_service())
launcher.wait()
except RuntimeError as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_engine(): def launch_engine():
try: launch_thread(engine_server.get_oslo_service())
launcher = service.ServiceLauncher(CONF)
launcher.launch_service(engine_server.get_oslo_service())
launcher.wait()
except RuntimeError as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_event_engine(): def launch_event_engine():
try: launch_thread(event_engine_server.get_oslo_service())
launcher = service.ServiceLauncher(CONF)
launcher.launch_service(event_engine_server.get_oslo_service())
launcher.wait()
except RuntimeError as e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
def launch_api(): def launch_api():
launcher = service.ProcessLauncher(cfg.CONF)
server = api_service.WSGIService('mistral_api') server = api_service.WSGIService('mistral_api')
launch_process(server, workers=server.workers)
launcher.launch_service(server, workers=server.workers)
launcher.wait()
def launch_any(options): def launch_any(options):
# Launch the servers on different threads. for option in options:
threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) LAUNCH_OPTIONS[option]()
for option in options]
[thread.wait() for thread in threads] global SERVER_PROCESS_MANAGER
global SERVER_THREAD_MANAGER
if SERVER_PROCESS_MANAGER:
SERVER_PROCESS_MANAGER.wait()
if SERVER_THREAD_MANAGER:
SERVER_THREAD_MANAGER.wait()
# Map cli options to appropriate functions. The cli options are # Map cli options to appropriate functions. The cli options are
@ -203,5 +208,25 @@ def main():
sys.exit(1) sys.exit(1)
# Helper method used in unit tests to reset the service launchers.
def reset_server_managers():
global SERVER_THREAD_MANAGER
global SERVER_PROCESS_MANAGER
SERVER_THREAD_MANAGER = None
SERVER_PROCESS_MANAGER = None
# Helper method used in unit tests to access the service launcher.
def get_server_thread_manager():
global SERVER_THREAD_MANAGER
return SERVER_THREAD_MANAGER
# Helper method used in unit tests to access the process launcher.
def get_server_process_manager():
global SERVER_PROCESS_MANAGER
return SERVER_PROCESS_MANAGER
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) sys.exit(main())

View File

@ -0,0 +1,114 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import mock
import pecan.testing
from oslo_config import cfg
from mistral.api import service as api_service
from mistral.cmd import launch
from mistral.tests.unit import base
class ServiceLauncherTest(base.DbTestCase):
def setUp(self):
super(ServiceLauncherTest, self).setUp()
launch.reset_server_managers()
@mock.patch('mistral.api.app.setup_app')
@mock.patch.object(api_service.wsgi, 'Server')
def test_launch_all(self, wsgi_server, mock_app):
mock_app.return_value = pecan.testing.load_test_app({
'app': {
'root': cfg.CONF.pecan.root,
'modules': cfg.CONF.pecan.modules,
'debug': cfg.CONF.pecan.debug,
'auth_enable': cfg.CONF.pecan.auth_enable,
'disable_cron_trigger_thread': True
}
})
eventlet.spawn(launch.launch_any, launch.LAUNCH_OPTIONS.keys())
for i in range(0, 50):
svr_proc_mgr = launch.get_server_process_manager()
svr_thrd_mgr = launch.get_server_thread_manager()
if svr_proc_mgr and svr_thrd_mgr:
break
eventlet.sleep(0.1)
self.assertIsNotNone(svr_proc_mgr)
self.assertIsNotNone(svr_thrd_mgr)
api_server = api_service.WSGIService('mistral_api')
api_workers = api_server.workers
self.assertEqual(len(svr_proc_mgr.children.keys()), api_workers)
self.assertEqual(len(svr_thrd_mgr.services.services), 3)
@mock.patch('mistral.api.app.setup_app')
@mock.patch.object(api_service.wsgi, 'Server')
def test_launch_process(self, wsgi_server, mock_app):
mock_app.return_value = pecan.testing.load_test_app({
'app': {
'root': cfg.CONF.pecan.root,
'modules': cfg.CONF.pecan.modules,
'debug': cfg.CONF.pecan.debug,
'auth_enable': cfg.CONF.pecan.auth_enable,
'disable_cron_trigger_thread': True
}
})
eventlet.spawn(launch.launch_any, ['api'])
for i in range(0, 50):
svr_proc_mgr = launch.get_server_process_manager()
if svr_proc_mgr:
break
eventlet.sleep(0.1)
svr_thrd_mgr = launch.get_server_thread_manager()
self.assertIsNotNone(svr_proc_mgr)
self.assertIsNone(svr_thrd_mgr)
api_server = api_service.WSGIService('mistral_api')
api_workers = api_server.workers
self.assertEqual(len(svr_proc_mgr.children.keys()), api_workers)
def test_launch_thread(self):
eventlet.spawn(launch.launch_any, ['engine'])
for i in range(0, 50):
svr_thrd_mgr = launch.get_server_thread_manager()
if svr_thrd_mgr:
break
eventlet.sleep(0.1)
svr_proc_mgr = launch.get_server_process_manager()
self.assertIsNone(svr_proc_mgr)
self.assertIsNotNone(svr_thrd_mgr)
self.assertEqual(len(svr_thrd_mgr.services.services), 1)