Implement engine graceful shutdown
* The functionality of graceful engine shutdown is now possible due to correct calculation of the "graceful" flag in the engine server's stop() method. Unfortunately, the Oslo Service framework doesn't pass it correctly, it simply ignores it in the call chain. So the only way to understand if the shutdown is graceful is to peek at the configuration property "graceful_shutdown_timeout" provided by Oslo Service. If it's greater than zero then we can treat it as graceful. * Oslo Service handles only four OS signals: SIGTERM, SIGINT, SIGHUP and SIGALRM. Only sending SIGTERM to the process leads to a graceful shutdown. For example, SIGINT (which is equal to ctrl + C in a unix shell) interrupts the process immediately. So the only way to do a graceful shutdown of an engine instance using a unix shell is to run the "kill <PID>" command. This needs to be taken into account when using it. * The patch also changes the order in which the engine server stops its inner services so that the underlying RPC server (currently Oslo Messaging based or Kombu based) stops first. This is needed to make sure that, first of all, no new RPC calls can arrive, and thereby, let all active DB transactions finish normally w/o starting new ones. Stopping the RPC server may be a heavy operation if there are already lots of RPC messages waiting for processing that are polled from the queue. So to the great extent the entire functionality of graceful shutdown will depend on whether an underlying RPC server implements the corresponding functionality in the proper way, i.e. after calling stop(graceful=True) it will stop receiving new calls and wait till all buffered RPC messages are processed normally. * The maximum time given to graceful shutdown is controlled via the "graceful_shutdown_timeout" configuration option, which is 60 seconds, by default. * Minor refactoring Implements blueprint: mistral-graceful-scale-in Change-Id: I6d1234dfa21b1e3420ec9ca2c5235dee973748ee
This commit is contained in:
parent
7b58b9c267
commit
f61929a3c8
@ -65,7 +65,9 @@ def launch_thread(server, workers=1):
|
||||
|
||||
if not SERVER_THREAD_MANAGER:
|
||||
SERVER_THREAD_MANAGER = service.ServiceLauncher(
|
||||
CONF, restart_method='mutate')
|
||||
CONF,
|
||||
restart_method='mutate'
|
||||
)
|
||||
|
||||
SERVER_THREAD_MANAGER.launch_service(server, workers=workers)
|
||||
except Exception as e:
|
||||
@ -79,7 +81,9 @@ def launch_process(server, workers=1):
|
||||
|
||||
if not SERVER_PROCESS_MANAGER:
|
||||
SERVER_PROCESS_MANAGER = service.ProcessLauncher(
|
||||
CONF, restart_method='mutate')
|
||||
CONF,
|
||||
restart_method='mutate'
|
||||
)
|
||||
|
||||
SERVER_PROCESS_MANAGER.launch_service(server, workers=workers)
|
||||
except Exception as e:
|
||||
@ -116,12 +120,12 @@ def launch_any(options):
|
||||
global SERVER_PROCESS_MANAGER
|
||||
global SERVER_THREAD_MANAGER
|
||||
|
||||
if SERVER_PROCESS_MANAGER:
|
||||
SERVER_PROCESS_MANAGER.wait()
|
||||
|
||||
if SERVER_THREAD_MANAGER:
|
||||
SERVER_THREAD_MANAGER.wait()
|
||||
|
||||
if SERVER_PROCESS_MANAGER:
|
||||
SERVER_PROCESS_MANAGER.wait()
|
||||
|
||||
|
||||
# Map cli options to appropriate functions. The cli options are
|
||||
# registered in mistral's config.py.
|
||||
|
@ -28,6 +28,8 @@ from mistral_lib import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class EngineServer(service_base.MistralService):
|
||||
"""Engine server.
|
||||
@ -64,27 +66,50 @@ class EngineServer(service_base.MistralService):
|
||||
# in the middle of executing an action then one of the remaining
|
||||
# engine instances will expire the action in a configured period
|
||||
# of time.
|
||||
if cfg.CONF.executor.type == 'local':
|
||||
if CONF.executor.type == 'local':
|
||||
action_heartbeat_sender.start()
|
||||
|
||||
if self._setup_profiler:
|
||||
profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
|
||||
profiler_utils.setup('mistral-engine', CONF.engine.host)
|
||||
|
||||
# Initialize and start RPC server.
|
||||
|
||||
self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine)
|
||||
self._rpc_server = rpc.get_rpc_server_driver()(CONF.engine)
|
||||
self._rpc_server.register_endpoint(self)
|
||||
|
||||
self._rpc_server.run(executor=cfg.CONF.oslo_rpc_executor)
|
||||
self._rpc_server.run(executor=CONF.oslo_rpc_executor)
|
||||
|
||||
self._notify_started('Engine server started.')
|
||||
|
||||
def stop(self, graceful=False):
|
||||
# NOTE(rakhmerov): Unfortunately, oslo.service doesn't pass the
|
||||
# 'graceful' parameter with a correct value. It's simply ignored
|
||||
# in the corresponding call chain leading to a concrete service.
|
||||
# The only workaround for now is to check 'graceful_shutdown_timeout'
|
||||
# configuration option. If it's not empty (not None or 0) then we
|
||||
# should treat it a graceful shutdown.
|
||||
graceful = bool(CONF.graceful_shutdown_timeout)
|
||||
|
||||
LOG.info(
|
||||
'Stopping an engine server [graceful=%s, timeout=%s]',
|
||||
graceful,
|
||||
CONF.graceful_shutdown_timeout
|
||||
)
|
||||
|
||||
super(EngineServer, self).stop(graceful)
|
||||
|
||||
# The rpc server needs to be stopped first so that the engine
|
||||
# server stops receiving new RPC calls. Under load, this operation
|
||||
# may take much time in case of graceful shutdown because there
|
||||
# still may be RPC messages already polled from the queue and
|
||||
# waiting for processing. So an underlying RPC server has to wait
|
||||
# until they are processed.
|
||||
if self._rpc_server:
|
||||
self._rpc_server.stop(graceful)
|
||||
|
||||
action_heartbeat_checker.stop(graceful)
|
||||
|
||||
if cfg.CONF.executor.type == 'local':
|
||||
if CONF.executor.type == 'local':
|
||||
action_heartbeat_sender.stop(graceful)
|
||||
|
||||
if self._scheduler:
|
||||
@ -95,8 +120,8 @@ class EngineServer(service_base.MistralService):
|
||||
if self._expiration_policy_tg:
|
||||
self._expiration_policy_tg.stop(graceful)
|
||||
|
||||
if self._rpc_server:
|
||||
self._rpc_server.stop(graceful)
|
||||
def wait(self):
|
||||
LOG.info("Waiting for an engine server to exit...")
|
||||
|
||||
def start_workflow(self, rpc_ctx, wf_identifier, wf_namespace,
|
||||
wf_ex_id, wf_input, description, params):
|
||||
|
@ -44,6 +44,7 @@ class OsloRPCServer(rpc.RPCServer):
|
||||
# TODO(rakhmerov): rpc.get_transport() should be in oslo.messaging
|
||||
# related module.
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
|
||||
self.oslo_server = messaging.get_rpc_server(
|
||||
rpc.get_transport(),
|
||||
target,
|
||||
|
Loading…
x
Reference in New Issue
Block a user