From ee537e33deeb98b7dc26e7b6d20509ce8ddd91dd Mon Sep 17 00:00:00 2001 From: Gary Kotton Date: Mon, 30 Dec 2013 00:48:15 -0800 Subject: [PATCH] Update RPC code from oslo The common RPC code has been updated to include the following: 8575d87af49ea276341908f83c8c51db13afca44 8b2b0b743e84ceed7841cf470afed6a5da8e1d07 23f602940c64ba408d77ceb8f5ba0f67ee4a18ef 6d0a6c3083218cdac52758a8b6aac6b03402c658 7cac1ac1bd9df36d4e5183afac3b643df10b1d4d 8159efddabb09dd9b7c99963ff7c9de0a6c62b62 Updated to include the following in modules in openstack-common.conf: py3kcompat, sslutils, and versionutils. The update also includes imports from the RPC code Change-Id: I84c5b8e2b17da0018dd69ecb354d123a609afe98 --- neutron/api/v2/resource.py | 2 +- neutron/api/versions.py | 2 +- neutron/debug/commands.py | 10 +- neutron/openstack/common/eventlet_backdoor.py | 67 ++- neutron/openstack/common/excutils.py | 74 ++- neutron/openstack/common/fileutils.py | 45 +- neutron/openstack/common/gettextutils.py | 420 ++++++++++-------- neutron/openstack/common/importutils.py | 2 - neutron/openstack/common/jsonutils.py | 25 +- neutron/openstack/common/local.py | 2 - neutron/openstack/common/log.py | 76 +++- neutron/openstack/common/log_handler.py | 6 +- neutron/openstack/common/loopingcall.py | 2 - neutron/openstack/common/network_utils.py | 22 +- neutron/openstack/common/periodic_task.py | 16 +- neutron/openstack/common/processutils.py | 30 +- .../openstack/common/py3kcompat/__init__.py | 0 .../openstack/common/py3kcompat/urlutils.py | 65 +++ neutron/openstack/common/rpc/__init__.py | 9 +- neutron/openstack/common/rpc/amqp.py | 70 ++- neutron/openstack/common/rpc/common.py | 84 ++-- neutron/openstack/common/rpc/dispatcher.py | 6 +- neutron/openstack/common/rpc/impl_fake.py | 7 +- neutron/openstack/common/rpc/impl_kombu.py | 87 ++-- neutron/openstack/common/rpc/impl_qpid.py | 32 +- neutron/openstack/common/rpc/impl_zmq.py | 42 +- neutron/openstack/common/rpc/matchmaker.py | 18 +- .../openstack/common/rpc/matchmaker_redis.py | 4 +- .../openstack/common/rpc/matchmaker_ring.py | 6 +- neutron/openstack/common/rpc/proxy.py | 9 +- neutron/openstack/common/rpc/serializer.py | 4 +- neutron/openstack/common/rpc/service.py | 8 +- neutron/openstack/common/rpc/zmq_receiver.py | 3 - neutron/openstack/common/service.py | 298 ++++++++++--- neutron/openstack/common/sslutils.py | 98 ++++ neutron/openstack/common/threadgroup.py | 22 +- neutron/openstack/common/timeutils.py | 43 +- neutron/openstack/common/uuidutils.py | 2 - neutron/openstack/common/versionutils.py | 148 ++++++ neutron/tests/unit/test_api_v2_resource.py | 4 +- neutron/wsgi.py | 2 +- openstack-common.conf | 3 + 42 files changed, 1308 insertions(+), 567 deletions(-) create mode 100644 neutron/openstack/common/py3kcompat/__init__.py create mode 100644 neutron/openstack/common/py3kcompat/urlutils.py mode change 100755 => 100644 neutron/openstack/common/rpc/zmq_receiver.py create mode 100644 neutron/openstack/common/sslutils.py create mode 100644 neutron/openstack/common/versionutils.py diff --git a/neutron/api/v2/resource.py b/neutron/api/v2/resource.py index 459390724b..a6e9765e38 100644 --- a/neutron/api/v2/resource.py +++ b/neutron/api/v2/resource.py @@ -149,7 +149,7 @@ def translate(translatable, locale): :returns: the translated object, or the object as-is if it was not translated """ - localize = gettextutils.get_localized_message + localize = gettextutils.translate if isinstance(translatable, exceptions.NeutronException): translatable.msg = localize(translatable.msg, locale) elif isinstance(translatable, webob.exc.HTTPError): diff --git a/neutron/api/versions.py b/neutron/api/versions.py index 5707b34873..671a56ec30 100644 --- a/neutron/api/versions.py +++ b/neutron/api/versions.py @@ -45,7 +45,7 @@ class Versions(object): if req.path != '/': language = req.best_match_language() msg = _('Unknown API version specified') - msg = gettextutils.get_localized_message(msg, language) + msg = gettextutils.translate(msg, language) return webob.exc.HTTPNotFound(explanation=msg) builder = versions_view.get_view_builder(req) diff --git a/neutron/debug/commands.py b/neutron/debug/commands.py index 829ed22cc2..59f820391e 100644 --- a/neutron/debug/commands.py +++ b/neutron/debug/commands.py @@ -30,8 +30,8 @@ class ProbeCommand(NeutronCommand): return self.app.debug_agent def run(self, parsed_args): - self.log.debug('run(%s)' % parsed_args) - self.app.stdout.write(_('Unimplemented commands') + '\n') + self.log.debug('run(%s)', parsed_args) + self.log.info(_('Unimplemented commands')) class CreateProbe(ProbeCommand): @@ -55,7 +55,7 @@ class CreateProbe(ProbeCommand): debug_agent = self.get_debug_agent() port = debug_agent.create_probe(parsed_args.id, parsed_args.device_owner) - self.app.stdout.write(_('Probe created : %s ') % port.id + '\n') + self.log.info(_('Probe created : %s '), port.id) class DeleteProbe(ProbeCommand): @@ -74,7 +74,7 @@ class DeleteProbe(ProbeCommand): self.log.debug('run(%s)' % parsed_args) debug_agent = self.get_debug_agent() debug_agent.delete_probe(parsed_args.id) - self.app.stdout.write(_('Probe %s deleted') % parsed_args.id + '\n') + self.log.info(_('Probe %s deleted'), parsed_args.id) class ListProbe(NeutronCommand, lister.Lister): @@ -105,7 +105,7 @@ class ClearProbe(ProbeCommand): self.log.debug('run(%s)' % parsed_args) debug_agent = self.get_debug_agent() debug_agent.clear_probe() - self.app.stdout.write(_('All Probes deleted ') + '\n') + self.log.info(_('All Probes deleted ')) class ExecProbe(ProbeCommand): diff --git a/neutron/openstack/common/eventlet_backdoor.py b/neutron/openstack/common/eventlet_backdoor.py index 57b89ae914..b55b0ceb3b 100644 --- a/neutron/openstack/common/eventlet_backdoor.py +++ b/neutron/openstack/common/eventlet_backdoor.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright (c) 2012 OpenStack Foundation. # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -18,8 +16,11 @@ from __future__ import print_function +import errno import gc +import os import pprint +import socket import sys import traceback @@ -28,14 +29,34 @@ import eventlet.backdoor import greenlet from oslo.config import cfg +from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common import log as logging + +help_for_backdoor_port = ( + "Acceptable values are 0, , and :, where 0 results " + "in listening on a random tcp port number; results in listening " + "on the specified port number (and not enabling backdoor if that port " + "is in use); and : results in listening on the smallest " + "unused port number within the specified range of port numbers. The " + "chosen port is displayed in the service's log file.") eventlet_backdoor_opts = [ - cfg.IntOpt('backdoor_port', + cfg.StrOpt('backdoor_port', default=None, - help='port for eventlet backdoor to listen') + help="Enable eventlet backdoor. %s" % help_for_backdoor_port) ] CONF = cfg.CONF CONF.register_opts(eventlet_backdoor_opts) +LOG = logging.getLogger(__name__) + + +class EventletBackdoorConfigValueError(Exception): + def __init__(self, port_range, help_msg, ex): + msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' + '%(help)s' % + {'range': port_range, 'ex': ex, 'help': help_msg}) + super(EventletBackdoorConfigValueError, self).__init__(msg) + self.port_range = port_range def _dont_use_this(): @@ -43,7 +64,7 @@ def _dont_use_this(): def _find_objects(t): - return filter(lambda o: isinstance(o, t), gc.get_objects()) + return [o for o in gc.get_objects() if isinstance(o, t)] def _print_greenthreads(): @@ -60,6 +81,33 @@ def _print_nativethreads(): print() +def _parse_port_range(port_range): + if ':' not in port_range: + start, end = port_range, port_range + else: + start, end = port_range.split(':', 1) + try: + start, end = int(start), int(end) + if end < start: + raise ValueError + return start, end + except ValueError as ex: + raise EventletBackdoorConfigValueError(port_range, ex, + help_for_backdoor_port) + + +def _listen(host, start_port, end_port, listen_func): + try_port = start_port + while True: + try: + return listen_func((host, try_port)) + except socket.error as exc: + if (exc.errno != errno.EADDRINUSE or + try_port >= end_port): + raise + try_port += 1 + + def initialize_if_enabled(): backdoor_locals = { 'exit': _dont_use_this, # So we don't exit the entire process @@ -72,6 +120,8 @@ def initialize_if_enabled(): if CONF.backdoor_port is None: return None + start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) + # NOTE(johannes): The standard sys.displayhook will print the value of # the last expression and set it to __builtin__._, which overwrites # the __builtin__._ that gettext sets. Let's switch to using pprint @@ -82,8 +132,13 @@ def initialize_if_enabled(): pprint.pprint(val) sys.displayhook = displayhook - sock = eventlet.listen(('localhost', CONF.backdoor_port)) + sock = _listen('localhost', start_port, end_port, eventlet.listen) + + # In the case of backdoor port being zero, a port number is assigned by + # listen(). In any case, pull the port number out here. port = sock.getsockname()[1] + LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') % + {'port': port, 'pid': os.getpid()}) eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, locals=backdoor_locals) return port diff --git a/neutron/openstack/common/excutils.py b/neutron/openstack/common/excutils.py index 676baaeae4..b7c762890e 100644 --- a/neutron/openstack/common/excutils.py +++ b/neutron/openstack/common/excutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # Copyright 2012, Red Hat, Inc. # @@ -19,16 +17,17 @@ Exception related utilities. """ -import contextlib import logging import sys +import time import traceback +import six + from neutron.openstack.common.gettextutils import _ -@contextlib.contextmanager -def save_and_reraise_exception(): +class save_and_reraise_exception(object): """Save current exception, run some code and then re-raise. In some cases the exception context can be cleared, resulting in None @@ -40,12 +39,61 @@ def save_and_reraise_exception(): To work around this, we save the exception state, run handler code, and then re-raise the original exception. If another exception occurs, the saved exception is logged and the new exception is re-raised. + + In some cases the caller may not want to re-raise the exception, and + for those circumstances this context provides a reraise flag that + can be used to suppress the exception. For example:: + + except Exception: + with save_and_reraise_exception() as ctxt: + decide_if_need_reraise() + if not should_be_reraised: + ctxt.reraise = False """ - type_, value, tb = sys.exc_info() - try: - yield - except Exception: - logging.error(_('Original exception being dropped: %s'), - traceback.format_exception(type_, value, tb)) - raise - raise type_, value, tb + def __init__(self): + self.reraise = True + + def __enter__(self): + self.type_, self.value, self.tb, = sys.exc_info() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + logging.error(_('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) + return False + if self.reraise: + six.reraise(self.type_, self.value, self.tb) + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + this_exc_message = six.u(str(exc)) + if this_exc_message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + this_exc_message != last_exc_message): + logging.exception( + _('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = this_exc_message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/neutron/openstack/common/fileutils.py b/neutron/openstack/common/fileutils.py index 48aaee895a..704af09623 100644 --- a/neutron/openstack/common/fileutils.py +++ b/neutron/openstack/common/fileutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # @@ -19,6 +17,7 @@ import contextlib import errno import os +import tempfile from neutron.openstack.common import excutils from neutron.openstack.common.gettextutils import _ @@ -69,33 +68,34 @@ def read_cached_file(filename, force_reload=False): return (reloaded, cache_info['data']) -def delete_if_exists(path): +def delete_if_exists(path, remove=os.unlink): """Delete a file, but ignore file not found error. :param path: File to delete + :param remove: Optional function to remove passed path """ try: - os.unlink(path) + remove(path) except OSError as e: - if e.errno == errno.ENOENT: - return - else: + if e.errno != errno.ENOENT: raise @contextlib.contextmanager -def remove_path_on_error(path): +def remove_path_on_error(path, remove=delete_if_exists): """Protect code that wants to operate on PATH atomically. Any exception will cause PATH to be removed. :param path: File to work with + :param remove: Optional function to remove passed path """ + try: yield except Exception: with excutils.save_and_reraise_exception(): - delete_if_exists(path) + remove(path) def file_open(*args, **kwargs): @@ -108,3 +108,30 @@ def file_open(*args, **kwargs): state at all (for unit tests) """ return file(*args, **kwargs) + + +def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): + """Create temporary file or use existing file. + + This util is needed for creating temporary file with + specified content, suffix and prefix. If path is not None, + it will be used for writing content. If the path doesn't + exist it'll be created. + + :param content: content for temporary file. + :param path: same as parameter 'dir' for mkstemp + :param suffix: same as parameter 'suffix' for mkstemp + :param prefix: same as parameter 'prefix' for mkstemp + + For example: it can be used in database tests for creating + configuration files. + """ + if path: + ensure_tree(path) + + (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix) + try: + os.write(fd, content) + finally: + os.close(fd) + return path diff --git a/neutron/openstack/common/gettextutils.py b/neutron/openstack/common/gettextutils.py index bc1afd17ac..b060c417d6 100644 --- a/neutron/openstack/common/gettextutils.py +++ b/neutron/openstack/common/gettextutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2012 Red Hat, Inc. # Copyright 2013 IBM Corp. # All Rights Reserved. @@ -26,13 +24,10 @@ Usual usage in an openstack.common module: import copy import gettext -import logging +import locale +from logging import handlers import os import re -try: - import UserString as _userString -except ImportError: - import collections as _userString from babel import localedata import six @@ -58,7 +53,7 @@ def enable_lazy(): def _(msg): if USE_LAZY: - return Message(msg, 'neutron') + return Message(msg, domain='neutron') else: if six.PY3: return _t.gettext(msg) @@ -90,11 +85,6 @@ def install(domain, lazy=False): # messages in OpenStack. We override the standard _() function # and % (format string) operation to build Message objects that can # later be translated when we have more information. - # - # Also included below is an example LocaleHandler that translates - # Messages to an associated locale, effectively allowing many logs, - # each with their own locale. - def _lazy_gettext(msg): """Create and return a Message object. @@ -105,7 +95,7 @@ def install(domain, lazy=False): Message encapsulates a string so that we can translate it later when needed. """ - return Message(msg, domain) + return Message(msg, domain=domain) from six import moves moves.builtins.__dict__['_'] = _lazy_gettext @@ -120,182 +110,158 @@ def install(domain, lazy=False): unicode=True) -class Message(_userString.UserString, object): - """Class used to encapsulate translatable messages.""" - def __init__(self, msg, domain): - # _msg is the gettext msgid and should never change - self._msg = msg - self._left_extra_msg = '' - self._right_extra_msg = '' - self._locale = None - self.params = None - self.domain = domain +class Message(six.text_type): + """A Message object is a unicode object that can be translated. - @property - def data(self): - # NOTE(mrodden): this should always resolve to a unicode string - # that best represents the state of the message currently + Translation of Message is done explicitly using the translate() method. + For all non-translation intents and purposes, a Message is simply unicode, + and can be treated as such. + """ - localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR') - if self.locale: - lang = gettext.translation(self.domain, - localedir=localedir, - languages=[self.locale], - fallback=True) - else: - # use system locale for translations - lang = gettext.translation(self.domain, - localedir=localedir, - fallback=True) + def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args): + """Create a new Message object. + In order for translation to work gettext requires a message ID, this + msgid will be used as the base unicode text. It is also possible + for the msgid and the base unicode text to be different by passing + the msgtext parameter. + """ + # If the base msgtext is not given, we use the default translation + # of the msgid (which is in English) just in case the system locale is + # not English, so that the base text will be in that locale by default. + if not msgtext: + msgtext = Message._translate_msgid(msgid, domain) + # We want to initialize the parent unicode with the actual object that + # would have been plain unicode if 'Message' was not enabled. + msg = super(Message, cls).__new__(cls, msgtext) + msg.msgid = msgid + msg.domain = domain + msg.params = params + return msg + + def translate(self, desired_locale=None): + """Translate this message to the desired locale. + + :param desired_locale: The desired locale to translate the message to, + if no locale is provided the message will be + translated to the system's default locale. + + :returns: the translated message in unicode + """ + + translated_message = Message._translate_msgid(self.msgid, + self.domain, + desired_locale) + if self.params is None: + # No need for more translation + return translated_message + + # This Message object may have been formatted with one or more + # Message objects as substitution arguments, given either as a single + # argument, part of a tuple, or as one or more values in a dictionary. + # When translating this Message we need to translate those Messages too + translated_params = _translate_args(self.params, desired_locale) + + translated_message = translated_message % translated_params + + return translated_message + + @staticmethod + def _translate_msgid(msgid, domain, desired_locale=None): + if not desired_locale: + system_locale = locale.getdefaultlocale() + # If the system locale is not available to the runtime use English + if not system_locale[0]: + desired_locale = 'en_US' + else: + desired_locale = system_locale[0] + + locale_dir = os.environ.get(domain.upper() + '_LOCALEDIR') + lang = gettext.translation(domain, + localedir=locale_dir, + languages=[desired_locale], + fallback=True) if six.PY3: - ugettext = lang.gettext + translator = lang.gettext else: - ugettext = lang.ugettext + translator = lang.ugettext - full_msg = (self._left_extra_msg + - ugettext(self._msg) + - self._right_extra_msg) + translated_message = translator(msgid) + return translated_message - if self.params is not None: - full_msg = full_msg % self.params + def __mod__(self, other): + # When we mod a Message we want the actual operation to be performed + # by the parent class (i.e. unicode()), the only thing we do here is + # save the original msgid and the parameters in case of a translation + unicode_mod = super(Message, self).__mod__(other) + modded = Message(self.msgid, + msgtext=unicode_mod, + params=self._sanitize_mod_params(other), + domain=self.domain) + return modded - return six.text_type(full_msg) + def _sanitize_mod_params(self, other): + """Sanitize the object being modded with this Message. - @property - def locale(self): - return self._locale + - Add support for modding 'None' so translation supports it + - Trim the modded object, which can be a large dictionary, to only + those keys that would actually be used in a translation + - Snapshot the object being modded, in case the message is + translated, it will be used as it was when the Message was created + """ + if other is None: + params = (other,) + elif isinstance(other, dict): + params = self._trim_dictionary_parameters(other) + else: + params = self._copy_param(other) + return params - @locale.setter - def locale(self, value): - self._locale = value - if not self.params: - return + def _trim_dictionary_parameters(self, dict_param): + """Return a dict that only has matching entries in the msgid.""" + # NOTE(luisg): Here we trim down the dictionary passed as parameters + # to avoid carrying a lot of unnecessary weight around in the message + # object, for example if someone passes in Message() % locals() but + # only some params are used, and additionally we prevent errors for + # non-deepcopyable objects by unicoding() them. - # This Message object may have been constructed with one or more - # Message objects as substitution parameters, given as a single - # Message, or a tuple or Map containing some, so when setting the - # locale for this Message we need to set it for those Messages too. - if isinstance(self.params, Message): - self.params.locale = value - return - if isinstance(self.params, tuple): - for param in self.params: - if isinstance(param, Message): - param.locale = value - return - if isinstance(self.params, dict): - for param in self.params.values(): - if isinstance(param, Message): - param.locale = value + # Look for %(param) keys in msgid; + # Skip %% and deal with the case where % is first character on the line + keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid) - def _save_dictionary_parameter(self, dict_param): - full_msg = self.data - # look for %(blah) fields in string; - # ignore %% and deal with the - # case where % is first character on the line - keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', full_msg) - - # if we don't find any %(blah) blocks but have a %s - if not keys and re.findall('(?:[^%]|^)%[a-z]', full_msg): - # apparently the full dictionary is the parameter - params = copy.deepcopy(dict_param) + # If we don't find any %(param) keys but have a %s + if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid): + # Apparently the full dictionary is the parameter + params = self._copy_param(dict_param) else: params = {} for key in keys: - try: - params[key] = copy.deepcopy(dict_param[key]) - except TypeError: - # cast uncopyable thing to unicode string - params[key] = six.text_type(dict_param[key]) + params[key] = self._copy_param(dict_param[key]) return params - def _save_parameters(self, other): - # we check for None later to see if - # we actually have parameters to inject, - # so encapsulate if our parameter is actually None - if other is None: - self.params = (other, ) - elif isinstance(other, dict): - self.params = self._save_dictionary_parameter(other) - else: - # fallback to casting to unicode, - # this will handle the problematic python code-like - # objects that cannot be deep-copied - try: - self.params = copy.deepcopy(other) - except TypeError: - self.params = six.text_type(other) + def _copy_param(self, param): + try: + return copy.deepcopy(param) + except TypeError: + # Fallback to casting to unicode this will handle the + # python code-like objects that can't be deep-copied + return six.text_type(param) - return self - - # overrides to be more string-like - def __unicode__(self): - return self.data - - def __str__(self): - if six.PY3: - return self.__unicode__() - return self.data.encode('utf-8') - - def __getstate__(self): - to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg', - 'domain', 'params', '_locale'] - new_dict = self.__dict__.fromkeys(to_copy) - for attr in to_copy: - new_dict[attr] = copy.deepcopy(self.__dict__[attr]) - - return new_dict - - def __setstate__(self, state): - for (k, v) in state.items(): - setattr(self, k, v) - - # operator overloads def __add__(self, other): - copied = copy.deepcopy(self) - copied._right_extra_msg += other.__str__() - return copied + msg = _('Message objects do not support addition.') + raise TypeError(msg) def __radd__(self, other): - copied = copy.deepcopy(self) - copied._left_extra_msg += other.__str__() - return copied + return self.__add__(other) - def __mod__(self, other): - # do a format string to catch and raise - # any possible KeyErrors from missing parameters - self.data % other - copied = copy.deepcopy(self) - return copied._save_parameters(other) - - def __mul__(self, other): - return self.data * other - - def __rmul__(self, other): - return other * self.data - - def __getitem__(self, key): - return self.data[key] - - def __getslice__(self, start, end): - return self.data.__getslice__(start, end) - - def __getattribute__(self, name): - # NOTE(mrodden): handle lossy operations that we can't deal with yet - # These override the UserString implementation, since UserString - # uses our __class__ attribute to try and build a new message - # after running the inner data string through the operation. - # At that point, we have lost the gettext message id and can just - # safely resolve to a string instead. - ops = ['capitalize', 'center', 'decode', 'encode', - 'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip', - 'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill'] - if name in ops: - return getattr(self.data, name) - else: - return _userString.UserString.__getattribute__(self, name) + def __str__(self): + # NOTE(luisg): Logging in python 2.6 tries to str() log records, + # and it expects specifically a UnicodeError in order to proceed. + msg = _('Message objects do not support str() because they may ' + 'contain non-ascii characters. ' + 'Please use unicode() or translate() instead.') + raise UnicodeError(msg) def get_available_languages(domain): @@ -317,7 +283,7 @@ def get_available_languages(domain): # NOTE(luisg): Babel <1.0 used a function called list(), which was # renamed to locale_identifiers() in >=1.0, the requirements master list # requires >=0.9.6, uncapped, so defensively work with both. We can remove - # this check when the master list updates to >=1.0, and all projects udpate + # this check when the master list updates to >=1.0, and update all projects list_identifiers = (getattr(localedata, 'list', None) or getattr(localedata, 'locale_identifiers')) locale_identifiers = list_identifiers() @@ -328,38 +294,118 @@ def get_available_languages(domain): return copy.copy(language_list) -def get_localized_message(message, user_locale): - """Gets a localized version of the given message in the given locale.""" +def translate(obj, desired_locale=None): + """Gets the translated unicode representation of the given object. + + If the object is not translatable it is returned as-is. + If the locale is None the object is translated to the system locale. + + :param obj: the object to translate + :param desired_locale: the locale to translate the message to, if None the + default system locale will be used + :returns: the translated object in unicode, or the original object if + it could not be translated + """ + message = obj + if not isinstance(message, Message): + # If the object to translate is not already translatable, + # let's first get its unicode representation + message = six.text_type(obj) if isinstance(message, Message): - if user_locale: - message.locale = user_locale - return six.text_type(message) - else: - return message + # Even after unicoding() we still need to check if we are + # running with translatable unicode before translating + return message.translate(desired_locale) + return obj -class LocaleHandler(logging.Handler): - """Handler that can have a locale associated to translate Messages. +def _translate_args(args, desired_locale=None): + """Translates all the translatable elements of the given arguments object. - A quick example of how to utilize the Message class above. - LocaleHandler takes a locale and a target logging.Handler object - to forward LogRecord objects to after translating the internal Message. + This method is used for translating the translatable values in method + arguments which include values of tuples or dictionaries. + If the object is not a tuple or a dictionary the object itself is + translated if it is translatable. + + If the locale is None the object is translated to the system locale. + + :param args: the args to translate + :param desired_locale: the locale to translate the args to, if None the + default system locale will be used + :returns: a new args object with the translated contents of the original + """ + if isinstance(args, tuple): + return tuple(translate(v, desired_locale) for v in args) + if isinstance(args, dict): + translated_dict = {} + for (k, v) in six.iteritems(args): + translated_v = translate(v, desired_locale) + translated_dict[k] = translated_v + return translated_dict + return translate(args, desired_locale) + + +class TranslationHandler(handlers.MemoryHandler): + """Handler that translates records before logging them. + + The TranslationHandler takes a locale and a target logging.Handler object + to forward LogRecord objects to after translating them. This handler + depends on Message objects being logged, instead of regular strings. + + The handler can be configured declaratively in the logging.conf as follows: + + [handlers] + keys = translatedlog, translator + + [handler_translatedlog] + class = handlers.WatchedFileHandler + args = ('/var/log/api-localized.log',) + formatter = context + + [handler_translator] + class = openstack.common.log.TranslationHandler + target = translatedlog + args = ('zh_CN',) + + If the specified locale is not available in the system, the handler will + log in the default locale. """ - def __init__(self, locale, target): - """Initialize a LocaleHandler + def __init__(self, locale=None, target=None): + """Initialize a TranslationHandler :param locale: locale to use for translating messages :param target: logging.Handler object to forward LogRecord objects to after translation """ - logging.Handler.__init__(self) + # NOTE(luisg): In order to allow this handler to be a wrapper for + # other handlers, such as a FileHandler, and still be able to + # configure it using logging.conf, this handler has to extend + # MemoryHandler because only the MemoryHandlers' logging.conf + # parsing is implemented such that it accepts a target handler. + handlers.MemoryHandler.__init__(self, capacity=0, target=target) self.locale = locale - self.target = target + + def setFormatter(self, fmt): + self.target.setFormatter(fmt) def emit(self, record): - if isinstance(record.msg, Message): - # set the locale and resolve to a string - record.msg.locale = self.locale + # We save the message from the original record to restore it + # after translation, so other handlers are not affected by this + original_msg = record.msg + original_args = record.args + + try: + self._translate_and_log_record(record) + finally: + record.msg = original_msg + record.args = original_args + + def _translate_and_log_record(self, record): + record.msg = translate(record.msg, self.locale) + + # In addition to translating the message, we also need to translate + # arguments that were passed to the log method that were not part + # of the main message e.g., log.info(_('Some message %s'), this_one)) + record.args = _translate_args(record.args, self.locale) self.target.emit(record) diff --git a/neutron/openstack/common/importutils.py b/neutron/openstack/common/importutils.py index 7a303f93f2..4fd9ae2bc2 100644 --- a/neutron/openstack/common/importutils.py +++ b/neutron/openstack/common/importutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # diff --git a/neutron/openstack/common/jsonutils.py b/neutron/openstack/common/jsonutils.py index 2833876792..d4a764b02d 100644 --- a/neutron/openstack/common/jsonutils.py +++ b/neutron/openstack/common/jsonutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2011 Justin Santa Barbara @@ -38,13 +36,23 @@ import functools import inspect import itertools import json -import types -import xmlrpclib +try: + import xmlrpclib +except ImportError: + # NOTE(jaypipes): xmlrpclib was renamed to xmlrpc.client in Python3 + # however the function and object call signatures + # remained the same. This whole try/except block should + # be removed and replaced with a call to six.moves once + # six 1.4.2 is released. See http://bit.ly/1bqrVzu + import xmlrpc.client as xmlrpclib import six +from neutron.openstack.common import gettextutils +from neutron.openstack.common import importutils from neutron.openstack.common import timeutils +netaddr = importutils.try_import("netaddr") _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.isfunction, inspect.isgeneratorfunction, @@ -52,7 +60,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, inspect.iscode, inspect.isbuiltin, inspect.isroutine, inspect.isabstract] -_simple_types = (types.NoneType, int, basestring, bool, float, long) +_simple_types = (six.string_types + six.integer_types + + (type(None), bool, float)) def to_primitive(value, convert_instances=False, convert_datetime=True, @@ -117,7 +126,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, level=level, max_depth=max_depth) if isinstance(value, dict): - return dict((k, recursive(v)) for k, v in value.iteritems()) + return dict((k, recursive(v)) for k, v in six.iteritems(value)) elif isinstance(value, (list, tuple)): return [recursive(lv) for lv in value] @@ -129,6 +138,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, if convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) + elif isinstance(value, gettextutils.Message): + return value.data elif hasattr(value, 'iteritems'): return recursive(dict(value.iteritems()), level=level + 1) elif hasattr(value, '__iter__'): @@ -137,6 +148,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Likely an instance of something. Watch for cycles. # Ignore class member vars. return recursive(value.__dict__, level=level + 1) + elif netaddr and isinstance(value, netaddr.IPAddress): + return six.text_type(value) else: if any(test(value) for test in _nasty_type_tests): return six.text_type(value) diff --git a/neutron/openstack/common/local.py b/neutron/openstack/common/local.py index e82f17d0f3..0819d5b97c 100644 --- a/neutron/openstack/common/local.py +++ b/neutron/openstack/common/local.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # diff --git a/neutron/openstack/common/log.py b/neutron/openstack/common/log.py index 854212bff1..427921c4ab 100644 --- a/neutron/openstack/common/log.py +++ b/neutron/openstack/common/log.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. @@ -35,6 +33,7 @@ import logging import logging.config import logging.handlers import os +import re import sys import traceback @@ -42,7 +41,7 @@ from oslo.config import cfg import six from six import moves -from neutron.openstack.common.gettextutils import _ # noqa +from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import importutils from neutron.openstack.common import jsonutils from neutron.openstack.common import local @@ -50,6 +49,24 @@ from neutron.openstack.common import local _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" +_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password'] + +# NOTE(ldbragst): Let's build a list of regex objects using the list of +# _SANITIZE_KEYS we already have. This way, we only have to add the new key +# to the list of _SANITIZE_KEYS and we can generate regular expressions +# for XML and JSON automatically. +_SANITIZE_PATTERNS = [] +_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', + r'(<%(key)s>).*?()', + r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', + r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])'] + +for key in _SANITIZE_KEYS: + for pattern in _FORMAT_PATTERNS: + reg_ex = re.compile(pattern % {'key': key}, re.DOTALL) + _SANITIZE_PATTERNS.append(reg_ex) + + common_cli_opts = [ cfg.BoolOpt('debug', short='d', @@ -113,7 +130,7 @@ generic_log_opts = [ log_opts = [ cfg.StrOpt('logging_context_format_string', default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' - '%(name)s [%(request_id)s %(user)s %(tenant)s] ' + '%(name)s [%(request_id)s %(user_identity)s] ' '%(instance)s%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', @@ -132,7 +149,6 @@ log_opts = [ 'amqp=WARN', 'amqplib=WARN', 'boto=WARN', - 'keystone=INFO', 'qpid=WARN', 'sqlalchemy=WARN', 'suds=INFO', @@ -215,6 +231,40 @@ def _get_log_file_path(binary=None): return None +def mask_password(message, secret="***"): + """Replace password with 'secret' in message. + + :param message: The string which includes security information. + :param secret: value with which to replace passwords. + :returns: The unicode value of message with the password fields masked. + + For example: + + >>> mask_password("'adminPass' : 'aaaaa'") + "'adminPass' : '***'" + >>> mask_password("'admin_pass' : 'aaaaa'") + "'admin_pass' : '***'" + >>> mask_password('"password" : "aaaaa"') + '"password" : "***"' + >>> mask_password("'original_password' : 'aaaaa'") + "'original_password' : '***'" + >>> mask_password("u'original_password' : u'aaaaa'") + "u'original_password' : u'***'" + """ + message = six.text_type(message) + + # NOTE(ldbragst): Check to see if anything in message contains any key + # specified in _SANITIZE_KEYS, if not then just return the message since + # we don't have to mask any passwords. + if not any(key in message for key in _SANITIZE_KEYS): + return message + + secret = r'\g<1>' + secret + r'\g<2>' + for pattern in _SANITIZE_PATTERNS: + message = re.sub(pattern, secret, message) + return message + + class BaseLoggerAdapter(logging.LoggerAdapter): def audit(self, msg, *args, **kwargs): @@ -282,10 +332,12 @@ class ContextAdapter(BaseLoggerAdapter): elif instance_uuid: instance_extra = (CONF.instance_uuid_format % {'uuid': instance_uuid}) - extra.update({'instance': instance_extra}) + extra['instance'] = instance_extra - extra.update({"project": self.project}) - extra.update({"version": self.version}) + extra.setdefault('user_identity', kwargs.pop('user_identity', None)) + + extra['project'] = self.project + extra['version'] = self.version extra['extra'] = extra.copy() return msg, kwargs @@ -299,7 +351,7 @@ class JSONFormatter(logging.Formatter): def formatException(self, ei, strip_newlines=True): lines = traceback.format_exception(*ei) if strip_newlines: - lines = [itertools.ifilter( + lines = [moves.filter( lambda x: x, line.rstrip().splitlines()) for line in lines] lines = list(itertools.chain(*lines)) @@ -337,10 +389,10 @@ class JSONFormatter(logging.Formatter): def _create_logging_excepthook(product_name): - def logging_excepthook(type, value, tb): + def logging_excepthook(exc_type, value, tb): extra = {} if CONF.verbose: - extra['exc_info'] = (type, value, tb) + extra['exc_info'] = (exc_type, value, tb) getLogger(product_name).critical(str(value), **extra) return logging_excepthook @@ -425,7 +477,7 @@ def _setup_logging_from_conf(): streamlog = ColorHandler() log_root.addHandler(streamlog) - elif not CONF.log_file: + elif not logpath: # pass sys.stdout as a positional argument # python2.6 calls the argument strm, in 2.7 it's stream streamlog = logging.StreamHandler(sys.stdout) diff --git a/neutron/openstack/common/log_handler.py b/neutron/openstack/common/log_handler.py index a8586debcd..8156f37e19 100644 --- a/neutron/openstack/common/log_handler.py +++ b/neutron/openstack/common/log_handler.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2013 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -15,10 +13,10 @@ # under the License. import logging -from neutron.openstack.common import notifier - from oslo.config import cfg +from neutron.openstack.common import notifier + class PublishErrorsHandler(logging.Handler): def emit(self, record): diff --git a/neutron/openstack/common/loopingcall.py b/neutron/openstack/common/loopingcall.py index f82dc7f3ae..e588c8309b 100644 --- a/neutron/openstack/common/loopingcall.py +++ b/neutron/openstack/common/loopingcall.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2011 Justin Santa Barbara diff --git a/neutron/openstack/common/network_utils.py b/neutron/openstack/common/network_utils.py index 6ddb441d74..29061519ab 100644 --- a/neutron/openstack/common/network_utils.py +++ b/neutron/openstack/common/network_utils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2012 OpenStack Foundation. # All Rights Reserved. # @@ -19,10 +17,7 @@ Network-related utilities and helper functions. """ -from neutron.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) +from neutron.openstack.common.py3kcompat import urlutils def parse_host_port(address, default_port=None): @@ -67,3 +62,18 @@ def parse_host_port(address, default_port=None): port = default_port return (host, None if port is None else int(port)) + + +def urlsplit(url, scheme='', allow_fragments=True): + """Parse a URL using urlparse.urlsplit(), splitting query and fragments. + This function papers over Python issue9374 when needed. + + The parameters are the same as urlparse.urlsplit. + """ + scheme, netloc, path, query, fragment = urlutils.urlsplit( + url, scheme, allow_fragments) + if allow_fragments and '#' in path: + path, fragment = path.split('#', 1) + if '?' in path: + path, query = path.split('?', 1) + return urlutils.SplitResult(scheme, netloc, path, query, fragment) diff --git a/neutron/openstack/common/periodic_task.py b/neutron/openstack/common/periodic_task.py index 02d0048c6c..cf2985e13a 100644 --- a/neutron/openstack/common/periodic_task.py +++ b/neutron/openstack/common/periodic_task.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -17,6 +15,7 @@ import datetime import time from oslo.config import cfg +import six from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import log as logging @@ -83,14 +82,14 @@ def periodic_task(*args, **kwargs): return f # NOTE(sirp): The `if` is necessary to allow the decorator to be used with - # and without parens. + # and without parents. # - # In the 'with-parens' case (with kwargs present), this function needs to + # In the 'with-parents' case (with kwargs present), this function needs to # return a decorator function since the interpreter will invoke it like: # # periodic_task(*args, **kwargs)(f) # - # In the 'without-parens' case, the original function will be passed + # In the 'without-parents' case, the original function will be passed # in as the first argument, like: # # periodic_task(f) @@ -150,8 +149,8 @@ class _PeriodicTasksMeta(type): cls._periodic_last_run[name] = task._periodic_last_run +@six.add_metaclass(_PeriodicTasksMeta) class PeriodicTasks(object): - __metaclass__ = _PeriodicTasksMeta def run_periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" @@ -173,7 +172,8 @@ class PeriodicTasks(object): if spacing is not None: idle_for = min(idle_for, spacing) - LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) + LOG.debug(_("Running periodic task %(full_task_name)s"), + {"full_task_name": full_task_name}) self._periodic_last_run[task_name] = timeutils.utcnow() try: @@ -182,7 +182,7 @@ class PeriodicTasks(object): if raise_on_error: raise LOG.exception(_("Error during %(full_task_name)s: %(e)s"), - locals()) + {"full_task_name": full_task_name, "e": e}) time.sleep(0) return idle_for diff --git a/neutron/openstack/common/processutils.py b/neutron/openstack/common/processutils.py index 73c214e8b2..039b9ad467 100644 --- a/neutron/openstack/common/processutils.py +++ b/neutron/openstack/common/processutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # @@ -19,6 +17,7 @@ System-level utilities and helper functions. """ +import logging as stdlib_logging import os import random import shlex @@ -81,7 +80,7 @@ def execute(*cmd, **kwargs): :param cmd: Passed to subprocess.Popen. :type cmd: string :param process_input: Send to opened process. - :type proces_input: string + :type process_input: string :param check_exit_code: Single bool, int, or list of allowed exit codes. Defaults to [0]. Raise :class:`ProcessExecutionError` unless @@ -102,6 +101,9 @@ def execute(*cmd, **kwargs): :param shell: whether or not there should be a shell used to execute this command. Defaults to false. :type shell: boolean + :param loglevel: log level for execute commands. + :type loglevel: int. (Should be stdlib_logging.DEBUG or + stdlib_logging.INFO) :returns: (stdout, stderr) from process execution :raises: :class:`UnknownArgumentError` on receiving unknown arguments @@ -116,6 +118,7 @@ def execute(*cmd, **kwargs): run_as_root = kwargs.pop('run_as_root', False) root_helper = kwargs.pop('root_helper', '') shell = kwargs.pop('shell', False) + loglevel = kwargs.pop('loglevel', stdlib_logging.DEBUG) if isinstance(check_exit_code, bool): ignore_exit_code = not check_exit_code @@ -127,7 +130,7 @@ def execute(*cmd, **kwargs): raise UnknownArgumentError(_('Got unknown keyword args ' 'to utils.execute: %r') % kwargs) - if run_as_root and os.geteuid() != 0: + if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0: if not root_helper: raise NoRootWrapSpecified( message=('Command requested root, but did not specify a root ' @@ -139,7 +142,7 @@ def execute(*cmd, **kwargs): while attempts > 0: attempts -= 1 try: - LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) + LOG.log(loglevel, _('Running cmd (subprocess): %s'), ' '.join(cmd)) _PIPE = subprocess.PIPE # pylint: disable=E1101 if os.name == 'nt': @@ -163,20 +166,19 @@ def execute(*cmd, **kwargs): result = obj.communicate() obj.stdin.close() # pylint: disable=E1101 _returncode = obj.returncode # pylint: disable=E1101 - if _returncode: - LOG.debug(_('Result was %s') % _returncode) - if not ignore_exit_code and _returncode not in check_exit_code: - (stdout, stderr) = result - raise ProcessExecutionError(exit_code=_returncode, - stdout=stdout, - stderr=stderr, - cmd=' '.join(cmd)) + LOG.log(loglevel, _('Result was %s') % _returncode) + if not ignore_exit_code and _returncode not in check_exit_code: + (stdout, stderr) = result + raise ProcessExecutionError(exit_code=_returncode, + stdout=stdout, + stderr=stderr, + cmd=' '.join(cmd)) return result except ProcessExecutionError: if not attempts: raise else: - LOG.debug(_('%r failed. Retrying.'), cmd) + LOG.log(loglevel, _('%r failed. Retrying.'), cmd) if delay_on_retry: greenthread.sleep(random.randint(20, 200) / 100.0) finally: diff --git a/neutron/openstack/common/py3kcompat/__init__.py b/neutron/openstack/common/py3kcompat/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/neutron/openstack/common/py3kcompat/urlutils.py b/neutron/openstack/common/py3kcompat/urlutils.py new file mode 100644 index 0000000000..6200271f3f --- /dev/null +++ b/neutron/openstack/common/py3kcompat/urlutils.py @@ -0,0 +1,65 @@ +# +# Copyright 2013 Canonical Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +""" +Python2/Python3 compatibility layer for OpenStack +""" + +import six + +if six.PY3: + # python3 + import urllib.error + import urllib.parse + import urllib.request + + urlencode = urllib.parse.urlencode + urljoin = urllib.parse.urljoin + quote = urllib.parse.quote + parse_qsl = urllib.parse.parse_qsl + unquote = urllib.parse.unquote + unquote_plus = urllib.parse.unquote_plus + urlparse = urllib.parse.urlparse + urlsplit = urllib.parse.urlsplit + urlunsplit = urllib.parse.urlunsplit + SplitResult = urllib.parse.SplitResult + + urlopen = urllib.request.urlopen + URLError = urllib.error.URLError + pathname2url = urllib.request.pathname2url +else: + # python2 + import urllib + import urllib2 + import urlparse + + urlencode = urllib.urlencode + quote = urllib.quote + unquote = urllib.unquote + unquote_plus = urllib.unquote_plus + + parse = urlparse + parse_qsl = parse.parse_qsl + urljoin = parse.urljoin + urlparse = parse.urlparse + urlsplit = parse.urlsplit + urlunsplit = parse.urlunsplit + SplitResult = parse.SplitResult + + urlopen = urllib2.urlopen + URLError = urllib2.URLError + pathname2url = urllib.pathname2url diff --git a/neutron/openstack/common/rpc/__init__.py b/neutron/openstack/common/rpc/__init__.py index e20d0b2889..046b35272b 100644 --- a/neutron/openstack/common/rpc/__init__.py +++ b/neutron/openstack/common/rpc/__init__.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -56,13 +54,12 @@ rpc_opts = [ help='Seconds to wait before a cast expires (TTL). ' 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', - default=['neutron.openstack.common.exception', - 'nova.exception', + default=['nova.exception', 'cinder.exception', 'exceptions', ], help='Modules of exceptions that are permitted to be recreated' - 'upon receiving exception data from an rpc call.'), + ' upon receiving exception data from an rpc call.'), cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), @@ -228,7 +225,7 @@ def notify(context, topic, msg, envelope=False): def cleanup(): - """Clean up resoruces in use by implementation. + """Clean up resources in use by implementation. Clean up any resources that have been allocated by the RPC implementation. This is typically open connections to a messaging service. This function diff --git a/neutron/openstack/common/rpc/amqp.py b/neutron/openstack/common/rpc/amqp.py index f055696c5c..b74fad0598 100644 --- a/neutron/openstack/common/rpc/amqp.py +++ b/neutron/openstack/common/rpc/amqp.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -20,9 +18,9 @@ """ Shared code between AMQP based openstack.common.rpc implementations. -The code in this module is shared between the rpc implemenations based on AMQP. -Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses -AMQP, but is deprecated and predates this code. +The code in this module is shared between the rpc implementations based on +AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also +uses AMQP, but is deprecated and predates this code. """ import collections @@ -35,6 +33,8 @@ from eventlet import pools from eventlet import queue from eventlet import semaphore from oslo.config import cfg +import six + from neutron.openstack.common import excutils from neutron.openstack.common.gettextutils import _ @@ -165,11 +165,13 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) - def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + def join_consumer_pool(self, callback, pool_name, topic, exchange_name, + ack_on_error=True): self.connection.join_consumer_pool(callback, pool_name, topic, - exchange_name) + exchange_name, + ack_on_error) def consume_in_thread(self): self.connection.consume_in_thread() @@ -187,7 +189,7 @@ class ReplyProxy(ConnectionContext): def __init__(self, conf, connection_pool): self._call_waiters = {} self._num_call_waiters = 0 - self._num_call_waiters_wrn_threshhold = 10 + self._num_call_waiters_wrn_threshold = 10 self._reply_q = 'reply_' + uuid.uuid4().hex super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False) self.declare_direct_consumer(self._reply_q, self._process_data) @@ -206,11 +208,11 @@ class ReplyProxy(ConnectionContext): def add_call_waiter(self, waiter, msg_id): self._num_call_waiters += 1 - if self._num_call_waiters > self._num_call_waiters_wrn_threshhold: + if self._num_call_waiters > self._num_call_waiters_wrn_threshold: LOG.warn(_('Number of call waiters is greater than warning ' - 'threshhold: %d. There could be a MulticallProxyWaiter ' - 'leak.') % self._num_call_waiters_wrn_threshhold) - self._num_call_waiters_wrn_threshhold *= 2 + 'threshold: %d. There could be a MulticallProxyWaiter ' + 'leak.') % self._num_call_waiters_wrn_threshold) + self._num_call_waiters_wrn_threshold *= 2 self._call_waiters[msg_id] = waiter def del_call_waiter(self, msg_id): @@ -233,18 +235,13 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, failure = rpc_common.serialize_remote_exception(failure, log_failure) - try: - msg = {'result': reply, 'failure': failure} - except TypeError: - msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + msg = {'result': reply, 'failure': failure} if ending: msg['ending'] = True _add_unique_id(msg) # If a reply_q exists, add the msg_id to the reply and pass the # reply_q to direct_send() to use it as the response queue. - # Otherwise use the msg_id for backward compatibilty. + # Otherwise use the msg_id for backward compatibility. if reply_q: msg['_msg_id'] = msg_id conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) @@ -303,8 +300,14 @@ def pack_context(msg, context): for args at some point. """ - context_d = dict([('_context_%s' % key, value) - for (key, value) in context.to_dict().iteritems()]) + if isinstance(context, dict): + context_d = dict([('_context_%s' % key, value) + for (key, value) in six.iteritems(context)]) + else: + context_d = dict([('_context_%s' % key, value) + for (key, value) in + six.iteritems(context.to_dict())]) + msg.update(context_d) @@ -362,22 +365,43 @@ class CallbackWrapper(_ThreadPoolWithWait): Allows it to be invoked in a green thread. """ - def __init__(self, conf, callback, connection_pool): + def __init__(self, conf, callback, connection_pool, + wait_for_consumers=False): """Initiates CallbackWrapper object. :param conf: cfg.CONF instance :param callback: a callable (probably a function) :param connection_pool: connection pool as returned by get_connection_pool() + :param wait_for_consumers: wait for all green threads to + complete and raise the last + caught exception, if any. + """ super(CallbackWrapper, self).__init__( conf=conf, connection_pool=connection_pool, ) self.callback = callback + self.wait_for_consumers = wait_for_consumers + self.exc_info = None + + def _wrap(self, message_data, **kwargs): + """Wrap the callback invocation to catch exceptions. + """ + try: + self.callback(message_data, **kwargs) + except Exception: + self.exc_info = sys.exc_info() def __call__(self, message_data): - self.pool.spawn_n(self.callback, message_data) + self.exc_info = None + self.pool.spawn_n(self._wrap, message_data) + + if self.wait_for_consumers: + self.pool.waitall() + if self.exc_info: + six.reraise(self.exc_info[1], None, self.exc_info[2]) class ProxyCallback(_ThreadPoolWithWait): diff --git a/neutron/openstack/common/rpc/common.py b/neutron/openstack/common/rpc/common.py index d240597afb..aa772fb95b 100644 --- a/neutron/openstack/common/rpc/common.py +++ b/neutron/openstack/common/rpc/common.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -29,12 +27,14 @@ from neutron.openstack.common import importutils from neutron.openstack.common import jsonutils from neutron.openstack.common import local from neutron.openstack.common import log as logging +from neutron.openstack.common import versionutils CONF = cfg.CONF LOG = logging.getLogger(__name__) +_RPC_ENVELOPE_VERSION = '2.0' '''RPC Envelope Version. This version number applies to the top level structure of messages sent out. @@ -47,7 +47,7 @@ This version number applies to the message envelope that is used in the serialization done inside the rpc layer. See serialize_msg() and deserialize_msg(). -The current message format (version 2.0) is very simple. It is: +The current message format (version 2.0) is very simple. It is:: { 'oslo.version': , @@ -65,7 +65,6 @@ We will JSON encode the application message payload. The message envelope, which includes the JSON encoded application message body, will be passed down to the messaging libraries as a dict. ''' -_RPC_ENVELOPE_VERSION = '2.0' _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' @@ -74,23 +73,23 @@ _REMOTE_POSTFIX = '_Remote' class RPCException(Exception): - message = _("An unknown RPC related exception occurred.") + msg_fmt = _("An unknown RPC related exception occurred.") def __init__(self, message=None, **kwargs): self.kwargs = kwargs if not message: try: - message = self.message % kwargs + message = self.msg_fmt % kwargs except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs LOG.exception(_('Exception in string format operation')) - for name, value in kwargs.iteritems(): + for name, value in six.iteritems(kwargs): LOG.error("%s: %s" % (name, value)) # at least get the core message out if something happened - message = self.message + message = self.msg_fmt super(RPCException, self).__init__(message) @@ -104,7 +103,7 @@ class RemoteError(RPCException): contains all of the relevant info. """ - message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") def __init__(self, exc_type=None, value=None, traceback=None): self.exc_type = exc_type @@ -121,7 +120,7 @@ class Timeout(RPCException): This exception is raised if the rpc_response_timeout is reached while waiting for a response from the remote side. """ - message = _('Timeout while waiting on RPC response - ' + msg_fmt = _('Timeout while waiting on RPC response - ' 'topic: "%(topic)s", RPC method: "%(method)s" ' 'info: "%(info)s"') @@ -144,25 +143,25 @@ class Timeout(RPCException): class DuplicateMessageError(RPCException): - message = _("Found duplicate message(%(msg_id)s). Skipping it.") + msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") class InvalidRPCConnectionReuse(RPCException): - message = _("Invalid reuse of an RPC connection.") + msg_fmt = _("Invalid reuse of an RPC connection.") class UnsupportedRpcVersion(RPCException): - message = _("Specified RPC version, %(version)s, not supported by " + msg_fmt = _("Specified RPC version, %(version)s, not supported by " "this endpoint.") class UnsupportedRpcEnvelopeVersion(RPCException): - message = _("Specified RPC envelope version, %(version)s, " + msg_fmt = _("Specified RPC envelope version, %(version)s, " "not supported by this endpoint.") class RpcVersionCapError(RPCException): - message = _("Specified RPC version cap, %(version_cap)s, is too low") + msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") class Connection(object): @@ -261,41 +260,20 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = {'set_admin_password': [('args', 'new_pass')], - 'run_instance': [('args', 'admin_password')], - 'route_message': [('args', 'message', 'args', 'method_info', - 'method_kwargs', 'password'), - ('args', 'message', 'args', 'method_info', - 'method_kwargs', 'admin_password')]} + SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] - has_method = 'method' in msg_data and msg_data['method'] in SANITIZE - has_context_token = '_context_auth_token' in msg_data - has_token = 'auth_token' in msg_data + def _fix_passwords(d): + """Sanitizes the password fields in the dictionary.""" + for k in six.iterkeys(d): + if k.lower().find('password') != -1: + d[k] = '' + elif k.lower() in SANITIZE: + d[k] = '' + elif isinstance(d[k], dict): + _fix_passwords(d[k]) + return d - if not any([has_method, has_context_token, has_token]): - return log_func(msg, msg_data) - - msg_data = copy.deepcopy(msg_data) - - if has_method: - for arg in SANITIZE.get(msg_data['method'], []): - try: - d = msg_data - for elem in arg[:-1]: - d = d[elem] - d[arg[-1]] = '' - except KeyError as e: - LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), - {'item': arg, - 'err': e}) - - if has_context_token: - msg_data['_context_auth_token'] = '' - - if has_token: - msg_data['auth_token'] = '' - - return log_func(msg, msg_data) + return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) def serialize_remote_exception(failure_info, log_failure=True): @@ -462,19 +440,15 @@ def client_exceptions(*exceptions): return outer +# TODO(sirp): we should deprecate this in favor of +# using `versionutils.is_compatible` directly def version_is_compatible(imp_version, version): """Determine whether versions are compatible. :param imp_version: The version implemented :param version: The version requested by an incoming message. """ - version_parts = version.split('.') - imp_version_parts = imp_version.split('.') - if int(version_parts[0]) != int(imp_version_parts[0]): # Major - return False - if int(version_parts[1]) > int(imp_version_parts[1]): # Minor - return False - return True + return versionutils.is_compatible(version, imp_version) def serialize_msg(raw_msg): diff --git a/neutron/openstack/common/rpc/dispatcher.py b/neutron/openstack/common/rpc/dispatcher.py index d734e32a6b..f266469d77 100644 --- a/neutron/openstack/common/rpc/dispatcher.py +++ b/neutron/openstack/common/rpc/dispatcher.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2012 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -83,6 +81,8 @@ On the client side, the same changes should be made as in example 1. The minimum version that supports the new parameter should be specified. """ +import six + from neutron.openstack.common.rpc import common as rpc_common from neutron.openstack.common.rpc import serializer as rpc_serializer @@ -121,7 +121,7 @@ class RpcDispatcher(object): :returns: A new set of deserialized args """ new_kwargs = dict() - for argname, arg in kwargs.iteritems(): + for argname, arg in six.iteritems(kwargs): new_kwargs[argname] = self.serializer.deserialize_entity(context, arg) return new_kwargs diff --git a/neutron/openstack/common/rpc/impl_fake.py b/neutron/openstack/common/rpc/impl_fake.py index e24db8a6c7..f6eea936d2 100644 --- a/neutron/openstack/common/rpc/impl_fake.py +++ b/neutron/openstack/common/rpc/impl_fake.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -26,6 +24,7 @@ import json import time import eventlet +import six from neutron.openstack.common.rpc import common as rpc_common @@ -69,7 +68,7 @@ class Consumer(object): # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: if failure: - raise failure[0], failure[1], failure[2] + six.reraise(failure[0], failure[1], failure[2]) res.append(reply) # if ending not 'sent'...we might have more data to # return from the function itself @@ -146,7 +145,7 @@ def multicall(conf, context, topic, msg, timeout=None): try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): - return iter([None]) + raise rpc_common.Timeout("No consumers available") else: return consumer.call(context, version, method, namespace, args, timeout) diff --git a/neutron/openstack/common/rpc/impl_kombu.py b/neutron/openstack/common/rpc/impl_kombu.py index fd73564d55..b0cb70f9ea 100644 --- a/neutron/openstack/common/rpc/impl_kombu.py +++ b/neutron/openstack/common/rpc/impl_kombu.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -18,7 +16,6 @@ import functools import itertools import socket import ssl -import sys import time import uuid @@ -29,16 +26,22 @@ import kombu.connection import kombu.entity import kombu.messaging from oslo.config import cfg +import six +from neutron.openstack.common import excutils from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import network_utils from neutron.openstack.common.rpc import amqp as rpc_amqp from neutron.openstack.common.rpc import common as rpc_common +from neutron.openstack.common import sslutils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', default='', - help='SSL version to use (valid only if SSL enabled)'), + help='SSL version to use (valid only if SSL enabled). ' + 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' + 'be available on some distributions' + ), cfg.StrOpt('kombu_ssl_keyfile', default='', help='SSL key file (valid only if SSL enabled)'), @@ -126,6 +129,7 @@ class ConsumerBase(object): self.tag = str(tag) self.kwargs = kwargs self.queue = None + self.ack_on_error = kwargs.get('ack_on_error', True) self.reconnect(channel) def reconnect(self, channel): @@ -135,6 +139,30 @@ class ConsumerBase(object): self.queue = kombu.entity.Queue(**self.kwargs) self.queue.declare() + def _callback_handler(self, message, callback): + """Call callback with deserialized message. + + Messages that are processed without exception are ack'ed. + + If the message processing generates an exception, it will be + ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed. + """ + + try: + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) + except Exception: + if self.ack_on_error: + LOG.exception(_("Failed to process message" + " ... skipping it.")) + message.ack() + else: + LOG.exception(_("Failed to process message" + " ... will requeue.")) + message.requeue() + else: + message.ack() + def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will start the flow of messages from the queue. Using the @@ -147,8 +175,6 @@ class ConsumerBase(object): If kwargs['nowait'] is True, then this call will block until a message is read. - Messages will automatically be acked if the callback doesn't - raise an exception """ options = {'consumer_tag': self.tag} @@ -159,13 +185,7 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) - try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) - except Exception: - LOG.exception(_("Failed to process message... skipping it.")) - finally: - message.ack() + self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -425,7 +445,7 @@ class Connection(object): 'virtual_host': self.conf.rabbit_virtual_host, } - for sp_key, value in server_params.iteritems(): + for sp_key, value in six.iteritems(server_params): p_key = server_params_to_kombu_params.get(sp_key, sp_key) params[p_key] = value @@ -451,7 +471,8 @@ class Connection(object): # http://docs.python.org/library/ssl.html - ssl.wrap_socket if self.conf.kombu_ssl_version: - ssl_params['ssl_version'] = self.conf.kombu_ssl_version + ssl_params['ssl_version'] = sslutils.validate_ssl_version( + self.conf.kombu_ssl_version) if self.conf.kombu_ssl_keyfile: ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile if self.conf.kombu_ssl_certfile: @@ -462,12 +483,8 @@ class Connection(object): # future with this? ssl_params['cert_reqs'] = ssl.CERT_REQUIRED - if not ssl_params: - # Just have the default behavior - return True - else: - # Return the extended behavior - return ssl_params + # Return the extended behavior or just have the default behavior + return ssl_params or True def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have @@ -534,13 +551,11 @@ class Connection(object): log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.error(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) - # NOTE(comstud): Copied from original code. There's - # really no better recourse because if this was a queue we - # need to consume on, we have no way to consume anymore. - sys.exit(1) + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info + LOG.error(msg) + raise rpc_common.RPCException(msg) if attempt == 1: sleep_time = self.interval_start or 1 @@ -609,7 +624,7 @@ class Connection(object): def _declare_consumer(): consumer = consumer_cls(self.conf, self.channel, topic, callback, - self.consumer_num.next()) + six.next(self.consumer_num)) self.consumers.append(consumer) return consumer @@ -632,8 +647,8 @@ class Connection(object): def _consume(): if info['do_consume']: - queues_head = self.consumers[:-1] - queues_tail = self.consumers[-1] + queues_head = self.consumers[:-1] # not fanout. + queues_tail = self.consumers[-1] # fanout for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) @@ -682,11 +697,12 @@ class Connection(object): self.declare_consumer(DirectConsumer, topic, callback) def declare_topic_consumer(self, topic, callback=None, queue_name=None, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, exchange_name=exchange_name, + ack_on_error=ack_on_error, ), topic, callback) @@ -715,12 +731,13 @@ class Connection(object): it = self.iterconsume(limit=limit) while True: try: - it.next() + six.next(it) except StopIteration: return def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() @@ -751,7 +768,7 @@ class Connection(object): self.declare_topic_consumer(topic, proxy_cb, pool_name) def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. @@ -765,6 +782,7 @@ class Connection(object): callback=callback, connection_pool=rpc_amqp.get_connection_pool(self.conf, Connection), + wait_for_consumers=not ack_on_error ) self.proxy_callbacks.append(callback_wrapper) self.declare_topic_consumer( @@ -772,6 +790,7 @@ class Connection(object): topic=topic, exchange_name=exchange_name, callback=callback_wrapper, + ack_on_error=ack_on_error, ) diff --git a/neutron/openstack/common/rpc/impl_qpid.py b/neutron/openstack/common/rpc/impl_qpid.py index 67e0f9c628..03b12e5d4d 100644 --- a/neutron/openstack/common/rpc/impl_qpid.py +++ b/neutron/openstack/common/rpc/impl_qpid.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation # Copyright 2011 - 2012, Red Hat, Inc. # @@ -22,7 +20,9 @@ import time import eventlet import greenlet from oslo.config import cfg +import six +from neutron.openstack.common import excutils from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import importutils from neutron.openstack.common import jsonutils @@ -149,10 +149,17 @@ class ConsumerBase(object): self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - self.reconnect(session) + self.connect(session) + + def connect(self, session): + """Declare the receiver on connect.""" + self._declare_receiver(session) def reconnect(self, session): """Re-declare the receiver after a qpid reconnect.""" + self._declare_receiver(session) + + def _declare_receiver(self, session): self.session = session self.receiver = session.receiver(self.address) self.receiver.capacity = 1 @@ -183,11 +190,15 @@ class ConsumerBase(object): except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: + # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) def get_receiver(self): return self.receiver + def get_node_name(self): + return self.address.split(';')[0] + class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'.""" @@ -263,6 +274,7 @@ class FanoutConsumer(ConsumerBase): 'topic' is the topic to listen on 'callback' is the callback to call when messages are received """ + self.conf = conf link_opts = {"exclusive": True} @@ -371,7 +383,7 @@ class DirectPublisher(Publisher): class TopicPublisher(Publisher): """Publisher class for 'topic'.""" def __init__(self, conf, session, topic): - """init a 'topic' publisher. + """Init a 'topic' publisher. """ exchange_name = rpc_amqp.get_control_exchange(conf) @@ -388,7 +400,7 @@ class TopicPublisher(Publisher): class FanoutPublisher(Publisher): """Publisher class for 'fanout'.""" def __init__(self, conf, session, topic): - """init a 'fanout' publisher. + """Init a 'fanout' publisher. """ if conf.qpid_topology_version == 1: @@ -407,7 +419,7 @@ class FanoutPublisher(Publisher): class NotifyPublisher(Publisher): """Publisher class for notifications.""" def __init__(self, conf, session, topic): - """init a 'topic' publisher. + """Init a 'topic' publisher. """ exchange_name = rpc_amqp.get_control_exchange(conf) node_opts = {"durable": True} @@ -515,7 +527,7 @@ class Connection(object): consumers = self.consumers self.consumers = {} - for consumer in consumers.itervalues(): + for consumer in six.itervalues(consumers): consumer.reconnect(self.session) self._register_consumer(consumer) @@ -673,12 +685,13 @@ class Connection(object): it = self.iterconsume(limit=limit) while True: try: - it.next() + six.next(it) except StopIteration: return def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() @@ -719,7 +732,7 @@ class Connection(object): return consumer def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. @@ -733,6 +746,7 @@ class Connection(object): callback=callback, connection_pool=rpc_amqp.get_connection_pool(self.conf, Connection), + wait_for_consumers=not ack_on_error ) self.proxy_callbacks.append(callback_wrapper) diff --git a/neutron/openstack/common/rpc/impl_zmq.py b/neutron/openstack/common/rpc/impl_zmq.py index 2d9adf5647..33fa95dd02 100644 --- a/neutron/openstack/common/rpc/impl_zmq.py +++ b/neutron/openstack/common/rpc/impl_zmq.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 Cloudscaling Group, Inc # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -25,6 +23,8 @@ import uuid import eventlet import greenlet from oslo.config import cfg +import six +from six import moves from neutron.openstack.common import excutils from neutron.openstack.common.gettextutils import _ @@ -192,7 +192,7 @@ class ZmqSocket(object): # it would be much worse if some of the code calling this # were to fail. For now, lets log, and later evaluate # if we can safely raise here. - LOG.error("ZeroMQ socket could not be closed.") + LOG.error(_("ZeroMQ socket could not be closed.")) self.sock = None def recv(self, **kwargs): @@ -221,7 +221,7 @@ class ZmqClient(object): return rpc_envelope = rpc_common.serialize_msg(data[1], envelope) - zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items()) + zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items()) self.outq.send(map(bytes, (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) @@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase): def __init__(self, conf): super(ZmqBaseReactor, self).__init__() - self.mapping = {} self.proxies = {} self.threads = [] self.sockets = [] @@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase): self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) - def register(self, proxy, in_addr, zmq_type_in, out_addr=None, - zmq_type_out=None, in_bind=True, out_bind=True, - subscribe=None): + def register(self, proxy, in_addr, zmq_type_in, + in_bind=True, subscribe=None): LOG.info(_("Registering reactor")) @@ -384,22 +382,8 @@ class ZmqBaseReactor(ConsumerBase): LOG.info(_("In reactor registered")) - if not out_addr: - return - - if zmq_type_out not in (zmq.PUSH, zmq.PUB): - raise RPCException("Bad output socktype") - - # Items push out. - outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) - - self.mapping[inq] = outq - self.mapping[outq] = inq - self.sockets.append(outq) - - LOG.info(_("Out reactor registered")) - def consume_in_thread(self): + @excutils.forever_retry_uncaught_exceptions def _consume(sock): LOG.info(_("Consuming socket")) while True: @@ -516,8 +500,7 @@ class ZmqProxy(ZmqBaseReactor): try: self.register(consumption_proxy, consume_in, - zmq.PULL, - out_bind=True) + zmq.PULL) except zmq.ZMQError: if os.access(ipc_dir, os.X_OK): with excutils.save_and_reraise_exception(): @@ -540,8 +523,8 @@ def unflatten_envelope(packenv): h = {} try: while True: - k = i.next() - h[k] = i.next() + k = six.next(i) + h[k] = six.next(i) except StopIteration: return h @@ -559,11 +542,6 @@ class ZmqReactor(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) - if sock in self.mapping: - LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { - 'data': data}) - self.mapping[sock].send(data) - return proxy = self.proxies[sock] diff --git a/neutron/openstack/common/rpc/matchmaker.py b/neutron/openstack/common/rpc/matchmaker.py index 71507f5700..26b6b9f76f 100644 --- a/neutron/openstack/common/rpc/matchmaker.py +++ b/neutron/openstack/common/rpc/matchmaker.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 Cloudscaling Group, Inc # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -92,7 +90,7 @@ class MatchMakerBase(object): """Acknowledge that a key.host is alive. Used internally for updating heartbeats, but may also be used - publically to acknowledge a system is alive (i.e. rpc message + publicly to acknowledge a system is alive (i.e. rpc message successfully sent to host) """ pass @@ -174,7 +172,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase): """Acknowledge that a host.topic is alive. Used internally for updating heartbeats, but may also be used - publically to acknowledge a system is alive (i.e. rpc message + publicly to acknowledge a system is alive (i.e. rpc message successfully sent to host) """ raise NotImplementedError("Must implement ack_alive") @@ -248,9 +246,7 @@ class DirectBinding(Binding): that it maps directly to a host, thus direct. """ def test(self, key): - if '.' in key: - return True - return False + return '.' in key class TopicBinding(Binding): @@ -262,17 +258,13 @@ class TopicBinding(Binding): matches that of a direct exchange. """ def test(self, key): - if '.' not in key: - return True - return False + return '.' not in key class FanoutBinding(Binding): """Match on fanout keys, where key starts with 'fanout.' string.""" def test(self, key): - if key.startswith('fanout~'): - return True - return False + return key.startswith('fanout~') class StubExchange(Exchange): diff --git a/neutron/openstack/common/rpc/matchmaker_redis.py b/neutron/openstack/common/rpc/matchmaker_redis.py index 5b53e5d9a3..0458060bf3 100644 --- a/neutron/openstack/common/rpc/matchmaker_redis.py +++ b/neutron/openstack/common/rpc/matchmaker_redis.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2013 Cloudscaling Group, Inc # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -95,7 +93,7 @@ class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): if not redis: raise ImportError("Failed to import module redis.") - self.redis = redis.StrictRedis( + self.redis = redis.Redis( host=CONF.matchmaker_redis.host, port=CONF.matchmaker_redis.port, password=CONF.matchmaker_redis.password) diff --git a/neutron/openstack/common/rpc/matchmaker_ring.py b/neutron/openstack/common/rpc/matchmaker_ring.py index 15a4971d85..831741922e 100644 --- a/neutron/openstack/common/rpc/matchmaker_ring.py +++ b/neutron/openstack/common/rpc/matchmaker_ring.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011-2013 Cloudscaling Group, Inc # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -63,9 +61,7 @@ class RingExchange(mm.Exchange): self.ring0[k] = itertools.cycle(self.ring[k]) def _ring_has(self, key): - if key in self.ring0: - return True - return False + return key in self.ring0 class RoundRobinRingExchange(RingExchange): diff --git a/neutron/openstack/common/rpc/proxy.py b/neutron/openstack/common/rpc/proxy.py index e180cdec79..5cb435fbf2 100644 --- a/neutron/openstack/common/rpc/proxy.py +++ b/neutron/openstack/common/rpc/proxy.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2012-2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -21,6 +19,7 @@ For more information about rpc API version numbers, see: rpc/dispatcher.py """ +import six from neutron.openstack.common import rpc from neutron.openstack.common.rpc import common as rpc_common @@ -36,7 +35,7 @@ class RpcProxy(object): rpc API. """ - # The default namespace, which can be overriden in a subclass. + # The default namespace, which can be overridden in a subclass. RPC_API_NAMESPACE = None def __init__(self, topic, default_version, version_cap=None, @@ -69,7 +68,7 @@ class RpcProxy(object): v = vers if vers else self.default_version if (self.version_cap and not rpc_common.version_is_compatible(self.version_cap, v)): - raise rpc_common.RpcVersionCapError(version=self.version_cap) + raise rpc_common.RpcVersionCapError(version_cap=self.version_cap) msg['version'] = v def _get_topic(self, topic): @@ -100,7 +99,7 @@ class RpcProxy(object): :returns: A new set of serialized arguments """ new_kwargs = dict() - for argname, arg in kwargs.iteritems(): + for argname, arg in six.iteritems(kwargs): new_kwargs[argname] = self.serializer.serialize_entity(context, arg) return new_kwargs diff --git a/neutron/openstack/common/rpc/serializer.py b/neutron/openstack/common/rpc/serializer.py index 76c6831033..9bc6e2a3a0 100644 --- a/neutron/openstack/common/rpc/serializer.py +++ b/neutron/openstack/common/rpc/serializer.py @@ -16,10 +16,12 @@ import abc +import six + +@six.add_metaclass(abc.ABCMeta) class Serializer(object): """Generic (de-)serialization definition base class.""" - __metaclass__ = abc.ABCMeta @abc.abstractmethod def serialize_entity(self, context, entity): diff --git a/neutron/openstack/common/rpc/service.py b/neutron/openstack/common/rpc/service.py index 39a98a2edc..4479dcc3ac 100644 --- a/neutron/openstack/common/rpc/service.py +++ b/neutron/openstack/common/rpc/service.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -32,10 +30,11 @@ class Service(service.Service): A service enables rpc by listening to queues based on topic and host. """ - def __init__(self, host, topic, manager=None): + def __init__(self, host, topic, manager=None, serializer=None): super(Service, self).__init__() self.host = host self.topic = topic + self.serializer = serializer if manager is None: self.manager = self else: @@ -48,7 +47,8 @@ class Service(service.Service): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) - dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], + self.serializer) # Share this same connection for these Consumers self.conn.create_consumer(self.topic, dispatcher, fanout=False) diff --git a/neutron/openstack/common/rpc/zmq_receiver.py b/neutron/openstack/common/rpc/zmq_receiver.py old mode 100755 new mode 100644 index ca0e28e2ec..bd32f0e997 --- a/neutron/openstack/common/rpc/zmq_receiver.py +++ b/neutron/openstack/common/rpc/zmq_receiver.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/neutron/openstack/common/service.py b/neutron/openstack/common/service.py index b76d355ec0..b8144bb3a7 100644 --- a/neutron/openstack/common/service.py +++ b/neutron/openstack/common/service.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # Copyright 2011 Justin Santa Barbara @@ -20,14 +18,23 @@ """Generic Node base class for all workers that run on hosts.""" import errno +import logging as std_logging import os import random import signal import sys import time +try: + # Importing just the symbol here because the io module does not + # exist in Python 2.6. + from io import UnsupportedOperation # noqa +except ImportError: + # Python 2.6 + UnsupportedOperation = None + import eventlet -import logging as std_logging +from eventlet import event from oslo.config import cfg from neutron.openstack.common import eventlet_backdoor @@ -42,6 +49,53 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +def _sighup_supported(): + return hasattr(signal, 'SIGHUP') + + +def _is_daemon(): + # The process group for a foreground process will match the + # process group of the controlling terminal. If those values do + # not match, or ioctl() fails on the stdout file handle, we assume + # the process is running in the background as a daemon. + # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics + try: + is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) + except OSError as err: + if err.errno == errno.ENOTTY: + # Assume we are a daemon because there is no terminal. + is_daemon = True + else: + raise + except UnsupportedOperation: + # Could not get the fileno for stdout, so we must be a daemon. + is_daemon = True + return is_daemon + + +def _is_sighup_and_daemon(signo): + if not (_sighup_supported() and signo == signal.SIGHUP): + # Avoid checking if we are a daemon, because the signal isn't + # SIGHUP. + return False + return _is_daemon() + + +def _signo_to_signame(signo): + signals = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'} + if _sighup_supported(): + signals[signal.SIGHUP] = 'SIGHUP' + return signals[signo] + + +def _set_signals_handler(handler): + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) + if _sighup_supported(): + signal.signal(signal.SIGHUP, handler) + + class Launcher(object): """Launch one or more services and wait for them to complete.""" @@ -51,20 +105,9 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup() + self.services = Services() self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - @staticmethod - def run_service(service): - """Start and wait for a service to finish. - - :param service: service to run and wait for. - :returns: None - - """ - service.start() - service.wait() - def launch_service(self, service): """Load and start the given service. @@ -73,7 +116,7 @@ class Launcher(object): """ service.backdoor_port = self.backdoor_port - self._services.add_thread(self.run_service, service) + self.services.add(service) def stop(self): """Stop all services which are currently running. @@ -81,7 +124,7 @@ class Launcher(object): :returns: None """ - self._services.stop() + self.services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -89,7 +132,16 @@ class Launcher(object): :returns: None """ - self._services.wait() + self.services.wait() + + def restart(self): + """Reload config files and restart service. + + :returns: None + + """ + cfg.CONF.reload_config_files() + self.services.restart() class SignalExit(SystemExit): @@ -101,33 +153,48 @@ class SignalExit(SystemExit): class ServiceLauncher(Launcher): def _handle_signal(self, signo, frame): # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) - + _set_signals_handler(signal.SIG_DFL) raise SignalExit(signo) - def wait(self): - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) + def handle_signal(self): + _set_signals_handler(self._handle_signal) + + def _wait_for_exit_or_signal(self, ready_callback=None): + status = None + signo = 0 LOG.debug(_('Full set of CONF:')) CONF.log_opt_values(LOG, std_logging.DEBUG) - status = None try: + if ready_callback: + ready_callback() super(ServiceLauncher, self).wait() except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] + signame = _signo_to_signame(exc.signo) LOG.info(_('Caught %s, exiting'), signame) status = exc.code + signo = exc.signo except SystemExit as exc: status = exc.code finally: - if rpc: - rpc.cleanup() self.stop() - return status + if rpc: + try: + rpc.cleanup() + except Exception: + # We're shutting down, so it doesn't matter at this point. + LOG.exception(_('Exception during rpc cleanup.')) + + return status, signo + + def wait(self, ready_callback=None): + while True: + self.handle_signal() + status, signo = self._wait_for_exit_or_signal(ready_callback) + if not _is_sighup_and_daemon(signo): + return status + self.restart() class ServiceWrapper(object): @@ -139,23 +206,29 @@ class ServiceWrapper(object): class ProcessLauncher(object): - def __init__(self): + def __init__(self, wait_interval=0.01): + """Constructor. + + :param wait_interval: The interval to sleep for between checks + of child process exit. + """ self.children = {} self.sigcaught = None self.running = True + self.wait_interval = wait_interval rfd, self.writepipe = os.pipe() self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + self.handle_signal() - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) + def handle_signal(self): + _set_signals_handler(self._handle_signal) def _handle_signal(self, signo, frame): self.sigcaught = signo self.running = False # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) + _set_signals_handler(signal.SIG_DFL) def _pipe_watcher(self): # This will block until the write end is closed when the parent @@ -166,16 +239,49 @@ class ProcessLauncher(object): sys.exit(1) - def _child_process(self, service): + def _child_process_handle_signal(self): # Setup child signal handlers differently def _sigterm(*args): signal.signal(signal.SIGTERM, signal.SIG_DFL) raise SignalExit(signal.SIGTERM) + def _sighup(*args): + signal.signal(signal.SIGHUP, signal.SIG_DFL) + raise SignalExit(signal.SIGHUP) + signal.signal(signal.SIGTERM, _sigterm) + if _sighup_supported(): + signal.signal(signal.SIGHUP, _sighup) # Block SIGINT and let the parent send us a SIGTERM signal.signal(signal.SIGINT, signal.SIG_IGN) + def _child_wait_for_exit_or_signal(self, launcher): + status = 0 + signo = 0 + + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + try: + launcher.wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_('Caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + finally: + launcher.stop() + + return status, signo + + def _child_process(self, service): + self._child_process_handle_signal() + # Reopen the eventlet hub to make sure we don't share an epoll # fd with parent and/or siblings, which would be bad eventlet.hubs.use_hub() @@ -189,7 +295,8 @@ class ProcessLauncher(object): random.seed() launcher = Launcher() - launcher.run_service(service) + launcher.launch_service(service) + return launcher def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: @@ -207,24 +314,13 @@ class ProcessLauncher(object): pid = os.fork() if pid == 0: - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. - status = 0 - try: - self._child_process(wrap.service) - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_('Unhandled exception')) - status = 2 - finally: - wrap.service.stop() + launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal(launcher) + if not _is_sighup_and_daemon(signo): + break + launcher.restart() os._exit(status) @@ -270,28 +366,37 @@ class ProcessLauncher(object): wrap.children.remove(pid) return wrap - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - - LOG.debug(_('Full set of CONF:')) - CONF.log_opt_values(LOG, std_logging.DEBUG) - + def _respawn_children(self): while self.running: wrap = self._wait_child() if not wrap: # Yield to other threads if no children have exited # Sleep for a short time to avoid excessive CPU usage # (see bug #1095346) - eventlet.greenthread.sleep(.01) + eventlet.greenthread.sleep(self.wait_interval) continue - while self.running and len(wrap.children) < wrap.workers: self._start_child(wrap) - if self.sigcaught: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[self.sigcaught] - LOG.info(_('Caught %s, stopping children'), signame) + def wait(self): + """Loop waiting on children to die and respawning as necessary.""" + + LOG.debug(_('Full set of CONF:')) + CONF.log_opt_values(LOG, std_logging.DEBUG) + + while True: + self.handle_signal() + self._respawn_children() + if self.sigcaught: + signame = _signo_to_signame(self.sigcaught) + LOG.info(_('Caught %s, stopping children'), signame) + if not _is_sighup_and_daemon(self.sigcaught): + break + + for pid in self.children: + os.kill(pid, signal.SIGHUP) + self.running = True + self.sigcaught = None for pid in self.children: try: @@ -313,15 +418,74 @@ class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) + # signal that the service is done shutting itself down: + self._done = event.Event() + + def reset(self): + # NOTE(Fengqian): docs for Event.reset() recommend against using it + self._done = event.Event() + def start(self): pass def stop(self): self.tg.stop() + self.tg.wait() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # Each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + if not self.done.ready(): + self.done.send() + + # reap threads: + self.tg.stop() def wait(self): self.tg.wait() + def restart(self): + self.stop() + self.done = event.Event() + for restart_service in self.services: + restart_service.reset() + self.tg.add_thread(self.run_service, restart_service, self.done) + + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + def launch(service, workers=None): if workers: diff --git a/neutron/openstack/common/sslutils.py b/neutron/openstack/common/sslutils.py new file mode 100644 index 0000000000..1d07937949 --- /dev/null +++ b/neutron/openstack/common/sslutils.py @@ -0,0 +1,98 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import ssl + +from oslo.config import cfg + +from neutron.openstack.common.gettextutils import _ + + +ssl_opts = [ + cfg.StrOpt('ca_file', + default=None, + help="CA certificate file to use to verify " + "connecting clients"), + cfg.StrOpt('cert_file', + default=None, + help="Certificate file to use when starting " + "the server securely"), + cfg.StrOpt('key_file', + default=None, + help="Private key file to use when starting " + "the server securely"), +] + + +CONF = cfg.CONF +CONF.register_opts(ssl_opts, "ssl") + + +def is_enabled(): + cert_file = CONF.ssl.cert_file + key_file = CONF.ssl.key_file + ca_file = CONF.ssl.ca_file + use_ssl = cert_file or key_file + + if cert_file and not os.path.exists(cert_file): + raise RuntimeError(_("Unable to find cert_file : %s") % cert_file) + + if ca_file and not os.path.exists(ca_file): + raise RuntimeError(_("Unable to find ca_file : %s") % ca_file) + + if key_file and not os.path.exists(key_file): + raise RuntimeError(_("Unable to find key_file : %s") % key_file) + + if use_ssl and (not cert_file or not key_file): + raise RuntimeError(_("When running server in SSL mode, you must " + "specify both a cert_file and key_file " + "option value in your configuration file")) + + return use_ssl + + +def wrap(sock): + ssl_kwargs = { + 'server_side': True, + 'certfile': CONF.ssl.cert_file, + 'keyfile': CONF.ssl.key_file, + 'cert_reqs': ssl.CERT_NONE, + } + + if CONF.ssl.ca_file: + ssl_kwargs['ca_certs'] = CONF.ssl.ca_file + ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + + return ssl.wrap_socket(sock, **ssl_kwargs) + + +_SSL_PROTOCOLS = { + "tlsv1": ssl.PROTOCOL_TLSv1, + "sslv23": ssl.PROTOCOL_SSLv23, + "sslv3": ssl.PROTOCOL_SSLv3 +} + +try: + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 +except AttributeError: + pass + + +def validate_ssl_version(version): + key = version.lower() + try: + return _SSL_PROTOCOLS[key] + except KeyError: + raise RuntimeError(_("Invalid SSL version : %s") % version) diff --git a/neutron/openstack/common/threadgroup.py b/neutron/openstack/common/threadgroup.py index 2705690422..5cfd59c94d 100644 --- a/neutron/openstack/common/threadgroup.py +++ b/neutron/openstack/common/threadgroup.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2012 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -from eventlet import greenlet +import eventlet from eventlet import greenpool from eventlet import greenthread @@ -48,6 +46,9 @@ class Thread(object): def wait(self): return self.thread.wait() + def link(self, func, *args, **kwargs): + self.thread.link(func, *args, **kwargs) + class ThreadGroup(object): """The point of the ThreadGroup classis to: @@ -79,13 +80,17 @@ class ThreadGroup(object): gt = self.pool.spawn(callback, *args, **kwargs) th = Thread(gt, self) self.threads.append(th) + return th def thread_done(self, thread): self.threads.remove(thread) def stop(self): current = greenthread.getcurrent() - for x in self.threads: + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: if x is current: # don't kill the current thread. continue @@ -105,17 +110,20 @@ class ThreadGroup(object): for x in self.timers: try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) current = greenthread.getcurrent() - for x in self.threads: + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: if x is current: continue try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) diff --git a/neutron/openstack/common/timeutils.py b/neutron/openstack/common/timeutils.py index ac2441bcb4..d5ed81d3e3 100644 --- a/neutron/openstack/common/timeutils.py +++ b/neutron/openstack/common/timeutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # @@ -21,8 +19,10 @@ Time related utilities and helper functions. import calendar import datetime +import time import iso8601 +import six # ISO 8601 extended time format with microseconds @@ -48,9 +48,9 @@ def parse_isotime(timestr): try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: - raise ValueError(e.message) + raise ValueError(six.text_type(e)) except TypeError as e: - raise ValueError(e.message) + raise ValueError(six.text_type(e)) def strtime(at=None, fmt=PERFECT_TIME_FORMAT): @@ -75,20 +75,31 @@ def normalize_time(timestamp): def is_older_than(before, seconds): """Return True if before is older than seconds.""" - if isinstance(before, basestring): + if isinstance(before, six.string_types): before = parse_strtime(before).replace(tzinfo=None) + else: + before = before.replace(tzinfo=None) + return utcnow() - before > datetime.timedelta(seconds=seconds) def is_newer_than(after, seconds): """Return True if after is newer than seconds.""" - if isinstance(after, basestring): + if isinstance(after, six.string_types): after = parse_strtime(after).replace(tzinfo=None) + else: + after = after.replace(tzinfo=None) + return after - utcnow() > datetime.timedelta(seconds=seconds) def utcnow_ts(): """Timestamp version of our utcnow function.""" + if utcnow.override_time is None: + # NOTE(kgriffs): This is several times faster + # than going through calendar.timegm(...) + return int(time.time()) + return calendar.timegm(utcnow().timetuple()) @@ -110,12 +121,15 @@ def iso8601_from_timestamp(timestamp): utcnow.override_time = None -def set_time_override(override_time=datetime.datetime.utcnow()): +def set_time_override(override_time=None): """Overrides utils.utcnow. Make it return a constant time or a list thereof, one at a time. + + :param override_time: datetime instance or list thereof. If not + given, defaults to the current UTC time. """ - utcnow.override_time = override_time + utcnow.override_time = override_time or datetime.datetime.utcnow() def advance_time_delta(timedelta): @@ -168,6 +182,15 @@ def delta_seconds(before, after): datetime objects (as a float, to microsecond resolution). """ delta = after - before + return total_seconds(delta) + + +def total_seconds(delta): + """Return the total seconds of datetime.timedelta object. + + Compute total seconds of datetime.timedelta, datetime.timedelta + doesn't have method total_seconds in Python2.6, calculate it manually. + """ try: return delta.total_seconds() except AttributeError: @@ -178,8 +201,8 @@ def delta_seconds(before, after): def is_soon(dt, window): """Determines if time is going to happen in the next window seconds. - :params dt: the time - :params window: minimum seconds to remain to consider the time not soon + :param dt: the time + :param window: minimum seconds to remain to consider the time not soon :return: True if expiration is within the given duration """ diff --git a/neutron/openstack/common/uuidutils.py b/neutron/openstack/common/uuidutils.py index 7608acb942..234b880c99 100644 --- a/neutron/openstack/common/uuidutils.py +++ b/neutron/openstack/common/uuidutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright (c) 2012 Intel Corporation. # All Rights Reserved. # diff --git a/neutron/openstack/common/versionutils.py b/neutron/openstack/common/versionutils.py new file mode 100644 index 0000000000..04472a8671 --- /dev/null +++ b/neutron/openstack/common/versionutils.py @@ -0,0 +1,148 @@ +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helpers for comparing version strings. +""" + +import functools +import pkg_resources + +from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class deprecated(object): + """A decorator to mark callables as deprecated. + + This decorator logs a deprecation message when the callable it decorates is + used. The message will include the release where the callable was + deprecated, the release where it may be removed and possibly an optional + replacement. + + Examples: + + 1. Specifying the required deprecated release + + >>> @deprecated(as_of=deprecated.ICEHOUSE) + ... def a(): pass + + 2. Specifying a replacement: + + >>> @deprecated(as_of=deprecated.ICEHOUSE, in_favor_of='f()') + ... def b(): pass + + 3. Specifying the release where the functionality may be removed: + + >>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=+1) + ... def c(): pass + + """ + + FOLSOM = 'F' + GRIZZLY = 'G' + HAVANA = 'H' + ICEHOUSE = 'I' + + _RELEASES = { + 'F': 'Folsom', + 'G': 'Grizzly', + 'H': 'Havana', + 'I': 'Icehouse', + } + + _deprecated_msg_with_alternative = _( + '%(what)s is deprecated as of %(as_of)s in favor of ' + '%(in_favor_of)s and may be removed in %(remove_in)s.') + + _deprecated_msg_no_alternative = _( + '%(what)s is deprecated as of %(as_of)s and may be ' + 'removed in %(remove_in)s. It will not be superseded.') + + def __init__(self, as_of, in_favor_of=None, remove_in=2, what=None): + """Initialize decorator + + :param as_of: the release deprecating the callable. Constants + are define in this class for convenience. + :param in_favor_of: the replacement for the callable (optional) + :param remove_in: an integer specifying how many releases to wait + before removing (default: 2) + :param what: name of the thing being deprecated (default: the + callable's name) + + """ + self.as_of = as_of + self.in_favor_of = in_favor_of + self.remove_in = remove_in + self.what = what + + def __call__(self, func): + if not self.what: + self.what = func.__name__ + '()' + + @functools.wraps(func) + def wrapped(*args, **kwargs): + msg, details = self._build_message() + LOG.deprecated(msg, details) + return func(*args, **kwargs) + return wrapped + + def _get_safe_to_remove_release(self, release): + # TODO(dstanek): this method will have to be reimplemented once + # when we get to the X release because once we get to the Y + # release, what is Y+2? + new_release = chr(ord(release) + self.remove_in) + if new_release in self._RELEASES: + return self._RELEASES[new_release] + else: + return new_release + + def _build_message(self): + details = dict(what=self.what, + as_of=self._RELEASES[self.as_of], + remove_in=self._get_safe_to_remove_release(self.as_of)) + + if self.in_favor_of: + details['in_favor_of'] = self.in_favor_of + msg = self._deprecated_msg_with_alternative + else: + msg = self._deprecated_msg_no_alternative + return msg, details + + +def is_compatible(requested_version, current_version, same_major=True): + """Determine whether `requested_version` is satisfied by + `current_version`; in other words, `current_version` is >= + `requested_version`. + + :param requested_version: version to check for compatibility + :param current_version: version to check against + :param same_major: if True, the major version must be identical between + `requested_version` and `current_version`. This is used when a + major-version difference indicates incompatibility between the two + versions. Since this is the common-case in practice, the default is + True. + :returns: True if compatible, False if not + """ + requested_parts = pkg_resources.parse_version(requested_version) + current_parts = pkg_resources.parse_version(current_version) + + if same_major and (requested_parts[0] != current_parts[0]): + return False + + return current_parts >= requested_parts diff --git a/neutron/tests/unit/test_api_v2_resource.py b/neutron/tests/unit/test_api_v2_resource.py index 83e8bfa3d3..ed3ea33fc6 100644 --- a/neutron/tests/unit/test_api_v2_resource.py +++ b/neutron/tests/unit/test_api_v2_resource.py @@ -168,7 +168,7 @@ class ResourceTestCase(base.BaseTestCase): self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body), expected_res) - @mock.patch('neutron.openstack.common.gettextutils.get_localized_message') + @mock.patch('neutron.openstack.common.gettextutils.translate') def test_unmapped_neutron_error_localized(self, mock_translation): gettextutils.install('blaa', lazy=True) msg_translation = 'Translated error' @@ -238,7 +238,7 @@ class ResourceTestCase(base.BaseTestCase): self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body), expected_res) - @mock.patch('neutron.openstack.common.gettextutils.get_localized_message') + @mock.patch('neutron.openstack.common.gettextutils.translate') def test_mapped_neutron_error_localized(self, mock_translation): gettextutils.install('blaa', lazy=True) msg_translation = 'Translated error' diff --git a/neutron/wsgi.py b/neutron/wsgi.py index f19881ce47..9e7964b458 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -1020,7 +1020,7 @@ class Router(object): if not match: language = req.best_match_language() msg = _('The resource could not be found.') - msg = gettextutils.get_localized_message(msg, language) + msg = gettextutils.translate(msg, language) return webob.exc.HTTPNotFound(explanation=msg) app = match['controller'] return app diff --git a/openstack-common.conf b/openstack-common.conf index e548fd774f..8e578748aa 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -21,12 +21,15 @@ module=notifier module=periodic_task module=policy module=processutils +module=py3kcompat module=rpc module=service +module=sslutils module=rootwrap module=threadgroup module=timeutils module=uuidutils +module=versionutils # The base module to hold the copy of openstack.common base=neutron