Update latest OSLO.
Changes include: 1. Removing the 'extras' in the pip-requires 2. Fixes for fake implementations for RPC calls 3. Version updates due to common version update Change-Id: Iefd32b3f7d529943b078e6d927d06043286ff94e
This commit is contained in:
parent
25a1279f1c
commit
07cde19d07
quantum/openstack/common
tools
@ -1735,11 +1735,13 @@ class CommonConfigOpts(ConfigOpts):
|
|||||||
BoolOpt('debug',
|
BoolOpt('debug',
|
||||||
short='d',
|
short='d',
|
||||||
default=False,
|
default=False,
|
||||||
help='Print debugging output'),
|
help='Print debugging output (set logging level to '
|
||||||
|
'DEBUG instead of default WARNING level).'),
|
||||||
BoolOpt('verbose',
|
BoolOpt('verbose',
|
||||||
short='v',
|
short='v',
|
||||||
default=False,
|
default=False,
|
||||||
help='Print more verbose output'),
|
help='Print more verbose output (set logging level to '
|
||||||
|
'INFO instead of default WARNING level).'),
|
||||||
]
|
]
|
||||||
|
|
||||||
logging_cli_opts = [
|
logging_cli_opts = [
|
||||||
|
@ -57,3 +57,11 @@ def import_module(import_str):
|
|||||||
"""Import a module."""
|
"""Import a module."""
|
||||||
__import__(import_str)
|
__import__(import_str)
|
||||||
return sys.modules[import_str]
|
return sys.modules[import_str]
|
||||||
|
|
||||||
|
|
||||||
|
def try_import(import_str, default=None):
|
||||||
|
"""Try to import a module and if it fails return default."""
|
||||||
|
try:
|
||||||
|
return import_module(import_str)
|
||||||
|
except ImportError:
|
||||||
|
return default
|
||||||
|
@ -54,7 +54,7 @@ class BaseParser(object):
|
|||||||
|
|
||||||
value = value.strip()
|
value = value.strip()
|
||||||
if ((value and value[0] == value[-1]) and
|
if ((value and value[0] == value[-1]) and
|
||||||
(value[0] == "\"" or value[0] == "'")):
|
(value[0] == "\"" or value[0] == "'")):
|
||||||
value = value[1:-1]
|
value = value[1:-1]
|
||||||
return key.strip(), [value]
|
return key.strip(), [value]
|
||||||
|
|
||||||
|
@ -220,6 +220,11 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
|
|||||||
'method': f.__name__})
|
'method': f.__name__})
|
||||||
retval = f(*args, **kwargs)
|
retval = f(*args, **kwargs)
|
||||||
finally:
|
finally:
|
||||||
|
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'
|
||||||
|
' for method "%(method)s"...'),
|
||||||
|
{'lock': name,
|
||||||
|
'path': lock_file_path,
|
||||||
|
'method': f.__name__})
|
||||||
# NOTE(vish): This removes the tempdir if we needed
|
# NOTE(vish): This removes the tempdir if we needed
|
||||||
# to create one. This is used to cleanup
|
# to create one. This is used to cleanup
|
||||||
# the locks left behind by unit tests.
|
# the locks left behind by unit tests.
|
||||||
|
@ -49,19 +49,19 @@ from quantum.openstack.common import notifier
|
|||||||
|
|
||||||
log_opts = [
|
log_opts = [
|
||||||
cfg.StrOpt('logging_context_format_string',
|
cfg.StrOpt('logging_context_format_string',
|
||||||
default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
|
default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
|
||||||
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
|
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
|
||||||
'%(message)s',
|
'%(message)s',
|
||||||
help='format string to use for log messages with context'),
|
help='format string to use for log messages with context'),
|
||||||
cfg.StrOpt('logging_default_format_string',
|
cfg.StrOpt('logging_default_format_string',
|
||||||
default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
|
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||||
'%(name)s [-] %(instance)s%(message)s',
|
'%(name)s [-] %(instance)s%(message)s',
|
||||||
help='format string to use for log messages without context'),
|
help='format string to use for log messages without context'),
|
||||||
cfg.StrOpt('logging_debug_format_suffix',
|
cfg.StrOpt('logging_debug_format_suffix',
|
||||||
default='%(funcName)s %(pathname)s:%(lineno)d',
|
default='%(funcName)s %(pathname)s:%(lineno)d',
|
||||||
help='data to append to log format when level is DEBUG'),
|
help='data to append to log format when level is DEBUG'),
|
||||||
cfg.StrOpt('logging_exception_prefix',
|
cfg.StrOpt('logging_exception_prefix',
|
||||||
default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
|
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
|
||||||
'%(instance)s',
|
'%(instance)s',
|
||||||
help='prefix each line of exception output with this format'),
|
help='prefix each line of exception output with this format'),
|
||||||
cfg.ListOpt('default_log_levels',
|
cfg.ListOpt('default_log_levels',
|
||||||
@ -259,7 +259,7 @@ class JSONFormatter(logging.Formatter):
|
|||||||
class PublishErrorsHandler(logging.Handler):
|
class PublishErrorsHandler(logging.Handler):
|
||||||
def emit(self, record):
|
def emit(self, record):
|
||||||
if ('quantum.openstack.common.notifier.log_notifier' in
|
if ('quantum.openstack.common.notifier.log_notifier' in
|
||||||
CONF.notification_driver):
|
CONF.notification_driver):
|
||||||
return
|
return
|
||||||
notifier.api.notify(None, 'error.publisher',
|
notifier.api.notify(None, 'error.publisher',
|
||||||
'error_notification',
|
'error_notification',
|
||||||
@ -361,10 +361,12 @@ def _setup_logging_from_conf(product_name):
|
|||||||
datefmt=datefmt))
|
datefmt=datefmt))
|
||||||
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
|
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
|
||||||
|
|
||||||
if CONF.verbose or CONF.debug:
|
if CONF.debug:
|
||||||
log_root.setLevel(logging.DEBUG)
|
log_root.setLevel(logging.DEBUG)
|
||||||
else:
|
elif CONF.verbose:
|
||||||
log_root.setLevel(logging.INFO)
|
log_root.setLevel(logging.INFO)
|
||||||
|
else:
|
||||||
|
log_root.setLevel(logging.WARNING)
|
||||||
|
|
||||||
level = logging.NOTSET
|
level = logging.NOTSET
|
||||||
for pair in CONF.default_log_levels:
|
for pair in CONF.default_log_levels:
|
||||||
@ -425,7 +427,7 @@ class LegacyFormatter(logging.Formatter):
|
|||||||
self._fmt = CONF.logging_default_format_string
|
self._fmt = CONF.logging_default_format_string
|
||||||
|
|
||||||
if (record.levelno == logging.DEBUG and
|
if (record.levelno == logging.DEBUG and
|
||||||
CONF.logging_debug_format_suffix):
|
CONF.logging_debug_format_suffix):
|
||||||
self._fmt += " " + CONF.logging_debug_format_suffix
|
self._fmt += " " + CONF.logging_debug_format_suffix
|
||||||
|
|
||||||
# Cache this on the record, Logger will respect our formated copy
|
# Cache this on the record, Logger will respect our formated copy
|
||||||
|
@ -574,19 +574,19 @@ class ParseState(object):
|
|||||||
|
|
||||||
for reduction, methname in self.reducers:
|
for reduction, methname in self.reducers:
|
||||||
if (len(self.tokens) >= len(reduction) and
|
if (len(self.tokens) >= len(reduction) and
|
||||||
self.tokens[-len(reduction):] == reduction):
|
self.tokens[-len(reduction):] == reduction):
|
||||||
# Get the reduction method
|
# Get the reduction method
|
||||||
meth = getattr(self, methname)
|
meth = getattr(self, methname)
|
||||||
|
|
||||||
# Reduce the token stream
|
# Reduce the token stream
|
||||||
results = meth(*self.values[-len(reduction):])
|
results = meth(*self.values[-len(reduction):])
|
||||||
|
|
||||||
# Update the tokens and values
|
# Update the tokens and values
|
||||||
self.tokens[-len(reduction):] = [r[0] for r in results]
|
self.tokens[-len(reduction):] = [r[0] for r in results]
|
||||||
self.values[-len(reduction):] = [r[1] for r in results]
|
self.values[-len(reduction):] = [r[1] for r in results]
|
||||||
|
|
||||||
# Check for any more reductions
|
# Check for any more reductions
|
||||||
return self.reduce()
|
return self.reduce()
|
||||||
|
|
||||||
def shift(self, tok, value):
|
def shift(self, tok, value):
|
||||||
"""Adds one more token to the state. Calls reduce()."""
|
"""Adds one more token to the state. Calls reduce()."""
|
||||||
|
@ -167,7 +167,7 @@ def cast(conf, context, topic, msg):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def notify(conf, context, topic, msg):
|
def notify(conf, context, topic, msg, envelope):
|
||||||
check_serialize(msg)
|
check_serialize(msg)
|
||||||
|
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import pprint
|
import pprint
|
||||||
|
import os
|
||||||
import socket
|
import socket
|
||||||
import string
|
import string
|
||||||
import sys
|
import sys
|
||||||
@ -29,6 +30,7 @@ from quantum.openstack.common import cfg
|
|||||||
from quantum.openstack.common.gettextutils import _
|
from quantum.openstack.common.gettextutils import _
|
||||||
from quantum.openstack.common import importutils
|
from quantum.openstack.common import importutils
|
||||||
from quantum.openstack.common import jsonutils
|
from quantum.openstack.common import jsonutils
|
||||||
|
from quantum.openstack.common import processutils as utils
|
||||||
from quantum.openstack.common.rpc import common as rpc_common
|
from quantum.openstack.common.rpc import common as rpc_common
|
||||||
|
|
||||||
|
|
||||||
@ -61,6 +63,10 @@ zmq_opts = [
|
|||||||
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
||||||
help='Number of ZeroMQ contexts, defaults to 1'),
|
help='Number of ZeroMQ contexts, defaults to 1'),
|
||||||
|
|
||||||
|
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
|
||||||
|
help='Maximum number of ingress messages to locally buffer '
|
||||||
|
'per topic. Default is unlimited.'),
|
||||||
|
|
||||||
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
||||||
help='Directory for holding IPC sockets'),
|
help='Directory for holding IPC sockets'),
|
||||||
|
|
||||||
@ -413,12 +419,6 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
super(ZmqProxy, self).__init__(conf)
|
super(ZmqProxy, self).__init__(conf)
|
||||||
|
|
||||||
self.topic_proxy = {}
|
self.topic_proxy = {}
|
||||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
|
||||||
|
|
||||||
self.topic_proxy['zmq_replies'] = \
|
|
||||||
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
|
|
||||||
zmq.PUB, bind=True)
|
|
||||||
self.sockets.append(self.topic_proxy['zmq_replies'])
|
|
||||||
|
|
||||||
def consume(self, sock):
|
def consume(self, sock):
|
||||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||||
@ -444,20 +444,81 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
sock_type = zmq.PUSH
|
sock_type = zmq.PUSH
|
||||||
|
|
||||||
if not topic in self.topic_proxy:
|
if not topic in self.topic_proxy:
|
||||||
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
|
def publisher(waiter):
|
||||||
sock_type, bind=True)
|
LOG.info(_("Creating proxy for topic: %s"), topic)
|
||||||
self.topic_proxy[topic] = outq
|
|
||||||
self.sockets.append(outq)
|
|
||||||
LOG.info(_("Created topic proxy: %s"), topic)
|
|
||||||
|
|
||||||
# It takes some time for a pub socket to open,
|
try:
|
||||||
# before we can have any faith in doing a send() to it.
|
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
|
||||||
if sock_type == zmq.PUB:
|
(ipc_dir, topic),
|
||||||
eventlet.sleep(.5)
|
sock_type, bind=True)
|
||||||
|
except RPCException:
|
||||||
|
waiter.send_exception(*sys.exc_info())
|
||||||
|
return
|
||||||
|
|
||||||
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
|
self.topic_proxy[topic] = eventlet.queue.LightQueue(
|
||||||
self.topic_proxy[topic].send(data)
|
CONF.rpc_zmq_topic_backlog)
|
||||||
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
|
self.sockets.append(out_sock)
|
||||||
|
|
||||||
|
# It takes some time for a pub socket to open,
|
||||||
|
# before we can have any faith in doing a send() to it.
|
||||||
|
if sock_type == zmq.PUB:
|
||||||
|
eventlet.sleep(.5)
|
||||||
|
|
||||||
|
waiter.send(True)
|
||||||
|
|
||||||
|
while(True):
|
||||||
|
data = self.topic_proxy[topic].get()
|
||||||
|
out_sock.send(data)
|
||||||
|
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
|
||||||
|
{'data': data})
|
||||||
|
|
||||||
|
wait_sock_creation = eventlet.event.Event()
|
||||||
|
eventlet.spawn(publisher, wait_sock_creation)
|
||||||
|
|
||||||
|
try:
|
||||||
|
wait_sock_creation.wait()
|
||||||
|
except RPCException:
|
||||||
|
LOG.error(_("Topic socket file creation failed."))
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.topic_proxy[topic].put_nowait(data)
|
||||||
|
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
|
||||||
|
{'data': data})
|
||||||
|
except eventlet.queue.Full:
|
||||||
|
LOG.error(_("Local per-topic backlog buffer full for topic "
|
||||||
|
"%(topic)s. Dropping message.") % {'topic': topic})
|
||||||
|
|
||||||
|
def consume_in_thread(self):
|
||||||
|
"""Runs the ZmqProxy service"""
|
||||||
|
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||||
|
consume_in = "tcp://%s:%s" % \
|
||||||
|
(CONF.rpc_zmq_bind_address,
|
||||||
|
CONF.rpc_zmq_port)
|
||||||
|
consumption_proxy = InternalContext(None)
|
||||||
|
|
||||||
|
if not os.path.isdir(ipc_dir):
|
||||||
|
try:
|
||||||
|
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
|
||||||
|
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
|
||||||
|
ipc_dir, run_as_root=True)
|
||||||
|
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
|
||||||
|
except utils.ProcessExecutionError:
|
||||||
|
LOG.error(_("Could not create IPC directory %s") %
|
||||||
|
(ipc_dir, ))
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.register(consumption_proxy,
|
||||||
|
consume_in,
|
||||||
|
zmq.PULL,
|
||||||
|
out_bind=True)
|
||||||
|
except zmq.ZMQError:
|
||||||
|
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||||
|
"Socket may already be in use."))
|
||||||
|
raise
|
||||||
|
|
||||||
|
super(ZmqProxy, self).consume_in_thread()
|
||||||
|
|
||||||
|
|
||||||
class ZmqReactor(ZmqBaseReactor):
|
class ZmqReactor(ZmqBaseReactor):
|
||||||
@ -551,7 +612,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def _call(addr, context, msg_id, topic, msg, timeout=None):
|
def _call(addr, context, msg_id, topic, msg, timeout=None,
|
||||||
|
serialize=True, force_envelope=False):
|
||||||
# timeout_response is how long we wait for a response
|
# timeout_response is how long we wait for a response
|
||||||
timeout = timeout or CONF.rpc_response_timeout
|
timeout = timeout or CONF.rpc_response_timeout
|
||||||
|
|
||||||
@ -586,7 +648,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
|
|||||||
)
|
)
|
||||||
|
|
||||||
LOG.debug(_("Sending cast"))
|
LOG.debug(_("Sending cast"))
|
||||||
_cast(addr, context, msg_id, topic, payload)
|
_cast(addr, context, msg_id, topic, payload,
|
||||||
|
serialize=serialize, force_envelope=force_envelope)
|
||||||
|
|
||||||
LOG.debug(_("Cast sent; Waiting reply"))
|
LOG.debug(_("Cast sent; Waiting reply"))
|
||||||
# Blocks until receives reply
|
# Blocks until receives reply
|
||||||
@ -642,7 +705,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
|
|||||||
_topic, _topic, msg, timeout, serialize,
|
_topic, _topic, msg, timeout, serialize,
|
||||||
force_envelope)
|
force_envelope)
|
||||||
return
|
return
|
||||||
return method(_addr, context, _topic, _topic, msg, timeout)
|
return method(_addr, context, _topic, _topic, msg, timeout,
|
||||||
|
serialize, force_envelope)
|
||||||
|
|
||||||
|
|
||||||
def create_connection(conf, new=True):
|
def create_connection(conf, new=True):
|
||||||
|
@ -27,17 +27,17 @@ import sys
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
import extras
|
|
||||||
import logging as std_logging
|
import logging as std_logging
|
||||||
|
|
||||||
from quantum.openstack.common import cfg
|
from quantum.openstack.common import cfg
|
||||||
from quantum.openstack.common import eventlet_backdoor
|
from quantum.openstack.common import eventlet_backdoor
|
||||||
from quantum.openstack.common.gettextutils import _
|
from quantum.openstack.common.gettextutils import _
|
||||||
|
from quantum.openstack.common import importutils
|
||||||
from quantum.openstack.common import log as logging
|
from quantum.openstack.common import log as logging
|
||||||
from quantum.openstack.common import threadgroup
|
from quantum.openstack.common import threadgroup
|
||||||
|
|
||||||
|
|
||||||
rpc = extras.try_import('quantum.openstack.common.rpc')
|
rpc = importutils.try_import('quantum.openstack.common.rpc')
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -5,7 +5,6 @@ amqplib==0.6.1
|
|||||||
anyjson>=0.2.4
|
anyjson>=0.2.4
|
||||||
argparse
|
argparse
|
||||||
eventlet>=0.9.17
|
eventlet>=0.9.17
|
||||||
extras
|
|
||||||
greenlet>=0.3.1
|
greenlet>=0.3.1
|
||||||
httplib2
|
httplib2
|
||||||
iso8601>=0.1.4
|
iso8601>=0.1.4
|
||||||
|
Loading…
x
Reference in New Issue
Block a user