Service Class Cleanup - Part 1/3

Here, we rework the various Service base classes to have a standard
constructor format, in preperation for turning the RPC/DNS/WSGI
Service classes into mixins, allowing for MiniDNS to be both a
RPC and DNS Service - sharing code with Central etc (RPCService) and
the Agent (DNSService).

Part two will turn each Service sub-class into a mixin.
Part three will move MiniDNS to using the DNSService mixin.

Change-Id: I1647f2036232d1bedfa3b3f79837dc0c808a2700
This commit is contained in:
Kiall Mac Innes 2015-02-28 19:04:41 +00:00 committed by Kiall Mac Innes
parent cb37ee959d
commit a537517499
18 changed files with 253 additions and 1168 deletions

View File

@ -16,11 +16,11 @@
from oslo.config import cfg
from oslo_log import log as logging
from designate import utils
from designate import dnsutils
from designate import service
from designate.agent import handler
from designate.backend import agent_backend
from designate.i18n import _LI
LOG = logging.getLogger(__name__)
@ -28,23 +28,30 @@ CONF = cfg.CONF
class Service(service.DNSService):
def __init__(self, *args, **kwargs):
super(Service, self).__init__(cfg.CONF['service:agent'], *args,
**kwargs)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
backend_driver = cfg.CONF['service:agent'].backend_driver
self.backend = agent_backend.get_backend(backend_driver, self)
# Create an instance of the RequestHandler class
self.application = handler.RequestHandler()
@property
def service_name(self):
return 'agent'
self.application = dnsutils.DNSMiddleware(self.application)
@property
@utils.cache_result
def _dns_application(self):
# Create an instance of the RequestHandler class
application = handler.RequestHandler()
application = dnsutils.DNSMiddleware(application)
return application
def start(self):
super(Service, self).start()
self.backend.start()
LOG.info(_LI("Started Agent Service"))
def stop(self):
super(Service, self).stop()
LOG.info(_LI("Stopped Agent Service"))
# TODO(kiall): Shouldn't we be stppping the backend here too? To fix
# in another review.

View File

@ -27,8 +27,15 @@ LOG = logging.getLogger(__name__)
class Service(service.WSGIService):
def __init__(self, backlog=128, threads=1000):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
@property
def service_name(self):
return 'api'
@property
def _wsgi_application(self):
api_paste_config = cfg.CONF['service:api'].api_paste_config
config_paths = utils.find_config(api_paste_config)
@ -38,11 +45,4 @@ class Service(service.WSGIService):
LOG.info(_LI('Using api-paste-config found at: %s') % config_paths[0])
application = deploy.loadapp("config:%s" % config_paths[0],
name='osapi_dns')
super(Service, self).__init__(application=application,
host=cfg.CONF['service:api'].api_host,
port=cfg.CONF['service:api'].api_port,
backlog=backlog,
threads=threads)
return deploy.loadapp("config:%s" % config_paths[0], name='osapi_dns')

View File

@ -250,8 +250,8 @@ class Service(service.RPCService):
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, *args, **kwargs):
super(Service, self).__init__(*args, **kwargs)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
@ -262,6 +262,10 @@ class Service(service.RPCService):
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
@property
def service_name(self):
return 'central'
def start(self):
# Check to see if there are any TLDs in the database
tlds = self.storage.find_tlds({})

View File

@ -31,7 +31,6 @@ def main():
utils.read_config('designate', sys.argv)
logging.setup(CONF, 'designate')
server = agent_service.Service.create(
binary='designate-agent')
server = agent_service.Service()
service.serve(server, workers=CONF['service:agent'].workers)
service.wait()

View File

@ -31,7 +31,6 @@ def main():
utils.read_config('designate', sys.argv)
logging.setup(CONF, 'designate')
server = central.Service.create(binary='designate-central',
service_name='central')
server = central.Service()
service.serve(server, workers=CONF['service:central'].workers)
service.wait()

View File

@ -31,7 +31,6 @@ def main():
utils.read_config('designate', sys.argv)
logging.setup(CONF, 'designate')
server = mdns_service.Service.create(
binary='designate-mdns')
server = mdns_service.Service()
service.serve(server, workers=CONF['service:mdns'].workers)
service.wait()

View File

@ -32,7 +32,6 @@ def main():
utils.read_config('designate', sys.argv)
logging.setup(CONF, 'designate')
server = pool_manager_service.Service.create(
binary='designate-pool-manager')
server = pool_manager_service.Service()
service.serve(server, workers=CONF['service:pool_manager'].workers)
service.wait()

View File

