Merge "Update Oslo to 96d1f887dda Part 1"

This commit is contained in:
Jenkins
2013-08-14 02:28:30 +00:00
committed by Gerrit Code Review
23 changed files with 649 additions and 260 deletions

View File

@@ -128,7 +128,14 @@
# Options defined in heat.openstack.common.eventlet_backdoor # Options defined in heat.openstack.common.eventlet_backdoor
# #
# port for eventlet backdoor to listen (integer value) # Enable eventlet backdoor. Acceptable values are 0, <port>,
# and <start>:<end>, where 0 results in listening on a random
# tcp port number; <port> results in listening on the
# specified port number (and not enabling backdoor if that
# port is in use); and <start>:<end> results in listening on
# the smallest unused port number within the specified range
# of port numbers. The chosen port is displayed in the
# service's log file. (string value)
#backdoor_port=<None> #backdoor_port=<None>
@@ -292,12 +299,24 @@
#control_exchange=openstack #control_exchange=openstack
#
# Options defined in heat.openstack.common.rpc.amqp
#
# Use durable queues in amqp. (boolean value)
#amqp_durable_queues=false
# Auto-delete queues in amqp. (boolean value)
#amqp_auto_delete=false
# #
# Options defined in heat.openstack.common.rpc.impl_kombu # Options defined in heat.openstack.common.rpc.impl_kombu
# #
# SSL version to use (valid only if SSL enabled) (string # SSL version to use (valid only if SSL enabled). valid values
# value) # are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
# distributions (string value)
#kombu_ssl_version= #kombu_ssl_version=
# SSL key file (valid only if SSL enabled) (string value) # SSL key file (valid only if SSL enabled) (string value)
@@ -346,9 +365,6 @@
# value) # value)
#rabbit_max_retries=0 #rabbit_max_retries=0
# use durable queues in RabbitMQ (boolean value)
#rabbit_durable_queues=false
# use H/A queues in RabbitMQ (x-ha-policy: all).You need to # use H/A queues in RabbitMQ (x-ha-policy: all).You need to
# wipe RabbitMQ database when changing this option. (boolean # wipe RabbitMQ database when changing this option. (boolean
# value) # value)
@@ -431,6 +447,25 @@
#matchmaker_heartbeat_ttl=600 #matchmaker_heartbeat_ttl=600
[ssl]
#
# Options defined in heat.openstack.common.sslutils
#
# CA certificate file to use to verify connecting clients
# (string value)
#ca_file=<None>
# Certificate file to use when starting the server securely
# (string value)
#cert_file=<None>
# Private key file to use when starting the server securely
# (string value)
#key_file=<None>
[paste_deploy] [paste_deploy]
# #
@@ -507,4 +542,3 @@
#ringfile=/etc/oslo/matchmaker_ring.json #ringfile=/etc/oslo/matchmaker_ring.json
# Total option count: 109

35
heat/openstack/common/config/generator.py Executable file → Normal file
View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 SINA Corporation # Copyright 2012 SINA Corporation
@@ -16,8 +15,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
# @author: Zhongyue Luo, SINA Corporation.
#
"""Extracts OpenStack config option info from module(s).""" """Extracts OpenStack config option info from module(s)."""
@@ -53,7 +50,6 @@ OPT_TYPES = {
MULTISTROPT: 'multi valued', MULTISTROPT: 'multi valued',
} }
OPTION_COUNT = 0
OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT, OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT,
FLOATOPT, LISTOPT, FLOATOPT, LISTOPT,
MULTISTROPT])) MULTISTROPT]))
@@ -100,8 +96,6 @@ def generate(srcfiles):
for group, opts in opts_by_group.items(): for group, opts in opts_by_group.items():
print_group_opts(group, opts) print_group_opts(group, opts)
print("# Total option count: %d" % OPTION_COUNT)
def _import_module(mod_str): def _import_module(mod_str):
try: try:
@@ -166,9 +160,7 @@ def _list_opts(obj):
def print_group_opts(group, opts_by_module): def print_group_opts(group, opts_by_module):
print("[%s]" % group) print("[%s]" % group)
print('') print('')
global OPTION_COUNT
for mod, opts in opts_by_module: for mod, opts in opts_by_module:
OPTION_COUNT += len(opts)
print('#') print('#')
print('# Options defined in %s' % mod) print('# Options defined in %s' % mod)
print('#') print('#')
@@ -189,24 +181,24 @@ def _get_my_ip():
return None return None
def _sanitize_default(s): def _sanitize_default(name, value):
"""Set up a reasonably sensible default for pybasedir, my_ip and host.""" """Set up a reasonably sensible default for pybasedir, my_ip and host."""
if s.startswith(sys.prefix): if value.startswith(sys.prefix):
# NOTE(jd) Don't use os.path.join, because it is likely to think the # NOTE(jd) Don't use os.path.join, because it is likely to think the
# second part is an absolute pathname and therefore drop the first # second part is an absolute pathname and therefore drop the first
# part. # part.
s = os.path.normpath("/usr/" + s[len(sys.prefix):]) value = os.path.normpath("/usr/" + value[len(sys.prefix):])
elif s.startswith(BASEDIR): elif value.startswith(BASEDIR):
return s.replace(BASEDIR, '/usr/lib/python/site-packages') return value.replace(BASEDIR, '/usr/lib/python/site-packages')
elif BASEDIR in s: elif BASEDIR in value:
return s.replace(BASEDIR, '') return value.replace(BASEDIR, '')
elif s == _get_my_ip(): elif value == _get_my_ip():
return '10.0.0.1' return '10.0.0.1'
elif s == socket.gethostname(): elif value == socket.gethostname() and 'host' in name:
return 'heat' return 'heat'
elif s.strip() != s: elif value.strip() != value:
return '"%s"' % s return '"%s"' % value
return s return value
def _print_opt(opt): def _print_opt(opt):
@@ -227,7 +219,8 @@ def _print_opt(opt):
print('#%s=<None>' % opt_name) print('#%s=<None>' % opt_name)
elif opt_type == STROPT: elif opt_type == STROPT:
assert(isinstance(opt_default, basestring)) assert(isinstance(opt_default, basestring))
print('#%s=%s' % (opt_name, _sanitize_default(opt_default))) print('#%s=%s' % (opt_name, _sanitize_default(opt_name,
opt_default)))
elif opt_type == BOOLOPT: elif opt_type == BOOLOPT:
assert(isinstance(opt_default, bool)) assert(isinstance(opt_default, bool))
print('#%s=%s' % (opt_name, str(opt_default).lower())) print('#%s=%s' % (opt_name, str(opt_default).lower()))

