Drop eventlet wsgi implementations
Change-Id: Ied7597a2beaa6df429088330666b77a418556c2a Signed-off-by: rabi <ramishra@redhat.com>
This commit is contained in:
@@ -1,96 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Heat All Server.
|
||||
|
||||
An OpenStack Heat server that can run all services.
|
||||
"""
|
||||
|
||||
# flake8: noqa: E402
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import sys
|
||||
import warnings
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_i18n as i18n
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import systemd
|
||||
|
||||
from heat.cmd import api
|
||||
from heat.cmd import api_cfn
|
||||
from heat.cmd import engine
|
||||
from heat.common import config
|
||||
from heat.common import messaging
|
||||
from heat import version
|
||||
|
||||
|
||||
i18n.enable_lazy()
|
||||
|
||||
API_LAUNCH_OPTS = {'setup_logging': False}
|
||||
|
||||
LAUNCH_SERVICES = {
|
||||
'engine': [engine.launch_engine, {'setup_logging': False}],
|
||||
'api': [api.launch_api, API_LAUNCH_OPTS],
|
||||
'api_cfn': [api_cfn.launch_cfn_api, API_LAUNCH_OPTS],
|
||||
}
|
||||
|
||||
services_opt = cfg.ListOpt(
|
||||
'enabled_services',
|
||||
default=['engine', 'api', 'api_cfn'],
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='The heat-all console script has been deprecated.',
|
||||
deprecated_since='24.0.0',
|
||||
help='Specifies the heat services that are enabled when running heat-all. '
|
||||
'Valid options are all or any combination of '
|
||||
'api, engine or api_cfn.'
|
||||
)
|
||||
|
||||
cfg.CONF.register_opt(services_opt, group='heat_all')
|
||||
|
||||
|
||||
def _start_service_threads(services):
|
||||
threads = []
|
||||
for option in services:
|
||||
launch_func = LAUNCH_SERVICES[option][0]
|
||||
kwargs = LAUNCH_SERVICES[option][1]
|
||||
threads.append(eventlet.spawn(launch_func, **kwargs))
|
||||
return threads
|
||||
|
||||
|
||||
def launch_all(setup_logging=True):
|
||||
if setup_logging:
|
||||
logging.register_options(cfg.CONF)
|
||||
cfg.CONF(project='heat', prog='heat-all',
|
||||
version=version.version_info.version_string())
|
||||
if setup_logging:
|
||||
logging.setup(cfg.CONF, 'heat-all')
|
||||
config.set_config_defaults()
|
||||
messaging.setup()
|
||||
return _start_service_threads(set(cfg.CONF.heat_all.enabled_services))
|
||||
|
||||
|
||||
def main():
|
||||
warnings.warn("The heat-all script has been deprecated and will be "
|
||||
"removed in the future.",
|
||||
DeprecationWarning)
|
||||
try:
|
||||
threads = launch_all()
|
||||
services = [thread.wait() for thread in threads]
|
||||
systemd.notify_once()
|
||||
[service.wait() for service in services]
|
||||
except RuntimeError as e:
|
||||
msg = str(e)
|
||||
sys.exit("ERROR: %s" % msg)
|
@@ -1,81 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Heat API Server.
|
||||
|
||||
An OpenStack ReST API to Heat.
|
||||
"""
|
||||
|
||||
# flake8: noqa: E402
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import sys
|
||||
import warnings
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_i18n as i18n
|
||||
from oslo_log import log as logging
|
||||
from oslo_reports import guru_meditation_report as gmr
|
||||
from oslo_reports import opts as gmr_opts
|
||||
from oslo_service import systemd
|
||||
|
||||
from heat.common import config
|
||||
from heat.common import messaging
|
||||
from heat.common import profiler
|
||||
from heat.common import wsgi
|
||||
from heat import version
|
||||
|
||||
|
||||
i18n.enable_lazy()
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def launch_api(setup_logging=True):
|
||||
if setup_logging:
|
||||
logging.register_options(CONF)
|
||||
CONF(project='heat', prog='heat-api',
|
||||
version=version.version_info.version_string())
|
||||
if setup_logging:
|
||||
logging.setup(CONF, CONF.prog)
|
||||
LOG = logging.getLogger(CONF.prog)
|
||||
config.set_config_defaults()
|
||||
messaging.setup()
|
||||
|
||||
app = config.load_paste_app()
|
||||
|
||||
port = CONF.heat_api.bind_port
|
||||
host = CONF.heat_api.bind_host
|
||||
LOG.info('Starting Heat REST API on %(host)s:%(port)s',
|
||||
{'host': host, 'port': port})
|
||||
profiler.setup(CONF.prog, host)
|
||||
gmr_opts.set_defaults(CONF)
|
||||
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
|
||||
server = wsgi.Server(CONF.prog, CONF.heat_api)
|
||||
server.start(app, default_port=port)
|
||||
return server
|
||||
|
||||
|
||||
def main():
|
||||
warnings.warn("The heat-api script has been deprecated and will be "
|
||||
"removed in the future.",
|
||||
DeprecationWarning)
|
||||
try:
|
||||
server = launch_api()
|
||||
systemd.notify_once()
|
||||
server.wait()
|
||||
except RuntimeError as e:
|
||||
msg = str(e)
|
||||
sys.exit("ERROR: %s" % msg)
|
@@ -1,85 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Heat API Server.
|
||||
|
||||
This implements an approximation of the Amazon CloudFormation API and
|
||||
translates it into a native representation. It then calls the heat-engine via
|
||||
AMQP RPC to implement them.
|
||||
"""
|
||||
|
||||
# flake8: noqa: E402
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import sys
|
||||
import warnings
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_i18n as i18n
|
||||
from oslo_log import log as logging
|
||||
from oslo_reports import guru_meditation_report as gmr
|
||||
from oslo_reports import opts as gmr_opts
|
||||
from oslo_service import systemd
|
||||
|
||||
from heat.common import config
|
||||
from heat.common import messaging
|
||||
from heat.common import profiler
|
||||
from heat.common import wsgi
|
||||
from heat import version
|
||||
|
||||
|
||||
i18n.enable_lazy()
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def launch_cfn_api(setup_logging=True):
|
||||
if setup_logging:
|
||||
logging.register_options(CONF)
|
||||
CONF(project='heat',
|
||||
prog='heat-api-cfn',
|
||||
version=version.version_info.version_string())
|
||||
if setup_logging:
|
||||
logging.setup(CONF, CONF.prog)
|
||||
logging.set_defaults()
|
||||
LOG = logging.getLogger(CONF.prog)
|
||||
config.set_config_defaults()
|
||||
messaging.setup()
|
||||
|
||||
app = config.load_paste_app()
|
||||
|
||||
port = CONF.heat_api_cfn.bind_port
|
||||
host = CONF.heat_api_cfn.bind_host
|
||||
LOG.info('Starting Heat API on %(host)s:%(port)s',
|
||||
{'host': host, 'port': port})
|
||||
profiler.setup(CONF.prog, host)
|
||||
gmr_opts.set_defaults(CONF)
|
||||
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
|
||||
server = wsgi.Server(CONF.prog, CONF.heat_api_cfn)
|
||||
server.start(app, default_port=port)
|
||||
return server
|
||||
|
||||
|
||||
def main():
|
||||
warnings.warn("The heat-api script has been deprecated and will be "
|
||||
"removed in the future.",
|
||||
DeprecationWarning)
|
||||
try:
|
||||
server = launch_cfn_api()
|
||||
systemd.notify_once()
|
||||
server.wait()
|
||||
except RuntimeError as e:
|
||||
msg = str(e)
|
||||
sys.exit("ERROR: %s" % msg)
|
@@ -13,8 +13,8 @@
|
||||
|
||||
"""Routines for configuring Heat."""
|
||||
import os
|
||||
import socket
|
||||
|
||||
from eventlet.green import socket
|
||||
from oslo_config import cfg
|
||||
from oslo_db import options as oslo_db_ops
|
||||
from oslo_log import log as logging
|
||||
|
@@ -19,18 +19,8 @@
|
||||
"""Utility methods for working with WSGI servers."""
|
||||
|
||||
import abc
|
||||
import errno
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
from eventlet.green import socket
|
||||
from eventlet.green import ssl
|
||||
import eventlet.greenio
|
||||
import eventlet.wsgi
|
||||
import functools
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
import oslo_i18n as i18n
|
||||
from oslo_log import log as logging
|
||||
@@ -58,32 +48,6 @@ api_opts = [
|
||||
cfg.PortOpt('bind_port', default=8004,
|
||||
help=_('The port on which the server will listen.'),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.IntOpt('backlog', default=4096,
|
||||
help=_("Number of backlog requests "
|
||||
"to configure the socket with."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.StrOpt('cert_file',
|
||||
help=_("Location of the SSL certificate file "
|
||||
"to use for SSL mode."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.StrOpt('key_file',
|
||||
help=_("Location of the SSL key file to use "
|
||||
"for enabling SSL mode."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.IntOpt('workers', min=0, default=0,
|
||||
help=_("Number of workers for Heat service. "
|
||||
"Default value 0 means, that service will start number "
|
||||
"of workers equal number of cores on server."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.IntOpt('max_header_line', default=16384,
|
||||
help=_('Maximum line size of message headers to be accepted. '
|
||||
'max_header_line may need to be increased when using '
|
||||
'large tokens (typically those generated by the '
|
||||
'Keystone v3 API with big service catalogs).')),
|
||||
cfg.IntOpt('tcp_keepidle', default=600,
|
||||
help=_('The value for the socket option TCP_KEEPIDLE. This is '
|
||||
'the time in seconds that the connection must be idle '
|
||||
'before TCP starts sending keepalive probes.')),
|
||||
]
|
||||
api_group = cfg.OptGroup('heat_api')
|
||||
cfg.CONF.register_group(api_group)
|
||||
@@ -98,51 +62,12 @@ api_cfn_opts = [
|
||||
cfg.PortOpt('bind_port', default=8000,
|
||||
help=_('The port on which the server will listen.'),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.IntOpt('backlog', default=4096,
|
||||
help=_("Number of backlog requests "
|
||||
"to configure the socket with."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.StrOpt('cert_file',
|
||||
help=_("Location of the SSL certificate file "
|
||||
"to use for SSL mode."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.StrOpt('key_file',
|
||||
help=_("Location of the SSL key file to use "
|
||||
"for enabling SSL mode."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.IntOpt('workers', min=0, default=1,
|
||||
help=_("Number of workers for Heat service."),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.IntOpt('max_header_line', default=16384,
|
||||
help=_('Maximum line size of message headers to be accepted. '
|
||||
'max_header_line may need to be increased when using '
|
||||
'large tokens (typically those generated by the '
|
||||
'Keystone v3 API with big service catalogs).')),
|
||||
cfg.IntOpt('tcp_keepidle', default=600,
|
||||
help=_('The value for the socket option TCP_KEEPIDLE. This is '
|
||||
'the time in seconds that the connection must be idle '
|
||||
'before TCP starts sending keepalive probes.')),
|
||||
]
|
||||
api_cfn_group = cfg.OptGroup('heat_api_cfn')
|
||||
cfg.CONF.register_group(api_cfn_group)
|
||||
cfg.CONF.register_opts(api_cfn_opts,
|
||||
group=api_cfn_group)
|
||||
|
||||
wsgi_elt_opts = [
|
||||
cfg.BoolOpt('wsgi_keep_alive',
|
||||
default=True,
|
||||
help=_("If False, closes the client socket connection "
|
||||
"explicitly.")),
|
||||
cfg.IntOpt('client_socket_timeout', default=900,
|
||||
help=_("Timeout for client connections' socket operations. "
|
||||
"If an incoming connection is idle for this number of "
|
||||
"seconds it will be closed. A value of '0' means "
|
||||
"wait forever.")),
|
||||
]
|
||||
wsgi_elt_group = cfg.OptGroup('eventlet_opts')
|
||||
cfg.CONF.register_group(wsgi_elt_group)
|
||||
cfg.CONF.register_opts(wsgi_elt_opts,
|
||||
group=wsgi_elt_group)
|
||||
json_size_opt = cfg.IntOpt('max_json_body_size',
|
||||
default=1048576,
|
||||
help=_('Maximum raw byte size of JSON request body.'
|
||||
@@ -154,7 +79,6 @@ def list_opts():
|
||||
yield None, [json_size_opt]
|
||||
yield 'heat_api', api_opts
|
||||
yield 'heat_api_cfn', api_cfn_opts
|
||||
yield 'eventlet_opts', wsgi_elt_opts
|
||||
|
||||
|
||||
def get_bind_addr(conf, default_port=None):
|
||||
@@ -162,377 +86,6 @@ def get_bind_addr(conf, default_port=None):
|
||||
return (conf.bind_host, conf.bind_port or default_port)
|
||||
|
||||
|
||||
def get_socket(conf, default_port):
|
||||
"""Bind socket to bind ip:port in conf.
|
||||
|
||||
Note: Mostly comes from Swift with a few small changes...
|
||||
|
||||
:param conf: a cfg.ConfigOpts object
|
||||
:param default_port: port to bind to if none is specified in conf
|
||||
|
||||
:returns: a socket object as returned from socket.listen or
|
||||
ssl.SSLContext.wrap_socket if conf specifies cert_file
|
||||
"""
|
||||
bind_addr = get_bind_addr(conf, default_port)
|
||||
|
||||
# TODO(jaypipes): eventlet's greened socket module does not actually
|
||||
# support IPv6 in getaddrinfo(). We need to get around this in the
|
||||
# future or monitor upstream for a fix
|
||||
address_family = [addr[0] for addr in socket.getaddrinfo(bind_addr[0],
|
||||
bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
|
||||
if addr[0] in (socket.AF_INET, socket.AF_INET6)][0]
|
||||
|
||||
cert_file = conf.cert_file
|
||||
key_file = conf.key_file
|
||||
use_ssl = cert_file or key_file
|
||||
if use_ssl and (not cert_file or not key_file):
|
||||
raise RuntimeError(_("When running server in SSL mode, you must "
|
||||
"specify both a cert_file and key_file "
|
||||
"option value in your configuration file"))
|
||||
|
||||
sock = None
|
||||
retry_until = time.time() + 30
|
||||
while not sock and time.time() < retry_until:
|
||||
try:
|
||||
sock = eventlet.listen(bind_addr,
|
||||
backlog=conf.backlog,
|
||||
family=address_family)
|
||||
except socket.error as err:
|
||||
if err.errno != errno.EADDRINUSE:
|
||||
raise
|
||||
time.sleep(0.1)
|
||||
if not sock:
|
||||
raise RuntimeError(_("Could not bind to %(bind_addr)s "
|
||||
"after trying for 30 seconds")
|
||||
% {'bind_addr': bind_addr})
|
||||
|
||||
return sock
|
||||
|
||||
|
||||
class Server(object):
|
||||
"""Server class to manage multiple WSGI sockets and applications."""
|
||||
|
||||
def __init__(self, name, conf, threads=1000):
|
||||
os.umask(0o27) # ensure files are created with the correct privileges
|
||||
self._logger = logging.getLogger("eventlet.wsgi.server")
|
||||
self.name = name
|
||||
self.threads = threads
|
||||
self.children = set()
|
||||
self.stale_children = set()
|
||||
self.running = True
|
||||
self.pgid = os.getpid()
|
||||
self.conf = conf
|
||||
try:
|
||||
os.setpgid(self.pgid, self.pgid)
|
||||
except OSError:
|
||||
self.pgid = 0
|
||||
|
||||
def kill_children(self, *args):
|
||||
"""Kills the entire process group."""
|
||||
LOG.error('SIGTERM received')
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
self.running = False
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
|
||||
def hup(self, *args):
|
||||
"""Reloads configuration files with zero down time."""
|
||||
LOG.error('SIGHUP received')
|
||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||||
raise exception.SIGHUPInterrupt
|
||||
|
||||
def start(self, application, default_port):
|
||||
"""Run a WSGI server with the given application.
|
||||
|
||||
:param application: The application to run in the WSGI server
|
||||
:param default_port: Port to bind to if none is specified in conf
|
||||
"""
|
||||
|
||||
eventlet.wsgi.MAX_HEADER_LINE = self.conf.max_header_line
|
||||
self.application = application
|
||||
self.default_port = default_port
|
||||
self.configure_socket()
|
||||
self.start_wsgi()
|
||||
|
||||
def start_wsgi(self):
|
||||
workers = self.conf.workers
|
||||
# childs == num of cores
|
||||
if workers == 0:
|
||||
childs_num = processutils.get_worker_count()
|
||||
# launch only one GreenPool without childs
|
||||
elif workers == 1:
|
||||
# Useful for profiling, test, debug etc.
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
self.pool.spawn_n(self._single_run, self.application, self.sock)
|
||||
return
|
||||
# childs equal specified value of workers
|
||||
else:
|
||||
childs_num = workers
|
||||
|
||||
LOG.info("Starting %d workers", workers)
|
||||
signal.signal(signal.SIGTERM, self.kill_children)
|
||||
signal.signal(signal.SIGINT, self.kill_children)
|
||||
signal.signal(signal.SIGHUP, self.hup)
|
||||
|
||||
rfd, self.writepipe = os.pipe()
|
||||
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
|
||||
|
||||
while len(self.children) < childs_num:
|
||||
self.run_child()
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
self._remove_children(pid)
|
||||
self._verify_and_respawn_children(pid, status)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
LOG.info('Caught keyboard interrupt. Exiting.')
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
break
|
||||
except exception.SIGHUPInterrupt:
|
||||
self.reload()
|
||||
continue
|
||||
eventlet.greenio.shutdown_safe(self.sock)
|
||||
self.sock.close()
|
||||
LOG.debug('Exited')
|
||||
|
||||
def configure_socket(self, old_conf=None, has_changed=None):
|
||||
"""Ensure a socket exists and is appropriately configured.
|
||||
|
||||
This function is called on start up, and can also be
|
||||
called in the event of a configuration reload.
|
||||
|
||||
When called for the first time a new socket is created.
|
||||
If reloading and either bind_host or bind port have been
|
||||
changed the existing socket must be closed and a new
|
||||
socket opened (laws of physics).
|
||||
|
||||
In all other cases (bind_host/bind_port have not changed)
|
||||
the existing socket is reused.
|
||||
|
||||
:param old_conf: Cached old configuration settings (if any)
|
||||
:param has changed: callable to determine if a parameter has changed
|
||||
"""
|
||||
# Do we need a fresh socket?
|
||||
new_sock = (old_conf is None or (
|
||||
has_changed('bind_host') or
|
||||
has_changed('bind_port')))
|
||||
# Will we be using https?
|
||||
use_ssl = not (not self.conf.cert_file or not self.conf.key_file)
|
||||
# Were we using https before?
|
||||
old_use_ssl = (old_conf is not None and not (
|
||||
not old_conf.get('key_file') or
|
||||
not old_conf.get('cert_file')))
|
||||
# Do we now need to perform an SSL wrap on the socket?
|
||||
wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock)
|
||||
# Do we now need to perform an SSL unwrap on the socket?
|
||||
unwrap_sock = use_ssl is False and old_use_ssl is True
|
||||
|
||||
if new_sock:
|
||||
self._sock = None
|
||||
if old_conf is not None:
|
||||
self.sock.close()
|
||||
_sock = get_socket(self.conf, self.default_port)
|
||||
_sock.setsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_REUSEADDR, 1)
|
||||
# sockets can hang around forever without keepalive
|
||||
_sock.setsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_KEEPALIVE, 1)
|
||||
self._sock = _sock
|
||||
|
||||
if wrap_sock:
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context.load_cert_chain(self.conf.cert_file, self.conf.key_file)
|
||||
self.sock = context.wrap_socket(self._sock)
|
||||
|
||||
if unwrap_sock:
|
||||
self.sock = self._sock
|
||||
|
||||
if new_sock and not use_ssl:
|
||||
self.sock = self._sock
|
||||
|
||||
# Pick up newly deployed certs
|
||||
if old_conf is not None and use_ssl is True and old_use_ssl is True:
|
||||
if has_changed('cert_file'):
|
||||
self.sock.certfile = self.conf.cert_file
|
||||
if has_changed('key_file'):
|
||||
self.sock.keyfile = self.conf.key_file
|
||||
|
||||
if new_sock or (old_conf is not None and has_changed('tcp_keepidle')):
|
||||
# This option isn't available in the OS X version of eventlet
|
||||
if hasattr(socket, 'TCP_KEEPIDLE'):
|
||||
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
|
||||
self.conf.tcp_keepidle)
|
||||
|
||||
if old_conf is not None and has_changed('backlog'):
|
||||
self.sock.listen(self.conf.backlog)
|
||||
|
||||
def _remove_children(self, pid):
|
||||
if pid in self.children:
|
||||
self.children.remove(pid)
|
||||
LOG.info('Removed dead child %s', pid)
|
||||
elif pid in self.stale_children:
|
||||
self.stale_children.remove(pid)
|
||||
LOG.info('Removed stale child %s', pid)
|
||||
else:
|
||||
LOG.warning('Unrecognised child %s', pid)
|
||||
|
||||
def _verify_and_respawn_children(self, pid, status):
|
||||
if len(self.stale_children) == 0:
|
||||
LOG.debug('No stale children')
|
||||
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
|
||||
LOG.error('Not respawning child %d, cannot '
|
||||
'recover from termination', pid)
|
||||
if not self.children and not self.stale_children:
|
||||
LOG.info(
|
||||
'All workers have terminated. Exiting')
|
||||
self.running = False
|
||||
else:
|
||||
if len(self.children) < self.conf.workers:
|
||||
self.run_child()
|
||||
|
||||
def stash_conf_values(self):
|
||||
"""Make a copy of some of the current global CONF's settings.
|
||||
|
||||
Allows determining if any of these values have changed when the config
|
||||
is reloaded.
|
||||
"""
|
||||
conf = {}
|
||||
conf['bind_host'] = self.conf.bind_host
|
||||
conf['bind_port'] = self.conf.bind_port
|
||||
conf['backlog'] = self.conf.backlog
|
||||
conf['key_file'] = self.conf.key_file
|
||||
conf['cert_file'] = self.conf.cert_file
|
||||
return conf
|
||||
|
||||
def reload(self):
|
||||
"""Reload and re-apply configuration settings.
|
||||
|
||||
Existing child processes are sent a SIGHUP signal
|
||||
and will exit after completing existing requests.
|
||||
New child processes, which will have the updated
|
||||
configuration, are spawned. This allows preventing
|
||||
interruption to the service.
|
||||
"""
|
||||
def _has_changed(old, new, param):
|
||||
old = old.get(param)
|
||||
new = getattr(new, param)
|
||||
return (new != old)
|
||||
|
||||
old_conf = self.stash_conf_values()
|
||||
has_changed = functools.partial(_has_changed, old_conf, self.conf)
|
||||
cfg.CONF.reload_config_files()
|
||||
os.killpg(self.pgid, signal.SIGHUP)
|
||||
self.stale_children = self.children
|
||||
self.children = set()
|
||||
|
||||
# Ensure any logging config changes are picked up
|
||||
logging.setup(cfg.CONF, self.name)
|
||||
|
||||
self.configure_socket(old_conf, has_changed)
|
||||
self.start_wsgi()
|
||||
|
||||
def wait(self):
|
||||
"""Wait until all servers have completed running."""
|
||||
try:
|
||||
if self.children:
|
||||
self.wait_on_children()
|
||||
else:
|
||||
self.pool.waitall()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def run_child(self):
|
||||
def child_hup(*args):
|
||||
"""Shuts down child processes, existing requests are handled."""
|
||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||||
eventlet.wsgi.is_accepting = False
|
||||
self.sock.close()
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, child_hup)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
# ignore the interrupt signal to avoid a race whereby
|
||||
# a child worker receives the signal before the parent
|
||||
# and is respawned unnecessarily as a result
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
# The child has no need to stash the unwrapped
|
||||
# socket, and the reference prevents a clean
|
||||
# exit on sighup
|
||||
self._sock = None
|
||||
self.run_server()
|
||||
LOG.info('Child %d exiting normally', os.getpid())
|
||||
# self.pool.waitall() is now called in wsgi's server so
|
||||
# it's safe to exit here
|
||||
sys.exit(0)
|
||||
else:
|
||||
LOG.info('Started child %s', pid)
|
||||
self.children.add(pid)
|
||||
|
||||
def _pipe_watcher(self):
|
||||
def _on_timeout_exit(*args):
|
||||
LOG.info('Graceful shutdown timeout exceeded, '
|
||||
'instantaneous exiting')
|
||||
os._exit(1)
|
||||
|
||||
# This will block until the write end is closed when the parent
|
||||
# dies unexpectedly
|
||||
|
||||
self.readpipe.read(1)
|
||||
LOG.info('Parent process has died unexpectedly, exiting')
|
||||
|
||||
# allow up to 1 second for sys.exit to gracefully shutdown
|
||||
signal.signal(signal.SIGALRM, _on_timeout_exit)
|
||||
signal.alarm(1)
|
||||
# do the same as child_hup
|
||||
eventlet.wsgi.is_accepting = False
|
||||
self.sock.close()
|
||||
sys.exit(1)
|
||||
|
||||
def run_server(self):
|
||||
"""Run a WSGI server."""
|
||||
eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
|
||||
eventlet.hubs.use_hub('poll')
|
||||
eventlet.patcher.monkey_patch(all=False, socket=True)
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
socket_timeout = cfg.CONF.eventlet_opts.client_socket_timeout or None
|
||||
|
||||
# Close write to ensure only parent has it open
|
||||
os.close(self.writepipe)
|
||||
# Create greenthread to watch for parent to close pipe
|
||||
eventlet.spawn_n(self._pipe_watcher)
|
||||
|
||||
try:
|
||||
eventlet.wsgi.server(
|
||||
self.sock,
|
||||
self.application,
|
||||
custom_pool=self.pool,
|
||||
url_length_limit=URL_LENGTH_LIMIT,
|
||||
log=self._logger,
|
||||
debug=cfg.CONF.debug,
|
||||
keepalive=cfg.CONF.eventlet_opts.wsgi_keep_alive,
|
||||
socket_timeout=socket_timeout)
|
||||
except socket.error as err:
|
||||
if err.errno != errno.EINVAL:
|
||||
raise
|
||||
self.pool.waitall()
|
||||
|
||||
def _single_run(self, application, sock):
|
||||
"""Start a WSGI server in a new green thread."""
|
||||
LOG.info("Starting single process server")
|
||||
eventlet.wsgi.server(sock, application,
|
||||
custom_pool=self.pool,
|
||||
url_length_limit=URL_LENGTH_LIMIT,
|
||||
log=self._logger,
|
||||
debug=cfg.CONF.debug)
|
||||
|
||||
|
||||
class Middleware(object):
|
||||
"""Base WSGI middleware wrapper.
|
||||
|
||||
|
@@ -17,10 +17,8 @@
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import fixtures
|
||||
import json
|
||||
from oslo_config import cfg
|
||||
import socket
|
||||
import webob
|
||||
|
||||
from heat.api.aws import exception as aws_exception
|
||||
@@ -29,77 +27,6 @@ from heat.common import wsgi
|
||||
from heat.tests import common
|
||||
|
||||
|
||||
class RequestTest(common.HeatTestCase):
|
||||
|
||||
def test_content_type_missing(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
self.assertRaises(exception.InvalidContentType,
|
||||
request.get_content_type, ('application/xml'))
|
||||
|
||||
def test_content_type_unsupported(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Content-Type"] = "text/html"
|
||||
self.assertRaises(exception.InvalidContentType,
|
||||
request.get_content_type, ('application/xml'))
|
||||
|
||||
def test_content_type_with_charset(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Content-Type"] = "application/json; charset=UTF-8"
|
||||
result = request.get_content_type(('application/json'))
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_from_accept_xml(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = "application/xml"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_from_accept_json(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = "application/json"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_from_accept_xml_json(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = "application/xml, application/json"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_from_accept_json_xml_quality(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = ("application/json; q=0.3, "
|
||||
"application/xml; q=0.9")
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_content_type_accept_default(self):
|
||||
request = wsgi.Request.blank('/tests/123.unsupported')
|
||||
request.headers["Accept"] = "application/unsupported1"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual("application/json", result)
|
||||
|
||||
def test_best_match_language(self):
|
||||
# Test that we are actually invoking language negotiation by webop
|
||||
request = wsgi.Request.blank('/')
|
||||
accepted = 'unknown-lang'
|
||||
request.headers = {'Accept-Language': accepted}
|
||||
|
||||
def fake_best_match(self, offers, default_match=None):
|
||||
# Best match on an unknown locale returns None
|
||||
return None
|
||||
with mock.patch.object(request.accept_language,
|
||||
'best_match') as mock_match:
|
||||
mock_match.side_effect = fake_best_match
|
||||
self.assertIsNone(request.best_match_language())
|
||||
|
||||
# If Accept-Language is missing or empty, match should be None
|
||||
request.headers = {'Accept-Language': ''}
|
||||
self.assertIsNone(request.best_match_language())
|
||||
request.headers.pop('Accept-Language')
|
||||
self.assertIsNone(request.best_match_language())
|
||||
|
||||
|
||||
class ResourceTest(common.HeatTestCase):
|
||||
|
||||
def test_get_action_args(self):
|
||||
@@ -387,77 +314,3 @@ class JSONRequestDeserializerTest(common.HeatTestCase):
|
||||
'(%s bytes) exceeds maximum allowed size (%s bytes).' % (
|
||||
len(body), cfg.CONF.max_json_body_size))
|
||||
self.assertEqual(msg, str(error))
|
||||
|
||||
|
||||
class GetSocketTestCase(common.HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(GetSocketTestCase, self).setUp()
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
"heat.common.wsgi.get_bind_addr",
|
||||
lambda x, y: ('192.168.0.13', 1234)))
|
||||
addr_info_list = [(2, 1, 6, '', ('192.168.0.13', 80)),
|
||||
(2, 2, 17, '', ('192.168.0.13', 80)),
|
||||
(2, 3, 0, '', ('192.168.0.13', 80))]
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
"heat.common.wsgi.socket.getaddrinfo",
|
||||
lambda *x: addr_info_list))
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
"heat.common.wsgi.time.time",
|
||||
mock.Mock(side_effect=[0, 1, 5, 10, 20, 35])))
|
||||
wsgi.cfg.CONF.heat_api.cert_file = '/etc/ssl/cert'
|
||||
wsgi.cfg.CONF.heat_api.key_file = '/etc/ssl/key'
|
||||
wsgi.cfg.CONF.heat_api.ca_file = '/etc/ssl/ca_cert'
|
||||
wsgi.cfg.CONF.heat_api.tcp_keepidle = 600
|
||||
|
||||
def test_correct_configure_socket(self):
|
||||
mock_socket = mock.Mock()
|
||||
mock_load_cert_chain = mock.Mock()
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'heat.common.wsgi.ssl.SSLContext.load_cert_chain',
|
||||
mock_load_cert_chain))
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'heat.common.wsgi.ssl.SSLContext.wrap_socket',
|
||||
mock_socket))
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'heat.common.wsgi.eventlet.listen',
|
||||
lambda *x, **y: mock_socket))
|
||||
server = wsgi.Server(name='heat-api', conf=cfg.CONF.heat_api)
|
||||
server.default_port = 1234
|
||||
server.configure_socket()
|
||||
self.assertIn(mock.call.setsockopt(
|
||||
socket.SOL_SOCKET,
|
||||
socket.SO_REUSEADDR,
|
||||
1), mock_socket.mock_calls)
|
||||
self.assertIn(mock.call.setsockopt(
|
||||
socket.SOL_SOCKET,
|
||||
socket.SO_KEEPALIVE,
|
||||
1), mock_socket.mock_calls)
|
||||
if hasattr(socket, 'TCP_KEEPIDLE'):
|
||||
self.assertIn(mock.call().setsockopt(
|
||||
socket.IPPROTO_TCP,
|
||||
socket.TCP_KEEPIDLE,
|
||||
wsgi.cfg.CONF.heat_api.tcp_keepidle), mock_socket.mock_calls)
|
||||
|
||||
def test_get_socket_without_all_ssl_reqs(self):
|
||||
wsgi.cfg.CONF.heat_api.key_file = None
|
||||
self.assertRaises(RuntimeError, wsgi.get_socket,
|
||||
wsgi.cfg.CONF.heat_api, 1234)
|
||||
|
||||
def test_get_socket_with_bind_problems(self):
|
||||
err = wsgi.socket.error(
|
||||
socket.errno.EADDRINUSE, 'Address already in use')
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'heat.common.wsgi.eventlet.listen',
|
||||
mock.Mock(side_effect=(
|
||||
[err] * 3 + [None]))))
|
||||
|
||||
self.assertRaises(RuntimeError, wsgi.get_socket,
|
||||
wsgi.cfg.CONF.heat_api, 1234)
|
||||
|
||||
def test_get_socket_with_unexpected_socket_errno(self):
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'heat.common.wsgi.eventlet.listen',
|
||||
mock.Mock(side_effect=wsgi.socket.error(socket.errno.ENOMEM))))
|
||||
self.assertRaises(wsgi.socket.error, wsgi.get_socket,
|
||||
wsgi.cfg.CONF.heat_api, 1234)
|
||||
|
@@ -0,0 +1,6 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
All wsgi application implementations using eventlet for
|
||||
heat-api, heat-api-cfn and heat-all are removed as a precusror
|
||||
to dropping eventlet usage from heat.
|
Reference in New Issue
Block a user