@ -16,23 +16,20 @@
from oslo.config import cfg
from oslo_log import log as logging
from designate import utils
from designate import dnsutils
from designate import service
from designate.mdns import handler
from designate.mdns import middleware
from designate.mdns import notify
from designate.i18n import _LI
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Service(service.RPCService):
def __init__(self, *args, **kwargs):
notify_endpoint = notify.NotifyEndpoint()
kwargs['endpoints'] = [notify_endpoint]
super(Service, self).__init__(*args, **kwargs)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
# Create an instance of the RequestHandler class
self.application = handler.RequestHandler()
@ -50,6 +47,15 @@ class Service(service.RPCService):
self._sock_udp = dnsutils.bind_udp(
CONF['service:mdns'].host, CONF['service:mdns'].port)
@property
def service_name(self):
return 'mdns'
@property
@utils.cache_result
def _rpc_endpoints(self):
return [notify.NotifyEndpoint()]
def start(self):
super(Service, self).start()
@ -59,10 +65,8 @@ class Service(service.RPCService):
self.tg.add_thread(
dnsutils.handle_udp, self._sock_udp, self.tg, dnsutils.handle,
self.application)
LOG.info(_LI("started mdns service"))
def stop(self):
# When the service is stopped, the threads for _handle_tcp and
# _handle_udp are stopped too.
super(Service, self).stop()
LOG.info(_LI("stopped mdns service"))

View File

@ -1,142 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exceptions common to OpenStack projects
"""
import logging
from designate.i18n import _
_FATAL_EXCEPTION_FORMAT_ERRORS = False
class Error(Exception):
def __init__(self, message=None):
super(Error, self).__init__(message)
class ApiError(Error):
def __init__(self, message='Unknown', code='Unknown'):
self.message = message
self.code = code
super(ApiError, self).__init__('%s: %s' % (code, message))
class NotFound(Error):
pass
class UnknownScheme(Error):
msg = "Unknown scheme '%s' found in URI"
def __init__(self, scheme):
msg = self.__class__.msg % scheme
super(UnknownScheme, self).__init__(msg)
class BadStoreUri(Error):
msg = "The Store URI %s was malformed. Reason: %s"
def __init__(self, uri, reason):
msg = self.__class__.msg % (uri, reason)
super(BadStoreUri, self).__init__(msg)
class Duplicate(Error):
pass
class NotAuthorized(Error):
pass
class NotEmpty(Error):
pass
class Invalid(Error):
pass
class BadInputError(Exception):
"""Error resulting from a client sending bad input to a server"""
pass
class MissingArgumentError(Error):
pass
class DatabaseMigrationError(Error):
pass
class ClientConnectionError(Exception):
"""Error resulting from a client connecting to a server"""
pass
def wrap_exception(f):
def _wrap(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
if not isinstance(e, Error):
# exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception'))
# logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e))
raise
_wrap.func_name = f.func_name
return _wrap
class OpenstackException(Exception):
"""
Base Exception
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
message = "An unknown exception occurred"
def __init__(self, **kwargs):
try:
self._error_string = self.message % kwargs
except Exception as e:
if _FATAL_EXCEPTION_FORMAT_ERRORS:
raise e
else:
# at least get the core message out if something happened
self._error_string = self.message
def __str__(self):
return self._error_string
class MalformedRequestBody(OpenstackException):
message = "Malformed message body: %(reason)s"
class InvalidContentType(OpenstackException):
message = "Invalid content type %(content_type)s"

View File

