Sync with oslo stable/grizzly c7862b5239822d701b7fb155faa4607eff602627

Change-Id: Ib7f5dd1404bdac1969e947cc310573052760f3f4
This commit is contained in:
Kiall Mac Innes 2013-06-30 23:37:19 +01:00
parent f2513a09b8
commit f3ce2698dc
84 changed files with 1409 additions and 629 deletions

View File

@ -16,7 +16,7 @@
# under the License.
import sys
import eventlet
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import service
from designate import utils

View File

@ -16,7 +16,7 @@
# under the License.
import sys
import eventlet
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import service
from designate import utils

View File

@ -16,7 +16,7 @@
# under the License.
import sys
import eventlet
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import service
from designate import utils

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -20,14 +20,17 @@
Filters which commands a service is allowed to run as another user.
To use this with designate, you should set the following in designate.conf:
To use this with designate, you should set the following in
designate.conf:
rootwrap_config=/etc/designate/rootwrap.conf
You also need to let the designate user run designate-rootwrap as root in sudoers:
designate ALL = (root) NOPASSWD: /usr/bin/designate-rootwrap /etc/designate/rootwrap.conf *
You also need to let the designate user run designate-rootwrap
as root in sudoers:
designate ALL = (root) NOPASSWD: /usr/bin/designate-rootwrap
/etc/designate/rootwrap.conf *
Service packaging should deploy .filters files only on nodes where they are
needed, to avoid allowing more than is necessary.
Service packaging should deploy .filters files only on nodes where
they are needed, to avoid allowing more than is necessary.
"""
import ConfigParser
@ -102,8 +105,8 @@ if __name__ == '__main__':
exec_dirs=config.exec_dirs)
if config.use_syslog:
logging.info("(%s > %s) Executing %s (filter match = %s)" % (
os.getlogin(), pwd.getpwuid(os.getuid())[0],
command, filtermatch.name))
os.getlogin(), pwd.getpwuid(os.getuid())[0],
command, filtermatch.name))
obj = subprocess.Popen(command,
stdin=sys.stdin,

53
bin/designate-rpc-zmq-receiver Executable file
View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import contextlib
import os
import sys
# If ../designate/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'designate', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import rpc
from designate.openstack.common.rpc import impl_zmq
CONF = cfg.CONF
CONF.register_opts(rpc.rpc_opts)
CONF.register_opts(impl_zmq.zmq_opts)
def main():
CONF(sys.argv[1:], project='designate')
logging.setup("designate")
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
reactor.consume_in_thread()
reactor.wait()
if __name__ == '__main__':
main()

View File

@ -16,7 +16,7 @@
# under the License.
import sys
import eventlet
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import service
from designate import utils

View File

@ -15,7 +15,7 @@
# under the License.
import os
import socket
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import rpc
cfg.CONF.register_opts([

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
cfg.CONF.register_group(cfg.OptGroup(
name='service:agent', title="Configuration for Agent Service"

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common.rpc import proxy as rpc_proxy

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common.rpc import service as rpc_service
from designate import backend
@ -40,6 +40,10 @@ class Service(rpc_service.Service):
self.manager.start()
super(Service, self).start()
def wait(self):
super(Service, self).wait()
self.conn.consumer_thread.wait()
def stop(self):
super(Service, self).stop()
self.manager.stop()

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import flask
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import jsonutils as json
cfg.CONF.register_group(cfg.OptGroup(

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import flask
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import local
from designate.openstack.common import log as logging
from designate.openstack.common import uuidutils
@ -76,7 +76,7 @@ class KeystoneContextMiddleware(wsgi.Middleware):
roles = headers.get('X-Roles').split(',')
context = DesignateContext(auth_tok=headers.get('X-Auth-Token'),
context = DesignateContext(auth_token=headers.get('X-Auth-Token'),
user=headers.get('X-User-ID'),
tenant=headers.get('X-Tenant-ID'),
roles=roles)

View File

@ -16,7 +16,7 @@
from paste import deploy
from designate.openstack.common import log as logging
from designate.openstack.common import wsgi
from designate.openstack.common import cfg
from oslo.config import cfg
from designate import exceptions
from designate import utils
from designate import policy

View File

@ -21,7 +21,7 @@ from werkzeug import exceptions as wexceptions
from werkzeug import wrappers
from werkzeug.routing import BaseConverter
from werkzeug.routing import ValidationError
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import jsonutils as json
from designate.openstack.common import log as logging
from designate.openstack.common import uuidutils

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import utils
from designate.backend import base

View File

@ -16,7 +16,7 @@
import os
import glob
import shutil
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import utils
from designate.backend import base

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import utils
from designate.backend import base

View File

@ -19,7 +19,7 @@ import base64
from sqlalchemy.sql import select
from sqlalchemy.sql.expression import null
from sqlalchemy.orm import exc as sqlalchemy_exceptions
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import exceptions
from designate.backend import base

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
# NOTE(kiall): See http://data.iana.org/TLD/tlds-alpha-by-domain.txt
# Version 2013031800.

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common.rpc import proxy as rpc_proxy

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common.rpc import service as rpc_service
from designate import exceptions
@ -56,6 +56,10 @@ class Service(rpc_service.Service):
super(Service, self).start()
def wait(self):
super(Service, self).wait()
self.conn.consumer_thread.wait()
def stop(self):
super(Service, self).stop()

View File

@ -22,11 +22,11 @@ LOG = logging.getLogger(__name__)
class DesignateContext(context.RequestContext):
def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False,
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None,
roles=[]):
super(DesignateContext, self).__init__(
auth_tok=auth_tok,
auth_token=auth_token,
user=user,
tenant=tenant,
is_admin=is_admin,

View File

@ -18,7 +18,7 @@ from migrate.exceptions import (DatabaseAlreadyControlledError,
DatabaseNotControlledError)
from migrate.versioning import api as versioning_api
from designate.openstack.common import log as logging
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.manage import base
LOG = logging.getLogger(__name__)

View File

@ -17,7 +17,7 @@ import os
from migrate.exceptions import DatabaseAlreadyControlledError
from migrate.versioning import api as versioning_api
from designate.openstack.common import log as logging
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.manage import base
LOG = logging.getLogger(__name__)

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.central import rpcapi as central_rpcapi
from designate.context import DesignateContext

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.notification_handler.base import BaseAddressHandler

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.notification_handler.base import BaseAddressHandler

View File

@ -220,7 +220,7 @@ log files::
This module also contains a global instance of the ConfigOpts class
in order to support a common usage pattern in OpenStack::
from designate.openstack.common import cfg
from oslo.config import cfg
opts = [
cfg.StrOpt('bind_host', default='0.0.0.0'),

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -37,9 +37,9 @@ class RequestContext(object):
accesses the system, as well as additional request information.
"""
def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False,
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None):
self.auth_tok = auth_tok
self.auth_token = auth_token
self.user = user
self.tenant = tenant
self.is_admin = is_admin
@ -55,7 +55,7 @@ class RequestContext(object):
'is_admin': self.is_admin,
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'auth_token': self.auth_tok,
'auth_token': self.auth_token,
'request_id': self.request_id}

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Openstack, LLC.
# Copyright (c) 2012 OpenStack Foundation.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
@ -24,8 +24,7 @@ import traceback
import eventlet
import eventlet.backdoor
import greenlet
from designate.openstack.common import cfg
from oslo.config import cfg
eventlet_backdoor_opts = [
cfg.IntOpt('backdoor_port',
@ -52,12 +51,20 @@ def _print_greenthreads():
print
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print threadId
traceback.print_stack(stack)
print
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
'quit': _dont_use_this, # So we don't exit the entire process
'fo': _find_objects,
'pgt': _print_greenthreads,
'pnt': _print_nativethreads,
}
if CONF.backdoor_port is None:

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -23,6 +23,8 @@ import logging
from designate.openstack.common.gettextutils import _
_FATAL_EXCEPTION_FORMAT_ERRORS = False
class Error(Exception):
def __init__(self, message=None):
@ -121,9 +123,12 @@ class OpenstackException(Exception):
try:
self._error_string = self.message % kwargs
except Exception:
# at least get the core message out if something happened
self._error_string = self.message
except Exception as e:
if _FATAL_EXCEPTION_FORMAT_ERRORS:
raise e
else:
# at least get the core message out if something happened
self._error_string = self.message
def __str__(self):
return self._error_string

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -34,6 +34,7 @@ This module provides a few things:
import datetime
import functools
import inspect
import itertools
import json
@ -42,7 +43,8 @@ import xmlrpclib
from designate.openstack.common import timeutils
def to_primitive(value, convert_instances=False, level=0):
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
"""Convert a complex object into primitives.
Handy for JSON serialization. We can optionally handle instances,
@ -78,12 +80,17 @@ def to_primitive(value, convert_instances=False, level=0):
if getattr(value, '__module__', None) == 'mox':
return 'mock'
if level > 3:
if level > max_depth:
return '?'
# The try block may not be necessary after the class check above,
# but just in case ...
try:
recursive = functools.partial(to_primitive,
convert_instances=convert_instances,
convert_datetime=convert_datetime,
level=level,
max_depth=max_depth)
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
@ -91,33 +98,19 @@ def to_primitive(value, convert_instances=False, level=0):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if isinstance(value, (list, tuple)):
o = []
for v in value:
o.append(to_primitive(v, convert_instances=convert_instances,
level=level))
return o
return [recursive(v) for v in value]
elif isinstance(value, dict):
o = {}
for k, v in value.iteritems():
o[k] = to_primitive(v, convert_instances=convert_instances,
level=level)
return o
elif isinstance(value, datetime.datetime):
return dict((k, recursive(v)) for k, v in value.iteritems())
elif convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()),
convert_instances=convert_instances,
level=level + 1)
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
return to_primitive(list(value),
convert_instances=convert_instances,
level=level)
return recursive(list(value))
elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return to_primitive(value.__dict__,
convert_instances=convert_instances,
level=level + 1)
return recursive(value.__dict__, level=level + 1)
else:
return value
except TypeError:

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -26,6 +26,9 @@ class WeakLocal(corolocal.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
# the weak reference and return the inner value here.
rval = rval()
return rval
@ -34,4 +37,12 @@ class WeakLocal(corolocal.local):
return corolocal.local.__setattr__(self, attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal()
# A "weak" store uses weak references and allows an object to fall out of scope
# when it falls out of scope in the code that uses the thread local storage. A
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -40,7 +40,8 @@ import stat
import sys
import traceback
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common.gettextutils import _
from designate.openstack.common import jsonutils
from designate.openstack.common import local
@ -324,16 +325,11 @@ def _create_logging_excepthook(product_name):
def setup(product_name):
"""Setup logging."""
sys.excepthook = _create_logging_excepthook(product_name)
if CONF.log_config:
try:
logging.config.fileConfig(CONF.log_config)
except Exception:
traceback.print_exc()
raise
logging.config.fileConfig(CONF.log_config)
else:
_setup_logging_from_conf(product_name)
_setup_logging_from_conf()
sys.excepthook = _create_logging_excepthook(product_name)
def set_defaults(logging_context_format_string):
@ -366,8 +362,8 @@ def _find_facility_from_conf():
return facility
def _setup_logging_from_conf(product_name):
log_root = getLogger(product_name).logger
def _setup_logging_from_conf():
log_root = getLogger(None).logger
for handler in log_root.handlers:
log_root.removeHandler(handler)
@ -405,7 +401,8 @@ def _setup_logging_from_conf(product_name):
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
else:
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.debug:
log_root.setLevel(logging.DEBUG)

View File

@ -46,23 +46,12 @@ class LoopingCallDone(Exception):
self.retvalue = retvalue
class LoopingCallBase(object):
class LoopingCall(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
self.done = None
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
"""A fixed interval looping call."""
def start(self, interval, initial_delay=None):
self._running = True
@ -88,7 +77,7 @@ class FixedIntervalLoopingCall(LoopingCallBase):
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in fixed duration looping call'))
LOG.exception(_('in looping call'))
done.send_exception(*sys.exc_info())
return
else:
@ -99,49 +88,8 @@ class FixedIntervalLoopingCall(LoopingCallBase):
greenthread.spawn_n(_inner)
return self.done
def stop(self):
self._running = False
# TODO(mikal): this class name is deprecated in Havana and should be removed
# in the I release
LoopingCall = FixedIntervalLoopingCall
class DynamicLoopingCall(LoopingCallBase):
"""A looping call which sleeps until the next known event.
The function called should return how long to sleep for before being
called again.
"""
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug(_('Dynamic looping call sleeping for %.02f '
'seconds'), idle)
greenthread.sleep(idle)
except LoopingCallDone, e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done
def wait(self):
return self.done.wait()

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -15,7 +15,8 @@
import uuid
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import context
from designate.openstack.common.gettextutils import _
from designate.openstack.common import importutils

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from designate.openstack.common import cfg
from designate.openstack.common import jsonutils
from designate.openstack.common import log as logging

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from designate.openstack.common import cfg
from designate.openstack.common import context as req_context
from designate.openstack.common.gettextutils import _
from designate.openstack.common import log as logging

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -15,7 +15,8 @@
'''messaging based notification driver, with message envelopes'''
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import context as req_context
from designate.openstack.common.gettextutils import _
from designate.openstack.common import log as logging

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -13,72 +13,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import time
from designate.openstack.common.gettextutils import _
from designate.openstack.common import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import timeutils
periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help=('Some periodic tasks can be run in a separate process. '
'Should we run them here?')),
]
CONF = cfg.CONF
CONF.register_opts(periodic_opts)
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 60.0
class InvalidPeriodicTaskArg(Exception):
message = _("Unexpected argument for periodic task creation: %(arg)s.")
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on every cycle
1. Without arguments '@periodic_task', this will be run on every tick
of the periodic scheduler.
2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]])
this will be run on approximately every N seconds. If this number is
negative the periodic task will be disabled. If the run_immediately
argument is provided and has a value of 'True', the first run of the
task will be shortly after task scheduler starts. If
run_immediately is omitted or set to 'False', the first time the
task runs will be approximately N seconds after the task scheduler
starts.
2. With arguments, @periodic_task(ticks_between_runs=N), this will be
run on every N ticks of the periodic scheduler.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
# Control if run at all
f._periodic_task = True
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)
# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_immediate = kwargs.pop('run_immediately', False)
if f._periodic_immediate:
f._periodic_last_run = None
else:
f._periodic_last_run = timeutils.utcnow()
f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
@ -105,7 +59,7 @@ class _PeriodicTasksMeta(type):
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
# class, so, go ahead an initialize it. If the attribute is present,
# class, so, go ahead and initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
@ -114,39 +68,20 @@ class _PeriodicTasksMeta(type):
cls._periodic_tasks = []
try:
cls._periodic_last_run = cls._periodic_last_run.copy()
cls._ticks_to_skip = cls._ticks_to_skip.copy()
except AttributeError:
cls._periodic_last_run = {}
try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
cls._periodic_spacing = {}
cls._ticks_to_skip = {}
# This uses __dict__ instead of
# inspect.getmembers(cls, inspect.ismethod) so only the methods of the
# current class are added when this class is scanned, and base classes
# are not added redundantly.
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
if task._periodic_spacing < 0:
LOG.info(_('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
continue
if not task._periodic_enabled:
LOG.info(_('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
continue
# A periodic spacing of zero indicates that this task should
# be run every pass
if task._periodic_spacing == 0:
task._periodic_spacing = None
cls._periodic_tasks.append((name, task))
cls._periodic_spacing[name] = task._periodic_spacing
cls._periodic_last_run[name] = task._periodic_last_run
cls._ticks_to_skip[name] = task._ticks_between_runs
class PeriodicTasks(object):
@ -154,34 +89,27 @@ class PeriodicTasks(object):
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
now = timeutils.utcnow()
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
" ticks left until next run"),
dict(full_task_name=full_task_name,
ticks_to_skip=ticks_to_skip))
self._ticks_to_skip[task_name] -= 1
continue
# If a periodic task is _nearly_ due, then we'll run it early
if spacing is not None and last_run is not None:
due = last_run + datetime.timedelta(seconds=spacing)
if not timeutils.is_soon(due, 0.2):
idle_for = min(idle_for, timeutils.delta_seconds(now, due))
continue
if spacing is not None:
idle_for = min(idle_for, spacing)
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
self._periodic_last_run[task_name] = timeutils.utcnow()
self._ticks_to_skip[task_name] = task._ticks_between_runs
LOG.debug(_("Running periodic task %(full_task_name)s"),
dict(full_task_name=full_task_name))
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
locals())
time.sleep(0)
return idle_for
LOG.exception(_("Error during %(full_task_name)s:"
" %(e)s"),
dict(e=e, full_task_name=full_task_name))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack, LLC.
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -19,16 +19,14 @@
System-level utilities and helper functions.
"""
import os
import logging
import random
import shlex
import signal
from eventlet.green import subprocess
from eventlet import greenthread
from designate.openstack.common.gettextutils import _
from designate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -42,12 +40,6 @@ class UnknownArgumentError(Exception):
class ProcessExecutionError(Exception):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
self.exit_code = exit_code
self.stderr = stderr
self.stdout = stdout
self.cmd = cmd
self.description = description
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
@ -57,17 +49,6 @@ class ProcessExecutionError(Exception):
super(ProcessExecutionError, self).__init__(message)
class NoRootWrapSpecified(Exception):
def __init__(self, message=None):
super(NoRootWrapSpecified, self).__init__(message)
def _subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not what
# non-Python subprocesses expect.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def execute(*cmd, **kwargs):
"""
Helper method to shell out and execute a command through subprocess with
@ -77,11 +58,11 @@ def execute(*cmd, **kwargs):
:type cmd: string
:param process_input: Send to opened process.
:type proces_input: string
:param check_exit_code: Single bool, int, or list of allowed exit
codes. Defaults to [0]. Raise
:class:`ProcessExecutionError` unless
program exits with one of these code.
:type check_exit_code: boolean, int, or [int]
:param check_exit_code: Defaults to 0. Will raise
:class:`ProcessExecutionError`
if the command exits without returning this value
as a returncode
:type check_exit_code: int
:param delay_on_retry: True | False. Defaults to True. If set to True,
wait a short amount of time before retrying.
:type delay_on_retry: boolean
@ -91,12 +72,8 @@ def execute(*cmd, **kwargs):
the command is prefixed by the command specified
in the root_helper kwarg.
:type run_as_root: boolean
:param root_helper: command to prefix to commands called with
run_as_root=True
:param root_helper: command to prefix all cmd's with
:type root_helper: string
:param shell: whether or not there should be a shell used to
execute this command. Defaults to false.
:type shell: boolean
:returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on
receiving unknown arguments
@ -104,31 +81,16 @@ def execute(*cmd, **kwargs):
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', [0])
ignore_exit_code = False
check_exit_code = kwargs.pop('check_exit_code', 0)
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
shell = kwargs.pop('shell', False)
if isinstance(check_exit_code, bool):
ignore_exit_code = not check_exit_code
check_exit_code = [0]
elif isinstance(check_exit_code, int):
check_exit_code = [check_exit_code]
if len(kwargs):
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root and os.geteuid() != 0:
if not root_helper:
raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root '
'helper.'))
if run_as_root:
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
@ -136,21 +98,11 @@ def execute(*cmd, **kwargs):
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt':
preexec_fn = None
close_fds = False
else:
preexec_fn = _subprocess_setup
close_fds = True
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=close_fds,
preexec_fn=preexec_fn,
shell=shell)
close_fds=True)
result = None
if process_input is not None:
result = obj.communicate(process_input)
@ -160,7 +112,9 @@ def execute(*cmd, **kwargs):
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if not ignore_exit_code and _returncode not in check_exit_code:
if (isinstance(check_exit_code, int) and
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -25,8 +25,17 @@ For some wrappers that add message versioning to rpc, see:
rpc.proxy
"""
from designate.openstack.common import cfg
import inspect
import logging
from oslo.config import cfg
from designate.openstack.common.gettextutils import _
from designate.openstack.common import importutils
from designate.openstack.common import local
LOG = logging.getLogger(__name__)
rpc_opts = [
@ -62,7 +71,8 @@ rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
CONF = cfg.CONF
CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
@ -83,10 +93,27 @@ def create_connection(new=True):
:returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
return _get_impl().create_connection(CONF, new=new)
def call(context, topic, msg, timeout=None):
def _check_for_lock():
if not CONF.debug:
return None
if ((hasattr(local.strong_store, 'locks_held')
and local.strong_store.locks_held)):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
LOG.warn(_('A RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held,
'stack': stack})
return True
return False
def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
@ -100,13 +127,17 @@ def call(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
if check_for_lock:
_check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
@ -124,7 +155,7 @@ def cast(context, topic, msg):
:returns: None
"""
return _get_impl().cast(cfg.CONF, context, topic, msg)
return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
@ -145,10 +176,10 @@ def fanout_cast(context, topic, msg):
:returns: None
"""
return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
return _get_impl().fanout_cast(CONF, context, topic, msg)
def multicall(context, topic, msg, timeout=None):
def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
@ -166,6 +197,8 @@ def multicall(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
@ -175,7 +208,9 @@ def multicall(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
if check_for_lock:
_check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False):
@ -217,7 +252,7 @@ def cast_to_server(context, server_params, topic, msg):
:returns: None
"""
return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg)
@ -233,7 +268,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None
"""
return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg)
@ -263,10 +298,10 @@ def _get_impl():
global _RPCIMPL
if _RPCIMPL is None:
try:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
_RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
impl = cfg.CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
impl = CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL

View File

@ -25,13 +25,19 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
"""
import collections
import inspect
import sys
import uuid
from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
# TODO(pekowsk): Remove import cfg and below comment in Havana.
# This import should no longer be needed when the amqp_rpc_single_reply_queue
# option is removed.
from oslo.config import cfg
from designate.openstack.common import excutils
from designate.openstack.common.gettextutils import _
@ -40,6 +46,17 @@ from designate.openstack.common import log as logging
from designate.openstack.common.rpc import common as rpc_common
# TODO(pekowski): Remove this option in Havana.
amqp_opts = [
cfg.BoolOpt('amqp_rpc_single_reply_queue',
default=False,
help='Enable a fast single reply queue if using AMQP based '
'RPC like RabbitMQ or Qpid.'),
]
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@ -51,6 +68,7 @@ class Pool(pools.Pool):
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
@ -60,6 +78,16 @@ class Pool(pools.Pool):
def empty(self):
while self.free_items:
self.get().close()
# Force a new connection pool to be created.
# Note that this was added due to failing unit test cases. The issue
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
# pool. The unit tests get here via the teatDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
@ -137,12 +165,15 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
exchange_name)
def consume_in_thread(self):
self.connection.consume_in_thread()
def consume_in_thread_group(self, thread_group):
self.connection.consume_in_thread_group(thread_group)
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance"""
if self.connection:
@ -151,8 +182,45 @@ class ConnectionContext(rpc_common.Connection):
raise rpc_common.InvalidRPCConnectionReuse()
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False, log_failure=True):
class ReplyProxy(ConnectionContext):
""" Connection class for RPC replies / callbacks """
def __init__(self, conf, connection_pool):
self._call_waiters = {}
self._num_call_waiters = 0
self._num_call_waiters_wrn_threshhold = 10
self._reply_q = 'reply_' + uuid.uuid4().hex
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
self.declare_direct_consumer(self._reply_q, self._process_data)
self.consume_in_thread()
def _process_data(self, message_data):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
LOG.warn(_('no calling threads waiting for msg_id : %s'
', message : %s') % (msg_id, message_data))
else:
waiter.put(message_data)
def add_call_waiter(self, waiter, msg_id):
self._num_call_waiters += 1
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
LOG.warn(_('Number of call waiters is greater than warning '
'threshhold: %d. There could be a MulticallProxyWaiter '
'leak.') % self._num_call_waiters_wrn_threshhold)
self._num_call_waiters_wrn_threshhold *= 2
self._call_waiters[msg_id] = waiter
def del_call_waiter(self, msg_id):
self._num_call_waiters -= 1
del self._call_waiters[msg_id]
def get_reply_q(self):
return self._reply_q
def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@ -171,13 +239,22 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
_add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
else:
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
@ -185,13 +262,14 @@ class RpcContext(rpc_common.CommonRpcContext):
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending, log_failure)
msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
reply, failure, ending, log_failure)
if ending:
self.msg_id = None
@ -207,6 +285,7 @@ def unpack_context(conf, msg):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
@ -227,15 +306,86 @@ def pack_context(msg, context):
msg.update(context_d)
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
class _MsgIdCache(object):
"""This class checks any duplicate messages."""
def __init__(self, conf, proxy, connection_pool):
self.proxy = proxy
# NOTE: This value is considered can be a configuration item, but
# it is not necessary to change its value in most cases,
# so let this value as static for now.
DUP_MSG_CHECK_SIZE = 16
def __init__(self, **kwargs):
self.prev_msgids = collections.deque([],
maxlen=self.DUP_MSG_CHECK_SIZE)
def check_duplicate_message(self, message_data):
"""AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it.
"""
if UNIQUE_ID in message_data:
msg_id = message_data[UNIQUE_ID]
if msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id)
else:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def _add_unique_id(msg):
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
to handle incoming messages.
"""
def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class CallbackWrapper(_ThreadPoolWithWait):
"""Wraps a straight callback to allow it to be invoked in a green
thread.
"""
def __init__(self, conf, callback, connection_pool):
"""
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
get_connection_pool()
"""
super(CallbackWrapper, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.callback = callback
def __call__(self, message_data):
self.pool.spawn_n(self.callback, message_data)
class ProxyCallback(_ThreadPoolWithWait):
"""Calls methods on a proxy object based on method and args."""
def __init__(self, conf, proxy, connection_pool):
super(ProxyCallback, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.proxy = proxy
self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@ -254,6 +404,7 @@ class ProxyCallback(object):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
@ -292,15 +443,73 @@ class ProxyCallback(object):
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_('Exception during message handling'),
exc_info=exc_info)
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
class MulticallProxyWaiter(object):
def __init__(self, conf, msg_id, timeout, connection_pool):
self._msg_id = msg_id
self._timeout = timeout or conf.rpc_response_timeout
self._reply_proxy = connection_pool.reply_proxy
self._done = False
self._got_ending = False
self._conf = conf
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
self.msg_id_cache = _MsgIdCache()
def put(self, data):
self._dataqueue.put(data)
def done(self):
if self._done:
return
self._done = True
# Remove this caller from reply proxy's call_waiters
self._reply_proxy.del_call_waiter(self._msg_id)
def _process_data(self, data):
result = None
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
failure)
elif data.get('ending', False):
self._got_ending = True
else:
result = data['result']
return result
def __iter__(self):
"""Return a result until we get a reply with an 'ending" flag"""
if self._done:
raise StopIteration
while True:
try:
data = self._dataqueue.get(timeout=self._timeout)
result = self._process_data(data)
except queue.Empty:
self.done()
raise rpc_common.Timeout()
except Exception:
with excutils.save_and_reraise_exception():
self.done()
if self._got_ending:
self.done()
raise StopIteration
if isinstance(result, Exception):
self.done()
raise result
yield result
#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
@ -310,6 +519,7 @@ class MulticallWaiter(object):
self._done = False
self._got_ending = False
self._conf = conf
self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
@ -321,6 +531,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
@ -356,22 +567,41 @@ def create_connection(conf, new, connection_pool):
return ConnectionContext(conf, connection_pool, pooled=not new)
_reply_proxy_create_sem = semaphore.Semaphore()
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
# TODO(pekowski): Remove all these comments in Havana.
# For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
# For amqp_rpc_single_reply_queue = True,
# The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_add_unique_id(msg)
pack_context(msg, context)
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
# TODO(pekowski): Remove this flag and the code under the if clause
# in Havana.
if not conf.amqp_rpc_single_reply_queue:
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
else:
with _reply_proxy_create_sem:
if not connection_pool.reply_proxy:
connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
@ -388,6 +618,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
@ -396,6 +627,7 @@ def cast(conf, context, topic, msg, connection_pool):
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
@ -403,6 +635,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -412,6 +645,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -423,6 +657,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:

View File

@ -21,7 +21,8 @@ import copy
import sys
import traceback
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common.gettextutils import _
from designate.openstack.common import importutils
from designate.openstack.common import jsonutils
@ -48,8 +49,8 @@ deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'designate.version': <RPC Envelope Version as a String>,
'designate.message': <Application Message Payload, JSON encoded>
'oslo.version': <RPC Envelope Version as a String>,
'oslo.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
@ -65,8 +66,8 @@ to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'designate.version'
_MESSAGE_KEY = 'designate.message'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
# TODO(russellb) Turn this on after Grizzly.
@ -124,6 +125,10 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
@ -196,23 +201,30 @@ class Connection(object):
"""
raise NotImplementedError()
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Spawn a thread that will be responsible for handling all incoming
messages for consumers that were set up on this connection.
Exactly one member of a given pool will receive each message.
Message dispatching inside of this is expected to be implemented in a
non-blocking manner. An example implementation would be having this
thread pull messages in for all of the consumers, but utilize a thread
pool for dispatching the messages to the proxy objects.
A message will be delivered to multiple pools, if more than
one is created.
:param callback: Callable to be invoked for each message.
:type callback: callable accepting one argument
:param pool_name: The name of the consumer pool.
:type pool_name: str
:param topic: The routing topic for desired messages.
:type topic: str
:param exchange_name: The name of the message exchange where
the client should attach. Defaults to
the configured exchange.
:type exchange_name: str
"""
raise NotImplementedError()
def consume_in_thread_group(self, thread_group):
"""
Spawn a thread to handle incoming messages in the supplied
ThreadGroup.
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
Spawn a thread that will be responsible for handling all incoming
messages for consumers that were set up on this connection.
@ -304,7 +316,7 @@ def deserialize_remote_exception(conf, data):
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution.
if not module in conf.allowed_rpc_exception_modules:
if module not in conf.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace)
try:
@ -313,7 +325,7 @@ def deserialize_remote_exception(conf, data):
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
failure = klass(**failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -119,9 +119,6 @@ class Connection(object):
def consume_in_thread(self):
pass
def consume_in_thread_group(self, thread_group):
pass
def create_connection(conf, new=True):
"""Create a connection"""

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -28,8 +28,8 @@ import kombu
import kombu.connection
import kombu.entity
import kombu.messaging
from oslo.config import cfg
from designate.openstack.common import cfg
from designate.openstack.common.gettextutils import _
from designate.openstack.common import network_utils
from designate.openstack.common.rpc import amqp as rpc_amqp
@ -66,7 +66,8 @@ kombu_opts = [
help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
help='the RabbitMQ password'),
help='the RabbitMQ password',
secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
@ -164,9 +165,10 @@ class ConsumerBase(object):
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options)
@ -196,6 +198,7 @@ class DirectConsumer(ConsumerBase):
"""
# Default options
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': False}
options.update(kwargs)
@ -621,8 +624,8 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
LOG.debug(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
@ -718,25 +721,17 @@ class Connection(object):
except StopIteration:
return
def _consumer_thread_callback(self):
""" Consumer thread callback used by consume_in_* """
try:
self.consume()
except greenlet.GreenletExit:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(
self._consumer_thread_callback)
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
thread_group.add_thread(self._consumer_thread_callback)
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(
@ -757,6 +752,30 @@ class Connection(object):
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
"""
callback_wrapper = rpc_amqp.CallbackWrapper(
conf=self.conf,
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
)
self.proxy_callbacks.append(callback_wrapper)
self.declare_topic_consumer(
queue_name=pool_name,
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
)
def create_connection(conf, new=True):
"""Create a connection"""

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -22,8 +22,8 @@ import uuid
import eventlet
import greenlet
from oslo.config import cfg
from designate.openstack.common import cfg
from designate.openstack.common.gettextutils import _
from designate.openstack.common import importutils
from designate.openstack.common import jsonutils
@ -40,8 +40,8 @@ qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
cfg.StrOpt('qpid_port',
default='5672',
cfg.IntOpt('qpid_port',
default=5672,
help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
@ -51,7 +51,8 @@ qpid_opts = [
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
help='Password for qpid connection'),
help='Password for qpid connection',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
@ -68,6 +69,8 @@ qpid_opts = [
cfg.CONF.register_opts(qpid_opts)
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
class ConsumerBase(object):
"""Consumer base class."""
@ -122,10 +125,27 @@ class ConsumerBase(object):
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
def _unpack_json_msg(self, msg):
"""Load the JSON data in msg if msg.content_type indicates that it
is necessary. Put the loaded data back into msg.content and
update msg.content_type appropriately.
A Qpid Message containing a dict will have a content_type of
'amqp/map', whereas one containing a string that needs to be converted
back from JSON will have a content_type of JSON_CONTENT_TYPE.
:param msg: a Qpid Message object
:returns: None
"""
if msg.content_type == JSON_CONTENT_TYPE:
msg.content = jsonutils.loads(msg.content)
msg.content_type = 'amqp/map'
def consume(self):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
@ -330,15 +350,16 @@ class Connection(object):
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues"""
if self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.ConnectionError:
pass
attempt = 0
delay = 1
while True:
# Close the session if necessary
if self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.ConnectionError:
pass
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
@ -414,8 +435,8 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
LOG.debug(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
@ -509,13 +530,6 @@ class Connection(object):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg)
def _consumer_thread_callback(self):
""" Consumer thread callback used by consume_in_* """
try:
self.consume()
except greenlet.GreenletExit:
return
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
@ -527,16 +541,15 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(
self._consumer_thread_callback)
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
thread_group.add_thread(self._consumer_thread_callback)
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(
@ -567,6 +580,34 @@ class Connection(object):
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
"""
callback_wrapper = rpc_amqp.CallbackWrapper(
conf=self.conf,
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
)
self.proxy_callbacks.append(callback_wrapper)
consumer = TopicConsumer(conf=self.conf,
session=self.session,
topic=topic,
callback=callback_wrapper,
name=pool_name,
exchange_name=exchange_name)
self._register_consumer(consumer)
return consumer
def create_connection(conf, new=True):
"""Create a connection"""

View File

@ -16,16 +16,17 @@
import os
import pprint
import re
import socket
import string
import sys
import types
import uuid
import eventlet
import greenlet
from oslo.config import cfg
from designate.openstack.common import cfg
from designate.openstack.common import excutils
from designate.openstack.common.gettextutils import _
from designate.openstack.common import importutils
from designate.openstack.common import jsonutils
@ -90,10 +91,10 @@ def _serialize(data):
Error if a developer passes us bad data.
"""
try:
return str(jsonutils.dumps(data, ensure_ascii=True))
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
with excutils.save_and_reraise_exception():
LOG.error(_("JSON serialization failed."))
def _deserialize(data):
@ -217,11 +218,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@ -295,13 +303,13 @@ class InternalContext(object):
ctx.replies)
LOG.debug(_("Sending reply"))
cast(CONF, ctx, topic, {
_multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
'msg_id': msg_id,
'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
})
}, _msg_id=msg_id)
class ConsumerBase(object):
@ -320,22 +328,23 @@ class ConsumerBase(object):
else:
return [result]
def process(self, style, target, proxy, ctx, data):
def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
# Method starting with - are
# processed internally. (non-valid method name)
method = data['method']
method = data.get('method')
if not method:
LOG.error(_("RPC message did not include method."))
return
# Internal method
# uses internal context for safety.
if data['method'][0] == '-':
# For reply / process_reply
method = method[1:]
if method == 'reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
if method == '-reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return
data.setdefault('version', None)
data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
@ -391,24 +400,17 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("Out reactor registered"))
def _consumer_thread_callback(self, sock):
""" Consumer thread callback used by consume_in_* """
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
for k in self.proxies.keys():
self.threads.append(
self.pool.spawn(self._consumer_thread_callback, k)
self.pool.spawn(_consume, k)
)
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
for k in self.proxies.keys():
thread_group.add_thread(self._consumer_thread_callback, k)
def wait(self):
for t in self.threads:
t.wait()
@ -430,6 +432,8 @@ class ZmqProxy(ZmqBaseReactor):
def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
self.topic_proxy = {}
@ -438,21 +442,15 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
topic = topic.split('.', 1)[0]
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
@ -461,6 +459,13 @@ class ZmqProxy(ZmqBaseReactor):
LOG.info(_("Creating proxy for topic: %s"), topic)
try:
# The topic is received over the network,
# don't trust this input.
if self.badchars.search(topic) is not None:
emsg = _("Topic contained dangerous characters.")
LOG.warn(emsg)
raise RPCException(emsg)
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
@ -517,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor):
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
try:
self.register(consumption_proxy,
@ -527,13 +532,28 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
super(ZmqProxy, self).consume_in_thread()
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
try:
while True:
k = i.next()
h[k] = i.next()
except StopIteration:
return h
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@ -554,38 +574,53 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
msg_id, topic, style, in_msg = data
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
self.pool.spawn_n(self.process, style, topic,
proxy, ctx, request)
if data[2] == 'cast': # Legacy protocol
packenv = data[3]
ctx, msg = _deserialize(packenv)
request = rpc_common.deserialize_msg(msg)
ctx = RpcContext.unmarshal(ctx)
elif data[2] == 'impl_zmq_v2':
packenv = data[4:]
msg = unflatten_envelope(packenv)
request = rpc_common.deserialize_msg(msg)
# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
return
self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
# Register with matchmaker.
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
# Subscription scenarios
if fanout:
subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
subscribe = ('', fanout)[type(fanout) == str]
topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@ -596,22 +631,26 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
self.topics.append(topic)
def close(self):
_get_matchmaker().stop_heartbeat()
for topic in self.topics:
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
self.reactor.close()
self.topics = []
def wait(self):
self.reactor.wait()
def consume_in_thread(self):
_get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
def consume_in_thread_group(self, thread_group):
self.reactor.consume_in_thread_group(thread_group)
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@ -620,7 +659,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(msg_id, topic, payload, serialize, force_envelope)
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@ -628,8 +667,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None,
serialize=True, force_envelope=False):
def _call(addr, context, topic, msg, timeout=None,
envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -659,23 +698,36 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
"ipc://%s/zmq_topic_zmq_replies.%s" %
(CONF.rpc_zmq_ipc_dir,
CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload,
serialize=serialize, force_envelope=force_envelope)
_cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1])
if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
elif msg[2] == 'impl_zmq_v2':
rpc_envelope = unflatten_envelope(msg[4:])
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
else:
raise rpc_common.UnsupportedRpcEnvelopeVersion(
_("Unsupported or unknown ZMQ envelope returned."))
responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
except (IndexError, KeyError):
raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
@ -691,8 +743,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@ -709,7 +761,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
raise rpc_common.Timeout, "No match from matchmaker."
raise rpc_common.Timeout(_("No match from matchmaker."))
# This supports brokerless fanout (addresses > 1)
for queue in queues:
@ -718,11 +770,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout, serialize,
force_envelope)
_topic, msg, timeout, envelope,
_msg_id)
return
return method(_addr, context, _topic, _topic, msg, timeout,
serialize, force_envelope)
return method(_addr, context, _topic, msg, timeout,
envelope)
def create_connection(conf, new=True):
@ -752,7 +804,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, **kwargs):
def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
Notifications are sent to topic-priority.
@ -760,10 +812,8 @@ def notify(conf, context, topic, msg, **kwargs):
"""
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)
topic = topic.replace('.', '-')
cast(conf, context, topic, msg, envelope=envelope)
def cleanup():
@ -787,21 +837,9 @@ def _get_ctxt():
return ZMQ_CTX
def _get_matchmaker():
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
# Only initialize a class.
if mm_path[-1][0] not in string.ascii_uppercase:
LOG.error(_("Matchmaker could not be loaded.\n"
"rpc_zmq_matchmaker is not a class."))
raise RPCException(_("Error loading Matchmaker."))
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
matchmaker = importutils.import_object(
CONF.rpc_zmq_matchmaker, *args, **kwargs)
return matchmaker

View File

@ -22,7 +22,9 @@ import contextlib
import itertools
import json
from designate.openstack.common import cfg
import eventlet
from oslo.config import cfg
from designate.openstack.common.gettextutils import _
from designate.openstack.common import log as logging
@ -32,6 +34,12 @@ matchmaker_opts = [
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
default=600,
help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
@ -69,12 +77,73 @@ class Binding(object):
class MatchMakerBase(object):
"""Match Maker Base Class."""
"""
Match Maker Base Class.
Build off HeartbeatMatchMakerBase if building a
heartbeat-capable MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
self.no_heartbeat_msg = _('Matchmaker does not implement '
'registration or heartbeat.')
def register(self, key, host):
"""
Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
"""
Acknowledge that a key.host is alive.
Used internally for updating heartbeats,
but may also be used publically to acknowledge
a system is alive (i.e. rpc message successfully
sent to host)
"""
pass
def is_alive(self, topic, host):
"""
Checks if a host is alive.
"""
pass
def expire(self, topic, host):
"""
Explicitly expire a host's registration.
"""
pass
def send_heartbeats(self):
"""
Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
"""
Unregister a topic.
"""
pass
def start_heartbeat(self):
"""
Spawn heartbeat greenthread.
"""
pass
def stop_heartbeat(self):
"""
Destroys the heartbeat greenthread.
"""
pass
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
@ -98,6 +167,103 @@ class MatchMakerBase(object):
return workers
class HeartbeatMatchMakerBase(MatchMakerBase):
"""
Base for a heart-beat capable MatchMaker.
Provides common methods for registering,
unregistering, and maintaining heartbeats.
"""
def __init__(self):
self.hosts = set()
self._heart = None
self.host_topic = {}
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
"""
Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
for key, host in self.host_topic:
self.ack_alive(key, host)
def ack_alive(self, key, host):
"""
Acknowledge that a host.topic is alive.
Used internally for updating heartbeats,
but may also be used publically to acknowledge
a system is alive (i.e. rpc message successfully
sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
"""
Implements registration logic.
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
"""
Implements de-registration logic.
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
"""
Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.host_topic[(key, host)] = host
key_host = '.'.join((key, host))
self.backend_register(key, key_host)
self.ack_alive(key, host)
def unregister(self, key, host):
"""
Unregister a topic.
"""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
def start_heartbeat(self):
"""
Implementation of MatchMakerBase.start_heartbeat
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if len(self.hosts) == 0:
raise MatchMakerException(
_("Register before starting heartbeat."))
def do_heartbeat():
while True:
self.send_heartbeats()
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
"""
Destroys the heartbeat greenthread.
"""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character
@ -201,24 +367,25 @@ class FanoutRingExchange(RingExchange):
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self):
def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__()
def run(self, key):
return [(key.split('.')[0] + '.localhost', 'localhost')]
return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
b, e = key.split('.', 1)
return [(b, e)]
e = key.split('.', 1)[1]
return [(key, e)]
class MatchMakerRing(MatchMakerBase):
@ -237,11 +404,11 @@ class MatchMakerLocalhost(MatchMakerBase):
Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
def __init__(self):
def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):