View File

@@ -61,7 +61,7 @@ class RequestContext(object):
'request_id': self.request_id} 'request_id': self.request_id}
def get_admin_context(show_deleted="no"): def get_admin_context(show_deleted=False):
context = RequestContext(None, context = RequestContext(None,
tenant=None, tenant=None,
is_admin=True, is_admin=True,

View File

@@ -18,8 +18,11 @@
from __future__ import print_function from __future__ import print_function
import errno
import gc import gc
import os
import pprint import pprint
import socket
import sys import sys
import traceback import traceback
@@ -28,14 +31,34 @@ import eventlet.backdoor
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging
help_for_backdoor_port = (
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
"in listening on a random tcp port number; <port> results in listening "
"on the specified port number (and not enabling backdoor if that port "
"is in use); and <start>:<end> results in listening on the smallest "
"unused port number within the specified range of port numbers. The "
"chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [ eventlet_backdoor_opts = [
cfg.IntOpt('backdoor_port', cfg.StrOpt('backdoor_port',
default=None, default=None,
help='port for eventlet backdoor to listen') help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
] ]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts) CONF.register_opts(eventlet_backdoor_opts)
LOG = logging.getLogger(__name__)
class EventletBackdoorConfigValueError(Exception):
def __init__(self, port_range, help_msg, ex):
msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
'%(help)s' %
{'range': port_range, 'ex': ex, 'help': help_msg})
super(EventletBackdoorConfigValueError, self).__init__(msg)
self.port_range = port_range
def _dont_use_this(): def _dont_use_this():
@@ -60,6 +83,33 @@ def _print_nativethreads():
print() print()
def _parse_port_range(port_range):
if ':' not in port_range:
start, end = port_range, port_range
else:
start, end = port_range.split(':', 1)
try:
start, end = int(start), int(end)
if end < start:
raise ValueError
return start, end
except ValueError as ex:
raise EventletBackdoorConfigValueError(port_range, ex,
help_for_backdoor_port)
def _listen(host, start_port, end_port, listen_func):
try_port = start_port
while True:
try:
return listen_func((host, try_port))
except socket.error as exc:
if (exc.errno != errno.EADDRINUSE or
try_port >= end_port):
raise
try_port += 1
def initialize_if_enabled(): def initialize_if_enabled():
backdoor_locals = { backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process 'exit': _dont_use_this, # So we don't exit the entire process
@@ -72,6 +122,8 @@ def initialize_if_enabled():
if CONF.backdoor_port is None: if CONF.backdoor_port is None:
return None return None
start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
# NOTE(johannes): The standard sys.displayhook will print the value of # NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites # the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint # the __builtin__._ that gettext sets. Let's switch to using pprint
@@ -82,8 +134,13 @@ def initialize_if_enabled():
pprint.pprint(val) pprint.pprint(val)
sys.displayhook = displayhook sys.displayhook = displayhook
sock = eventlet.listen(('localhost', CONF.backdoor_port)) sock = _listen('localhost', start_port, end_port, eventlet.listen)
# In the case of backdoor port being zero, a port number is assigned by
# listen(). In any case, pull the port number out here.
port = sock.getsockname()[1] port = sock.getsockname()[1]
LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
{'port': port, 'pid': os.getpid()})
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals) locals=backdoor_locals)
return port return port

