Merge "Zero downtime config reload (socket handling)"

This commit is contained in:
Jenkins 2015-03-27 12:28:48 +00:00 committed by Gerrit Code Review
commit 26d8eeb3a9
4 changed files with 325 additions and 92 deletions

View File

@ -712,3 +712,20 @@ def no_4byte_params(f):
_check_dict(kwargs) _check_dict(kwargs)
return f(*args, **kwargs) return f(*args, **kwargs)
return wrapper return wrapper
def stash_conf_values():
"""
Make a copy of some of the current global CONF's settings.
Allows determining if any of these values have changed
when the config is reloaded.
"""
conf = {}
conf['bind_host'] = CONF.bind_host
conf['bind_port'] = CONF.bind_port
conf['tcp_keepidle'] = CONF.cert_file
conf['backlog'] = CONF.backlog
conf['key_file'] = CONF.key_file
conf['cert_file'] = CONF.cert_file
return conf

View File

@ -22,6 +22,7 @@ Utility methods for working with WSGI servers
from __future__ import print_function from __future__ import print_function
import errno import errno
import functools
import os import os
import signal import signal
import sys import sys
@ -124,6 +125,30 @@ def get_bind_addr(default_port=None):
return (CONF.bind_host, CONF.bind_port or default_port) return (CONF.bind_host, CONF.bind_port or default_port)
def ssl_wrap_socket(sock):
"""
Wrap an existing socket in SSL
:param sock: non-SSL socket to wrap
:returns: An SSL wrapped socket
"""
utils.validate_key_cert(CONF.key_file, CONF.cert_file)
ssl_kwargs = {
'server_side': True,
'certfile': CONF.cert_file,
'keyfile': CONF.key_file,
'cert_reqs': ssl.CERT_NONE,
}
if CONF.ca_file:
ssl_kwargs['ca_certs'] = CONF.ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs)
def get_socket(default_port): def get_socket(default_port):
""" """
Bind socket to bind ip:port in conf Bind socket to bind ip:port in conf
@ -148,43 +173,20 @@ def get_socket(default_port):
if addr[0] in (socket.AF_INET, socket.AF_INET6) if addr[0] in (socket.AF_INET, socket.AF_INET6)
][0] ][0]
cert_file = CONF.cert_file use_ssl = CONF.key_file or CONF.cert_file
key_file = CONF.key_file if use_ssl and (not CONF.key_file or not CONF.cert_file):
use_ssl = cert_file or key_file
if use_ssl and (not cert_file or not key_file):
raise RuntimeError(_("When running server in SSL mode, you must " raise RuntimeError(_("When running server in SSL mode, you must "
"specify both a cert_file and key_file " "specify both a cert_file and key_file "
"option value in your configuration file")) "option value in your configuration file"))
def wrap_ssl(sock):
utils.validate_key_cert(key_file, cert_file)
ssl_kwargs = {
'server_side': True,
'certfile': cert_file,
'keyfile': key_file,
'cert_reqs': ssl.CERT_NONE,
}
if CONF.ca_file:
ssl_kwargs['ca_certs'] = CONF.ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs)
sock = utils.get_test_suite_socket() sock = utils.get_test_suite_socket()
retry_until = time.time() + 30 retry_until = time.time() + 30
if sock and use_ssl:
sock = wrap_ssl(sock)
while not sock and time.time() < retry_until: while not sock and time.time() < retry_until:
try: try:
sock = eventlet.listen(bind_addr, sock = eventlet.listen(bind_addr,
backlog=CONF.backlog, backlog=CONF.backlog,
family=address_family) family=address_family)
if use_ssl:
sock = wrap_ssl(sock)
except socket.error as err: except socket.error as err:
if err.args[0] != errno.EADDRINUSE: if err.args[0] != errno.EADDRINUSE:
raise raise
@ -194,14 +196,6 @@ def get_socket(default_port):
" trying for 30 seconds") % " trying for 30 seconds") %
{'host': bind_addr[0], {'host': bind_addr[0],
'port': bind_addr[1]}) 'port': bind_addr[1]})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# in my experience, sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
return sock return sock
@ -250,9 +244,10 @@ class Server(object):
This class requires initialize_glance_store set to True if This class requires initialize_glance_store set to True if
glance store needs to be initialized. glance store needs to be initialized.
""" """
def __init__(self, threads=1000, initialize_glance_store=False): def __init__(self, threads=1000, initialize_glance_store=False):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line os.umask(0o27) # ensure files are created with the correct privileges
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = loggers.WritableLogger(self._logger)
self.threads = threads self.threads = threads
self.children = set() self.children = set()
self.stale_children = set() self.stale_children = set()
@ -260,22 +255,6 @@ class Server(object):
# NOTE(abhishek): Allows us to only re-initialize glance_store when # NOTE(abhishek): Allows us to only re-initialize glance_store when
# the API's configuration reloads. # the API's configuration reloads.
self.initialize_glance_store = initialize_glance_store self.initialize_glance_store = initialize_glance_store
def hup(self, *args):
"""Reloads configuration files with zero down time."""
signal.signal(signal.SIGHUP, signal.SIG_IGN)
raise exception.SIGHUPInterrupt
def start(self, application, default_port):
"""
Run a WSGI server with the given application.
:param application: The application to be run in the WSGI server
:param default_port: Port to bind to if none is specified in conf
"""
if self.initialize_glance_store:
initialize_glance_store()
self.pgid = os.getpid() self.pgid = os.getpid()
try: try:
# NOTE(flaper87): Make sure this process # NOTE(flaper87): Make sure this process
@ -293,20 +272,33 @@ class Server(object):
# shouldn't raise any error here. # shouldn't raise any error here.
self.pgid = 0 self.pgid = 0
def kill_children(*args): def hup(self, *args):
"""Kills the entire process group.""" """
signal.signal(signal.SIGTERM, signal.SIG_IGN) Reloads configuration files with zero down time
signal.signal(signal.SIGINT, signal.SIG_IGN) """
self.running = False signal.signal(signal.SIGHUP, signal.SIG_IGN)
os.killpg(self.pgid, signal.SIGTERM) raise exception.SIGHUPInterrupt
def kill_children(self, *args):
"""Kills the entire process group."""
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.running = False
os.killpg(self.pgid, signal.SIGTERM)
def start(self, application, default_port):
"""
Run a WSGI server with the given application.
:param application: The application to be run in the WSGI server
:param default_port: Port to bind to if none is specified in conf
"""
self.application = application self.application = application
self.sock = get_socket(default_port) self.default_port = default_port
self.configure()
os.umask(0o27) # ensure files are created with the correct privileges self.start_wsgi()
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = loggers.WritableLogger(self._logger)
def start_wsgi(self):
if CONF.workers == 0: if CONF.workers == 0:
# Useful for profiling, test, debug etc. # Useful for profiling, test, debug etc.
self.pool = self.create_pool() self.pool = self.create_pool()
@ -314,8 +306,8 @@ class Server(object):
return return
else: else:
LOG.info(_LI("Starting %d workers") % CONF.workers) LOG.info(_LI("Starting %d workers") % CONF.workers)
signal.signal(signal.SIGTERM, kill_children) signal.signal(signal.SIGTERM, self.kill_children)
signal.signal(signal.SIGINT, kill_children) signal.signal(signal.SIGINT, self.kill_children)
signal.signal(signal.SIGHUP, self.hup) signal.signal(signal.SIGHUP, self.hup)
while len(self.children) < CONF.workers: while len(self.children) < CONF.workers:
self.run_child() self.run_child()
@ -367,17 +359,41 @@ class Server(object):
self.sock.close() self.sock.close()
LOG.debug('Exited') LOG.debug('Exited')
def reload(self): def configure(self, old_conf=None, has_changed=None):
cfg.CONF.reload_config_files() """
Apply configuration settings
:param old_conf: Cached old configuration settings (if any)
:param has changed: callable to determine if a parameter has changed
"""
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.configure_socket(old_conf, has_changed)
if self.initialize_glance_store: if self.initialize_glance_store:
initialize_glance_store() initialize_glance_store()
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = utils.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, CONF)
CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP) os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children self.stale_children = self.children
self.children = set() self.children = set()
while len(self.children) < CONF.workers: self.configure(old_conf, has_changed)
self.run_child() self.start_wsgi()
signal.signal(signal.SIGHUP, self.hup)
def wait(self): def wait(self):
"""Wait until all servers have completed running.""" """Wait until all servers have completed running."""
@ -404,10 +420,14 @@ class Server(object):
# a child worker receives the signal before the parent # a child worker receives the signal before the parent
# and is respawned unnecessarily as a result # and is respawned unnecessarily as a result
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server() self.run_server()
LOG.info(_LI('Child %d exiting normally') % os.getpid()) LOG.info(_LI('Child %d exiting normally') % os.getpid())
# self.pool.waitall() has been called by run_server, so # self.pool.waitall() is now called in wsgi's server so
# its safe to exit here # it's safe to exit here
sys.exit(0) sys.exit(0)
else: else:
LOG.info(_LI('Started child %s') % pid) LOG.info(_LI('Started child %s') % pid)
@ -445,6 +465,78 @@ class Server(object):
debug=False, debug=False,
keepalive=CONF.http_keepalive) keepalive=CONF.http_keepalive)
def configure_socket(self, old_conf=None, has_changed=None):
"""
Ensure a socket exists and is appropriately configured.
This function is called on start up, and can also be
called in the event of a configuration reload.
When called for the first time a new socket is created.
If reloading and either bind_host or bind port have been
changed the existing socket must be closed and a new
socket opened (laws of physics).
In all other cases (bind_host/bind_port have not changed)
the existing socket is reused.
:param old_conf: Cached old configuration settings (if any)
:param has changed: callable to determine if a parameter has changed
"""
# Do we need a fresh socket?
new_sock = (old_conf is None or (
has_changed('bind_host') or
has_changed('bind_port')))
# Will we be using https?
use_ssl = not (not CONF.cert_file or not CONF.key_file)
# Were we using https before?
old_use_ssl = (old_conf is not None and not (
not old_conf.get('key_file') or
not old_conf.get('cert_file')))
# Do we now need to perform an SSL wrap on the socket?
wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock)
# Do we now need to perform an SSL unwrap on the socket?
unwrap_sock = use_ssl is False and old_use_ssl is True
if new_sock:
self._sock = None
if old_conf is not None:
self.sock.close()
_sock = get_socket(self.default_port)
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
self._sock = _sock
if wrap_sock:
self.sock = ssl_wrap_socket(self._sock)
if unwrap_sock:
self.sock = self._sock
if new_sock and not use_ssl:
self.sock = self._sock
# Pick up newly deployed certs
if old_conf is not None and use_ssl is True and old_use_ssl is True:
if has_changed('cert_file') or has_changed('key_file'):
utils.validate_key_cert(CONF.key_file, CONF.cert_file)
if has_changed('cert_file'):
self.sock.certfile = CONF.cert_file
if has_changed('key_file'):
self.sock.keyfile = CONF.key_file
if new_sock or (old_conf is not None and has_changed('tcp_keepidle')):
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
if old_conf is not None and has_changed('backlog'):
self.sock.listen(CONF.backlog)
class Middleware(object): class Middleware(object):
""" """