View File

@ -0,0 +1,149 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
from oslo.config import cfg
from designate.openstack.common import importutils
from designate.openstack.common import log as logging
from designate.openstack.common.rpc import matchmaker as mm_common
redis = importutils.try_import('redis')
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default=None,
help='Password for Redis server. (optional)'),
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='matchmaker_redis',
title='Options for Redis-based MatchMaker')
CONF.register_group(opt_group)
CONF.register_opts(matchmaker_redis_opts, opt_group)
LOG = logging.getLogger(__name__)
class RedisExchange(mm_common.Exchange):
def __init__(self, matchmaker):
self.matchmaker = matchmaker
self.redis = matchmaker.redis
super(RedisExchange, self).__init__()
class RedisTopicExchange(RedisExchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
while True:
member_name = self.redis.srandmember(topic)
if not member_name:
# If this happens, there are no
# longer any members.
break
if not self.matchmaker.is_alive(topic, member_name):
continue
host = member_name.split('.', 1)[1]
return [(member_name, host)]
return []
class RedisFanoutExchange(RedisExchange):
"""
Return a list of all hosts.
"""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
good_hosts = filter(
lambda host: self.matchmaker.is_alive(topic, host), hosts)
return [(x, x.split('.', 1)[1]) for x in good_hosts]
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
"""
MatchMaker registering and looking-up hosts with a Redis server.
"""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
self.redis = redis.StrictRedis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
def ack_alive(self, key, host):
topic = "%s.%s" % (key, host)
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
# If we could not update the expiration, the key
# might have been pruned. Re-register, creating a new
# key in Redis.
self.register(self.topic_host[host], host)
def is_alive(self, topic, host):
if self.redis.ttl(host) == -1:
self.expire(topic, host)
return False
return True
def expire(self, topic, host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.delete(host)
pipe.srem(topic, host)
pipe.execute()
def backend_register(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.sadd(key, key_host)
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
pipe.set(key_host, '')
pipe.execute()
def backend_unregister(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.srem(key, key_host)
pipe.delete(key_host)
pipe.execute()

View File

@ -63,7 +63,7 @@ class Service(service.Service):
self.manager.initialize_service_hook(self)
# Consume from all consumers in a thread
self.conn.consume_in_thread_group(self.tg)
self.conn.consume_in_thread()
def stop(self):
# Try to shut the connection down, but if we get any sort of

View File

@ -28,8 +28,8 @@ import time
import eventlet
import logging as std_logging
from oslo.config import cfg
from designate.openstack.common import cfg
from designate.openstack.common import eventlet_backdoor
from designate.openstack.common.gettextutils import _
from designate.openstack.common import importutils

View File

@ -0,0 +1,80 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import ssl
from oslo.config import cfg
from designate.openstack.common.gettextutils import _
ssl_opts = [
cfg.StrOpt('ca_file',
default=None,
help="CA certificate file to use to verify "
"connecting clients"),
cfg.StrOpt('cert_file',
default=None,
help="Certificate file to use when starting "
"the server securely"),
cfg.StrOpt('key_file',
default=None,
help="Private key file to use when starting "
"the server securely"),
]
CONF = cfg.CONF
CONF.register_opts(ssl_opts, "ssl")
def is_enabled():
cert_file = CONF.ssl.cert_file
key_file = CONF.ssl.key_file
ca_file = CONF.ssl.ca_file
use_ssl = cert_file or key_file
if cert_file and not os.path.exists(cert_file):
raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
if ca_file and not os.path.exists(ca_file):
raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
if key_file and not os.path.exists(key_file):
raise RuntimeError(_("Unable to find key_file : %s") % key_file)
if use_ssl and (not cert_file or not key_file):
raise RuntimeError(_("When running server in SSL mode, you must "
"specify both a cert_file and key_file "
"option value in your configuration file"))
return use_ssl
def wrap(sock):
ssl_kwargs = {
'server_side': True,
'certfile': CONF.ssl.cert_file,
'keyfile': CONF.ssl.key_file,
'cert_reqs': ssl.CERT_NONE,
}
if CONF.ssl.ca_file:
ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs)

View File

@ -63,7 +63,7 @@ class ThreadGroup(object):
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
pulse = loopingcall.LoopingCall(callback, *args, **kwargs)
pulse.start(interval=interval,
initial_delay=initial_delay)
self.timers.append(pulse)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -25,18 +25,22 @@ import datetime
import iso8601
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None):
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format"""
if not at:
at = utcnow()
str = at.strftime(TIME_FORMAT)
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
str += ('Z' if tz == 'UTC' else tz)
return str
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
@ -179,4 +183,4 @@ def is_soon(dt, window):
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) < soon
return normalize_time(dt) <= soon

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -17,15 +17,19 @@
"""Utility methods for working with WSGI servers."""
import datetime
import eventlet
import eventlet.wsgi
eventlet.patcher.monkey_patch(all=False, socket=True)
import datetime
import errno
import socket
import sys
import time
import eventlet.wsgi
from oslo.config import cfg
import routes
import routes.middleware
import sys
import webob.dec
import webob.exc
from xml.dom import minidom
@ -36,7 +40,21 @@ from designate.openstack.common.gettextutils import _
from designate.openstack.common import jsonutils
from designate.openstack.common import log as logging
from designate.openstack.common import service
from designate.openstack.common import sslutils
from designate.openstack.common import xmlutils
socket_opts = [
cfg.IntOpt('backlog',
default=4096,
help="Number of backlog requests to configure the socket with"),
cfg.IntOpt('tcp_keepidle',
default=600,
help="Sets the value of TCP_KEEPIDLE in seconds for each "
"server socket. Not supported on OS X."),
]
CONF = cfg.CONF
CONF.register_opts(socket_opts)
LOG = logging.getLogger(__name__)
@ -56,15 +74,54 @@ class Service(service.Service):
"""
def __init__(self, application, port,
host='0.0.0.0', backlog=128, threads=1000):
host='0.0.0.0', backlog=4096, threads=1000):
self.application = application
self._port = port
self._host = host
self.backlog = backlog
self._socket = eventlet.listen((self._host, self._port),
backlog=self.backlog)
self._backlog = backlog if backlog else CONF.backlog
super(Service, self).__init__(threads)
def _get_socket(self, host, port, backlog):
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
info = socket.getaddrinfo(host,
port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
if sslutils.is_enabled():
sock = sslutils.wrap(sock)
except socket.error, err:
if err.args[0] != errno.EADDRINUSE:
raise
eventlet.sleep(0.1)
if not sock:
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
"after trying for 30 seconds") %
{'host': host, 'port': port})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
return sock
def start(self):
"""Start serving this service using the provided server instance.
@ -72,8 +129,13 @@ class Service(service.Service):
"""
super(Service, self).start()
self._socket = self._get_socket(self._host, self._port, self._backlog)
self.tg.add_thread(self._run, self.application, self._socket)
@property
def backlog(self):
return self._backlog
@property
def host(self):
return self._socket.getsockname()[0] if self._socket else self._host
@ -93,7 +155,9 @@ class Service(service.Service):
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
logger = logging.getLogger('eventlet.wsgi')
eventlet.wsgi.server(socket, application, custom_pool=self.tg.pool,
eventlet.wsgi.server(socket,
application,
custom_pool=self.tg.pool,
log=logging.WritableLogger(logger))
@ -680,7 +744,7 @@ class XMLDeserializer(TextDeserializer):
plurals = set(self.metadata.get('plurals', {}))
try:
node = minidom.parseString(datastring).childNodes[0]
node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
except expat.ExpatError:
msg = _("cannot understand XML")

View File

@ -0,0 +1,74 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from xml.dom import minidom
from xml.parsers import expat
from xml import sax
from xml.sax import expatreader
class ProtectedExpatParser(expatreader.ExpatParser):
"""An expat parser which disables DTD's and entities by default."""
def __init__(self, forbid_dtd=True, forbid_entities=True,
*args, **kwargs):
# Python 2.x old style class
expatreader.ExpatParser.__init__(self, *args, **kwargs)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise ValueError("Inline DTD forbidden")
def entity_decl(self, entityName, is_parameter_entity, value, base,
systemId, publicId, notationName):
raise ValueError("<!ENTITY> entity declaration forbidden")
def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise ValueError("<!ENTITY> unparsed entity forbidden")
def external_entity_ref(self, context, base, systemId, publicId):
raise ValueError("<!ENTITY> external entity forbidden")
def notation_decl(self, name, base, sysid, pubid):
raise ValueError("<!ENTITY> notation forbidden")
def reset(self):
expatreader.ExpatParser.reset(self)
if self.forbid_dtd:
self._parser.StartDoctypeDeclHandler = self.start_doctype_decl
self._parser.EndDoctypeDeclHandler = None
if self.forbid_entities:
self._parser.EntityDeclHandler = self.entity_decl
self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
self._parser.ExternalEntityRefHandler = self.external_entity_ref
self._parser.NotationDeclHandler = self.notation_decl
try:
self._parser.SkippedEntityHandler = None
except AttributeError:
# some pyexpat versions do not support SkippedEntity
pass
def safe_minidom_parse_string(xml_string):
"""Parse an XML string using minidom safely.
"""
try:
return minidom.parseString(xml_string, parser=ProtectedExpatParser())
except sax.SAXParseException:
raise expat.ExpatError()

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import policy
from designate import utils

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.quota.base import Quota

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
from designate.openstack.common import cfg
from oslo.config import cfg
from designate import exceptions
from designate.plugin import Plugin

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
cfg.CONF.register_group(cfg.OptGroup(
name='service:sink', title="Configuration for Sink Service"

View File

@ -13,11 +13,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common import rpc
from designate.openstack.common import service
from stevedore.named import NamedExtensionManager
from designate import exceptions
LOG = logging.getLogger(__name__)
@ -49,15 +50,20 @@ class Service(service.Service):
try:
return self.extensions_manager.map(_load_extension)
except RuntimeError:
# No handlers enabled. No problem.
return []
# No handlers enabled. Bail!
raise exceptions.ConfigurationError('No designate-sink handlers '
'enabled')
def start(self):
super(Service, self).start()
# Setup notification subscriptions and start consuming
self._setup_subscriptions()
self.rpc_conn.consume_in_thread_group(self.tg)
self.rpc_conn.consume_in_thread()
def wait(self):
super(Service, self).wait()
self.rpc_conn.consumer_thread.wait()
def stop(self):
# Try to shut the connection down, but if we get any sort of

View File

@ -24,7 +24,7 @@ from sqlalchemy.exc import DisconnectionError, OperationalError
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common.gettextutils import _

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.storage.base import Storage

View File

@ -16,7 +16,7 @@
import time
from sqlalchemy.orm import exc
from sqlalchemy import distinct, func
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import exceptions
from designate.storage import base

View File

@ -16,7 +16,7 @@
import copy
import unittest2
import mox
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common.notifier import test_notifier
from designate.openstack.common import policy

View File

@ -129,7 +129,7 @@ class KeystoneContextMiddlewareTest(ApiTestCase):
context = request.environ['context']
self.assertFalse(context.is_admin)
self.assertEqual('AuthToken', context.auth_tok)
self.assertEqual('AuthToken', context.auth_token)
self.assertEqual('UserID', context.user_id)
self.assertEqual('TenantID', context.tenant_id)
self.assertEqual(['admin', 'Member'], context.roles)
@ -159,7 +159,7 @@ class KeystoneContextMiddlewareTest(ApiTestCase):
context = request.environ['context']
self.assertFalse(context.is_admin)
self.assertEqual('AuthToken', context.auth_tok)
self.assertEqual('AuthToken', context.auth_token)
self.assertEqual('UserID', context.user_id)
self.assertEqual('TenantID', context.original_tenant_id)
self.assertEqual('5a993bf8-d521-420a-81e1-192d9cc3d5a0',
@ -183,7 +183,7 @@ class NoAuthContextMiddlewareTest(ApiTestCase):
context = request.environ['context']
self.assertTrue(context.is_admin)
self.assertIsNone(context.auth_tok)
self.assertIsNone(context.auth_token)
self.assertIsNone(context.user_id)
self.assertIsNone(context.tenant_id)
self.assertEqual([], context.roles)

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.tests import TestCase
from designate import backend

View File

@ -13,7 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.tests import TestCase
from designate import quota

View File

@ -1,20 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests import TestCase
class SinkTestCase(TestCase):
__test__ = False

View File

@ -1,33 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests.test_sink import SinkTestCase
LOG = logging.getLogger(__name__)
class SinkServiceTest(SinkTestCase):
__test__ = True
def setUp(self):
super(SinkServiceTest, self).setUp()
self.sink_service = self.get_sink_service()
def test_start_and_stop(self):
# Ensures the start/stop actions don't raise
self.sink_service.start()
self.sink_service.stop()

View File

@ -18,7 +18,7 @@ import pkg_resources
import json
from jinja2 import Template
from designate.openstack.common import log as logging
from designate.openstack.common import cfg
from oslo.config import cfg
from designate.openstack.common import processutils
from designate.openstack.common import timeutils
from designate.openstack.common.notifier import api as notifier_api

View File

@ -21,10 +21,12 @@ module=processutils
module=rootwrap
module=rpc
module=service
module=sslutils
module=threadgroup
module=timeutils
module=uuidutils
module=wsgi
module=xmlutils
# The base module to hold the copy of openstack.common
base=designate

View File

@ -8,6 +8,7 @@ ipaddr
iso8601>=0.1.4
jsonschema>=1.0.0,!=1.4.0,<2
kombu>2.4.7
oslo.config>=1.1.0
paste
pastedeploy>=1.5.0
pbr>=0.5.16,<0.6