View File

@@ -22,7 +22,7 @@ import sys
from eventlet import event from eventlet import event
from eventlet import greenthread from eventlet import greenthread
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common import timeutils from heat.openstack.common import timeutils

View File

@@ -19,10 +19,7 @@
Network-related utilities and helper functions. Network-related utilities and helper functions.
""" """
from heat.openstack.common import log as logging import urlparse
LOG = logging.getLogger(__name__)
def parse_host_port(address, default_port=None): def parse_host_port(address, default_port=None):
@@ -67,3 +64,18 @@ def parse_host_port(address, default_port=None):
port = default_port port = default_port
return (host, None if port is None else int(port)) return (host, None if port is None else int(port))
def urlsplit(url, scheme='', allow_fragments=True):
"""Parse a URL using urlparse.urlsplit(), splitting query and fragments.
This function papers over Python issue9374 when needed.
The parameters are the same as urlparse.urlsplit.
"""
scheme, netloc, path, query, fragment = urlparse.urlsplit(
url, scheme, allow_fragments)
if allow_fragments and '#' in path:
path, fragment = path.split('#', 1)
if '?' in path:
path, query = path.split('?', 1)
return urlparse.SplitResult(scheme, netloc, path, query, fragment)

View File

@@ -157,29 +157,16 @@ def _get_drivers():
if _drivers is None: if _drivers is None:
_drivers = {} _drivers = {}
for notification_driver in CONF.notification_driver: for notification_driver in CONF.notification_driver:
add_driver(notification_driver) try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
return _drivers.values() return _drivers.values()
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver
def _reset_drivers(): def _reset_drivers():
"""Used by unit tests to reset the drivers.""" """Used by unit tests to reset the drivers."""
global _drivers global _drivers

View File

@@ -29,7 +29,7 @@ import inspect
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import local from heat.openstack.common import local
from heat.openstack.common import log as logging from heat.openstack.common import log as logging

View File

@@ -34,14 +34,28 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import queue from eventlet import queue
from eventlet import semaphore from eventlet import semaphore
from oslo.config import cfg
from heat.openstack.common import excutils from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import local from heat.openstack.common import local
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common.rpc import common as rpc_common from heat.openstack.common.rpc import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
deprecated_name='rabbit_durable_queues',
deprecated_group='DEFAULT',
help='Use durable queues in amqp.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
help='Auto-delete queues in amqp.'),
]
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id' UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -151,11 +165,13 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name): def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name) self.connection.create_worker(topic, proxy, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, exchange_name): def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
ack_on_error=True):
self.connection.join_consumer_pool(callback, self.connection.join_consumer_pool(callback,
pool_name, pool_name,
topic, topic,
exchange_name) exchange_name,
ack_on_error)
def consume_in_thread(self): def consume_in_thread(self):
self.connection.consume_in_thread() self.connection.consume_in_thread()
@@ -219,12 +235,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure = rpc_common.serialize_remote_exception(failure, failure = rpc_common.serialize_remote_exception(failure,
log_failure) log_failure)
try: msg = {'result': reply, 'failure': failure}
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
_add_unique_id(msg) _add_unique_id(msg)

View File