@ -1,798 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Utility methods for working with WSGI servers."""
import eventlet
eventlet.patcher.monkey_patch(all=False, socket=True)
import datetime
import errno
import socket
import sys
import time
import eventlet.wsgi
from oslo.config import cfg
from oslo_log import log as logging
from oslo_log import loggers
from oslo.serialization import jsonutils
import routes
import routes.middleware
import webob.dec
import webob.exc
from xml.dom import minidom
from xml.parsers import expat
from designate.i18n import _
from designate.openstack.common import service
from designate.openstack.common import sslutils
from designate.openstack.deprecated import exception
from designate.openstack.deprecated import xmlutils
socket_opts = [
cfg.IntOpt('backlog',
default=4096,
help="Number of backlog requests to configure the socket with"),
cfg.IntOpt('tcp_keepidle',
default=600,
help="Sets the value of TCP_KEEPIDLE in seconds for each "
"server socket. Not supported on OS X."),
]
CONF = cfg.CONF
CONF.register_opts(socket_opts)
LOG = logging.getLogger(__name__)
def run_server(application, port):
"""Run a WSGI server with the given application."""
sock = eventlet.listen(('0.0.0.0', port))
eventlet.wsgi.server(sock, application)
class Service(service.Service):
"""
Provides a Service API for wsgi servers.
This gives us the ability to launch wsgi servers with the
Launcher classes in service.py.
"""
def __init__(self, application, port,
host='0.0.0.0', backlog=4096, threads=1000):
self.application = application
self._port = port
self._host = host
self._backlog = backlog if backlog else CONF.backlog
super(Service, self).__init__(threads)
def _get_socket(self, host, port, backlog):
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
info = socket.getaddrinfo(host,
port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
if sslutils.is_enabled():
sock = sslutils.wrap(sock)
except socket.error, err:
if err.args[0] != errno.EADDRINUSE:
raise
eventlet.sleep(0.1)
if not sock:
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
"after trying for 30 seconds") %
{'host': host, 'port': port})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
return sock
def start(self):
"""Start serving this service using the provided server instance.
:returns: None
"""
super(Service, self).start()
self._socket = self._get_socket(self._host, self._port, self._backlog)
self.tg.add_thread(self._run, self.application, self._socket)
@property
def backlog(self):
return self._backlog
@property
def host(self):
return self._socket.getsockname()[0] if self._socket else self._host
@property
def port(self):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
"""Stop serving this API.
:returns: None
"""
super(Service, self).stop()
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
logger = logging.getLogger('eventlet.wsgi')
eventlet.wsgi.server(socket,
application,
custom_pool=self.tg.pool,
log=loggers.WritableLogger(logger))
class Middleware(object):
"""
Base WSGI middleware wrapper. These classes require an application to be
initialized that will be called next. By default the middleware will
simply call its wrapped app, or you can override __call__ to customize its
behavior.
"""
def __init__(self, application):
self.application = application
def process_request(self, req):
"""
Called on each request.
If this returns None, the next application down the stack will be
executed. If it returns a response then that response will be returned
and execution will stop here.
"""
return None
def process_response(self, response):
"""Do whatever you'd like to the response."""
return response
@webob.dec.wsgify
def __call__(self, req):
response = self.process_request(req)
if response:
return response
response = req.get_response(self.application)
return self.process_response(response)
class Debug(Middleware):
"""
Helper class that can be inserted into any WSGI application chain
to get information about the request and response.
"""
@webob.dec.wsgify
def __call__(self, req):
print ("*" * 40) + " REQUEST ENVIRON"
for key, value in req.environ.items():
print key, "=", value
print
resp = req.get_response(self.application)
print ("*" * 40) + " RESPONSE HEADERS"
for (key, value) in resp.headers.iteritems():
print key, "=", value
print
resp.app_iter = self.print_generator(resp.app_iter)
return resp
@staticmethod
def print_generator(app_iter):
"""
Iterator that prints the contents of a wrapper string iterator
when iterated.
"""
print ("*" * 40) + " BODY"
for part in app_iter:
sys.stdout.write(part)
sys.stdout.flush()
yield part
print
class Router(object):
"""
WSGI middleware that maps incoming requests to WSGI apps.
"""
def __init__(self, mapper):
"""
Create a router for the given routes.Mapper.
Each route in `mapper` must specify a 'controller', which is a
WSGI app to call. You'll probably want to specify an 'action' as
well and have your controller be a wsgi.Controller, who will route
the request to the action method.
Examples:
mapper = routes.Mapper()
sc = ServerController()
# Explicit mapping of one route to a controller+action
mapper.connect(None, "/svrlist", controller=sc, action="list")
# Actions are all implicitly defined
mapper.resource("server", "servers", controller=sc)
# Pointing to an arbitrary WSGI app. You can specify the
# {path_info:.*} parameter so the target app can be handed just that
# section of the URL.
mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp())
"""
self.map = mapper
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map)
@webob.dec.wsgify
def __call__(self, req):
"""
Route the incoming request to a controller based on self.map.
If no match, return a 404.
"""
return self._router
@staticmethod
@webob.dec.wsgify
def _dispatch(req):
"""
Called by self._router after matching the incoming request to a route
and putting the information into req.environ. Either returns 404
or the routed WSGI app's response.
"""
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return webob.exc.HTTPNotFound()
app = match['controller']
return app
class Request(webob.Request):
"""Add some Openstack API-specific logic to the base webob.Request."""
default_request_content_types = ('application/json', 'application/xml')
default_accept_types = ('application/json', 'application/xml')
default_accept_type = 'application/json'
def best_match_content_type(self, supported_content_types=None):
"""Determine the requested response content-type.
Based on the query extension then the Accept header.
Defaults to default_accept_type if we don't find a preference
"""
supported_content_types = (supported_content_types or
self.default_accept_types)
parts = self.path.rsplit('.', 1)
if len(parts) > 1:
ctype = 'application/{0}'.format(parts[1])
if ctype in supported_content_types:
return ctype
bm = self.accept.best_match(supported_content_types)
return bm or self.default_accept_type
def get_content_type(self, allowed_content_types=None):
"""Determine content type of the request body.
Does not do any body introspection, only checks header
"""
if "Content-Type" not in self.headers:
return None
content_type = self.content_type
allowed_content_types = (allowed_content_types or
self.default_request_content_types)
if content_type not in allowed_content_types:
raise exception.InvalidContentType(content_type=content_type)
return content_type
class Resource(object):
"""
WSGI app that handles (de)serialization and controller dispatch.
Reads routing information supplied by RoutesMiddleware and calls
the requested action method upon its deserializer, controller,
and serializer. Those three objects may implement any of the basic
controller action methods (create, update, show, index, delete)
along with any that may be specified in the api router. A 'default'
method may also be implemented to be used in place of any
non-implemented actions. Deserializer methods must accept a request
argument and return a dictionary. Controller methods must accept a
request argument. Additionally, they must also accept keyword
arguments that represent the keys returned by the Deserializer. They
may raise a webob.exc exception or return a dict, which will be
serialized by requested content type.
"""
def __init__(self, controller, deserializer=None, serializer=None):
"""
:param controller: object that implement methods created by routes lib
:param deserializer: object that supports webob request deserialization
through controller-like actions
:param serializer: object that supports webob response serialization
through controller-like actions
"""
self.controller = controller
self.serializer = serializer or ResponseSerializer()
self.deserializer = deserializer or RequestDeserializer()
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, request):
"""WSGI method that controls (de)serialization and method dispatch."""
try:
action, action_args, accept = self.deserialize_request(request)
except exception.InvalidContentType:
msg = _("Unsupported Content-Type")
return webob.exc.HTTPUnsupportedMediaType(explanation=msg)
except exception.MalformedRequestBody:
msg = _("Malformed request body")
return webob.exc.HTTPBadRequest(explanation=msg)
action_result = self.execute_action(action, request, **action_args)
try:
return self.serialize_response(action, action_result, accept)
# return unserializable result (typically a webob exc)
except Exception:
return action_result
def deserialize_request(self, request):
return self.deserializer.deserialize(request)
def serialize_response(self, action, action_result, accept):
return self.serializer.serialize(action_result, accept, action)
def execute_action(self, action, request, **action_args):
return self.dispatch(self.controller, action, request, **action_args)
def dispatch(self, obj, action, *args, **kwargs):
"""Find action-specific method on self and call it."""
try:
method = getattr(obj, action)
except AttributeError:
method = getattr(obj, 'default')
return method(*args, **kwargs)
def get_action_args(self, request_environment):
"""Parse dictionary created by routes library."""
try:
args = request_environment['wsgiorg.routing_args'][1].copy()
except Exception:
return {}
try:
del args['controller']
except KeyError:
pass
try:
del args['format']
except KeyError:
pass
return args
class ActionDispatcher(object):
"""Maps method name to local methods through action name."""
def dispatch(self, *args, **kwargs):
"""Find and call local method."""
action = kwargs.pop('action', 'default')
action_method = getattr(self, str(action), self.default)
return action_method(*args, **kwargs)
def default(self, data):
raise NotImplementedError()
class DictSerializer(ActionDispatcher):
"""Default request body serialization"""
def serialize(self, data, action='default'):
return self.dispatch(data, action=action)
def default(self, data):
return ""
class JSONDictSerializer(DictSerializer):
"""Default JSON request body serialization"""
def default(self, data):
def sanitizer(obj):
if isinstance(obj, datetime.datetime):
_dtime = obj - datetime.timedelta(microseconds=obj.microsecond)
return _dtime.isoformat()
return unicode(obj)
return jsonutils.dumps(data, default=sanitizer)
class XMLDictSerializer(DictSerializer):
def __init__(self, metadata=None, xmlns=None):
"""
:param metadata: information needed to deserialize xml into
a dictionary.
:param xmlns: XML namespace to include with serialized xml
"""
super(XMLDictSerializer, self).__init__()
self.metadata = metadata or {}
self.xmlns = xmlns
def default(self, data):
# We expect data to contain a single key which is the XML root.
root_key = data.keys()[0]
doc = minidom.Document()
node = self._to_xml_node(doc, self.metadata, root_key, data[root_key])
return self.to_xml_string(node)
def to_xml_string(self, node, has_atom=False):
self._add_xmlns(node, has_atom)
return node.toprettyxml(indent=' ', encoding='UTF-8')
# NOTE (ameade): the has_atom should be removed after all of the
# xml serializers and view builders have been updated to the current
# spec that required all responses include the xmlns:atom, the has_atom
# flag is to prevent current tests from breaking
def _add_xmlns(self, node, has_atom=False):
if self.xmlns is not None:
node.setAttribute('xmlns', self.xmlns)
if has_atom:
node.setAttribute('xmlns:atom', "http://www.w3.org/2005/Atom")
def _to_xml_node(self, doc, metadata, nodename, data):
"""Recursive method to convert data members to XML nodes."""
result = doc.createElement(nodename)
# Set the xml namespace if one is specified
# TODO(justinsb): We could also use prefixes on the keys
xmlns = metadata.get('xmlns', None)
if xmlns:
result.setAttribute('xmlns', xmlns)
# TODO(bcwaldon): accomplish this without a type-check
if type(data) is list:
collections = metadata.get('list_collections', {})
if nodename in collections:
metadata = collections[nodename]
for item in data:
node = doc.createElement(metadata['item_name'])
node.setAttribute(metadata['item_key'], str(item))
result.appendChild(node)
return result
singular = metadata.get('plurals', {}).get(nodename, None)
if singular is None:
if nodename.endswith('s'):
singular = nodename[:-1]
else:
singular = 'item'
for item in data:
node = self._to_xml_node(doc, metadata, singular, item)
result.appendChild(node)
# TODO(bcwaldon): accomplish this without a type-check
elif type(data) is dict:
collections = metadata.get('dict_collections', {})
if nodename in collections:
metadata = collections[nodename]
for k, v in data.items():
node = doc.createElement(metadata['item_name'])
node.setAttribute(metadata['item_key'], str(k))
text = doc.createTextNode(str(v))
node.appendChild(text)
result.appendChild(node)
return result
attrs = metadata.get('attributes', {}).get(nodename, {})
for k, v in data.items():
if k in attrs:
result.setAttribute(k, str(v))
else:
node = self._to_xml_node(doc, metadata, k, v)
result.appendChild(node)
else:
# Type is atom
node = doc.createTextNode(str(data))
result.appendChild(node)
return result
def _create_link_nodes(self, xml_doc, links):
link_nodes = []
for link in links:
link_node = xml_doc.createElement('atom:link')
link_node.setAttribute('rel', link['rel'])
link_node.setAttribute('href', link['href'])
if 'type' in link:
link_node.setAttribute('type', link['type'])
link_nodes.append(link_node)
return link_nodes
class ResponseHeadersSerializer(ActionDispatcher):
"""Default response headers serialization"""
def serialize(self, response, data, action):
self.dispatch(response, data, action=action)
def default(self, response, data):
response.status_int = 200
class ResponseSerializer(object):
"""Encode the necessary pieces into a response object"""
def __init__(self, body_serializers=None, headers_serializer=None):
self.body_serializers = {
'application/xml': XMLDictSerializer(),
'application/json': JSONDictSerializer(),
}
self.body_serializers.update(body_serializers or {})
self.headers_serializer = (headers_serializer or
ResponseHeadersSerializer())
def serialize(self, response_data, content_type, action='default'):
"""Serialize a dict into a string and wrap in a wsgi.Request object.
:param response_data: dict produced by the Controller
:param content_type: expected mimetype of serialized response body
"""
response = webob.Response()
self.serialize_headers(response, response_data, action)
self.serialize_body(response, response_data, content_type, action)
return response
def serialize_headers(self, response, data, action):
self.headers_serializer.serialize(response, data, action)
def serialize_body(self, response, data, content_type, action):
response.headers['Content-Type'] = content_type
if data is not None:
serializer = self.get_body_serializer(content_type)
response.body = serializer.serialize(data, action)
def get_body_serializer(self, content_type):
try:
return self.body_serializers[content_type]
except (KeyError, TypeError):
raise exception.InvalidContentType(content_type=content_type)
class RequestHeadersDeserializer(ActionDispatcher):
"""Default request headers deserializer"""
def deserialize(self, request, action):
return self.dispatch(request, action=action)
def default(self, request):
return {}
class RequestDeserializer(object):
"""Break up a Request object into more useful pieces."""
def __init__(self, body_deserializers=None, headers_deserializer=None,
supported_content_types=None):
self.supported_content_types = supported_content_types
self.body_deserializers = {
'application/xml': XMLDeserializer(),
'application/json': JSONDeserializer(),
}
self.body_deserializers.update(body_deserializers or {})
self.headers_deserializer = (headers_deserializer or
RequestHeadersDeserializer())
def deserialize(self, request):
"""Extract necessary pieces of the request.
:param request: Request object
:returns: tuple of (expected controller action name, dictionary of
keyword arguments to pass to the controller, the expected
content type of the response)
"""
action_args = self.get_action_args(request.environ)
action = action_args.pop('action', None)
action_args.update(self.deserialize_headers(request, action))
action_args.update(self.deserialize_body(request, action))
accept = self.get_expected_content_type(request)
return (action, action_args, accept)
def deserialize_headers(self, request, action):
return self.headers_deserializer.deserialize(request, action)
def deserialize_body(self, request, action):
if not len(request.body) > 0:
LOG.debug(_("Empty body provided in request"))
return {}
try:
content_type = request.get_content_type()
except exception.InvalidContentType:
LOG.debug(_("Unrecognized Content-Type provided in request"))
raise
if content_type is None:
LOG.debug(_("No Content-Type provided in request"))
return {}
try:
deserializer = self.get_body_deserializer(content_type)
except exception.InvalidContentType:
LOG.debug(_("Unable to deserialize body as provided Content-Type"))
raise
return deserializer.deserialize(request.body, action)
def get_body_deserializer(self, content_type):
try:
return self.body_deserializers[content_type]
except (KeyError, TypeError):
raise exception.InvalidContentType(content_type=content_type)
def get_expected_content_type(self, request):
return request.best_match_content_type(self.supported_content_types)
def get_action_args(self, request_environment):
"""Parse dictionary created by routes library."""
try:
args = request_environment['wsgiorg.routing_args'][1].copy()
except Exception:
return {}
try:
del args['controller']
except KeyError:
pass
try:
del args['format']
except KeyError:
pass
return args
class TextDeserializer(ActionDispatcher):
"""Default request body deserialization"""
def deserialize(self, datastring, action='default'):
return self.dispatch(datastring, action=action)
def default(self, datastring):
return {}
class JSONDeserializer(TextDeserializer):
def _from_json(self, datastring):
try:
return jsonutils.loads(datastring)
except ValueError:
msg = _("cannot understand JSON")
raise exception.MalformedRequestBody(reason=msg)
def default(self, datastring):
return {'body': self._from_json(datastring)}
class XMLDeserializer(TextDeserializer):
def __init__(self, metadata=None):
"""
:param metadata: information needed to deserialize xml into
a dictionary.
"""
super(XMLDeserializer, self).__init__()
self.metadata = metadata or {}
def _from_xml(self, datastring):
plurals = set(self.metadata.get('plurals', {}))
try:
node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
except expat.ExpatError:
msg = _("cannot understand XML")
raise exception.MalformedRequestBody(reason=msg)
def _from_xml_node(self, node, listnames):
"""Convert a minidom node to a simple Python type.
:param listnames: list of XML node names whose subnodes should
be considered list items.
"""
if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
return node.childNodes[0].nodeValue
elif node.nodeName in listnames:
return [self._from_xml_node(n, listnames) for n in node.childNodes]
else:
result = dict()
for attr in node.attributes.keys():
result[attr] = node.attributes[attr].nodeValue
for child in node.childNodes:
if child.nodeType != node.TEXT_NODE:
result[child.nodeName] = self._from_xml_node(child,
listnames)
return result
def find_first_child_named(self, parent, name):
"""Search a nodes children for the first child with a given name"""
for node in parent.childNodes:
if node.nodeName == name:
return node
return None
def find_children_named(self, parent, name):
"""Return all of a nodes children who have the given name"""
for node in parent.childNodes:
if node.nodeName == name:
yield node
def extract_text(self, node):
"""Get the text field contained by the given node"""
if len(node.childNodes) == 1:
child = node.childNodes[0]
if child.nodeType == child.TEXT_NODE:
return child.nodeValue
return ""
def default(self, datastring):
return {'body': self._from_xml(datastring)}

