diff --git a/designate/agent/service.py b/designate/agent/service.py index c48945d98..565ff7bbb 100644 --- a/designate/agent/service.py +++ b/designate/agent/service.py @@ -16,11 +16,11 @@ from oslo.config import cfg from oslo_log import log as logging +from designate import utils from designate import dnsutils from designate import service from designate.agent import handler from designate.backend import agent_backend -from designate.i18n import _LI LOG = logging.getLogger(__name__) @@ -28,23 +28,30 @@ CONF = cfg.CONF class Service(service.DNSService): - def __init__(self, *args, **kwargs): - super(Service, self).__init__(cfg.CONF['service:agent'], *args, - **kwargs) + def __init__(self, threads=None): + super(Service, self).__init__(threads=threads) backend_driver = cfg.CONF['service:agent'].backend_driver self.backend = agent_backend.get_backend(backend_driver, self) - # Create an instance of the RequestHandler class - self.application = handler.RequestHandler() + @property + def service_name(self): + return 'agent' - self.application = dnsutils.DNSMiddleware(self.application) + @property + @utils.cache_result + def _dns_application(self): + # Create an instance of the RequestHandler class + application = handler.RequestHandler() + application = dnsutils.DNSMiddleware(application) + + return application def start(self): super(Service, self).start() self.backend.start() - LOG.info(_LI("Started Agent Service")) def stop(self): super(Service, self).stop() - LOG.info(_LI("Stopped Agent Service")) + # TODO(kiall): Shouldn't we be stppping the backend here too? To fix + # in another review. diff --git a/designate/api/service.py b/designate/api/service.py index 7a4eb7bf1..85446283e 100644 --- a/designate/api/service.py +++ b/designate/api/service.py @@ -27,8 +27,15 @@ LOG = logging.getLogger(__name__) class Service(service.WSGIService): - def __init__(self, backlog=128, threads=1000): + def __init__(self, threads=None): + super(Service, self).__init__(threads=threads) + @property + def service_name(self): + return 'api' + + @property + def _wsgi_application(self): api_paste_config = cfg.CONF['service:api'].api_paste_config config_paths = utils.find_config(api_paste_config) @@ -38,11 +45,4 @@ class Service(service.WSGIService): LOG.info(_LI('Using api-paste-config found at: %s') % config_paths[0]) - application = deploy.loadapp("config:%s" % config_paths[0], - name='osapi_dns') - - super(Service, self).__init__(application=application, - host=cfg.CONF['service:api'].api_host, - port=cfg.CONF['service:api'].api_port, - backlog=backlog, - threads=threads) + return deploy.loadapp("config:%s" % config_paths[0], name='osapi_dns') diff --git a/designate/central/service.py b/designate/central/service.py index 4a609c226..073b377ea 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -250,8 +250,8 @@ class Service(service.RPCService): target = messaging.Target(version=RPC_API_VERSION) - def __init__(self, *args, **kwargs): - super(Service, self).__init__(*args, **kwargs) + def __init__(self, threads=None): + super(Service, self).__init__(threads=threads) # Get a storage connection storage_driver = cfg.CONF['service:central'].storage_driver @@ -262,6 +262,10 @@ class Service(service.RPCService): self.network_api = network_api.get_network_api(cfg.CONF.network_api) + @property + def service_name(self): + return 'central' + def start(self): # Check to see if there are any TLDs in the database tlds = self.storage.find_tlds({}) diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py index 7304a4bb7..739afe097 100644 --- a/designate/cmd/agent.py +++ b/designate/cmd/agent.py @@ -31,7 +31,6 @@ def main(): utils.read_config('designate', sys.argv) logging.setup(CONF, 'designate') - server = agent_service.Service.create( - binary='designate-agent') + server = agent_service.Service() service.serve(server, workers=CONF['service:agent'].workers) service.wait() diff --git a/designate/cmd/central.py b/designate/cmd/central.py index 8195590f1..b5bc5a1cc 100644 --- a/designate/cmd/central.py +++ b/designate/cmd/central.py @@ -31,7 +31,6 @@ def main(): utils.read_config('designate', sys.argv) logging.setup(CONF, 'designate') - server = central.Service.create(binary='designate-central', - service_name='central') + server = central.Service() service.serve(server, workers=CONF['service:central'].workers) service.wait() diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py index 795ba1fa7..43cc4d5f8 100644 --- a/designate/cmd/mdns.py +++ b/designate/cmd/mdns.py @@ -31,7 +31,6 @@ def main(): utils.read_config('designate', sys.argv) logging.setup(CONF, 'designate') - server = mdns_service.Service.create( - binary='designate-mdns') + server = mdns_service.Service() service.serve(server, workers=CONF['service:mdns'].workers) service.wait() diff --git a/designate/cmd/pool_manager.py b/designate/cmd/pool_manager.py index 9261d2b81..7ed7ff0ed 100644 --- a/designate/cmd/pool_manager.py +++ b/designate/cmd/pool_manager.py @@ -32,7 +32,6 @@ def main(): utils.read_config('designate', sys.argv) logging.setup(CONF, 'designate') - server = pool_manager_service.Service.create( - binary='designate-pool-manager') + server = pool_manager_service.Service() service.serve(server, workers=CONF['service:pool_manager'].workers) service.wait() diff --git a/designate/mdns/service.py b/designate/mdns/service.py index 03322f844..61d8c3926 100644 --- a/designate/mdns/service.py +++ b/designate/mdns/service.py @@ -16,23 +16,20 @@ from oslo.config import cfg from oslo_log import log as logging +from designate import utils from designate import dnsutils from designate import service from designate.mdns import handler from designate.mdns import middleware from designate.mdns import notify -from designate.i18n import _LI LOG = logging.getLogger(__name__) CONF = cfg.CONF class Service(service.RPCService): - def __init__(self, *args, **kwargs): - notify_endpoint = notify.NotifyEndpoint() - kwargs['endpoints'] = [notify_endpoint] - - super(Service, self).__init__(*args, **kwargs) + def __init__(self, threads=None): + super(Service, self).__init__(threads=threads) # Create an instance of the RequestHandler class self.application = handler.RequestHandler() @@ -50,6 +47,15 @@ class Service(service.RPCService): self._sock_udp = dnsutils.bind_udp( CONF['service:mdns'].host, CONF['service:mdns'].port) + @property + def service_name(self): + return 'mdns' + + @property + @utils.cache_result + def _rpc_endpoints(self): + return [notify.NotifyEndpoint()] + def start(self): super(Service, self).start() @@ -59,10 +65,8 @@ class Service(service.RPCService): self.tg.add_thread( dnsutils.handle_udp, self._sock_udp, self.tg, dnsutils.handle, self.application) - LOG.info(_LI("started mdns service")) def stop(self): # When the service is stopped, the threads for _handle_tcp and # _handle_udp are stopped too. super(Service, self).stop() - LOG.info(_LI("stopped mdns service")) diff --git a/designate/openstack/deprecated/__init__.py b/designate/openstack/deprecated/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/designate/openstack/deprecated/exception.py b/designate/openstack/deprecated/exception.py deleted file mode 100644 index d8a5f3a82..000000000 --- a/designate/openstack/deprecated/exception.py +++ /dev/null @@ -1,142 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Exceptions common to OpenStack projects -""" - -import logging - -from designate.i18n import _ - -_FATAL_EXCEPTION_FORMAT_ERRORS = False - - -class Error(Exception): - def __init__(self, message=None): - super(Error, self).__init__(message) - - -class ApiError(Error): - def __init__(self, message='Unknown', code='Unknown'): - self.message = message - self.code = code - super(ApiError, self).__init__('%s: %s' % (code, message)) - - -class NotFound(Error): - pass - - -class UnknownScheme(Error): - - msg = "Unknown scheme '%s' found in URI" - - def __init__(self, scheme): - msg = self.__class__.msg % scheme - super(UnknownScheme, self).__init__(msg) - - -class BadStoreUri(Error): - - msg = "The Store URI %s was malformed. Reason: %s" - - def __init__(self, uri, reason): - msg = self.__class__.msg % (uri, reason) - super(BadStoreUri, self).__init__(msg) - - -class Duplicate(Error): - pass - - -class NotAuthorized(Error): - pass - - -class NotEmpty(Error): - pass - - -class Invalid(Error): - pass - - -class BadInputError(Exception): - """Error resulting from a client sending bad input to a server""" - pass - - -class MissingArgumentError(Error): - pass - - -class DatabaseMigrationError(Error): - pass - - -class ClientConnectionError(Exception): - """Error resulting from a client connecting to a server""" - pass - - -def wrap_exception(f): - def _wrap(*args, **kw): - try: - return f(*args, **kw) - except Exception, e: - if not isinstance(e, Error): - # exc_type, exc_value, exc_traceback = sys.exc_info() - logging.exception(_('Uncaught exception')) - # logging.error(traceback.extract_stack(exc_traceback)) - raise Error(str(e)) - raise - _wrap.func_name = f.func_name - return _wrap - - -class OpenstackException(Exception): - """ - Base Exception - - To correctly use this class, inherit from it and define - a 'message' property. That message will get printf'd - with the keyword arguments provided to the constructor. - """ - message = "An unknown exception occurred" - - def __init__(self, **kwargs): - try: - self._error_string = self.message % kwargs - - except Exception as e: - if _FATAL_EXCEPTION_FORMAT_ERRORS: - raise e - else: - # at least get the core message out if something happened - self._error_string = self.message - - def __str__(self): - return self._error_string - - -class MalformedRequestBody(OpenstackException): - message = "Malformed message body: %(reason)s" - - -class InvalidContentType(OpenstackException): - message = "Invalid content type %(content_type)s" diff --git a/designate/openstack/deprecated/wsgi.py b/designate/openstack/deprecated/wsgi.py deleted file mode 100644 index 071c23a16..000000000 --- a/designate/openstack/deprecated/wsgi.py +++ /dev/null @@ -1,798 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Utility methods for working with WSGI servers.""" - -import eventlet -eventlet.patcher.monkey_patch(all=False, socket=True) - -import datetime -import errno -import socket -import sys -import time - -import eventlet.wsgi -from oslo.config import cfg -from oslo_log import log as logging -from oslo_log import loggers -from oslo.serialization import jsonutils -import routes -import routes.middleware -import webob.dec -import webob.exc -from xml.dom import minidom -from xml.parsers import expat - -from designate.i18n import _ -from designate.openstack.common import service -from designate.openstack.common import sslutils -from designate.openstack.deprecated import exception -from designate.openstack.deprecated import xmlutils - -socket_opts = [ - cfg.IntOpt('backlog', - default=4096, - help="Number of backlog requests to configure the socket with"), - cfg.IntOpt('tcp_keepidle', - default=600, - help="Sets the value of TCP_KEEPIDLE in seconds for each " - "server socket. Not supported on OS X."), -] - -CONF = cfg.CONF -CONF.register_opts(socket_opts) - -LOG = logging.getLogger(__name__) - - -def run_server(application, port): - """Run a WSGI server with the given application.""" - sock = eventlet.listen(('0.0.0.0', port)) - eventlet.wsgi.server(sock, application) - - -class Service(service.Service): - """ - Provides a Service API for wsgi servers. - - This gives us the ability to launch wsgi servers with the - Launcher classes in service.py. - """ - - def __init__(self, application, port, - host='0.0.0.0', backlog=4096, threads=1000): - self.application = application - self._port = port - self._host = host - self._backlog = backlog if backlog else CONF.backlog - super(Service, self).__init__(threads) - - def _get_socket(self, host, port, backlog): - # TODO(dims): eventlet's green dns/socket module does not actually - # support IPv6 in getaddrinfo(). We need to get around this in the - # future or monitor upstream for a fix - info = socket.getaddrinfo(host, - port, - socket.AF_UNSPEC, - socket.SOCK_STREAM)[0] - family = info[0] - bind_addr = info[-1] - - sock = None - retry_until = time.time() + 30 - while not sock and time.time() < retry_until: - try: - sock = eventlet.listen(bind_addr, - backlog=backlog, - family=family) - if sslutils.is_enabled(): - sock = sslutils.wrap(sock) - - except socket.error, err: - if err.args[0] != errno.EADDRINUSE: - raise - eventlet.sleep(0.1) - if not sock: - raise RuntimeError(_("Could not bind to %(host)s:%(port)s " - "after trying for 30 seconds") % - {'host': host, 'port': port}) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - # sockets can hang around forever without keepalive - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - - # This option isn't available in the OS X version of eventlet - if hasattr(socket, 'TCP_KEEPIDLE'): - sock.setsockopt(socket.IPPROTO_TCP, - socket.TCP_KEEPIDLE, - CONF.tcp_keepidle) - - return sock - - def start(self): - """Start serving this service using the provided server instance. - - :returns: None - - """ - super(Service, self).start() - self._socket = self._get_socket(self._host, self._port, self._backlog) - self.tg.add_thread(self._run, self.application, self._socket) - - @property - def backlog(self): - return self._backlog - - @property - def host(self): - return self._socket.getsockname()[0] if self._socket else self._host - - @property - def port(self): - return self._socket.getsockname()[1] if self._socket else self._port - - def stop(self): - """Stop serving this API. - - :returns: None - - """ - super(Service, self).stop() - - def _run(self, application, socket): - """Start a WSGI server in a new green thread.""" - logger = logging.getLogger('eventlet.wsgi') - eventlet.wsgi.server(socket, - application, - custom_pool=self.tg.pool, - log=loggers.WritableLogger(logger)) - - -class Middleware(object): - """ - Base WSGI middleware wrapper. These classes require an application to be - initialized that will be called next. By default the middleware will - simply call its wrapped app, or you can override __call__ to customize its - behavior. - """ - - def __init__(self, application): - self.application = application - - def process_request(self, req): - """ - Called on each request. - - If this returns None, the next application down the stack will be - executed. If it returns a response then that response will be returned - and execution will stop here. - """ - return None - - def process_response(self, response): - """Do whatever you'd like to the response.""" - return response - - @webob.dec.wsgify - def __call__(self, req): - response = self.process_request(req) - if response: - return response - response = req.get_response(self.application) - return self.process_response(response) - - -class Debug(Middleware): - """ - Helper class that can be inserted into any WSGI application chain - to get information about the request and response. - """ - - @webob.dec.wsgify - def __call__(self, req): - print ("*" * 40) + " REQUEST ENVIRON" - for key, value in req.environ.items(): - print key, "=", value - print - resp = req.get_response(self.application) - - print ("*" * 40) + " RESPONSE HEADERS" - for (key, value) in resp.headers.iteritems(): - print key, "=", value - print - - resp.app_iter = self.print_generator(resp.app_iter) - - return resp - - @staticmethod - def print_generator(app_iter): - """ - Iterator that prints the contents of a wrapper string iterator - when iterated. - """ - print ("*" * 40) + " BODY" - for part in app_iter: - sys.stdout.write(part) - sys.stdout.flush() - yield part - print - - -class Router(object): - - """ - WSGI middleware that maps incoming requests to WSGI apps. - """ - - def __init__(self, mapper): - """ - Create a router for the given routes.Mapper. - - Each route in `mapper` must specify a 'controller', which is a - WSGI app to call. You'll probably want to specify an 'action' as - well and have your controller be a wsgi.Controller, who will route - the request to the action method. - - Examples: - mapper = routes.Mapper() - sc = ServerController() - - # Explicit mapping of one route to a controller+action - mapper.connect(None, "/svrlist", controller=sc, action="list") - - # Actions are all implicitly defined - mapper.resource("server", "servers", controller=sc) - - # Pointing to an arbitrary WSGI app. You can specify the - # {path_info:.*} parameter so the target app can be handed just that - # section of the URL. - mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp()) - """ - self.map = mapper - self._router = routes.middleware.RoutesMiddleware(self._dispatch, - self.map) - - @webob.dec.wsgify - def __call__(self, req): - """ - Route the incoming request to a controller based on self.map. - If no match, return a 404. - """ - return self._router - - @staticmethod - @webob.dec.wsgify - def _dispatch(req): - """ - Called by self._router after matching the incoming request to a route - and putting the information into req.environ. Either returns 404 - or the routed WSGI app's response. - """ - match = req.environ['wsgiorg.routing_args'][1] - if not match: - return webob.exc.HTTPNotFound() - app = match['controller'] - return app - - -class Request(webob.Request): - """Add some Openstack API-specific logic to the base webob.Request.""" - - default_request_content_types = ('application/json', 'application/xml') - default_accept_types = ('application/json', 'application/xml') - default_accept_type = 'application/json' - - def best_match_content_type(self, supported_content_types=None): - """Determine the requested response content-type. - - Based on the query extension then the Accept header. - Defaults to default_accept_type if we don't find a preference - - """ - supported_content_types = (supported_content_types or - self.default_accept_types) - - parts = self.path.rsplit('.', 1) - if len(parts) > 1: - ctype = 'application/{0}'.format(parts[1]) - if ctype in supported_content_types: - return ctype - - bm = self.accept.best_match(supported_content_types) - return bm or self.default_accept_type - - def get_content_type(self, allowed_content_types=None): - """Determine content type of the request body. - - Does not do any body introspection, only checks header - - """ - if "Content-Type" not in self.headers: - return None - - content_type = self.content_type - allowed_content_types = (allowed_content_types or - self.default_request_content_types) - - if content_type not in allowed_content_types: - raise exception.InvalidContentType(content_type=content_type) - return content_type - - -class Resource(object): - """ - WSGI app that handles (de)serialization and controller dispatch. - - Reads routing information supplied by RoutesMiddleware and calls - the requested action method upon its deserializer, controller, - and serializer. Those three objects may implement any of the basic - controller action methods (create, update, show, index, delete) - along with any that may be specified in the api router. A 'default' - method may also be implemented to be used in place of any - non-implemented actions. Deserializer methods must accept a request - argument and return a dictionary. Controller methods must accept a - request argument. Additionally, they must also accept keyword - arguments that represent the keys returned by the Deserializer. They - may raise a webob.exc exception or return a dict, which will be - serialized by requested content type. - """ - def __init__(self, controller, deserializer=None, serializer=None): - """ - :param controller: object that implement methods created by routes lib - :param deserializer: object that supports webob request deserialization - through controller-like actions - :param serializer: object that supports webob response serialization - through controller-like actions - """ - self.controller = controller - self.serializer = serializer or ResponseSerializer() - self.deserializer = deserializer or RequestDeserializer() - - @webob.dec.wsgify(RequestClass=Request) - def __call__(self, request): - """WSGI method that controls (de)serialization and method dispatch.""" - - try: - action, action_args, accept = self.deserialize_request(request) - except exception.InvalidContentType: - msg = _("Unsupported Content-Type") - return webob.exc.HTTPUnsupportedMediaType(explanation=msg) - except exception.MalformedRequestBody: - msg = _("Malformed request body") - return webob.exc.HTTPBadRequest(explanation=msg) - - action_result = self.execute_action(action, request, **action_args) - try: - return self.serialize_response(action, action_result, accept) - # return unserializable result (typically a webob exc) - except Exception: - return action_result - - def deserialize_request(self, request): - return self.deserializer.deserialize(request) - - def serialize_response(self, action, action_result, accept): - return self.serializer.serialize(action_result, accept, action) - - def execute_action(self, action, request, **action_args): - return self.dispatch(self.controller, action, request, **action_args) - - def dispatch(self, obj, action, *args, **kwargs): - """Find action-specific method on self and call it.""" - try: - method = getattr(obj, action) - except AttributeError: - method = getattr(obj, 'default') - - return method(*args, **kwargs) - - def get_action_args(self, request_environment): - """Parse dictionary created by routes library.""" - try: - args = request_environment['wsgiorg.routing_args'][1].copy() - except Exception: - return {} - - try: - del args['controller'] - except KeyError: - pass - - try: - del args['format'] - except KeyError: - pass - - return args - - -class ActionDispatcher(object): - """Maps method name to local methods through action name.""" - - def dispatch(self, *args, **kwargs): - """Find and call local method.""" - action = kwargs.pop('action', 'default') - action_method = getattr(self, str(action), self.default) - return action_method(*args, **kwargs) - - def default(self, data): - raise NotImplementedError() - - -class DictSerializer(ActionDispatcher): - """Default request body serialization""" - - def serialize(self, data, action='default'): - return self.dispatch(data, action=action) - - def default(self, data): - return "" - - -class JSONDictSerializer(DictSerializer): - """Default JSON request body serialization""" - - def default(self, data): - def sanitizer(obj): - if isinstance(obj, datetime.datetime): - _dtime = obj - datetime.timedelta(microseconds=obj.microsecond) - return _dtime.isoformat() - return unicode(obj) - return jsonutils.dumps(data, default=sanitizer) - - -class XMLDictSerializer(DictSerializer): - - def __init__(self, metadata=None, xmlns=None): - """ - :param metadata: information needed to deserialize xml into - a dictionary. - :param xmlns: XML namespace to include with serialized xml - """ - super(XMLDictSerializer, self).__init__() - self.metadata = metadata or {} - self.xmlns = xmlns - - def default(self, data): - # We expect data to contain a single key which is the XML root. - root_key = data.keys()[0] - doc = minidom.Document() - node = self._to_xml_node(doc, self.metadata, root_key, data[root_key]) - - return self.to_xml_string(node) - - def to_xml_string(self, node, has_atom=False): - self._add_xmlns(node, has_atom) - return node.toprettyxml(indent=' ', encoding='UTF-8') - - # NOTE (ameade): the has_atom should be removed after all of the - # xml serializers and view builders have been updated to the current - # spec that required all responses include the xmlns:atom, the has_atom - # flag is to prevent current tests from breaking - def _add_xmlns(self, node, has_atom=False): - if self.xmlns is not None: - node.setAttribute('xmlns', self.xmlns) - if has_atom: - node.setAttribute('xmlns:atom', "http://www.w3.org/2005/Atom") - - def _to_xml_node(self, doc, metadata, nodename, data): - """Recursive method to convert data members to XML nodes.""" - result = doc.createElement(nodename) - - # Set the xml namespace if one is specified - # TODO(justinsb): We could also use prefixes on the keys - xmlns = metadata.get('xmlns', None) - if xmlns: - result.setAttribute('xmlns', xmlns) - - # TODO(bcwaldon): accomplish this without a type-check - if type(data) is list: - collections = metadata.get('list_collections', {}) - if nodename in collections: - metadata = collections[nodename] - for item in data: - node = doc.createElement(metadata['item_name']) - node.setAttribute(metadata['item_key'], str(item)) - result.appendChild(node) - return result - singular = metadata.get('plurals', {}).get(nodename, None) - if singular is None: - if nodename.endswith('s'): - singular = nodename[:-1] - else: - singular = 'item' - for item in data: - node = self._to_xml_node(doc, metadata, singular, item) - result.appendChild(node) - # TODO(bcwaldon): accomplish this without a type-check - elif type(data) is dict: - collections = metadata.get('dict_collections', {}) - if nodename in collections: - metadata = collections[nodename] - for k, v in data.items(): - node = doc.createElement(metadata['item_name']) - node.setAttribute(metadata['item_key'], str(k)) - text = doc.createTextNode(str(v)) - node.appendChild(text) - result.appendChild(node) - return result - attrs = metadata.get('attributes', {}).get(nodename, {}) - for k, v in data.items(): - if k in attrs: - result.setAttribute(k, str(v)) - else: - node = self._to_xml_node(doc, metadata, k, v) - result.appendChild(node) - else: - # Type is atom - node = doc.createTextNode(str(data)) - result.appendChild(node) - return result - - def _create_link_nodes(self, xml_doc, links): - link_nodes = [] - for link in links: - link_node = xml_doc.createElement('atom:link') - link_node.setAttribute('rel', link['rel']) - link_node.setAttribute('href', link['href']) - if 'type' in link: - link_node.setAttribute('type', link['type']) - link_nodes.append(link_node) - return link_nodes - - -class ResponseHeadersSerializer(ActionDispatcher): - """Default response headers serialization""" - - def serialize(self, response, data, action): - self.dispatch(response, data, action=action) - - def default(self, response, data): - response.status_int = 200 - - -class ResponseSerializer(object): - """Encode the necessary pieces into a response object""" - - def __init__(self, body_serializers=None, headers_serializer=None): - self.body_serializers = { - 'application/xml': XMLDictSerializer(), - 'application/json': JSONDictSerializer(), - } - self.body_serializers.update(body_serializers or {}) - - self.headers_serializer = (headers_serializer or - ResponseHeadersSerializer()) - - def serialize(self, response_data, content_type, action='default'): - """Serialize a dict into a string and wrap in a wsgi.Request object. - - :param response_data: dict produced by the Controller - :param content_type: expected mimetype of serialized response body - - """ - response = webob.Response() - self.serialize_headers(response, response_data, action) - self.serialize_body(response, response_data, content_type, action) - return response - - def serialize_headers(self, response, data, action): - self.headers_serializer.serialize(response, data, action) - - def serialize_body(self, response, data, content_type, action): - response.headers['Content-Type'] = content_type - if data is not None: - serializer = self.get_body_serializer(content_type) - response.body = serializer.serialize(data, action) - - def get_body_serializer(self, content_type): - try: - return self.body_serializers[content_type] - except (KeyError, TypeError): - raise exception.InvalidContentType(content_type=content_type) - - -class RequestHeadersDeserializer(ActionDispatcher): - """Default request headers deserializer""" - - def deserialize(self, request, action): - return self.dispatch(request, action=action) - - def default(self, request): - return {} - - -class RequestDeserializer(object): - """Break up a Request object into more useful pieces.""" - - def __init__(self, body_deserializers=None, headers_deserializer=None, - supported_content_types=None): - - self.supported_content_types = supported_content_types - - self.body_deserializers = { - 'application/xml': XMLDeserializer(), - 'application/json': JSONDeserializer(), - } - self.body_deserializers.update(body_deserializers or {}) - - self.headers_deserializer = (headers_deserializer or - RequestHeadersDeserializer()) - - def deserialize(self, request): - """Extract necessary pieces of the request. - - :param request: Request object - :returns: tuple of (expected controller action name, dictionary of - keyword arguments to pass to the controller, the expected - content type of the response) - - """ - action_args = self.get_action_args(request.environ) - action = action_args.pop('action', None) - - action_args.update(self.deserialize_headers(request, action)) - action_args.update(self.deserialize_body(request, action)) - - accept = self.get_expected_content_type(request) - - return (action, action_args, accept) - - def deserialize_headers(self, request, action): - return self.headers_deserializer.deserialize(request, action) - - def deserialize_body(self, request, action): - if not len(request.body) > 0: - LOG.debug(_("Empty body provided in request")) - return {} - - try: - content_type = request.get_content_type() - except exception.InvalidContentType: - LOG.debug(_("Unrecognized Content-Type provided in request")) - raise - - if content_type is None: - LOG.debug(_("No Content-Type provided in request")) - return {} - - try: - deserializer = self.get_body_deserializer(content_type) - except exception.InvalidContentType: - LOG.debug(_("Unable to deserialize body as provided Content-Type")) - raise - - return deserializer.deserialize(request.body, action) - - def get_body_deserializer(self, content_type): - try: - return self.body_deserializers[content_type] - except (KeyError, TypeError): - raise exception.InvalidContentType(content_type=content_type) - - def get_expected_content_type(self, request): - return request.best_match_content_type(self.supported_content_types) - - def get_action_args(self, request_environment): - """Parse dictionary created by routes library.""" - try: - args = request_environment['wsgiorg.routing_args'][1].copy() - except Exception: - return {} - - try: - del args['controller'] - except KeyError: - pass - - try: - del args['format'] - except KeyError: - pass - - return args - - -class TextDeserializer(ActionDispatcher): - """Default request body deserialization""" - - def deserialize(self, datastring, action='default'): - return self.dispatch(datastring, action=action) - - def default(self, datastring): - return {} - - -class JSONDeserializer(TextDeserializer): - - def _from_json(self, datastring): - try: - return jsonutils.loads(datastring) - except ValueError: - msg = _("cannot understand JSON") - raise exception.MalformedRequestBody(reason=msg) - - def default(self, datastring): - return {'body': self._from_json(datastring)} - - -class XMLDeserializer(TextDeserializer): - - def __init__(self, metadata=None): - """ - :param metadata: information needed to deserialize xml into - a dictionary. - """ - super(XMLDeserializer, self).__init__() - self.metadata = metadata or {} - - def _from_xml(self, datastring): - plurals = set(self.metadata.get('plurals', {})) - - try: - node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0] - return {node.nodeName: self._from_xml_node(node, plurals)} - except expat.ExpatError: - msg = _("cannot understand XML") - raise exception.MalformedRequestBody(reason=msg) - - def _from_xml_node(self, node, listnames): - """Convert a minidom node to a simple Python type. - - :param listnames: list of XML node names whose subnodes should - be considered list items. - - """ - - if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3: - return node.childNodes[0].nodeValue - elif node.nodeName in listnames: - return [self._from_xml_node(n, listnames) for n in node.childNodes] - else: - result = dict() - for attr in node.attributes.keys(): - result[attr] = node.attributes[attr].nodeValue - for child in node.childNodes: - if child.nodeType != node.TEXT_NODE: - result[child.nodeName] = self._from_xml_node(child, - listnames) - return result - - def find_first_child_named(self, parent, name): - """Search a nodes children for the first child with a given name""" - for node in parent.childNodes: - if node.nodeName == name: - return node - return None - - def find_children_named(self, parent, name): - """Return all of a nodes children who have the given name""" - for node in parent.childNodes: - if node.nodeName == name: - yield node - - def extract_text(self, node): - """Get the text field contained by the given node""" - if len(node.childNodes) == 1: - child = node.childNodes[0] - if child.nodeType == child.TEXT_NODE: - return child.nodeValue - return "" - - def default(self, datastring): - return {'body': self._from_xml(datastring)} diff --git a/designate/openstack/deprecated/xmlutils.py b/designate/openstack/deprecated/xmlutils.py deleted file mode 100644 index 1231a5902..000000000 --- a/designate/openstack/deprecated/xmlutils.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2013 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from xml.dom import minidom -from xml.parsers import expat -from xml import sax -from xml.sax import expatreader - - -class ProtectedExpatParser(expatreader.ExpatParser): - """An expat parser which disables DTD's and entities by default.""" - - def __init__(self, forbid_dtd=True, forbid_entities=True, - *args, **kwargs): - # Python 2.x old style class - expatreader.ExpatParser.__init__(self, *args, **kwargs) - self.forbid_dtd = forbid_dtd - self.forbid_entities = forbid_entities - - def start_doctype_decl(self, name, sysid, pubid, has_internal_subset): - raise ValueError("Inline DTD forbidden") - - def entity_decl(self, entityName, is_parameter_entity, value, base, - systemId, publicId, notationName): - raise ValueError(" entity declaration forbidden") - - def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name): - # expat 1.2 - raise ValueError(" unparsed entity forbidden") - - def external_entity_ref(self, context, base, systemId, publicId): - raise ValueError(" external entity forbidden") - - def notation_decl(self, name, base, sysid, pubid): - raise ValueError(" notation forbidden") - - def reset(self): - expatreader.ExpatParser.reset(self) - if self.forbid_dtd: - self._parser.StartDoctypeDeclHandler = self.start_doctype_decl - self._parser.EndDoctypeDeclHandler = None - if self.forbid_entities: - self._parser.EntityDeclHandler = self.entity_decl - self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl - self._parser.ExternalEntityRefHandler = self.external_entity_ref - self._parser.NotationDeclHandler = self.notation_decl - try: - self._parser.SkippedEntityHandler = None - except AttributeError: - # some pyexpat versions do not support SkippedEntity - pass - - -def safe_minidom_parse_string(xml_string): - """Parse an XML string using minidom safely. - - """ - try: - return minidom.parseString(xml_string, parser=ProtectedExpatParser()) - except sax.SAXParseException: - raise expat.ExpatError() diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index c00bf7a72..b950def71 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -71,13 +71,8 @@ class Service(service.RPCService): target = messaging.Target(version=RPC_API_VERSION) - def __init__(self, host, binary, topic, **kwargs): - - # Modifying the topic so it is pool manager instance specific. - topic = '%s.%s' % (topic, cfg.CONF['service:pool_manager'].pool_id) - LOG.info(_LI('Using topic %(topic)s for this pool manager instance.') - % {'topic': topic}) - super(Service, self).__init__(host, binary, topic, **kwargs) + def __init__(self, threads=None): + super(Service, self).__init__(threads=threads) # Get a pool manager cache connection. cache_driver = cfg.CONF['service:pool_manager'].cache_driver @@ -121,6 +116,21 @@ class Service(service.RPCService): self.enable_sync_timer = \ cfg.CONF['service:pool_manager'].enable_sync_timer + @property + def service_name(self): + return 'pool_manager' + + @property + def _rpc_topic(self): + # Modify the default topic so it's pool manager instance specific. + topic = super(Service, self)._rpc_topic + + topic = '%s.%s' % (topic, cfg.CONF['service:pool_manager'].pool_id) + LOG.info(_LI('Using topic %(topic)s for this pool manager instance.') + % {'topic': topic}) + + return topic + def start(self): for server_backend in self.server_backends: backend_instance = server_backend['backend_instance'] diff --git a/designate/service.py b/designate/service.py index 8d38ed0c5..674fee14c 100644 --- a/designate/service.py +++ b/designate/service.py @@ -1,6 +1,9 @@ # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2011 Justin Santa Barbara +# Copyright 2011 OpenStack Foundation +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,34 +17,57 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import os -import inspect +import abc +import socket +import errno +import time +import six +import eventlet.wsgi from oslo import messaging from oslo.config import cfg from oslo_log import log as logging +from oslo_log import loggers from designate.openstack.common import service -from designate.openstack.deprecated import wsgi +from designate.openstack.common import sslutils from designate.i18n import _ from designate import rpc from designate import policy from designate import version from designate import dnsutils +# TODO(kiall): These options have been cut+paste from the old WSGI code, and +# should be moved into service:api etc.. +wsgi_socket_opts = [ + cfg.IntOpt('backlog', + default=4096, + help="Number of backlog requests to configure the socket with"), + cfg.IntOpt('tcp_keepidle', + default=600, + help="Sets the value of TCP_KEEPIDLE in seconds for each " + "server socket. Not supported on OS X."), +] CONF = cfg.CONF +CONF.register_opts(wsgi_socket_opts) LOG = logging.getLogger(__name__) +@six.add_metaclass(abc.ABCMeta) class Service(service.Service): """ Service class to be shared among the diverse service inside of Designate. """ - def __init__(self, threads=1000): + def __init__(self, threads=None): + threads = threads or 1000 + super(Service, self).__init__(threads) + self._host = CONF.host + self._service_config = CONF['service:%s' % self.service_name] + policy.init() # NOTE(kiall): All services need RPC initialized, as this is used @@ -50,165 +76,187 @@ class Service(service.Service): if not rpc.initialized(): rpc.init(CONF) + @abc.abstractproperty + def service_name(self): + pass + + def start(self): + super(Service, self).start() + + LOG.info(_('Starting %(name)s service (version: %(version)s)') % + {'name': self.service_name, + 'version': version.version_info.version_string()}) + + def stop(self): + LOG.info(_('Stopping %(name)s service') % {'name': self.service_name}) + + super(Service, self).stop() + class RPCService(Service): """ Service class to be shared by all Designate RPC Services """ - def __init__(self, host, binary, topic, service_name=None, endpoints=None): - super(RPCService, self).__init__() + def __init__(self, threads=None): + super(RPCService, self).__init__(threads) - self.host = host - self.binary = binary - self.topic = topic - self.service_name = service_name + LOG.debug(_("Creating RPC Server on topic '%s'") % self._rpc_topic) + self._rpc_server = rpc.get_server( + messaging.Target(topic=self._rpc_topic, server=self._host), + self._rpc_endpoints) - # TODO(ekarlso): change this to be loadable via mod import or - # stevedore? - self.endpoints = endpoints or [self] + @property + def _rpc_endpoints(self): + return [self] + + @property + def _rpc_topic(self): + return self.service_name def start(self): super(RPCService, self).start() - version_string = version.version_info.version_string() - LOG.info(_('Starting %(topic)s node (version %(version_string)s)') % - {'topic': self.topic, 'version_string': version_string}) - - LOG.debug(_("Creating RPC server on topic '%s'") % self.topic) - - target = messaging.Target(topic=self.topic, server=self.host) - self.rpcserver = rpc.get_server(target, self.endpoints) - self.rpcserver.start() + LOG.debug(_("Starting RPC server on topic '%s'") % self._rpc_topic) + self._rpc_server.start() + # TODO(kiall): This probably belongs somewhere else, maybe the base + # Service class? self.notifier = rpc.get_notifier(self.service_name) - for e in self.endpoints: + for e in self._rpc_endpoints: if e != self and hasattr(e, 'start'): e.start() - @classmethod - def create(cls, host=None, binary=None, topic=None, service_name=None, - endpoints=None): - """Instantiates class and passes back application object. - - :param host: defaults to CONF.host - :param binary: defaults to basename of executable - :param topic: defaults to bin_name - 'cinder-' part - """ - if not host: - host = CONF.host - if not binary: - binary = os.path.basename(inspect.stack()[-1][1]) - if not topic: - name = "_".join(binary.split('-')[1:]) + '_topic' - topic = CONF.get(name) - - service_obj = cls(host, binary, topic, service_name=service_name, - endpoints=endpoints) - return service_obj - def stop(self): - for e in self.endpoints: + for e in self._rpc_endpoints: if e != self and hasattr(e, 'stop'): e.stop() # Try to shut the connection down, but if we get any sort of # errors, go ahead and ignore them.. as we're shutting down anyway try: - self.rpcserver.stop() + self._rpc_server.stop() except Exception: pass super(RPCService, self).stop() def wait(self): - for e in self.endpoints: + for e in self._rpc_endpoints: if e != self and hasattr(e, 'wait'): e.wait() super(RPCService, self).wait() -class WSGIService(wsgi.Service, Service): +@six.add_metaclass(abc.ABCMeta) +class WSGIService(Service): """ Service class to be shared by all Designate WSGI Services """ - def __init__(self, application, port, host='0.0.0.0', backlog=4096, - threads=1000): - # NOTE(kiall): We avoid calling super(cls, self) here, as our parent - # classes have different argspecs. Additionally, if we - # manually call both parent's __init__, the openstack - # common Service class's __init__ method will be called - # twice. As a result, we only call the designate base - # Service's __init__ method, and duplicate the - # wsgi.Service's constructor functionality here. - # - Service.__init__(self, threads) + def __init__(self, threads=None): + super(WSGIService, self).__init__(threads) - self.application = application - self._port = port - self._host = host - self._backlog = backlog if backlog else CONF.backlog + @abc.abstractproperty + def _wsgi_application(self): + pass + + def _wsgi_get_socket(self): + # TODO(dims): eventlet's green dns/socket module does not actually + # support IPv6 in getaddrinfo(). We need to get around this in the + # future or monitor upstream for a fix + info = socket.getaddrinfo(self._service_config.api_host, + self._service_config.api_port, + socket.AF_UNSPEC, + socket.SOCK_STREAM)[0] + family = info[0] + bind_addr = info[-1] + + sock = None + retry_until = time.time() + 30 + while not sock and time.time() < retry_until: + try: + # TODO(kiall): Backlog should be a service specific setting, + # rather than a global + sock = eventlet.listen(bind_addr, + backlog=cfg.CONF.backlog, + family=family) + if sslutils.is_enabled(): + sock = sslutils.wrap(sock) + + except socket.error as err: + if err.args[0] != errno.EADDRINUSE: + raise + eventlet.sleep(0.1) + if not sock: + raise RuntimeError(_("Could not bind to %(host)s:%(port)s " + "after trying for 30 seconds") % + {'host': self._service_config.api_host, + 'port': self._service_config.api_port}) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # sockets can hang around forever without keepalive + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # This option isn't available in the OS X version of eventlet + if hasattr(socket, 'TCP_KEEPIDLE'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPIDLE, + CONF.tcp_keepidle) + + return sock + + def start(self): + super(WSGIService, self).start() + + socket = self._wsgi_get_socket() + application = self._wsgi_application + + self.tg.add_thread(self._wsgi_handle, application, socket) + + def _wsgi_handle(self, application, socket): + logger = logging.getLogger('eventlet.wsgi') + eventlet.wsgi.server(socket, + application, + custom_pool=self.tg.pool, + log=loggers.WritableLogger(logger)) +@six.add_metaclass(abc.ABCMeta) class DNSService(Service): """ Service class to be used for a service that only works in TCP """ - def __init__(self, config, host=None, binary=None, service_name=None, - endpoints=None, threads=1000): + def __init__(self, threads=None): super(DNSService, self).__init__(threads) - self.host = host - self.binary = binary - self.service_name = service_name + self._dns_sock_tcp = dnsutils.bind_tcp( + self._service_config.host, + self._service_config.port, + self._service_config.tcp_backlog) - self.endpoints = endpoints or [self] - self.config = config + self._dns_sock_udp = dnsutils.bind_udp( + self._service_config.host, + self._service_config.port) - self._sock_tcp = dnsutils.bind_tcp( - self.config.host, self.config.port, - self.config.tcp_backlog) - - self._sock_udp = dnsutils.bind_udp( - self.config.host, self.config.port) - - @classmethod - def create(cls, host=None, binary=None, service_name=None, - endpoints=None): - """Instantiates class and passes back application object. - - :param host: defaults to CONF.host - :param binary: defaults to basename of executable - """ - if not host: - host = CONF.host - if not binary: - binary = os.path.basename(inspect.stack()[-1][1]) - - service_obj = cls(host, binary, service_name=service_name, - endpoints=endpoints) - return service_obj + @abc.abstractproperty + def _dns_application(self): + pass def start(self): - for e in self.endpoints: - if e != self and hasattr(e, 'start'): - e.start() - - self.tg.add_thread( - dnsutils.handle_tcp, self._sock_tcp, self.tg, dnsutils.handle, - self.application, timeout=self.config.tcp_recv_timeout) - self.tg.add_thread( - dnsutils.handle_udp, self._sock_udp, self.tg, dnsutils.handle, - self.application) - super(DNSService, self).start() - def stop(self): - for e in self.endpoints: - if e != self and hasattr(e, 'stop'): - e.stop() + self.tg.add_thread( + dnsutils.handle_tcp, self._dns_sock_tcp, self.tg, dnsutils.handle, + self._dns_application, self._service_config.tcp_recv_timeout) + self.tg.add_thread( + dnsutils.handle_udp, self._dns_sock_udp, self.tg, dnsutils.handle, + self._dns_application) + + def wait(self): + super(DNSService, self).wait() + + def stop(self): # When the service is stopped, the threads for _handle_tcp and # _handle_udp are stopped too. super(DNSService, self).stop() diff --git a/designate/sink/service.py b/designate/sink/service.py index 42b168198..8cb599111 100644 --- a/designate/sink/service.py +++ b/designate/sink/service.py @@ -28,13 +28,17 @@ LOG = logging.getLogger(__name__) class Service(service.Service): - def __init__(self, *args, **kwargs): - super(Service, self).__init__(*args, **kwargs) + def __init__(self, threads=None): + super(Service, self).__init__(threads=threads) # Initialize extensions self.handlers = self._init_extensions() self.subscribers = self._get_subscribers() + @property + def service_name(self): + return 'sink' + def _init_extensions(self): """Loads and prepares all enabled extensions""" diff --git a/designate/tests/fixtures.py b/designate/tests/fixtures.py index 2ae821023..fc9e4372d 100644 --- a/designate/tests/fixtures.py +++ b/designate/tests/fixtures.py @@ -62,10 +62,10 @@ class RPCFixture(fixtures.Fixture): class ServiceFixture(fixtures.Fixture): - def __init__(self, svc_name, *args, **kw): + def __init__(self, svc_name): cls = importutils.import_class( 'designate.%s.service.Service' % svc_name) - self.svc = cls.create(binary='designate-' + svc_name, *args, **kw) + self.svc = cls() def setUp(self): super(ServiceFixture, self).setUp() diff --git a/designate/tests/test_pool_manager/test_service_sqlalchemy.py b/designate/tests/test_pool_manager/test_service_sqlalchemy.py index 06adbd943..9119fbbfa 100644 --- a/designate/tests/test_pool_manager/test_service_sqlalchemy.py +++ b/designate/tests/test_pool_manager/test_service_sqlalchemy.py @@ -84,7 +84,7 @@ class PoolManagerServiceTest(PoolManagerTestCase): def test_pool_instance_topic(self): self.assertEqual( 'pool_manager.%s' % cfg.CONF['service:pool_manager'].pool_id, - self.service.topic) + self.service._rpc_topic) def test_no_pool_servers_configured(self): self.service.stop() diff --git a/designate/utils.py b/designate/utils.py index fc403b76d..91b1bd7e1 100644 --- a/designate/utils.py +++ b/designate/utils.py @@ -351,3 +351,27 @@ def extract_priority_from_data(recordset_type, record): priority, _, data = record['data'].partition(" ") priority = int(priority) return priority, data + + +def cache_result(function): + """A function decorator to cache the result of the first call, every + additional call will simply return the cached value. + + If we were python3 only, we would have used functools.lru_cache() in place + of this. If there's a python2 backport in a lightweight library, then we + should switch to that. + """ + # NOTE: We're cheating a little here, by using a mutable type (a list), + # we're able to read and update the value from within in inline + # wrapper method. If we used an immutable type, the assignment + # would not work as we want. + cache = [None] + + def wrapper(*args, **kwargs): + if cache[0] is not None: + return cache[0] + else: + result = function(*args, **kwargs) + cache[0] = result + return result + return wrapper