@@ -24,6 +24,7 @@ import traceback
from oslo.config import cfg from oslo.config import cfg
import six import six
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
from heat.openstack.common import local from heat.openstack.common import local
@@ -73,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote'
class RPCException(Exception): class RPCException(Exception):
message = _("An unknown RPC related exception occurred.") msg_fmt = _("An unknown RPC related exception occurred.")
def __init__(self, message=None, **kwargs): def __init__(self, message=None, **kwargs):
self.kwargs = kwargs self.kwargs = kwargs
if not message: if not message:
try: try:
message = self.message % kwargs message = self.msg_fmt % kwargs
except Exception: except Exception:
# kwargs doesn't match a variable in the message # kwargs doesn't match a variable in the message
@@ -89,7 +90,7 @@ class RPCException(Exception):
for name, value in kwargs.iteritems(): for name, value in kwargs.iteritems():
LOG.error("%s: %s" % (name, value)) LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened # at least get the core message out if something happened
message = self.message message = self.msg_fmt
super(RPCException, self).__init__(message) super(RPCException, self).__init__(message)
@@ -103,7 +104,7 @@ class RemoteError(RPCException):
contains all of the relevant info. contains all of the relevant info.
""" """
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None): def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type self.exc_type = exc_type
@@ -120,7 +121,7 @@ class Timeout(RPCException):
This exception is raised if the rpc_response_timeout is reached while This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side. waiting for a response from the remote side.
""" """
message = _('Timeout while waiting on RPC response - ' msg_fmt = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" ' 'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"') 'info: "%(info)s"')
@@ -143,25 +144,25 @@ class Timeout(RPCException):
class DuplicateMessageError(RPCException): class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.") msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException): class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.") msg_fmt = _("Invalid reuse of an RPC connection.")
class UnsupportedRpcVersion(RPCException): class UnsupportedRpcVersion(RPCException):
message = _("Specified RPC version, %(version)s, not supported by " msg_fmt = _("Specified RPC version, %(version)s, not supported by "
"this endpoint.") "this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException): class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, " msg_fmt = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.") "not supported by this endpoint.")
class RpcVersionCapError(RPCException): class RpcVersionCapError(RPCException):
message = _("Specified RPC version cap, %(version_cap)s, is too low") msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object): class Connection(object):
@@ -260,41 +261,20 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': [('args', 'new_pass')], SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE def _fix_passwords(d):
has_context_token = '_context_auth_token' in msg_data """Sanitizes the password fields in the dictionary."""
has_token = 'auth_token' in msg_data for k in d.iterkeys():
if k.lower().find('password') != -1:
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
if not any([has_method, has_context_token, has_token]): return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
return log_func(msg, msg_data)
msg_data = copy.deepcopy(msg_data)
if has_method:
for arg in SANITIZE.get(msg_data['method'], []):
try:
d = msg_data
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
if has_token:
msg_data['auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
def serialize_remote_exception(failure_info, log_failure=True): def serialize_remote_exception(failure_info, log_failure=True):

View File

@@ -18,7 +18,6 @@ import functools
import itertools import itertools
import socket import socket
import ssl import ssl
import sys
import time import time
import uuid import uuid
@@ -30,15 +29,20 @@ import kombu.entity
import kombu.messaging import kombu.messaging
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import network_utils from heat.openstack.common import network_utils
from heat.openstack.common.rpc import amqp as rpc_amqp from heat.openstack.common.rpc import amqp as rpc_amqp
from heat.openstack.common.rpc import common as rpc_common from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common import sslutils
kombu_opts = [ kombu_opts = [
cfg.StrOpt('kombu_ssl_version', cfg.StrOpt('kombu_ssl_version',
default='', default='',
help='SSL version to use (valid only if SSL enabled)'), help='SSL version to use (valid only if SSL enabled). '
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
'be available on some distributions'
),
cfg.StrOpt('kombu_ssl_keyfile', cfg.StrOpt('kombu_ssl_keyfile',
default='', default='',
help='SSL key file (valid only if SSL enabled)'), help='SSL key file (valid only if SSL enabled)'),
@@ -82,9 +86,6 @@ kombu_opts = [
default=0, default=0,
help='maximum retries with trying to connect to RabbitMQ ' help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'), '(the default of 0 implies an infinite retry count)'),
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues', cfg.BoolOpt('rabbit_ha_queues',
default=False, default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).' help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@@ -129,6 +130,7 @@ class ConsumerBase(object):
self.tag = str(tag) self.tag = str(tag)
self.kwargs = kwargs self.kwargs = kwargs
self.queue = None self.queue = None
self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel) self.reconnect(channel)
def reconnect(self, channel): def reconnect(self, channel):
@@ -138,6 +140,36 @@ class ConsumerBase(object):
self.queue = kombu.entity.Queue(**self.kwargs) self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare() self.queue.declare()
def _callback_handler(self, message, callback):
"""Call callback with deserialized message.
Messages that are processed without exception are ack'ed.
If the message processing generates an exception, it will be
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
Rejection is better than waiting for the message to timeout.
Rejected messages are immediately requeued.
"""
ack_msg = False
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
ack_msg = True
except Exception:
if self.ack_on_error:
ack_msg = True
LOG.exception(_("Failed to process message"
" ... skipping it."))
else:
LOG.exception(_("Failed to process message"
" ... will requeue."))
finally:
if ack_msg:
message.ack()
else:
message.reject()
def consume(self, *args, **kwargs): def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will """Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the start the flow of messages from the queue. Using the
@@ -150,8 +182,6 @@ class ConsumerBase(object):
If kwargs['nowait'] is True, then this call will block until If kwargs['nowait'] is True, then this call will block until
a message is read. a message is read.
Messages will automatically be acked if the callback doesn't
raise an exception
""" """
options = {'consumer_tag': self.tag} options = {'consumer_tag': self.tag}
@@ -162,13 +192,7 @@ class ConsumerBase(object):
def _callback(raw_message): def _callback(raw_message):
message = self.channel.message_to_python(raw_message) message = self.channel.message_to_python(raw_message)
try: self._callback_handler(message, callback)
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options) self.queue.consume(*args, callback=_callback, **options)
@@ -233,9 +257,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments Other kombu options may be passed as keyword arguments
""" """
# Default options # Default options
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf), 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False, 'auto_delete': conf.amqp_auto_delete,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@@ -339,8 +363,8 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults Kombu options may be passed as keyword args to override defaults
""" """
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.amqp_durable_queues,
'auto_delete': False, 'auto_delete': conf.amqp_auto_delete,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf) exchange_name = rpc_amqp.get_control_exchange(conf)
@@ -370,7 +394,7 @@ class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'.""" """Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs): def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf) self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
@@ -454,7 +478,8 @@ class Connection(object):
# http://docs.python.org/library/ssl.html - ssl.wrap_socket # http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version: if self.conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.conf.kombu_ssl_version ssl_params['ssl_version'] = sslutils.validate_ssl_version(
self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile: if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile: if self.conf.kombu_ssl_certfile:
@@ -465,12 +490,8 @@ class Connection(object):
# future with this? # future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
if not ssl_params: # Return the extended behavior or just have the default behavior
# Just have the default behavior return ssl_params or True
return True
else:
# Return the extended behavior
return ssl_params
def _connect(self, params): def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have """Connect to rabbit. Re-establish any queues that may have
@@ -537,13 +558,11 @@ class Connection(object):
log_info.update(params) log_info.update(params)
if self.max_retries and attempt == self.max_retries: if self.max_retries and attempt == self.max_retries:
LOG.error(_('Unable to connect to AMQP server on ' msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d ' '%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info) 'tries: %(err_str)s') % log_info
# NOTE(comstud): Copied from original code. There's LOG.error(msg)
# really no better recourse because if this was a queue we raise rpc_common.RPCException(msg)
# need to consume on, we have no way to consume anymore.
sys.exit(1)
if attempt == 1: if attempt == 1:
sleep_time = self.interval_start or 1 sleep_time = self.interval_start or 1
@@ -635,8 +654,8 @@ class Connection(object):
def _consume(): def _consume():
if info['do_consume']: if info['do_consume']:
queues_head = self.consumers[:-1] queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] queues_tail = self.consumers[-1] # fanout
for queue in queues_head: for queue in queues_head:
queue.consume(nowait=True) queue.consume(nowait=True)
queues_tail.consume(nowait=False) queues_tail.consume(nowait=False)
@@ -685,11 +704,12 @@ class Connection(object):
self.declare_consumer(DirectConsumer, topic, callback) self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None, def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None): exchange_name=None, ack_on_error=True):
"""Create a 'topic' consumer.""" """Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer, self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name, name=queue_name,
exchange_name=exchange_name, exchange_name=exchange_name,
ack_on_error=ack_on_error,
), ),
topic, callback) topic, callback)
@@ -724,6 +744,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()
@@ -754,7 +775,7 @@ class Connection(object):
self.declare_topic_consumer(topic, proxy_cb, pool_name) self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None): exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from """Register as a member of a group of consumers for a given topic from
the specified exchange. the specified exchange.
@@ -775,6 +796,7 @@ class Connection(object):
topic=topic, topic=topic,
exchange_name=exchange_name, exchange_name=exchange_name,
callback=callback_wrapper, callback=callback_wrapper,
ack_on_error=ack_on_error,
) )