View File

@ -18,10 +18,14 @@ import re
import time import time
import psutil import psutil
import requests
from glance.tests import functional from glance.tests import functional
from glance.tests.utils import execute from glance.tests.utils import execute
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
'../', 'var'))
def set_config_value(filepath, key, value): def set_config_value(filepath, key, value):
"""Set 'key = value' in config file""" """Set 'key = value' in config file"""
@ -46,6 +50,27 @@ class TestReload(functional.FunctionalTest):
self.stop_servers() self.stop_servers()
super(TestReload, self).tearDown() super(TestReload, self).tearDown()
def ticker(self, message, seconds=60, tick=0.01):
"""
Allows repeatedly testing for an expected result
for a finite amount of time.
:param message: Message to display on timeout
:param seconds: Time in seconds after which we timeout
:param tick: Time to sleep before rechecking for expected result
:returns: 'True' or fails the test with 'message' on timeout
"""
# We default to allowing 60 seconds timeout but
# typically only a few hundredths of a second
# are needed.
num_ticks = seconds * (1.0 / tick)
count = 0
while count < num_ticks:
count += 1
time.sleep(tick)
yield
self.fail(message)
def _get_children(self, server): def _get_children(self, server):
pid = None pid = None
pid = self._get_parent(server) pid = self._get_parent(server)
@ -67,11 +92,11 @@ class TestReload(functional.FunctionalTest):
conf_filepath = os.path.join(conf_dir, '%s.conf' % service) conf_filepath = os.path.join(conf_dir, '%s.conf' % service)
return conf_filepath return conf_filepath
def test_reload_workers(self): def _url(self, protocol, path):
"""Test SIGHUP picks up new workers value. return '%s://127.0.0.1:%d%s' % (protocol, self.api_port, path)
This test requires around 2 minutes time for execution. def test_reload(self):
""" """Test SIGHUP picks up new config values"""
def check_pids(pre, post=None, workers=2): def check_pids(pre, post=None, workers=2):
if post is None: if post is None:
if len(pre) == workers: if len(pre) == workers:
@ -90,28 +115,126 @@ class TestReload(functional.FunctionalTest):
pre_pids = {} pre_pids = {}
post_pids = {} post_pids = {}
for _ in range(6000): # Test changing the workers value creates all new children
# This recycles the existing socket
msg = 'Start timeout'
for _ in self.ticker(msg):
for server in ('api', 'registry'): for server in ('api', 'registry'):
pre_pids[server] = self._get_children(server) pre_pids[server] = self._get_children(server)
if check_pids(pre_pids['api'], workers=1): if check_pids(pre_pids['api'], workers=1):
if check_pids(pre_pids['registry'], workers=1): if check_pids(pre_pids['registry'], workers=1):
break break
time.sleep(0.01)
for server in ('api', 'registry'): for server in ('api', 'registry'):
self.assertTrue(check_pids(pre_pids[server], workers=1))
# Labour costs have fallen # Labour costs have fallen
set_config_value(self._conffile(server), 'workers', '2') set_config_value(self._conffile(server), 'workers', '2')
cmd = "kill -HUP %s" % self._get_parent(server) cmd = "kill -HUP %s" % self._get_parent(server)
execute(cmd, raise_error=True) execute(cmd, raise_error=True)
for _ in range(6000): msg = 'Worker change timeout'
for _ in self.ticker(msg):
for server in ('api', 'registry'): for server in ('api', 'registry'):
post_pids[server] = self._get_children(server) post_pids[server] = self._get_children(server)
if check_pids(pre_pids['registry'], post_pids['registry']): if check_pids(pre_pids['registry'], post_pids['registry']):
if check_pids(pre_pids['api'], post_pids['api']): if check_pids(pre_pids['api'], post_pids['api']):
break break
time.sleep(0.01)
for server in ('api', 'registry'): # Test changing from http to https
self.assertTrue(check_pids(pre_pids[server], post_pids[server])) # This recycles the existing socket
path = self._url('http', '/')
response = requests.get(path)
self.assertEqual(300, response.status_code)
del response # close socket so that process audit is reliable
pre_pids['api'] = self._get_children('api')
key_file = os.path.join(TEST_VAR_DIR, 'privatekey.key')
set_config_value(self._conffile('api'), 'key_file', key_file)
cert_file = os.path.join(TEST_VAR_DIR, 'certificate.crt')
set_config_value(self._conffile('api'), 'cert_file', cert_file)
cmd = "kill -HUP %s" % self._get_parent('api')
execute(cmd, raise_error=True)
msg = 'http to https timeout'
for _ in self.ticker(msg):
post_pids['api'] = self._get_children('api')
if check_pids(pre_pids['api'], post_pids['api']):
break
ca_file = os.path.join(TEST_VAR_DIR, 'ca.crt')
path = self._url('https', '/')
response = requests.get(path, verify=ca_file)
self.assertEqual(300, response.status_code)
del response
# Test https restart
# This recycles the existing socket
pre_pids['api'] = self._get_children('api')
cmd = "kill -HUP %s" % self._get_parent('api')
execute(cmd, raise_error=True)
msg = 'https restart timeout'
for _ in self.ticker(msg):
post_pids['api'] = self._get_children('api')
if check_pids(pre_pids['api'], post_pids['api']):
break
ca_file = os.path.join(TEST_VAR_DIR, 'ca.crt')
path = self._url('https', '/')
response = requests.get(path, verify=ca_file)
self.assertEqual(300, response.status_code)
del response
# Test changing the https bind_host
# This requires a new socket
pre_pids['api'] = self._get_children('api')
set_config_value(self._conffile('api'), 'bind_host', '127.0.0.1')
cmd = "kill -HUP %s" % self._get_parent('api')
execute(cmd, raise_error=True)
msg = 'https bind_host timeout'
for _ in self.ticker(msg):
post_pids['api'] = self._get_children('api')
if check_pids(pre_pids['api'], post_pids['api']):
break
path = self._url('https', '/')
response = requests.get(path, verify=ca_file)
self.assertEqual(300, response.status_code)
del response
# Test https -> http
# This recycles the existing socket
pre_pids['api'] = self._get_children('api')
set_config_value(self._conffile('api'), 'key_file', '')
set_config_value(self._conffile('api'), 'cert_file', '')
cmd = "kill -HUP %s" % self._get_parent('api')
execute(cmd, raise_error=True)
msg = 'https to http timeout'
for _ in self.ticker(msg):
post_pids['api'] = self._get_children('api')
if check_pids(pre_pids['api'], post_pids['api']):
break
path = self._url('http', '/')
response = requests.get(path)
self.assertEqual(300, response.status_code)
del response
# Test changing the http bind_host
# This requires a new socket
pre_pids['api'] = self._get_children('api')
set_config_value(self._conffile('api'), 'bind_host', '127.0.0.1')
cmd = "kill -HUP %s" % self._get_parent('api')
execute(cmd, raise_error=True)
msg = 'http bind_host timeout'
for _ in self.ticker(msg):
post_pids['api'] = self._get_children('api')
if check_pids(pre_pids['api'], post_pids['api']):
break
path = self._url('http', '/')
response = requests.get(path)
self.assertEqual(300, response.status_code)
del response

