Merge "Windows multiprocess wsgi"

This commit is contained in:
Zuul 2019-03-14 15:25:08 +00:00 committed by Gerrit Code Review
commit ac15dc608a
3 changed files with 293 additions and 126 deletions

View File

@ -62,6 +62,7 @@ from glance import notifier
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi") CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF) logging.register_options(CONF)
wsgi.register_cli_opts()
# NOTE(rosmaita): Any new exceptions added should preserve the current # NOTE(rosmaita): Any new exceptions added should preserve the current
# error codes for backward compatibility. The value 99 is returned # error codes for backward compatibility. The value 99 is returned

View File

@ -60,6 +60,7 @@ from glance import notifier
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi") CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF) logging.register_options(CONF)
wsgi.register_cli_opts()
def main(): def main():

View File

@ -21,10 +21,14 @@ Utility methods for working with WSGI servers
""" """
from __future__ import print_function from __future__ import print_function
import abc
import errno import errno
import functools import functools
import os import os
import re
import signal import signal
import struct
import subprocess
import sys import sys
import time import time
@ -33,6 +37,7 @@ from eventlet.green import ssl
import eventlet.greenio import eventlet.greenio
import eventlet.wsgi import eventlet.wsgi
import glance_store import glance_store
from os_win import utilsfactory as os_win_utilsfactory
from oslo_concurrency import processutils from oslo_concurrency import processutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -318,6 +323,12 @@ store_opts = [
'using comma.')), 'using comma.')),
] ]
cli_opts = [
cfg.StrOpt('pipe-handle',
help='This argument is used internally on Windows. Glance '
'passes a pipe handle to child processes, which is then '
'used for inter-process communication.'),
]
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -340,8 +351,17 @@ except ImportError:
uwsgi = None uwsgi = None
def register_cli_opts():
CONF.register_cli_opts(cli_opts)
def get_num_workers(): def get_num_workers():
"""Return the configured number of workers.""" """Return the configured number of workers."""
# Windows only: we're already running on the worker side.
if os.name == 'nt' and getattr(CONF, 'pipe_handle', None):
return 0
if CONF.workers is None: if CONF.workers is None:
# None implies the number of CPUs limited to 8 # None implies the number of CPUs limited to 8
# See Launchpad bug #1748916 and the config help text # See Launchpad bug #1748916 and the config help text
@ -475,7 +495,8 @@ def get_asynchronous_eventlet_pool(size=1000):
return pool return pool
class Server(object): @six.add_metaclass(abc.ABCMeta)
class BaseServer(object):
"""Server class to manage multiple WSGI sockets and applications. """Server class to manage multiple WSGI sockets and applications.
This class requires initialize_glance_store set to True if This class requires initialize_glance_store set to True if
@ -491,24 +512,6 @@ class Server(object):
# NOTE(abhishek): Allows us to only re-initialize glance_store when # NOTE(abhishek): Allows us to only re-initialize glance_store when
# the API's configuration reloads. # the API's configuration reloads.
self.initialize_glance_store = initialize_glance_store self.initialize_glance_store = initialize_glance_store
self.pgid = os.getpid()
try:
# NOTE(flaper87): Make sure this process
# runs in its own process group.
# NOTE(lpetrut): This isn't available on Windows, so we're going
# to use job objects instead.
os.setpgid(self.pgid, self.pgid)
except (OSError, AttributeError):
# NOTE(flaper87): When running glance-control,
# (glance's functional tests, for example)
# setpgid fails with EPERM as glance-control
# creates a fresh session, of which the newly
# launched service becomes the leader (session
# leaders may not change process groups)
#
# Running glance-(api|registry) is safe and
# shouldn't raise any error here.
self.pgid = 0
@staticmethod @staticmethod
def set_signal_handler(signal_name, handler): def set_signal_handler(signal_name, handler):
@ -524,13 +527,20 @@ class Server(object):
self.set_signal_handler("SIGHUP", signal.SIG_IGN) self.set_signal_handler("SIGHUP", signal.SIG_IGN)
raise exception.SIGHUPInterrupt raise exception.SIGHUPInterrupt
@abc.abstractmethod
def kill_children(self, *args): def kill_children(self, *args):
"""Kills the entire process group.""" pass
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
self.set_signal_handler("SIGINT", signal.SIG_IGN) @abc.abstractmethod
self.set_signal_handler("SIGCHLD", signal.SIG_IGN) def wait_on_children(self):
self.running = False pass
os.killpg(self.pgid, signal.SIGTERM)
@abc.abstractmethod
def run_child(self):
pass
def reload(self):
raise NotImplementedError()
def start(self, application, default_port): def start(self, application, default_port):
""" """
@ -562,50 +572,6 @@ class Server(object):
def create_pool(self): def create_pool(self):
return get_asynchronous_eventlet_pool(size=self.threads) return get_asynchronous_eventlet_pool(size=self.threads)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s') % pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination') % pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < get_num_workers():
self.run_child()
def wait_on_children(self):
while self.running:
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def configure(self, old_conf=None, has_changed=None): def configure(self, old_conf=None, has_changed=None):
""" """
Apply configuration settings Apply configuration settings
@ -622,35 +588,6 @@ class Server(object):
else: else:
initialize_glance_store() initialize_glance_store()
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = utils.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, CONF)
CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(CONF, 'glance')
config.set_config_defaults()
self.configure(old_conf, has_changed)
self.start_wsgi()
def wait(self): def wait(self):
"""Wait until all servers have completed running.""" """Wait until all servers have completed running."""
try: try:
@ -661,34 +598,6 @@ class Server(object):
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
self.set_signal_handler("SIGHUP", child_hup)
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
self.set_signal_handler("SIGINT", signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.add(pid)
def run_server(self): def run_server(self):
"""Run a WSGI server.""" """Run a WSGI server."""
if cfg.CONF.pydev_worker_debug_host: if cfg.CONF.pydev_worker_debug_host:
@ -796,6 +705,262 @@ class Server(object):
self.sock.listen(CONF.backlog) self.sock.listen(CONF.backlog)
class PosixServer(BaseServer):
def __init__(self, *args, **kwargs):
super(PosixServer, self).__init__(*args, **kwargs)
self.pgid = os.getpid()
try:
# NOTE(flaper87): Make sure this process
# runs in its own process group.
os.setpgid(self.pgid, self.pgid)
except OSError:
# NOTE(flaper87): When running glance-control,
# (glance's functional tests, for example)
# setpgid fails with EPERM as glance-control
# creates a fresh session, of which the newly
# launched service becomes the leader (session
# leaders may not change process groups)
#
# Running glance-(api|registry) is safe and
# shouldn't raise any error here.
self.pgid = 0
def kill_children(self, *args):
"""Kills the entire process group."""
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
self.set_signal_handler("SIGINT", signal.SIG_IGN)
self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
self.running = False
os.killpg(self.pgid, signal.SIGTERM)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s') % pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination') % pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < get_num_workers():
self.run_child()
def wait_on_children(self):
while self.running:
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
self.set_signal_handler("SIGHUP", child_hup)
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
self.set_signal_handler("SIGINT", signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.add(pid)
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = utils.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, CONF)
CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(CONF, 'glance')
config.set_config_defaults()
self.configure(old_conf, has_changed)
self.start_wsgi()
class Win32ProcessLauncher(object):
def __init__(self):
self._processutils = os_win_utilsfactory.get_processutils()
self._workers = []
self._worker_job_handles = []
def add_process(self, cmd):
LOG.info("Starting subprocess: %s", cmd)
worker = subprocess.Popen(cmd, close_fds=False)
try:
job_handle = self._processutils.kill_process_on_job_close(
worker.pid)
except Exception:
LOG.exception("Could not associate child process "
"with a job, killing it.")
worker.kill()
raise
self._worker_job_handles.append(job_handle)
self._workers.append(worker)
return worker
def wait(self):
pids = [worker.pid for worker in self._workers]
if pids:
self._processutils.wait_for_multiple_processes(pids,
wait_all=True)
# By sleeping here, we allow signal handlers to be executed.
time.sleep(0)
class Win32Server(BaseServer):
_py_script_re = re.compile(r'.*\.py\w?$')
_sock = None
def __init__(self, *args, **kwargs):
super(Win32Server, self).__init__(*args, **kwargs)
self._launcher = Win32ProcessLauncher()
self._ioutils = os_win_utilsfactory.get_ioutils()
def run_child(self):
# We're passing copies of the socket through pipes.
rfd, wfd = self._ioutils.create_pipe(inherit_handle=True)
cmd = sys.argv + ['--pipe-handle=%s' % int(rfd)]
# Recent setuptools versions will trim '-script.py' and '.exe'
# extensions from sys.argv[0].
if self._py_script_re.match(sys.argv[0]):
cmd = [sys.executable] + cmd
worker = self._launcher.add_process(cmd)
self._ioutils.close_handle(rfd)
share_sock_buff = self._sock.share(worker.pid)
self._ioutils.write_file(
wfd,
struct.pack('<I', len(share_sock_buff)),
4)
self._ioutils.write_file(
wfd, share_sock_buff, len(share_sock_buff))
self.children.add(worker.pid)
def kill_children(self, *args):
# We're using job objects, the children will exit along with the
# main process.
exit(0)
def wait_on_children(self):
self._launcher.wait()
def _get_sock_from_parent(self):
# This is supposed to be called exactly once in the child process.
# We're passing a copy of the socket through a pipe.
pipe_handle = int(getattr(CONF, 'pipe_handle', 0))
if not pipe_handle:
err_msg = _("Did not receive a pipe handle, which is used when "
"communicating with the parent process.")
raise exception.GlanceException(err_msg)
# Get the length of the data to be received.
buff = self._ioutils.get_buffer(4)
self._ioutils.read_file(pipe_handle, buff, 4)
socket_buff_sz = struct.unpack('<I', buff)[0]
# Get the serialized socket object.
socket_buff = self._ioutils.get_buffer(socket_buff_sz)
self._ioutils.read_file(pipe_handle, socket_buff, socket_buff_sz)
self._ioutils.close_handle(pipe_handle)
# Recreate the socket object. This will only work with
# Python 3.6 or later.
return socket.fromshare(bytes(socket_buff[:]))
def configure_socket(self, old_conf=None, has_changed=None):
fresh_start = not (old_conf or has_changed)
use_ssl = CONF.cert_file or CONF.key_file
pipe_handle = getattr(CONF, 'pipe_handle', None)
if not (fresh_start and pipe_handle):
return super(Win32Server, self).configure_socket(
old_conf, has_changed)
self._sock = self._get_sock_from_parent()
if use_ssl:
self.sock = ssl_wrap_socket(self._sock)
else:
self.sock = self._sock
if hasattr(socket, 'TCP_KEEPIDLE'):
# This was introduced in WS 2016 RS3
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
if os.name == 'nt':
Server = Win32Server
else:
Server = PosixServer
class Middleware(object): class Middleware(object):
""" """
Base WSGI middleware wrapper. These classes require an application to be Base WSGI middleware wrapper. These classes require an application to be