View File

@@ -24,7 +24,8 @@ import eventlet
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
@@ -118,10 +119,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session) self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
self._declare_receiver(session)
def reconnect(self, session): def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect.""" """Re-declare the receiver after a qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session self.session = session
self.receiver = session.receiver(self.address) self.receiver = session.receiver(self.address)
self.receiver.capacity = 1 self.receiver.capacity = 1
@@ -152,11 +160,15 @@ class ConsumerBase(object):
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally: finally:
# TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message) self.session.acknowledge(message)
def get_receiver(self): def get_receiver(self):
return self.receiver return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase): class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'.""" """Queue/consumer class for 'direct'."""
@@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
super(DirectConsumer, self).__init__(session, callback, super(DirectConsumer, self).__init__(
"%s/%s" % (msg_id, msg_id), session, callback,
{"type": "direct"}, "%s/%s" % (msg_id, msg_id),
msg_id, {"type": "direct"},
{"exclusive": True}) msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
class TopicConsumer(ConsumerBase): class TopicConsumer(ConsumerBase):
@@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase):
""" """
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback, super(TopicConsumer, self).__init__(
"%s/%s" % (exchange_name, topic), session, callback,
{}, name or topic, {}) "%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
class FanoutConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase):
@@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on 'topic' is the topic to listen on
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
self.conf = conf
super(FanoutConsumer, self).__init__( super(FanoutConsumer, self).__init__(
session, callback, session, callback,
@@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex), "%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True}) {"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object): class Publisher(object):
"""Base Publisher class.""" """Base Publisher class."""
@@ -575,6 +610,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()
@@ -615,7 +651,7 @@ class Connection(object):
return consumer return consumer
def join_consumer_pool(self, callback, pool_name, topic, def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None): exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from """Register as a member of a group of consumers for a given topic from
the specified exchange. the specified exchange.

View File

@@ -27,7 +27,7 @@ import greenlet
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common import excutils from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
from heat.openstack.common.rpc import common as rpc_common from heat.openstack.common.rpc import common as rpc_common
@@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase):
def __init__(self, conf): def __init__(self, conf):
super(ZmqBaseReactor, self).__init__() super(ZmqBaseReactor, self).__init__()
self.mapping = {}
self.proxies = {} self.proxies = {}
self.threads = [] self.threads = []
self.sockets = [] self.sockets = []
@@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
def register(self, proxy, in_addr, zmq_type_in, out_addr=None, def register(self, proxy, in_addr, zmq_type_in,
zmq_type_out=None, in_bind=True, out_bind=True, in_bind=True, subscribe=None):
subscribe=None):
LOG.info(_("Registering reactor")) LOG.info(_("Registering reactor"))
@@ -384,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("In reactor registered")) LOG.info(_("In reactor registered"))
if not out_addr:
return
if zmq_type_out not in (zmq.PUSH, zmq.PUB):
raise RPCException("Bad output socktype")
# Items push out.
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
self.mapping[inq] = outq
self.mapping[outq] = inq
self.sockets.append(outq)
LOG.info(_("Out reactor registered"))
def consume_in_thread(self): def consume_in_thread(self):
def _consume(sock): def _consume(sock):
LOG.info(_("Consuming socket")) LOG.info(_("Consuming socket"))
@@ -516,8 +499,7 @@ class ZmqProxy(ZmqBaseReactor):
try: try:
self.register(consumption_proxy, self.register(consumption_proxy,
consume_in, consume_in,
zmq.PULL, zmq.PULL)
out_bind=True)
except zmq.ZMQError: except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK): if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
@@ -559,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying) #TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv() data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
if sock in self.mapping:
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
'data': data})
self.mapping[sock].send(data)
return
proxy = self.proxies[sock] proxy = self.proxies[sock]

View File

@@ -23,7 +23,7 @@ import contextlib
import eventlet import eventlet
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
@@ -248,9 +248,7 @@ class DirectBinding(Binding):
that it maps directly to a host, thus direct. that it maps directly to a host, thus direct.
""" """
def test(self, key): def test(self, key):
if '.' in key: return '.' in key
return True
return False
class TopicBinding(Binding): class TopicBinding(Binding):
@@ -262,17 +260,13 @@ class TopicBinding(Binding):
matches that of a direct exchange. matches that of a direct exchange.
""" """
def test(self, key): def test(self, key):
if '.' not in key: return '.' not in key
return True
return False
class FanoutBinding(Binding): class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string.""" """Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key): def test(self, key):
if key.startswith('fanout~'): return key.startswith('fanout~')
return True
return False
class StubExchange(Exchange): class StubExchange(Exchange):

View File

@@ -23,7 +23,7 @@ import json
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common.rpc import matchmaker as mm from heat.openstack.common.rpc import matchmaker as mm
@@ -63,9 +63,7 @@ class RingExchange(mm.Exchange):
self.ring0[k] = itertools.cycle(self.ring[k]) self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key): def _ring_has(self, key):
if key in self.ring0: return key in self.ring0
return True
return False
class RoundRobinRingExchange(RingExchange): class RoundRobinRingExchange(RingExchange):

View File

@@ -69,7 +69,7 @@ class RpcProxy(object):
v = vers if vers else self.default_version v = vers if vers else self.default_version
if (self.version_cap and not if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)): rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap) raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
msg['version'] = v msg['version'] = v
def _get_topic(self, topic): def _get_topic(self, topic):

View File

@@ -17,7 +17,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common import rpc from heat.openstack.common import rpc
from heat.openstack.common.rpc import dispatcher as rpc_dispatcher from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
@@ -32,10 +32,11 @@ class Service(service.Service):
A service enables rpc by listening to queues based on topic and host. A service enables rpc by listening to queues based on topic and host.
""" """
def __init__(self, host, topic, manager=None): def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__() super(Service, self).__init__()
self.host = host self.host = host
self.topic = topic self.topic = topic
self.serializer = serializer
if manager is None: if manager is None:
self.manager = self self.manager = self
else: else:
@@ -48,7 +49,8 @@ class Service(service.Service):
LOG.debug(_("Creating Consumer connection for Service %s") % LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic) self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
self.serializer)
# Share this same connection for these Consumers # Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False) self.conn.create_consumer(self.topic, dispatcher, fanout=False)

View File

@@ -27,11 +27,12 @@ import sys
import time import time
import eventlet import eventlet
from eventlet import event
import logging as std_logging import logging as std_logging
from oslo.config import cfg from oslo.config import cfg
from heat.openstack.common import eventlet_backdoor from heat.openstack.common import eventlet_backdoor
from heat.openstack.common.gettextutils import _ from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils from heat.openstack.common import importutils
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common import threadgroup from heat.openstack.common import threadgroup
@@ -51,20 +52,9 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services = threadgroup.ThreadGroup() self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled() self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
"""Start and wait for a service to finish.
:param service: service to run and wait for.
:returns: None
"""
service.start()
service.wait()
def launch_service(self, service): def launch_service(self, service):
"""Load and start the given service. """Load and start the given service.
@@ -73,7 +63,7 @@ class Launcher(object):
""" """
service.backdoor_port = self.backdoor_port service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service) self.services.add(service)
def stop(self): def stop(self):
"""Stop all services which are currently running. """Stop all services which are currently running.
@@ -81,7 +71,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services.stop() self.services.stop()
def wait(self): def wait(self):
"""Waits until all services have been stopped, and then returns. """Waits until all services have been stopped, and then returns.
@@ -89,7 +79,16 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services.wait() self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit): class SignalExit(SystemExit):
@@ -103,31 +102,51 @@ class ServiceLauncher(Launcher):
# Allow the process to be killed again and die from natural causes # Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signo) raise SignalExit(signo)
def wait(self): def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _wait_for_exit_or_signal(self):
status = None
signo = 0
LOG.debug(_('Full set of CONF:')) LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG) CONF.log_opt_values(LOG, std_logging.DEBUG)
status = None
try: try:
super(ServiceLauncher, self).wait() super(ServiceLauncher, self).wait()
except SignalExit as exc: except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM', signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo] signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame) LOG.info(_('Caught %s, exiting'), signame)
status = exc.code status = exc.code
signo = exc.signo
except SystemExit as exc: except SystemExit as exc:
status = exc.code status = exc.code
finally: finally:
if rpc:
rpc.cleanup()
self.stop() self.stop()
return status if rpc:
try:
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status, signo
def wait(self):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal()
if signo != signal.SIGHUP:
return status
self.restart()
class ServiceWrapper(object): class ServiceWrapper(object):
@@ -145,9 +164,12 @@ class ProcessLauncher(object):
self.running = True self.running = True
rfd, self.writepipe = os.pipe() rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
def _handle_signal(self, signo, frame): def _handle_signal(self, signo, frame):
self.sigcaught = signo self.sigcaught = signo
@@ -156,6 +178,7 @@ class ProcessLauncher(object):
# Allow the process to be killed again and die from natural causes # Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
def _pipe_watcher(self): def _pipe_watcher(self):
# This will block until the write end is closed when the parent # This will block until the write end is closed when the parent
@@ -166,16 +189,47 @@ class ProcessLauncher(object):
sys.exit(1) sys.exit(1)
def _child_process(self, service): def _child_process_handle_signal(self):
# Setup child signal handlers differently # Setup child signal handlers differently
def _sigterm(*args): def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM) raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm) signal.signal(signal.SIGTERM, _sigterm)
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM # Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = None
signo = 0
try:
launcher.wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll # Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad # fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub() eventlet.hubs.use_hub()
@@ -189,7 +243,8 @@ class ProcessLauncher(object):
random.seed() random.seed()
launcher = Launcher() launcher = Launcher()
launcher.run_service(service) launcher.launch_service(service)
return launcher
def _start_child(self, wrap): def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers: if len(wrap.forktimes) > wrap.workers:
@@ -210,21 +265,13 @@ class ProcessLauncher(object):
# NOTE(johannes): All exceptions are caught to ensure this # NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would # doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children. # be bad for a child to spawn more children.
status = 0 launcher = self._child_process(wrap.service)
try: while True:
self._child_process(wrap.service) self._child_process_handle_signal()
except SignalExit as exc: status, signo = self._child_wait_for_exit_or_signal(launcher)
signame = {signal.SIGTERM: 'SIGTERM', if signo != signal.SIGHUP:
signal.SIGINT: 'SIGINT'}[exc.signo] break
LOG.info(_('Caught %s, exiting'), signame) launcher.restart()
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.service.stop()
os._exit(status) os._exit(status)
@@ -270,12 +317,7 @@ class ProcessLauncher(object):
wrap.children.remove(pid) wrap.children.remove(pid)
return wrap return wrap
def wait(self): def _respawn_children(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while self.running: while self.running:
wrap = self._wait_child() wrap = self._wait_child()
if not wrap: if not wrap:
@@ -284,14 +326,30 @@ class ProcessLauncher(object):
# (see bug #1095346) # (see bug #1095346)
eventlet.greenthread.sleep(.01) eventlet.greenthread.sleep(.01)
continue continue
while self.running and len(wrap.children) < wrap.workers: while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap) self._start_child(wrap)
if self.sigcaught: def wait(self):
signame = {signal.SIGTERM: 'SIGTERM', """Loop waiting on children to die and respawning as necessary."""
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame) LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
if self.sigcaught != signal.SIGHUP:
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children: for pid in self.children:
try: try:
@@ -313,15 +371,74 @@ class Service(object):
def __init__(self, threads=1000): def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads) self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self): def start(self):
pass pass
def stop(self): def stop(self):
self.tg.stop() self.tg.stop()
self.tg.wait()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
def wait(self):
self._done.wait()
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
def stop(self):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
def wait(self): def wait(self):
self.tg.wait() self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
service.start()
done.wait()
def launch(service, workers=None): def launch(service, workers=None):
if workers: if workers:

View File

@@ -0,0 +1,100 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import ssl
from oslo.config import cfg
from heat.openstack.common.gettextutils import _ # noqa
ssl_opts = [
cfg.StrOpt('ca_file',
default=None,
help="CA certificate file to use to verify "
"connecting clients"),
cfg.StrOpt('cert_file',
default=None,
help="Certificate file to use when starting "
"the server securely"),
cfg.StrOpt('key_file',
default=None,
help="Private key file to use when starting "
"the server securely"),
]
CONF = cfg.CONF
CONF.register_opts(ssl_opts, "ssl")
def is_enabled():
cert_file = CONF.ssl.cert_file
key_file = CONF.ssl.key_file
ca_file = CONF.ssl.ca_file
use_ssl = cert_file or key_file
if cert_file and not os.path.exists(cert_file):
raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
if ca_file and not os.path.exists(ca_file):
raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
if key_file and not os.path.exists(key_file):
raise RuntimeError(_("Unable to find key_file : %s") % key_file)
if use_ssl and (not cert_file or not key_file):
raise RuntimeError(_("When running server in SSL mode, you must "
"specify both a cert_file and key_file "
"option value in your configuration file"))
return use_ssl
def wrap(sock):
ssl_kwargs = {
'server_side': True,
'certfile': CONF.ssl.cert_file,
'keyfile': CONF.ssl.key_file,
'cert_reqs': ssl.CERT_NONE,
}
if CONF.ssl.ca_file:
ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs)
_SSL_PROTOCOLS = {
"tlsv1": ssl.PROTOCOL_TLSv1,
"sslv23": ssl.PROTOCOL_SSLv23,
"sslv3": ssl.PROTOCOL_SSLv3
}
try:
_SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
except AttributeError:
pass
def validate_ssl_version(version):
key = version.lower()
try:
return _SSL_PROTOCOLS[key]
except KeyError:
raise RuntimeError(_("Invalid SSL version : %s") % version)

View File

@@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from eventlet import greenlet import eventlet
from eventlet import greenpool from eventlet import greenpool
from eventlet import greenthread from eventlet import greenthread
@@ -105,7 +105,7 @@ class ThreadGroup(object):
for x in self.timers: for x in self.timers:
try: try:
x.wait() x.wait()
except greenlet.GreenletExit: except eventlet.greenlet.GreenletExit:
pass pass
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)
@@ -115,7 +115,7 @@ class ThreadGroup(object):
continue continue
try: try:
x.wait() x.wait()
except greenlet.GreenletExit: except eventlet.greenlet.GreenletExit:
pass pass
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)

View File

@@ -49,9 +49,9 @@ def parse_isotime(timestr):
try: try:
return iso8601.parse_date(timestr) return iso8601.parse_date(timestr)
except iso8601.ParseError as e: except iso8601.ParseError as e:
raise ValueError(e.message) raise ValueError(unicode(e))
except TypeError as e: except TypeError as e:
raise ValueError(e.message) raise ValueError(unicode(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT): def strtime(at=None, fmt=PERFECT_TIME_FORMAT):

69
tools/config/generate_sample.sh Executable file
View File

@@ -0,0 +1,69 @@
#!/usr/bin/env bash
print_hint() {
echo "Try \`${0##*/} --help' for more information." >&2
}
PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:o: \
--long help,base-dir:,package-name:,output-dir: -- "$@")
if [ $? != 0 ] ; then print_hint ; exit 1 ; fi
eval set -- "$PARSED_OPTIONS"
while true; do
case "$1" in
-h|--help)
echo "${0##*/} [options]"
echo ""
echo "options:"
echo "-h, --help show brief help"
echo "-b, --base-dir=DIR Project base directory (required)"
echo "-p, --package-name=NAME Project package name"
echo "-o, --output-dir=DIR File output directory"
exit 0
;;
-b|--base-dir)
shift
BASEDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
-p|--package-name)
shift
PACKAGENAME=`echo $1`
shift
;;
-o|--output-dir)
shift
OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'`
shift
;;
--)
break
;;
esac
done
if [ -z $BASEDIR ] || ! [ -d $BASEDIR ]
then
echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1
fi
PACKAGENAME=${PACKAGENAME:-${BASEDIR##*/}}
OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc}
if ! [ -d $OUTPUTDIR ]
then
echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2
exit 1
fi
BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'`
FILES=$(find $BASEDIR/$PACKAGENAME -type f -name "*.py" ! -path "*/tests/*" \
-exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u)
export EVENTLET_NO_GREENDNS=yes
MODULEPATH=heat.openstack.common.config.generator
OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample
python -m $MODULEPATH $FILES > $OUTPUTFILE

View File

@@ -17,7 +17,7 @@
import os import os
import sys import sys
import install_venv_common as install_venv import install_venv_common as install_venv # noqa
def first_file(file_list): def first_file(file_list):