Windows multiprocess wsgi
This change allows glance-api to use multiple workers on Windows by not using fork, which is unavailable. Instead, we're duplicating sockets and passing the handles through pipes. Also, instead of using process groups, we have to go with Windows job objects. Note that this doesn't change the workflow for other platforms. A subsequent change will allow the tests to run on Windows. blueprint windows-support Change-Id: Ic786199844e1d804962172286905036d93a4ed14
This commit is contained in:
parent
5759ec0b1c
commit
f0dc2454da
@ -62,6 +62,7 @@ from glance import notifier
|
||||
CONF = cfg.CONF
|
||||
CONF.import_group("profiler", "glance.common.wsgi")
|
||||
logging.register_options(CONF)
|
||||
wsgi.register_cli_opts()
|
||||
|
||||
# NOTE(rosmaita): Any new exceptions added should preserve the current
|
||||
# error codes for backward compatibility. The value 99 is returned
|
||||
|
@ -60,6 +60,7 @@ from glance import notifier
|
||||
CONF = cfg.CONF
|
||||
CONF.import_group("profiler", "glance.common.wsgi")
|
||||
logging.register_options(CONF)
|
||||
wsgi.register_cli_opts()
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -21,10 +21,14 @@ Utility methods for working with WSGI servers
|
||||
"""
|
||||
from __future__ import print_function
|
||||
|
||||
import abc
|
||||
import errno
|
||||
import functools
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
@ -33,6 +37,7 @@ from eventlet.green import ssl
|
||||
import eventlet.greenio
|
||||
import eventlet.wsgi
|
||||
import glance_store
|
||||
from os_win import utilsfactory as os_win_utilsfactory
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
@ -318,6 +323,12 @@ store_opts = [
|
||||
'using comma.')),
|
||||
]
|
||||
|
||||
cli_opts = [
|
||||
cfg.StrOpt('pipe-handle',
|
||||
help='This argument is used internally on Windows. Glance '
|
||||
'passes a pipe handle to child processes, which is then '
|
||||
'used for inter-process communication.'),
|
||||
]
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -340,8 +351,17 @@ except ImportError:
|
||||
uwsgi = None
|
||||
|
||||
|
||||
def register_cli_opts():
|
||||
CONF.register_cli_opts(cli_opts)
|
||||
|
||||
|
||||
def get_num_workers():
|
||||
"""Return the configured number of workers."""
|
||||
|
||||
# Windows only: we're already running on the worker side.
|
||||
if os.name == 'nt' and getattr(CONF, 'pipe_handle', None):
|
||||
return 0
|
||||
|
||||
if CONF.workers is None:
|
||||
# None implies the number of CPUs limited to 8
|
||||
# See Launchpad bug #1748916 and the config help text
|
||||
@ -475,7 +495,8 @@ def get_asynchronous_eventlet_pool(size=1000):
|
||||
return pool
|
||||
|
||||
|
||||
class Server(object):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseServer(object):
|
||||
"""Server class to manage multiple WSGI sockets and applications.
|
||||
|
||||
This class requires initialize_glance_store set to True if
|
||||
@ -491,24 +512,6 @@ class Server(object):
|
||||
# NOTE(abhishek): Allows us to only re-initialize glance_store when
|
||||
# the API's configuration reloads.
|
||||
self.initialize_glance_store = initialize_glance_store
|
||||
self.pgid = os.getpid()
|
||||
try:
|
||||
# NOTE(flaper87): Make sure this process
|
||||
# runs in its own process group.
|
||||
# NOTE(lpetrut): This isn't available on Windows, so we're going
|
||||
# to use job objects instead.
|
||||
os.setpgid(self.pgid, self.pgid)
|
||||
except (OSError, AttributeError):
|
||||
# NOTE(flaper87): When running glance-control,
|
||||
# (glance's functional tests, for example)
|
||||
# setpgid fails with EPERM as glance-control
|
||||
# creates a fresh session, of which the newly
|
||||
# launched service becomes the leader (session
|
||||
# leaders may not change process groups)
|
||||
#
|
||||
# Running glance-(api|registry) is safe and
|
||||
# shouldn't raise any error here.
|
||||
self.pgid = 0
|
||||
|
||||
@staticmethod
|
||||
def set_signal_handler(signal_name, handler):
|
||||
@ -524,13 +527,20 @@ class Server(object):
|
||||
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
|
||||
raise exception.SIGHUPInterrupt
|
||||
|
||||
@abc.abstractmethod
|
||||
def kill_children(self, *args):
|
||||
"""Kills the entire process group."""
|
||||
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
|
||||
self.set_signal_handler("SIGINT", signal.SIG_IGN)
|
||||
self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
|
||||
self.running = False
|
||||
os.killpg(self.pgid, signal.SIGTERM)
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def wait_on_children(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def run_child(self):
|
||||
pass
|
||||
|
||||
def reload(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def start(self, application, default_port):
|
||||
"""
|
||||
@ -562,50 +572,6 @@ class Server(object):
|
||||
def create_pool(self):
|
||||
return get_asynchronous_eventlet_pool(size=self.threads)
|
||||
|
||||
def _remove_children(self, pid):
|
||||
if pid in self.children:
|
||||
self.children.remove(pid)
|
||||
LOG.info(_LI('Removed dead child %s'), pid)
|
||||
elif pid in self.stale_children:
|
||||
self.stale_children.remove(pid)
|
||||
LOG.info(_LI('Removed stale child %s'), pid)
|
||||
else:
|
||||
LOG.warn(_LW('Unrecognised child %s') % pid)
|
||||
|
||||
def _verify_and_respawn_children(self, pid, status):
|
||||
if len(self.stale_children) == 0:
|
||||
LOG.debug('No stale children')
|
||||
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
|
||||
LOG.error(_LE('Not respawning child %d, cannot '
|
||||
'recover from termination') % pid)
|
||||
if not self.children and not self.stale_children:
|
||||
LOG.info(
|
||||
_LI('All workers have terminated. Exiting'))
|
||||
self.running = False
|
||||
else:
|
||||
if len(self.children) < get_num_workers():
|
||||
self.run_child()
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
self._remove_children(pid)
|
||||
self._verify_and_respawn_children(pid, status)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
|
||||
break
|
||||
except exception.SIGHUPInterrupt:
|
||||
self.reload()
|
||||
continue
|
||||
eventlet.greenio.shutdown_safe(self.sock)
|
||||
self.sock.close()
|
||||
LOG.debug('Exited')
|
||||
|
||||
def configure(self, old_conf=None, has_changed=None):
|
||||
"""
|
||||
Apply configuration settings
|
||||
@ -622,35 +588,6 @@ class Server(object):
|
||||
else:
|
||||
initialize_glance_store()
|
||||
|
||||
def reload(self):
|
||||
"""
|
||||
Reload and re-apply configuration settings
|
||||
|
||||
Existing child processes are sent a SIGHUP signal
|
||||
and will exit after completing existing requests.
|
||||
New child processes, which will have the updated
|
||||
configuration, are spawned. This allows preventing
|
||||
interruption to the service.
|
||||
"""
|
||||
def _has_changed(old, new, param):
|
||||
old = old.get(param)
|
||||
new = getattr(new, param)
|
||||
return (new != old)
|
||||
|
||||
old_conf = utils.stash_conf_values()
|
||||
has_changed = functools.partial(_has_changed, old_conf, CONF)
|
||||
CONF.reload_config_files()
|
||||
os.killpg(self.pgid, signal.SIGHUP)
|
||||
self.stale_children = self.children
|
||||
self.children = set()
|
||||
|
||||
# Ensure any logging config changes are picked up
|
||||
logging.setup(CONF, 'glance')
|
||||
config.set_config_defaults()
|
||||
|
||||
self.configure(old_conf, has_changed)
|
||||
self.start_wsgi()
|
||||
|
||||
def wait(self):
|
||||
"""Wait until all servers have completed running."""
|
||||
try:
|
||||
@ -661,34 +598,6 @@ class Server(object):
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def run_child(self):
|
||||
def child_hup(*args):
|
||||
"""Shuts down child processes, existing requests are handled."""
|
||||
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
|
||||
eventlet.wsgi.is_accepting = False
|
||||
self.sock.close()
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
self.set_signal_handler("SIGHUP", child_hup)
|
||||
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
|
||||
# ignore the interrupt signal to avoid a race whereby
|
||||
# a child worker receives the signal before the parent
|
||||
# and is respawned unnecessarily as a result
|
||||
self.set_signal_handler("SIGINT", signal.SIG_IGN)
|
||||
# The child has no need to stash the unwrapped
|
||||
# socket, and the reference prevents a clean
|
||||
# exit on sighup
|
||||
self._sock = None
|
||||
self.run_server()
|
||||
LOG.info(_LI('Child %d exiting normally'), os.getpid())
|
||||
# self.pool.waitall() is now called in wsgi's server so
|
||||
# it's safe to exit here
|
||||
sys.exit(0)
|
||||
else:
|
||||
LOG.info(_LI('Started child %s'), pid)
|
||||
self.children.add(pid)
|
||||
|
||||
def run_server(self):
|
||||
"""Run a WSGI server."""
|
||||
if cfg.CONF.pydev_worker_debug_host:
|
||||
@ -796,6 +705,262 @@ class Server(object):
|
||||
self.sock.listen(CONF.backlog)
|
||||
|
||||
|
||||
class PosixServer(BaseServer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PosixServer, self).__init__(*args, **kwargs)
|
||||
|
||||
self.pgid = os.getpid()
|
||||
try:
|
||||
# NOTE(flaper87): Make sure this process
|
||||
# runs in its own process group.
|
||||
os.setpgid(self.pgid, self.pgid)
|
||||
except OSError:
|
||||
# NOTE(flaper87): When running glance-control,
|
||||
# (glance's functional tests, for example)
|
||||
# setpgid fails with EPERM as glance-control
|
||||
# creates a fresh session, of which the newly
|
||||
# launched service becomes the leader (session
|
||||
# leaders may not change process groups)
|
||||
#
|
||||
# Running glance-(api|registry) is safe and
|
||||
# shouldn't raise any error here.
|
||||
self.pgid = 0
|
||||
|
||||
def kill_children(self, *args):
|
||||
"""Kills the entire process group."""
|
||||
self.set_signal_handler("SIGTERM", signal.SIG_IGN)
|
||||
self.set_signal_handler("SIGINT", signal.SIG_IGN)
|
||||
self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
|
||||
self.running = False
|
||||
os.killpg(self.pgid, signal.SIGTERM)
|
||||
|
||||
def _remove_children(self, pid):
|
||||
if pid in self.children:
|
||||
self.children.remove(pid)
|
||||
LOG.info(_LI('Removed dead child %s'), pid)
|
||||
elif pid in self.stale_children:
|
||||
self.stale_children.remove(pid)
|
||||
LOG.info(_LI('Removed stale child %s'), pid)
|
||||
else:
|
||||
LOG.warn(_LW('Unrecognised child %s') % pid)
|
||||
|
||||
def _verify_and_respawn_children(self, pid, status):
|
||||
if len(self.stale_children) == 0:
|
||||
LOG.debug('No stale children')
|
||||
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
|
||||
LOG.error(_LE('Not respawning child %d, cannot '
|
||||
'recover from termination') % pid)
|
||||
if not self.children and not self.stale_children:
|
||||
LOG.info(
|
||||
_LI('All workers have terminated. Exiting'))
|
||||
self.running = False
|
||||
else:
|
||||
if len(self.children) < get_num_workers():
|
||||
self.run_child()
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
self._remove_children(pid)
|
||||
self._verify_and_respawn_children(pid, status)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
|
||||
break
|
||||
except exception.SIGHUPInterrupt:
|
||||
self.reload()
|
||||
continue
|
||||
eventlet.greenio.shutdown_safe(self.sock)
|
||||
self.sock.close()
|
||||
LOG.debug('Exited')
|
||||
|
||||
def run_child(self):
|
||||
def child_hup(*args):
|
||||
"""Shuts down child processes, existing requests are handled."""
|
||||
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
|
||||
eventlet.wsgi.is_accepting = False
|
||||
self.sock.close()
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
self.set_signal_handler("SIGHUP", child_hup)
|
||||
self.set_signal_handler("SIGTERM", signal.SIG_DFL)
|
||||
# ignore the interrupt signal to avoid a race whereby
|
||||
# a child worker receives the signal before the parent
|
||||
# and is respawned unnecessarily as a result
|
||||
self.set_signal_handler("SIGINT", signal.SIG_IGN)
|
||||
# The child has no need to stash the unwrapped
|
||||
# socket, and the reference prevents a clean
|
||||
# exit on sighup
|
||||
self._sock = None
|
||||
self.run_server()
|
||||
LOG.info(_LI('Child %d exiting normally'), os.getpid())
|
||||
# self.pool.waitall() is now called in wsgi's server so
|
||||
# it's safe to exit here
|
||||
sys.exit(0)
|
||||
else:
|
||||
LOG.info(_LI('Started child %s'), pid)
|
||||
self.children.add(pid)
|
||||
|
||||
def reload(self):
|
||||
"""
|
||||
Reload and re-apply configuration settings
|
||||
|
||||
Existing child processes are sent a SIGHUP signal
|
||||
and will exit after completing existing requests.
|
||||
New child processes, which will have the updated
|
||||
configuration, are spawned. This allows preventing
|
||||
interruption to the service.
|
||||
"""
|
||||
def _has_changed(old, new, param):
|
||||
old = old.get(param)
|
||||
new = getattr(new, param)
|
||||
return (new != old)
|
||||
|
||||
old_conf = utils.stash_conf_values()
|
||||
has_changed = functools.partial(_has_changed, old_conf, CONF)
|
||||
CONF.reload_config_files()
|
||||
os.killpg(self.pgid, signal.SIGHUP)
|
||||
self.stale_children = self.children
|
||||
self.children = set()
|
||||
|
||||
# Ensure any logging config changes are picked up
|
||||
logging.setup(CONF, 'glance')
|
||||
config.set_config_defaults()
|
||||
|
||||
self.configure(old_conf, has_changed)
|
||||
self.start_wsgi()
|
||||
|
||||
|
||||
class Win32ProcessLauncher(object):
|
||||
def __init__(self):
|
||||
self._processutils = os_win_utilsfactory.get_processutils()
|
||||
|
||||
self._workers = []
|
||||
self._worker_job_handles = []
|
||||
|
||||
def add_process(self, cmd):
|
||||
LOG.info("Starting subprocess: %s", cmd)
|
||||
|
||||
worker = subprocess.Popen(cmd, close_fds=False)
|
||||
try:
|
||||
job_handle = self._processutils.kill_process_on_job_close(
|
||||
worker.pid)
|
||||
except Exception:
|
||||
LOG.exception("Could not associate child process "
|
||||
"with a job, killing it.")
|
||||
worker.kill()
|
||||
raise
|
||||
|
||||
self._worker_job_handles.append(job_handle)
|
||||
self._workers.append(worker)
|
||||
|
||||
return worker
|
||||
|
||||
def wait(self):
|
||||
pids = [worker.pid for worker in self._workers]
|
||||
if pids:
|
||||
self._processutils.wait_for_multiple_processes(pids,
|
||||
wait_all=True)
|
||||
# By sleeping here, we allow signal handlers to be executed.
|
||||
time.sleep(0)
|
||||
|
||||
|
||||
class Win32Server(BaseServer):
|
||||
_py_script_re = re.compile(r'.*\.py\w?$')
|
||||
_sock = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Win32Server, self).__init__(*args, **kwargs)
|
||||
self._launcher = Win32ProcessLauncher()
|
||||
self._ioutils = os_win_utilsfactory.get_ioutils()
|
||||
|
||||
def run_child(self):
|
||||
# We're passing copies of the socket through pipes.
|
||||
rfd, wfd = self._ioutils.create_pipe(inherit_handle=True)
|
||||
|
||||
cmd = sys.argv + ['--pipe-handle=%s' % int(rfd)]
|
||||
# Recent setuptools versions will trim '-script.py' and '.exe'
|
||||
# extensions from sys.argv[0].
|
||||
if self._py_script_re.match(sys.argv[0]):
|
||||
cmd = [sys.executable] + cmd
|
||||
|
||||
worker = self._launcher.add_process(cmd)
|
||||
self._ioutils.close_handle(rfd)
|
||||
|
||||
share_sock_buff = self._sock.share(worker.pid)
|
||||
self._ioutils.write_file(
|
||||
wfd,
|
||||
struct.pack('<I', len(share_sock_buff)),
|
||||
4)
|
||||
self._ioutils.write_file(
|
||||
wfd, share_sock_buff, len(share_sock_buff))
|
||||
|
||||
self.children.add(worker.pid)
|
||||
|
||||
def kill_children(self, *args):
|
||||
# We're using job objects, the children will exit along with the
|
||||
# main process.
|
||||
exit(0)
|
||||
|
||||
def wait_on_children(self):
|
||||
self._launcher.wait()
|
||||
|
||||
def _get_sock_from_parent(self):
|
||||
# This is supposed to be called exactly once in the child process.
|
||||
# We're passing a copy of the socket through a pipe.
|
||||
pipe_handle = int(getattr(CONF, 'pipe_handle', 0))
|
||||
if not pipe_handle:
|
||||
err_msg = _("Did not receive a pipe handle, which is used when "
|
||||
"communicating with the parent process.")
|
||||
raise exception.GlanceException(err_msg)
|
||||
|
||||
# Get the length of the data to be received.
|
||||
buff = self._ioutils.get_buffer(4)
|
||||
self._ioutils.read_file(pipe_handle, buff, 4)
|
||||
socket_buff_sz = struct.unpack('<I', buff)[0]
|
||||
|
||||
# Get the serialized socket object.
|
||||
socket_buff = self._ioutils.get_buffer(socket_buff_sz)
|
||||
self._ioutils.read_file(pipe_handle, socket_buff, socket_buff_sz)
|
||||
self._ioutils.close_handle(pipe_handle)
|
||||
|
||||
# Recreate the socket object. This will only work with
|
||||
# Python 3.6 or later.
|
||||
return socket.fromshare(bytes(socket_buff[:]))
|
||||
|
||||
def configure_socket(self, old_conf=None, has_changed=None):
|
||||
fresh_start = not (old_conf or has_changed)
|
||||
use_ssl = CONF.cert_file or CONF.key_file
|
||||
pipe_handle = getattr(CONF, 'pipe_handle', None)
|
||||
|
||||
if not (fresh_start and pipe_handle):
|
||||
return super(Win32Server, self).configure_socket(
|
||||
old_conf, has_changed)
|
||||
|
||||
self._sock = self._get_sock_from_parent()
|
||||
|
||||
if use_ssl:
|
||||
self.sock = ssl_wrap_socket(self._sock)
|
||||
else:
|
||||
self.sock = self._sock
|
||||
|
||||
if hasattr(socket, 'TCP_KEEPIDLE'):
|
||||
# This was introduced in WS 2016 RS3
|
||||
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
|
||||
CONF.tcp_keepidle)
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
Server = Win32Server
|
||||
else:
|
||||
Server = PosixServer
|
||||
|
||||
|
||||
class Middleware(object):
|
||||
"""
|
||||
Base WSGI middleware wrapper. These classes require an application to be
|
||||
|
Loading…
x
Reference in New Issue
Block a user