View File

@ -1,72 +0,0 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from xml.dom import minidom
from xml.parsers import expat
from xml import sax
from xml.sax import expatreader
class ProtectedExpatParser(expatreader.ExpatParser):
"""An expat parser which disables DTD's and entities by default."""
def __init__(self, forbid_dtd=True, forbid_entities=True,
*args, **kwargs):
# Python 2.x old style class
expatreader.ExpatParser.__init__(self, *args, **kwargs)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise ValueError("Inline DTD forbidden")
def entity_decl(self, entityName, is_parameter_entity, value, base,
systemId, publicId, notationName):
raise ValueError("<!ENTITY> entity declaration forbidden")
def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise ValueError("<!ENTITY> unparsed entity forbidden")
def external_entity_ref(self, context, base, systemId, publicId):
raise ValueError("<!ENTITY> external entity forbidden")
def notation_decl(self, name, base, sysid, pubid):
raise ValueError("<!ENTITY> notation forbidden")
def reset(self):
expatreader.ExpatParser.reset(self)
if self.forbid_dtd:
self._parser.StartDoctypeDeclHandler = self.start_doctype_decl
self._parser.EndDoctypeDeclHandler = None
if self.forbid_entities:
self._parser.EntityDeclHandler = self.entity_decl
self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
self._parser.ExternalEntityRefHandler = self.external_entity_ref
self._parser.NotationDeclHandler = self.notation_decl
try:
self._parser.SkippedEntityHandler = None
except AttributeError:
# some pyexpat versions do not support SkippedEntity
pass
def safe_minidom_parse_string(xml_string):
"""Parse an XML string using minidom safely.
"""
try:
return minidom.parseString(xml_string, parser=ProtectedExpatParser())
except sax.SAXParseException:
raise expat.ExpatError()

