Force catch_errors into pipeline
This commit adds a hook for WSGI applications (e.g. proxy.server.Application) to modify their WSGI pipelines. This is currently used by the proxy server to ensure that catch_errors is present; if it is missing, it is inserted as the first middleware in the pipeline. This lets us write new, mandatory middlewares for Swift without breaking existing deployments on upgrade. Change-Id: Ibed0f2edb6f80c25be182b3d4544e6a67c5050ad
This commit is contained in:
parent
1bb6563a19
commit
a49fd3d8de
@ -16,6 +16,7 @@
|
||||
"""WSGI tools for use with swift."""
|
||||
|
||||
import errno
|
||||
import inspect
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
@ -109,7 +110,6 @@ def wrap_conf_type(f):
|
||||
|
||||
|
||||
appconfig = wrap_conf_type(loadwsgi.appconfig)
|
||||
loadapp = wrap_conf_type(loadwsgi.loadapp)
|
||||
|
||||
|
||||
def monkey_patch_mimetools():
|
||||
@ -197,6 +197,111 @@ class RestrictedGreenPool(GreenPool):
|
||||
self.waitall()
|
||||
|
||||
|
||||
class PipelineWrapper(object):
|
||||
"""
|
||||
This class provides a number of utility methods for
|
||||
modifying the composition of a wsgi pipeline.
|
||||
"""
|
||||
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
|
||||
def __contains__(self, entry_point_name):
|
||||
try:
|
||||
self.index(entry_point_name)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def _format_for_display(self, ctx):
|
||||
if ctx.entry_point_name:
|
||||
return ctx.entry_point_name
|
||||
elif inspect.isfunction(ctx.object):
|
||||
# ctx.object is a reference to the actual filter_factory
|
||||
# function, so we pretty-print that. It's not the nice short
|
||||
# entry point, but it beats "<unknown>".
|
||||
#
|
||||
# These happen when, instead of something like
|
||||
#
|
||||
# use = egg:swift#healthcheck
|
||||
#
|
||||
# you have something like this:
|
||||
#
|
||||
# paste.filter_factory = \
|
||||
# swift.common.middleware.healthcheck:filter_factory
|
||||
return "%s:%s" % (inspect.getmodule(ctx.object).__name__,
|
||||
ctx.object.__name__)
|
||||
else:
|
||||
# No idea what this is
|
||||
return "<unknown context>"
|
||||
|
||||
def __str__(self):
|
||||
parts = [self._format_for_display(ctx)
|
||||
for ctx in self.context.filter_contexts]
|
||||
parts.append(self._format_for_display(self.context.app_context))
|
||||
return " ".join(parts)
|
||||
|
||||
def create_filter(self, entry_point_name):
|
||||
"""
|
||||
Creates a context for a filter that can subsequently be added
|
||||
to a pipeline context.
|
||||
|
||||
:param entry_point_name: entry point of the middleware (Swift only)
|
||||
|
||||
:returns: a filter context
|
||||
"""
|
||||
spec = 'egg:swift#' + entry_point_name
|
||||
ctx = loadwsgi.loadcontext(loadwsgi.FILTER, spec,
|
||||
global_conf=self.context.global_conf)
|
||||
ctx.protocol = 'paste.filter_factory'
|
||||
return ctx
|
||||
|
||||
def index(self, entry_point_name):
|
||||
"""
|
||||
Returns the first index of the given entry point name in the pipeline.
|
||||
|
||||
Raises ValueError if the given module is not in the pipeline.
|
||||
"""
|
||||
for i, ctx in enumerate(self.context.filter_contexts):
|
||||
if ctx.entry_point_name == entry_point_name:
|
||||
return i
|
||||
raise ValueError("%s is not in pipeline" % (entry_point_name,))
|
||||
|
||||
def insert_filter(self, ctx, index=0):
|
||||
"""
|
||||
Inserts a filter module into the pipeline context.
|
||||
|
||||
:param ctx: the context to be inserted
|
||||
:param index: (optional) index at which filter should be
|
||||
inserted in the list of pipeline filters. Default
|
||||
is 0, which means the start of the pipeline.
|
||||
"""
|
||||
self.context.filter_contexts.insert(index, ctx)
|
||||
|
||||
|
||||
def loadcontext(object_type, uri, name=None, relative_to=None,
|
||||
global_conf=None):
|
||||
add_conf_type = wrap_conf_type(lambda x: x)
|
||||
return loadwsgi.loadcontext(object_type, add_conf_type(uri), name=name,
|
||||
relative_to=relative_to,
|
||||
global_conf=global_conf)
|
||||
|
||||
|
||||
def loadapp(conf_file, global_conf):
|
||||
"""
|
||||
Loads a context from a config file, and if the context is a pipeline
|
||||
then presents the app with the opportunity to modify the pipeline.
|
||||
"""
|
||||
ctx = loadcontext(loadwsgi.APP, conf_file, global_conf=global_conf)
|
||||
if ctx.object_type.name == 'pipeline':
|
||||
# give app the opportunity to modify the pipeline context
|
||||
app = ctx.app_context.create()
|
||||
func = getattr(app, 'modify_wsgi_pipeline', None)
|
||||
if func:
|
||||
func(PipelineWrapper(ctx))
|
||||
return ctx.create()
|
||||
|
||||
|
||||
def run_server(conf, logger, sock, global_conf=None):
|
||||
# Ensure TZ environment variable exists to avoid stat('/etc/localtime') on
|
||||
# some platforms. This locks in reported times to the timezone in which
|
||||
|
@ -38,6 +38,22 @@ from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
|
||||
HTTPServerError, HTTPException, Request
|
||||
|
||||
|
||||
# List of entry points for mandatory middlewares.
|
||||
#
|
||||
# Fields:
|
||||
#
|
||||
# "name" (required) is the entry point name from setup.py.
|
||||
#
|
||||
# "after" (optional) is a list of middlewares that this middleware should come
|
||||
# after. Default is for the middleware to go at the start of the pipeline. Any
|
||||
# middlewares in the "after" list that are not present in the pipeline will be
|
||||
# ignored, so you can safely name optional middlewares to come after. For
|
||||
# example, 'after: ["catch_errors", "bulk"]' would install this middleware
|
||||
# after catch_errors and bulk if both were present, but if bulk were absent,
|
||||
# would just install it after catch_errors.
|
||||
required_filters = [{'name': 'catch_errors'}]
|
||||
|
||||
|
||||
class Application(object):
|
||||
"""WSGI application for the proxy server."""
|
||||
|
||||
@ -478,6 +494,37 @@ class Application(object):
|
||||
{'type': typ, 'ip': node['ip'], 'port': node['port'],
|
||||
'device': node['device'], 'info': additional_info})
|
||||
|
||||
def modify_wsgi_pipeline(self, pipe):
|
||||
"""
|
||||
Called during WSGI pipeline creation. Modifies the WSGI pipeline
|
||||
context to ensure that mandatory middleware is present in the pipeline.
|
||||
|
||||
:param pipe: A PipelineWrapper object
|
||||
"""
|
||||
pipeline_was_modified = False
|
||||
for filter_spec in reversed(required_filters):
|
||||
filter_name = filter_spec['name']
|
||||
if filter_name not in pipe:
|
||||
afters = filter_spec.get('after', [])
|
||||
insert_at = 0
|
||||
for after in afters:
|
||||
try:
|
||||
insert_at = max(insert_at, pipe.index(after) + 1)
|
||||
except ValueError: # not in pipeline; ignore it
|
||||
pass
|
||||
self.logger.info(
|
||||
'Adding required filter %s to pipeline at position %d' %
|
||||
(filter_name, insert_at))
|
||||
ctx = pipe.create_filter(filter_name)
|
||||
pipe.insert_filter(ctx, index=insert_at)
|
||||
pipeline_was_modified = True
|
||||
|
||||
if pipeline_was_modified:
|
||||
self.logger.info("Pipeline was modified. New pipeline is \"%s\".",
|
||||
pipe)
|
||||
else:
|
||||
self.logger.debug("Pipeline is \"%s\"", pipe)
|
||||
|
||||
|
||||
def app_factory(global_conf, **local_conf):
|
||||
"""paste.deploy app factory for creating WSGI proxy apps."""
|
||||
|
@ -42,7 +42,7 @@ from swift.common import wsgi, utils, ring
|
||||
|
||||
from test.unit import temptree
|
||||
|
||||
from mock import patch
|
||||
from paste.deploy import loadwsgi
|
||||
|
||||
|
||||
def _fake_rings(tmpdir):
|
||||
@ -126,14 +126,11 @@ class TestWSGI(unittest.TestCase):
|
||||
swift_dir = TEMPDIR
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = catch_errors proxy-server
|
||||
pipeline = proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
conn_timeout = 0.2
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
"""
|
||||
contents = dedent(config)
|
||||
with temptree(['proxy-server.conf']) as t:
|
||||
@ -143,7 +140,7 @@ class TestWSGI(unittest.TestCase):
|
||||
_fake_rings(t)
|
||||
app, conf, logger, log_name = wsgi.init_request_processor(
|
||||
conf_file, 'proxy-server')
|
||||
# verify pipeline is catch_errors -> proxy-servery
|
||||
# verify pipeline is catch_errors -> proxy-server
|
||||
expected = swift.common.middleware.catch_errors.CatchErrorMiddleware
|
||||
self.assert_(isinstance(app, expected))
|
||||
self.assert_(isinstance(app.app, swift.proxy.server.Application))
|
||||
@ -179,14 +176,15 @@ class TestWSGI(unittest.TestCase):
|
||||
}
|
||||
# strip indent from test config contents
|
||||
config_dir = dict((f, dedent(c)) for (f, c) in config_dir.items())
|
||||
with temptree(*zip(*config_dir.items())) as conf_root:
|
||||
conf_dir = os.path.join(conf_root, 'proxy-server.conf.d')
|
||||
with open(os.path.join(conf_dir, 'swift.conf'), 'w') as f:
|
||||
f.write('[DEFAULT]\nswift_dir = %s' % conf_root)
|
||||
_fake_rings(conf_root)
|
||||
app, conf, logger, log_name = wsgi.init_request_processor(
|
||||
conf_dir, 'proxy-server')
|
||||
# verify pipeline is catch_errors -> proxy-servery
|
||||
with mock.patch('swift.proxy.server.Application.modify_wsgi_pipeline'):
|
||||
with temptree(*zip(*config_dir.items())) as conf_root:
|
||||
conf_dir = os.path.join(conf_root, 'proxy-server.conf.d')
|
||||
with open(os.path.join(conf_dir, 'swift.conf'), 'w') as f:
|
||||
f.write('[DEFAULT]\nswift_dir = %s' % conf_root)
|
||||
_fake_rings(conf_root)
|
||||
app, conf, logger, log_name = wsgi.init_request_processor(
|
||||
conf_dir, 'proxy-server')
|
||||
# verify pipeline is catch_errors -> proxy-server
|
||||
expected = swift.common.middleware.catch_errors.CatchErrorMiddleware
|
||||
self.assert_(isinstance(app, expected))
|
||||
self.assert_(isinstance(app.app, swift.proxy.server.Application))
|
||||
@ -333,12 +331,14 @@ class TestWSGI(unittest.TestCase):
|
||||
with open(conf_file, 'w') as f:
|
||||
f.write(contents.replace('TEMPDIR', t))
|
||||
_fake_rings(t)
|
||||
with patch('swift.common.wsgi.wsgi') as _wsgi:
|
||||
with patch('swift.common.wsgi.eventlet') as _eventlet:
|
||||
conf = wsgi.appconfig(conf_file)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
with mock.patch('swift.proxy.server.Application.'
|
||||
'modify_wsgi_pipeline'):
|
||||
with mock.patch('swift.common.wsgi.wsgi') as _wsgi:
|
||||
with mock.patch('swift.common.wsgi.eventlet') as _eventlet:
|
||||
conf = wsgi.appconfig(conf_file)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
self.assertEquals('HTTP/1.0',
|
||||
_wsgi.HttpProtocol.default_request_version)
|
||||
self.assertEquals(30, _wsgi.WRITE_TIMEOUT)
|
||||
@ -379,14 +379,16 @@ class TestWSGI(unittest.TestCase):
|
||||
with open(os.path.join(conf_dir, 'swift.conf'), 'w') as f:
|
||||
f.write('[DEFAULT]\nswift_dir = %s' % conf_root)
|
||||
_fake_rings(conf_root)
|
||||
with patch('swift.common.wsgi.wsgi') as _wsgi:
|
||||
with patch('swift.common.wsgi.eventlet') as _eventlet:
|
||||
with patch.dict('os.environ', {'TZ': ''}):
|
||||
conf = wsgi.appconfig(conf_dir)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
self.assert_(os.environ['TZ'] is not '')
|
||||
with mock.patch('swift.proxy.server.Application.'
|
||||
'modify_wsgi_pipeline'):
|
||||
with mock.patch('swift.common.wsgi.wsgi') as _wsgi:
|
||||
with mock.patch('swift.common.wsgi.eventlet') as _eventlet:
|
||||
with mock.patch.dict('os.environ', {'TZ': ''}):
|
||||
conf = wsgi.appconfig(conf_dir)
|
||||
logger = logging.getLogger('test')
|
||||
sock = listen(('localhost', 0))
|
||||
wsgi.run_server(conf, logger, sock)
|
||||
self.assert_(os.environ['TZ'] is not '')
|
||||
|
||||
self.assertEquals('HTTP/1.0',
|
||||
_wsgi.HttpProtocol.default_request_version)
|
||||
@ -667,5 +669,229 @@ class TestWSGIContext(unittest.TestCase):
|
||||
self.assertRaises(StopIteration, iterator.next)
|
||||
|
||||
|
||||
class TestPipelineWrapper(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
config = """
|
||||
[DEFAULT]
|
||||
swift_dir = TEMPDIR
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = healthcheck catch_errors tempurl proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
conn_timeout = 0.2
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
||||
[filter:tempurl]
|
||||
paste.filter_factory = swift.common.middleware.tempurl:filter_factory
|
||||
"""
|
||||
|
||||
contents = dedent(config)
|
||||
with temptree(['proxy-server.conf']) as t:
|
||||
conf_file = os.path.join(t, 'proxy-server.conf')
|
||||
with open(conf_file, 'w') as f:
|
||||
f.write(contents.replace('TEMPDIR', t))
|
||||
ctx = wsgi.loadcontext(loadwsgi.APP, conf_file, global_conf={})
|
||||
self.pipe = wsgi.PipelineWrapper(ctx)
|
||||
|
||||
def _entry_point_names(self):
|
||||
# Helper method to return a list of the entry point names for the
|
||||
# filters in the pipeline.
|
||||
return [c.entry_point_name for c in self.pipe.context.filter_contexts]
|
||||
|
||||
def test_insert_filter(self):
|
||||
original_modules = ['healthcheck', 'catch_errors', None]
|
||||
self.assertEqual(self._entry_point_names(), original_modules)
|
||||
|
||||
self.pipe.insert_filter(self.pipe.create_filter('catch_errors'))
|
||||
expected_modules = ['catch_errors', 'healthcheck',
|
||||
'catch_errors', None]
|
||||
self.assertEqual(self._entry_point_names(), expected_modules)
|
||||
|
||||
def test_str(self):
|
||||
self.assertEqual(
|
||||
str(self.pipe),
|
||||
"healthcheck catch_errors " +
|
||||
"swift.common.middleware.tempurl:filter_factory proxy")
|
||||
|
||||
def test_str_unknown_filter(self):
|
||||
self.pipe.context.filter_contexts[0].entry_point_name = None
|
||||
self.pipe.context.filter_contexts[0].object = 'mysterious'
|
||||
self.assertEqual(
|
||||
str(self.pipe),
|
||||
"<unknown context> catch_errors " +
|
||||
"swift.common.middleware.tempurl:filter_factory proxy")
|
||||
|
||||
|
||||
class TestPipelineModification(unittest.TestCase):
|
||||
def pipeline_modules(self, app):
|
||||
# This is rather brittle; it'll break if a middleware stores its app
|
||||
# anywhere other than an attribute named "app", but it works for now.
|
||||
pipe = []
|
||||
for _ in xrange(1000):
|
||||
pipe.append(app.__class__.__module__)
|
||||
if not hasattr(app, 'app'):
|
||||
break
|
||||
app = app.app
|
||||
return pipe
|
||||
|
||||
def test_load_app(self):
|
||||
config = """
|
||||
[DEFAULT]
|
||||
swift_dir = TEMPDIR
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = healthcheck proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
conn_timeout = 0.2
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
"""
|
||||
|
||||
def modify_func(app, pipe):
|
||||
new = pipe.create_filter('catch_errors')
|
||||
pipe.insert_filter(new)
|
||||
|
||||
contents = dedent(config)
|
||||
with temptree(['proxy-server.conf']) as t:
|
||||
conf_file = os.path.join(t, 'proxy-server.conf')
|
||||
with open(conf_file, 'w') as f:
|
||||
f.write(contents.replace('TEMPDIR', t))
|
||||
_fake_rings(t)
|
||||
with mock.patch(
|
||||
'swift.proxy.server.Application.modify_wsgi_pipeline',
|
||||
modify_func):
|
||||
app = wsgi.loadapp(conf_file, global_conf={})
|
||||
exp = swift.common.middleware.catch_errors.CatchErrorMiddleware
|
||||
self.assertTrue(isinstance(app, exp), app)
|
||||
exp = swift.common.middleware.healthcheck.HealthCheckMiddleware
|
||||
self.assertTrue(isinstance(app.app, exp), app.app)
|
||||
exp = swift.proxy.server.Application
|
||||
self.assertTrue(isinstance(app.app.app, exp), app.app.app)
|
||||
|
||||
def test_proxy_unmodified_wsgi_pipeline(self):
|
||||
# Make sure things are sane even when we modify nothing
|
||||
config = """
|
||||
[DEFAULT]
|
||||
swift_dir = TEMPDIR
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = catch_errors proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
conn_timeout = 0.2
|
||||
|
||||
[filter:catch_errors]
|
||||
use = egg:swift#catch_errors
|
||||
"""
|
||||
|
||||
contents = dedent(config)
|
||||
with temptree(['proxy-server.conf']) as t:
|
||||
conf_file = os.path.join(t, 'proxy-server.conf')
|
||||
with open(conf_file, 'w') as f:
|
||||
f.write(contents.replace('TEMPDIR', t))
|
||||
_fake_rings(t)
|
||||
app = wsgi.loadapp(conf_file, global_conf={})
|
||||
|
||||
self.assertEqual(self.pipeline_modules(app),
|
||||
['swift.common.middleware.catch_errors',
|
||||
'swift.proxy.server'])
|
||||
|
||||
def test_proxy_modify_wsgi_pipeline(self):
|
||||
config = """
|
||||
[DEFAULT]
|
||||
swift_dir = TEMPDIR
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = healthcheck proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
conn_timeout = 0.2
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
"""
|
||||
|
||||
contents = dedent(config)
|
||||
with temptree(['proxy-server.conf']) as t:
|
||||
conf_file = os.path.join(t, 'proxy-server.conf')
|
||||
with open(conf_file, 'w') as f:
|
||||
f.write(contents.replace('TEMPDIR', t))
|
||||
_fake_rings(t)
|
||||
app = wsgi.loadapp(conf_file, global_conf={})
|
||||
|
||||
self.assertEqual(self.pipeline_modules(app)[0],
|
||||
'swift.common.middleware.catch_errors')
|
||||
|
||||
def test_proxy_modify_wsgi_pipeline_ordering(self):
|
||||
config = """
|
||||
[DEFAULT]
|
||||
swift_dir = TEMPDIR
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = healthcheck proxy-logging bulk tempurl proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
conn_timeout = 0.2
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
||||
[filter:proxy-logging]
|
||||
use = egg:swift#proxy_logging
|
||||
|
||||
[filter:bulk]
|
||||
use = egg:swift#bulk
|
||||
|
||||
[filter:tempurl]
|
||||
use = egg:swift#tempurl
|
||||
"""
|
||||
|
||||
new_req_filters = [
|
||||
# not in pipeline, no afters
|
||||
{'name': 'catch_errors'},
|
||||
# already in pipeline
|
||||
{'name': 'proxy_logging',
|
||||
'after': ['catch_errors']},
|
||||
# not in pipeline, comes after more than one thing
|
||||
{'name': 'container_quotas',
|
||||
'after': ['catch_errors', 'bulk']}]
|
||||
|
||||
contents = dedent(config)
|
||||
with temptree(['proxy-server.conf']) as t:
|
||||
conf_file = os.path.join(t, 'proxy-server.conf')
|
||||
with open(conf_file, 'w') as f:
|
||||
f.write(contents.replace('TEMPDIR', t))
|
||||
_fake_rings(t)
|
||||
with mock.patch.object(swift.proxy.server, 'required_filters',
|
||||
new_req_filters):
|
||||
app = wsgi.loadapp(conf_file, global_conf={})
|
||||
|
||||
self.assertEqual(self.pipeline_modules(app), [
|
||||
'swift.common.middleware.catch_errors',
|
||||
'swift.common.middleware.healthcheck',
|
||||
'swift.common.middleware.proxy_logging',
|
||||
'swift.common.middleware.bulk',
|
||||
'swift.common.middleware.container_quotas',
|
||||
'swift.common.middleware.tempurl',
|
||||
'swift.proxy.server'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user