View File

@ -489,14 +489,13 @@ class ServerTest(test_utils.BaseTestCase):
actual = wsgi.Server(threads=1).create_pool() actual = wsgi.Server(threads=1).create_pool()
self.assertIsInstance(actual, eventlet.greenpool.GreenPool) self.assertIsInstance(actual, eventlet.greenpool.GreenPool)
@mock.patch.object(wsgi, 'get_socket') @mock.patch.object(wsgi.Server, 'configure_socket')
def test_http_keepalive(self, mock_get_socket): def test_http_keepalive(self, mock_configure_socket):
fake_socket = 'fake_socket'
mock_get_socket.return_value = 'fake_socket'
self.config(http_keepalive=False) self.config(http_keepalive=False)
self.config(workers=0) self.config(workers=0)
server = wsgi.Server(threads=1) server = wsgi.Server(threads=1)
server.sock = 'fake_socket'
# mocking eventlet.wsgi server method to check it is called with # mocking eventlet.wsgi server method to check it is called with
# configured 'http_keepalive' value. # configured 'http_keepalive' value.
with mock.patch.object(eventlet.wsgi, with mock.patch.object(eventlet.wsgi,
@ -504,7 +503,7 @@ class ServerTest(test_utils.BaseTestCase):
fake_application = "fake-application" fake_application = "fake-application"
server.start(fake_application, 0) server.start(fake_application, 0)
server.wait() server.wait()
mock_server.assert_called_once_with(fake_socket, mock_server.assert_called_once_with('fake_socket',
fake_application, fake_application,
log=server._wsgi_logger, log=server._wsgi_logger,
debug=False, debug=False,
@ -582,20 +581,22 @@ class GetSocketTestCase(test_utils.BaseTestCase):
wsgi.CONF.ca_file = '/etc/ssl/ca_cert' wsgi.CONF.ca_file = '/etc/ssl/ca_cert'
wsgi.CONF.tcp_keepidle = 600 wsgi.CONF.tcp_keepidle = 600
def test_correct_get_socket(self): def test_correct_configure_socket(self):
mock_socket = mock.Mock() mock_socket = mock.Mock()
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
'glance.common.wsgi.ssl.wrap_socket', 'glance.common.wsgi.ssl.wrap_socket',
mock_socket)) mock_socket))
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
'glance.common.wsgi.eventlet.listen', 'glance.common.wsgi.eventlet.listen',
lambda *x, **y: None)) lambda *x, **y: mock_socket))
wsgi.get_socket(1234) server = wsgi.Server()
self.assertIn(mock.call().setsockopt( server.default_port = 1234
server.configure_socket()
self.assertIn(mock.call.setsockopt(
socket.SOL_SOCKET, socket.SOL_SOCKET,
socket.SO_REUSEADDR, socket.SO_REUSEADDR,
1), mock_socket.mock_calls) 1), mock_socket.mock_calls)
self.assertIn(mock.call().setsockopt( self.assertIn(mock.call.setsockopt(
socket.SOL_SOCKET, socket.SOL_SOCKET,
socket.SO_KEEPALIVE, socket.SO_KEEPALIVE,
1), mock_socket.mock_calls) 1), mock_socket.mock_calls)