View File

@ -71,13 +71,8 @@ class Service(service.RPCService):
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, host, binary, topic, **kwargs):
# Modifying the topic so it is pool manager instance specific.
topic = '%s.%s' % (topic, cfg.CONF['service:pool_manager'].pool_id)
LOG.info(_LI('Using topic %(topic)s for this pool manager instance.')
% {'topic': topic})
super(Service, self).__init__(host, binary, topic, **kwargs)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
# Get a pool manager cache connection.
cache_driver = cfg.CONF['service:pool_manager'].cache_driver
@ -121,6 +116,21 @@ class Service(service.RPCService):
self.enable_sync_timer = \
cfg.CONF['service:pool_manager'].enable_sync_timer
@property
def service_name(self):
return 'pool_manager'
@property
def _rpc_topic(self):
# Modify the default topic so it's pool manager instance specific.
topic = super(Service, self)._rpc_topic
topic = '%s.%s' % (topic, cfg.CONF['service:pool_manager'].pool_id)
LOG.info(_LI('Using topic %(topic)s for this pool manager instance.')
% {'topic': topic})
return topic
def start(self):
for server_backend in self.server_backends:
backend_instance = server_backend['backend_instance']

View File

@ -1,6 +1,9 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# Copyright 2011 OpenStack Foundation
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,34 +17,57 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import inspect
import abc
import socket
import errno
import time
import six
import eventlet.wsgi
from oslo import messaging
from oslo.config import cfg
from oslo_log import log as logging
from oslo_log import loggers
from designate.openstack.common import service
from designate.openstack.deprecated import wsgi
from designate.openstack.common import sslutils
from designate.i18n import _
from designate import rpc
from designate import policy
from designate import version
from designate import dnsutils
# TODO(kiall): These options have been cut+paste from the old WSGI code, and
# should be moved into service:api etc..
wsgi_socket_opts = [
cfg.IntOpt('backlog',
default=4096,
help="Number of backlog requests to configure the socket with"),
cfg.IntOpt('tcp_keepidle',
default=600,
help="Sets the value of TCP_KEEPIDLE in seconds for each "
"server socket. Not supported on OS X."),
]
CONF = cfg.CONF
CONF.register_opts(wsgi_socket_opts)
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Service(service.Service):
"""
Service class to be shared among the diverse service inside of Designate.
"""
def __init__(self, threads=1000):
def __init__(self, threads=None):
threads = threads or 1000
super(Service, self).__init__(threads)
self._host = CONF.host
self._service_config = CONF['service:%s' % self.service_name]
policy.init()
# NOTE(kiall): All services need RPC initialized, as this is used
@ -50,165 +76,187 @@ class Service(service.Service):
if not rpc.initialized():
rpc.init(CONF)
@abc.abstractproperty
def service_name(self):
pass
def start(self):
super(Service, self).start()
LOG.info(_('Starting %(name)s service (version: %(version)s)') %
{'name': self.service_name,
'version': version.version_info.version_string()})
def stop(self):
LOG.info(_('Stopping %(name)s service') % {'name': self.service_name})
super(Service, self).stop()
class RPCService(Service):
"""
Service class to be shared by all Designate RPC Services
"""
def __init__(self, host, binary, topic, service_name=None, endpoints=None):
super(RPCService, self).__init__()
def __init__(self, threads=None):
super(RPCService, self).__init__(threads)
self.host = host
self.binary = binary
self.topic = topic
self.service_name = service_name
LOG.debug(_("Creating RPC Server on topic '%s'") % self._rpc_topic)
self._rpc_server = rpc.get_server(
messaging.Target(topic=self._rpc_topic, server=self._host),
self._rpc_endpoints)
# TODO(ekarlso): change this to be loadable via mod import or
# stevedore?
self.endpoints = endpoints or [self]
@property
def _rpc_endpoints(self):
return [self]
@property
def _rpc_topic(self):
return self.service_name
def start(self):
super(RPCService, self).start()
version_string = version.version_info.version_string()
LOG.info(_('Starting %(topic)s node (version %(version_string)s)') %
{'topic': self.topic, 'version_string': version_string})
LOG.debug(_("Creating RPC server on topic '%s'") % self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
self.rpcserver = rpc.get_server(target, self.endpoints)
self.rpcserver.start()
LOG.debug(_("Starting RPC server on topic '%s'") % self._rpc_topic)
self._rpc_server.start()
# TODO(kiall): This probably belongs somewhere else, maybe the base
# Service class?
self.notifier = rpc.get_notifier(self.service_name)
for e in self.endpoints:
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'start'):
e.start()
@classmethod
def create(cls, host=None, binary=None, topic=None, service_name=None,
endpoints=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'cinder-' part
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
name = "_".join(binary.split('-')[1:]) + '_topic'
topic = CONF.get(name)
service_obj = cls(host, binary, topic, service_name=service_name,
endpoints=endpoints)
return service_obj
def stop(self):
for e in self.endpoints:
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'stop'):
e.stop()
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpcserver.stop()
self._rpc_server.stop()
except Exception:
pass
super(RPCService, self).stop()
def wait(self):
for e in self.endpoints:
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'wait'):
e.wait()
super(RPCService, self).wait()
class WSGIService(wsgi.Service, Service):
@six.add_metaclass(abc.ABCMeta)
class WSGIService(Service):
"""
Service class to be shared by all Designate WSGI Services
"""
def __init__(self, application, port, host='0.0.0.0', backlog=4096,
threads=1000):
# NOTE(kiall): We avoid calling super(cls, self) here, as our parent
# classes have different argspecs. Additionally, if we
# manually call both parent's __init__, the openstack
# common Service class's __init__ method will be called
# twice. As a result, we only call the designate base
# Service's __init__ method, and duplicate the
# wsgi.Service's constructor functionality here.
#
Service.__init__(self, threads)
def __init__(self, threads=None):
super(WSGIService, self).__init__(threads)
self.application = application
self._port = port
self._host = host
self._backlog = backlog if backlog else CONF.backlog
@abc.abstractproperty
def _wsgi_application(self):
pass
def _wsgi_get_socket(self):
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
info = socket.getaddrinfo(self._service_config.api_host,
self._service_config.api_port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
# TODO(kiall): Backlog should be a service specific setting,
# rather than a global
sock = eventlet.listen(bind_addr,
backlog=cfg.CONF.backlog,
family=family)
if sslutils.is_enabled():
sock = sslutils.wrap(sock)
except socket.error as err:
if err.args[0] != errno.EADDRINUSE:
raise
eventlet.sleep(0.1)
if not sock:
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
"after trying for 30 seconds") %
{'host': self._service_config.api_host,
'port': self._service_config.api_port})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
return sock
def start(self):
super(WSGIService, self).start()
socket = self._wsgi_get_socket()
application = self._wsgi_application
self.tg.add_thread(self._wsgi_handle, application, socket)
def _wsgi_handle(self, application, socket):
logger = logging.getLogger('eventlet.wsgi')
eventlet.wsgi.server(socket,
application,
custom_pool=self.tg.pool,
log=loggers.WritableLogger(logger))
@six.add_metaclass(abc.ABCMeta)
class DNSService(Service):
"""
Service class to be used for a service that only works in TCP
"""
def __init__(self, config, host=None, binary=None, service_name=None,
endpoints=None, threads=1000):
def __init__(self, threads=None):
super(DNSService, self).__init__(threads)
self.host = host
self.binary = binary
self.service_name = service_name
self._dns_sock_tcp = dnsutils.bind_tcp(
self._service_config.host,
self._service_config.port,
self._service_config.tcp_backlog)
self.endpoints = endpoints or [self]
self.config = config
self._dns_sock_udp = dnsutils.bind_udp(
self._service_config.host,
self._service_config.port)
self._sock_tcp = dnsutils.bind_tcp(
self.config.host, self.config.port,
self.config.tcp_backlog)
self._sock_udp = dnsutils.bind_udp(
self.config.host, self.config.port)
@classmethod
def create(cls, host=None, binary=None, service_name=None,
endpoints=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
service_obj = cls(host, binary, service_name=service_name,
endpoints=endpoints)
return service_obj
@abc.abstractproperty
def _dns_application(self):
pass
def start(self):
for e in self.endpoints:
if e != self and hasattr(e, 'start'):
e.start()
self.tg.add_thread(
dnsutils.handle_tcp, self._sock_tcp, self.tg, dnsutils.handle,
self.application, timeout=self.config.tcp_recv_timeout)
self.tg.add_thread(
dnsutils.handle_udp, self._sock_udp, self.tg, dnsutils.handle,
self.application)
super(DNSService, self).start()
def stop(self):
for e in self.endpoints:
if e != self and hasattr(e, 'stop'):
e.stop()
self.tg.add_thread(
dnsutils.handle_tcp, self._dns_sock_tcp, self.tg, dnsutils.handle,
self._dns_application, self._service_config.tcp_recv_timeout)
self.tg.add_thread(
dnsutils.handle_udp, self._dns_sock_udp, self.tg, dnsutils.handle,
self._dns_application)
def wait(self):
super(DNSService, self).wait()
def stop(self):
# When the service is stopped, the threads for _handle_tcp and
# _handle_udp are stopped too.
super(DNSService, self).stop()

View File

@ -28,13 +28,17 @@ LOG = logging.getLogger(__name__)
class Service(service.Service):
def __init__(self, *args, **kwargs):
super(Service, self).__init__(*args, **kwargs)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
# Initialize extensions
self.handlers = self._init_extensions()
self.subscribers = self._get_subscribers()
@property
def service_name(self):
return 'sink'
def _init_extensions(self):
"""Loads and prepares all enabled extensions"""

View File

@ -62,10 +62,10 @@ class RPCFixture(fixtures.Fixture):
class ServiceFixture(fixtures.Fixture):
def __init__(self, svc_name, *args, **kw):
def __init__(self, svc_name):
cls = importutils.import_class(
'designate.%s.service.Service' % svc_name)
self.svc = cls.create(binary='designate-' + svc_name, *args, **kw)
self.svc = cls()
def setUp(self):
super(ServiceFixture, self).setUp()

View File

@ -84,7 +84,7 @@ class PoolManagerServiceTest(PoolManagerTestCase):
def test_pool_instance_topic(self):
self.assertEqual(
'pool_manager.%s' % cfg.CONF['service:pool_manager'].pool_id,
self.service.topic)
self.service._rpc_topic)
def test_no_pool_servers_configured(self):
self.service.stop()

View File

@ -351,3 +351,27 @@ def extract_priority_from_data(recordset_type, record):
priority, _, data = record['data'].partition(" ")
priority = int(priority)
return priority, data
def cache_result(function):
"""A function decorator to cache the result of the first call, every
additional call will simply return the cached value.
If we were python3 only, we would have used functools.lru_cache() in place
of this. If there's a python2 backport in a lightweight library, then we
should switch to that.
"""
# NOTE: We're cheating a little here, by using a mutable type (a list),
# we're able to read and update the value from within in inline
# wrapper method. If we used an immutable type, the assignment
# would not work as we want.
cache = [None]
def wrapper(*args, **kwargs):
if cache[0] is not None:
return cache[0]
else:
result = function(*args, **kwargs)
cache[0] = result
return result
return wrapper