f4e2efb58d
This removes some custom Cinder code which handles filtering secret config options in a flaky way. Filtering will now be based on the "secret=True" option flag. Related-Bug: #1750074 Change-Id: I1c404b057d1471c85bd7eaf5c096f5912293460a
695 lines
27 KiB
Python
695 lines
27 KiB
Python
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# Copyright 2011 Justin Santa Barbara
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Generic Node base class for all workers that run on hosts."""
|
|
|
|
|
|
import inspect
|
|
import os
|
|
import random
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
from oslo_concurrency import processutils
|
|
from oslo_config import cfg
|
|
from oslo_db import exception as db_exc
|
|
from oslo_log import log as logging
|
|
import oslo_messaging as messaging
|
|
from oslo_service import loopingcall
|
|
from oslo_service import service
|
|
from oslo_service import wsgi
|
|
from oslo_utils import importutils
|
|
osprofiler_initializer = importutils.try_import('osprofiler.initializer')
|
|
profiler = importutils.try_import('osprofiler.profiler')
|
|
profiler_opts = importutils.try_import('osprofiler.opts')
|
|
|
|
|
|
from cinder.common import constants
|
|
from cinder import context
|
|
from cinder import coordination
|
|
from cinder import exception
|
|
from cinder.i18n import _
|
|
from cinder import objects
|
|
from cinder.objects import base as objects_base
|
|
from cinder.objects import fields
|
|
from cinder import rpc
|
|
from cinder import version
|
|
from cinder.volume import utils as vol_utils
|
|
|
|
if os.name == 'nt':
|
|
from os_win import utilsfactory as os_win_utilsfactory
|
|
else:
|
|
os_win_utilsfactory = None
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
service_opts = [
|
|
cfg.IntOpt('report_interval',
|
|
default=10,
|
|
help='Interval, in seconds, between nodes reporting state '
|
|
'to datastore'),
|
|
cfg.IntOpt('periodic_interval',
|
|
default=60,
|
|
help='Interval, in seconds, between running periodic tasks'),
|
|
cfg.IntOpt('periodic_fuzzy_delay',
|
|
default=60,
|
|
help='Range, in seconds, to randomly delay when starting the'
|
|
' periodic task scheduler to reduce stampeding.'
|
|
' (Disable by setting to 0)'),
|
|
cfg.StrOpt('osapi_volume_listen',
|
|
default="0.0.0.0",
|
|
help='IP address on which OpenStack Volume API listens'),
|
|
cfg.PortOpt('osapi_volume_listen_port',
|
|
default=8776,
|
|
help='Port on which OpenStack Volume API listens'),
|
|
cfg.IntOpt('osapi_volume_workers',
|
|
help='Number of workers for OpenStack Volume API service. '
|
|
'The default is equal to the number of CPUs available.'),
|
|
cfg.BoolOpt('osapi_volume_use_ssl',
|
|
default=False,
|
|
help='Wraps the socket in a SSL context if True is set. '
|
|
'A certificate file and key file must be specified.'), ]
|
|
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(service_opts)
|
|
if profiler_opts:
|
|
profiler_opts.set_defaults(CONF)
|
|
|
|
|
|
def setup_profiler(binary, host):
|
|
if (osprofiler_initializer is None or
|
|
profiler is None or
|
|
profiler_opts is None):
|
|
LOG.debug('osprofiler is not present')
|
|
return
|
|
|
|
if CONF.profiler.enabled:
|
|
osprofiler_initializer.init_from_conf(
|
|
conf=CONF,
|
|
context=context.get_admin_context().to_dict(),
|
|
project="cinder",
|
|
service=binary,
|
|
host=host
|
|
)
|
|
LOG.warning(
|
|
"OSProfiler is enabled.\nIt means that person who knows "
|
|
"any of hmac_keys that are specified in "
|
|
"/etc/cinder/cinder.conf can trace his requests. \n"
|
|
"In real life only operator can read this file so there "
|
|
"is no security issue. Note that even if person can "
|
|
"trigger profiler, only admin user can retrieve trace "
|
|
"information.\n"
|
|
"To disable OSProfiler set in cinder.conf:\n"
|
|
"[profiler]\nenabled=false")
|
|
|
|
|
|
class Service(service.Service):
|
|
"""Service object for binaries running on hosts.
|
|
|
|
A service takes a manager and enables rpc by listening to queues based
|
|
on topic. It also periodically runs tasks on the manager and reports
|
|
it state to the database services table.
|
|
"""
|
|
# Make service_id a class attribute so it can be used for clean up
|
|
service_id = None
|
|
|
|
def __init__(self, host, binary, topic, manager, report_interval=None,
|
|
periodic_interval=None, periodic_fuzzy_delay=None,
|
|
service_name=None, coordination=False, cluster=None, *args,
|
|
**kwargs):
|
|
super(Service, self).__init__()
|
|
|
|
if not rpc.initialized():
|
|
rpc.init(CONF)
|
|
|
|
self.cluster = cluster
|
|
self.host = host
|
|
self.binary = binary
|
|
self.topic = topic
|
|
self.manager_class_name = manager
|
|
self.coordination = coordination
|
|
manager_class = importutils.import_class(self.manager_class_name)
|
|
if CONF.profiler.enabled:
|
|
manager_class = profiler.trace_cls("rpc")(manager_class)
|
|
|
|
self.service = None
|
|
self.manager = manager_class(host=self.host,
|
|
cluster=self.cluster,
|
|
service_name=service_name,
|
|
*args, **kwargs)
|
|
self.availability_zone = self.manager.availability_zone
|
|
|
|
# NOTE(geguileo): We need to create the Service DB entry before we
|
|
# create the manager, otherwise capped versions for serializer and rpc
|
|
# client would use existing DB entries not including us, which could
|
|
# result in us using None (if it's the first time the service is run)
|
|
# or an old version (if this is a normal upgrade of a single service).
|
|
ctxt = context.get_admin_context()
|
|
try:
|
|
service_ref = objects.Service.get_by_args(ctxt, host, binary)
|
|
service_ref.rpc_current_version = manager_class.RPC_API_VERSION
|
|
obj_version = objects_base.OBJ_VERSIONS.get_current()
|
|
service_ref.object_current_version = obj_version
|
|
|
|
# added_to_cluster attribute marks when we consider that we have
|
|
# just added a host to a cluster so we can include resources into
|
|
# that cluster. We consider that we have added the host when we
|
|
# didn't have data in the cluster DB field and our current
|
|
# configuration has a cluster value. We don't want to do anything
|
|
# automatic if the cluster is changed, in those cases we'll want
|
|
# to use cinder manage command and to it manually.
|
|
self.added_to_cluster = (not service_ref.cluster_name and cluster)
|
|
|
|
if service_ref.cluster_name != cluster:
|
|
LOG.info('This service has been moved from cluster '
|
|
'%(cluster_svc)s to %(cluster_cfg)s. Resources '
|
|
'will %(opt_no)sbe moved to the new cluster',
|
|
{'cluster_svc': service_ref.cluster_name,
|
|
'cluster_cfg': cluster,
|
|
'opt_no': '' if self.added_to_cluster else 'NO '})
|
|
|
|
if self.added_to_cluster:
|
|
# We pass copy service's disable status in the cluster if we
|
|
# have to create it.
|
|
self._ensure_cluster_exists(ctxt, service_ref)
|
|
service_ref.cluster_name = cluster
|
|
service_ref.save()
|
|
Service.service_id = service_ref.id
|
|
except exception.NotFound:
|
|
# We don't want to include cluster information on the service or
|
|
# create the cluster entry if we are upgrading.
|
|
self._create_service_ref(ctxt, manager_class.RPC_API_VERSION)
|
|
# We don't want to include resources in the cluster during the
|
|
# start while we are still doing the rolling upgrade.
|
|
self.added_to_cluster = True
|
|
|
|
self.report_interval = report_interval
|
|
self.periodic_interval = periodic_interval
|
|
self.periodic_fuzzy_delay = periodic_fuzzy_delay
|
|
self.basic_config_check()
|
|
self.saved_args, self.saved_kwargs = args, kwargs
|
|
self.timers = []
|
|
|
|
setup_profiler(binary, host)
|
|
self.rpcserver = None
|
|
self.backend_rpcserver = None
|
|
self.cluster_rpcserver = None
|
|
|
|
def start(self):
|
|
version_string = version.version_string()
|
|
LOG.info('Starting %(topic)s node (version %(version_string)s)',
|
|
{'topic': self.topic, 'version_string': version_string})
|
|
self.model_disconnected = False
|
|
|
|
if self.coordination:
|
|
coordination.COORDINATOR.start()
|
|
|
|
self.manager.init_host(added_to_cluster=self.added_to_cluster,
|
|
service_id=Service.service_id)
|
|
|
|
LOG.debug("Creating RPC server for service %s", self.topic)
|
|
|
|
ctxt = context.get_admin_context()
|
|
endpoints = [self.manager]
|
|
endpoints.extend(self.manager.additional_endpoints)
|
|
obj_version_cap = objects.Service.get_minimum_obj_version(ctxt)
|
|
LOG.debug("Pinning object versions for RPC server serializer to %s",
|
|
obj_version_cap)
|
|
serializer = objects_base.CinderObjectSerializer(obj_version_cap)
|
|
|
|
target = messaging.Target(topic=self.topic, server=self.host)
|
|
self.rpcserver = rpc.get_server(target, endpoints, serializer)
|
|
self.rpcserver.start()
|
|
|
|
# NOTE(dulek): Kids, don't do that at home. We're relying here on
|
|
# oslo.messaging implementation details to keep backward compatibility
|
|
# with pre-Ocata services. This will not matter once we drop
|
|
# compatibility with them.
|
|
if self.topic == constants.VOLUME_TOPIC:
|
|
target = messaging.Target(
|
|
topic='%(topic)s.%(host)s' % {'topic': self.topic,
|
|
'host': self.host},
|
|
server=vol_utils.extract_host(self.host, 'host'))
|
|
self.backend_rpcserver = rpc.get_server(target, endpoints,
|
|
serializer)
|
|
self.backend_rpcserver.start()
|
|
|
|
if self.cluster:
|
|
LOG.info('Starting %(topic)s cluster %(cluster)s (version '
|
|
'%(version)s)',
|
|
{'topic': self.topic, 'version': version_string,
|
|
'cluster': self.cluster})
|
|
target = messaging.Target(
|
|
topic='%s.%s' % (self.topic, self.cluster),
|
|
server=vol_utils.extract_host(self.cluster, 'host'))
|
|
serializer = objects_base.CinderObjectSerializer(obj_version_cap)
|
|
self.cluster_rpcserver = rpc.get_server(target, endpoints,
|
|
serializer)
|
|
self.cluster_rpcserver.start()
|
|
|
|
self.manager.init_host_with_rpc()
|
|
|
|
if self.report_interval:
|
|
pulse = loopingcall.FixedIntervalLoopingCall(
|
|
self.report_state)
|
|
pulse.start(interval=self.report_interval,
|
|
initial_delay=self.report_interval)
|
|
self.timers.append(pulse)
|
|
|
|
if self.periodic_interval:
|
|
if self.periodic_fuzzy_delay:
|
|
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
|
|
else:
|
|
initial_delay = None
|
|
|
|
periodic = loopingcall.FixedIntervalLoopingCall(
|
|
self.periodic_tasks)
|
|
periodic.start(interval=self.periodic_interval,
|
|
initial_delay=initial_delay)
|
|
self.timers.append(periodic)
|
|
|
|
def basic_config_check(self):
|
|
"""Perform basic config checks before starting service."""
|
|
# Make sure report interval is less than service down time
|
|
if self.report_interval:
|
|
if CONF.service_down_time <= self.report_interval:
|
|
new_down_time = int(self.report_interval * 2.5)
|
|
LOG.warning(
|
|
"Report interval must be less than service down "
|
|
"time. Current config service_down_time: "
|
|
"%(service_down_time)s, report_interval for this: "
|
|
"service is: %(report_interval)s. Setting global "
|
|
"service_down_time to: %(new_down_time)s",
|
|
{'service_down_time': CONF.service_down_time,
|
|
'report_interval': self.report_interval,
|
|
'new_down_time': new_down_time})
|
|
CONF.set_override('service_down_time', new_down_time)
|
|
|
|
def _ensure_cluster_exists(self, context, service):
|
|
if self.cluster:
|
|
try:
|
|
cluster = objects.Cluster.get_by_id(context, None,
|
|
name=self.cluster,
|
|
binary=self.binary)
|
|
# If the cluster already exists, then the service replication
|
|
# fields must match those of the cluster unless the service
|
|
# is in error status.
|
|
error_states = (fields.ReplicationStatus.ERROR,
|
|
fields.ReplicationStatus.FAILOVER_ERROR)
|
|
if service.replication_status not in error_states:
|
|
for attr in ('replication_status', 'active_backend_id',
|
|
'frozen'):
|
|
if getattr(service, attr) != getattr(cluster, attr):
|
|
setattr(service, attr, getattr(cluster, attr))
|
|
|
|
except exception.ClusterNotFound:
|
|
# Since the cluster didn't exist, we copy replication fields
|
|
# from the service.
|
|
cluster = objects.Cluster(
|
|
context=context,
|
|
name=self.cluster,
|
|
binary=self.binary,
|
|
disabled=service.disabled,
|
|
replication_status=service.replication_status,
|
|
active_backend_id=service.active_backend_id,
|
|
frozen=service.frozen)
|
|
try:
|
|
cluster.create()
|
|
|
|
# Race condition occurred and another service created the
|
|
# cluster, so we can continue as it already exists.
|
|
except exception.ClusterExists:
|
|
pass
|
|
|
|
def _create_service_ref(self, context, rpc_version=None):
|
|
kwargs = {
|
|
'host': self.host,
|
|
'binary': self.binary,
|
|
'topic': self.topic,
|
|
'report_count': 0,
|
|
'availability_zone': self.availability_zone,
|
|
'rpc_current_version': rpc_version or self.manager.RPC_API_VERSION,
|
|
'object_current_version': objects_base.OBJ_VERSIONS.get_current(),
|
|
}
|
|
# If we are upgrading we have to ignore the cluster value
|
|
kwargs['cluster_name'] = self.cluster
|
|
service_ref = objects.Service(context=context, **kwargs)
|
|
service_ref.create()
|
|
Service.service_id = service_ref.id
|
|
self._ensure_cluster_exists(context, service_ref)
|
|
# If we have updated the service_ref with replication data from
|
|
# the cluster it will be saved.
|
|
service_ref.save()
|
|
|
|
def __getattr__(self, key):
|
|
manager = self.__dict__.get('manager', None)
|
|
return getattr(manager, key)
|
|
|
|
@classmethod
|
|
def create(cls, host=None, binary=None, topic=None, manager=None,
|
|
report_interval=None, periodic_interval=None,
|
|
periodic_fuzzy_delay=None, service_name=None,
|
|
coordination=False, cluster=None):
|
|
"""Instantiates class and passes back application object.
|
|
|
|
:param host: defaults to CONF.host
|
|
:param binary: defaults to basename of executable
|
|
:param topic: defaults to bin_name - 'cinder-' part
|
|
:param manager: defaults to CONF.<topic>_manager
|
|
:param report_interval: defaults to CONF.report_interval
|
|
:param periodic_interval: defaults to CONF.periodic_interval
|
|
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
|
|
:param cluster: Defaults to None, as only some services will have it
|
|
|
|
"""
|
|
if not host:
|
|
host = CONF.host
|
|
if not binary:
|
|
binary = os.path.basename(inspect.stack()[-1][1])
|
|
if not topic:
|
|
topic = binary
|
|
if not manager:
|
|
subtopic = topic.rpartition('cinder-')[2]
|
|
manager = CONF.get('%s_manager' % subtopic, None)
|
|
if report_interval is None:
|
|
report_interval = CONF.report_interval
|
|
if periodic_interval is None:
|
|
periodic_interval = CONF.periodic_interval
|
|
if periodic_fuzzy_delay is None:
|
|
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
|
|
service_obj = cls(host, binary, topic, manager,
|
|
report_interval=report_interval,
|
|
periodic_interval=periodic_interval,
|
|
periodic_fuzzy_delay=periodic_fuzzy_delay,
|
|
service_name=service_name,
|
|
coordination=coordination,
|
|
cluster=cluster)
|
|
|
|
return service_obj
|
|
|
|
def stop(self):
|
|
# Try to shut the connection down, but if we get any sort of
|
|
# errors, go ahead and ignore them.. as we're shutting down anyway
|
|
try:
|
|
self.rpcserver.stop()
|
|
if self.backend_rpcserver:
|
|
self.backend_rpcserver.stop()
|
|
if self.cluster_rpcserver:
|
|
self.cluster_rpcserver.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
self.timers_skip = []
|
|
for x in self.timers:
|
|
try:
|
|
x.stop()
|
|
except Exception:
|
|
self.timers_skip.append(x)
|
|
|
|
if self.coordination:
|
|
try:
|
|
coordination.COORDINATOR.stop()
|
|
except Exception:
|
|
pass
|
|
super(Service, self).stop(graceful=True)
|
|
|
|
def wait(self):
|
|
skip = getattr(self, 'timers_skip', [])
|
|
for x in self.timers:
|
|
if x not in skip:
|
|
try:
|
|
x.wait()
|
|
except Exception:
|
|
pass
|
|
if self.rpcserver:
|
|
self.rpcserver.wait()
|
|
if self.backend_rpcserver:
|
|
self.backend_rpcserver.wait()
|
|
if self.cluster_rpcserver:
|
|
self.cluster_rpcserver.wait()
|
|
super(Service, self).wait()
|
|
|
|
def periodic_tasks(self, raise_on_error=False):
|
|
"""Tasks to be run at a periodic interval."""
|
|
ctxt = context.get_admin_context()
|
|
self.manager.run_periodic_tasks(ctxt, raise_on_error=raise_on_error)
|
|
|
|
def report_state(self):
|
|
"""Update the state of this service in the datastore."""
|
|
if not self.manager.is_working():
|
|
# NOTE(dulek): If manager reports a problem we're not sending
|
|
# heartbeats - to indicate that service is actually down.
|
|
LOG.error('Manager for service %(binary)s %(host)s is '
|
|
'reporting problems, not sending heartbeat. '
|
|
'Service will appear "down".',
|
|
{'binary': self.binary,
|
|
'host': self.host})
|
|
return
|
|
|
|
ctxt = context.get_admin_context()
|
|
try:
|
|
try:
|
|
service_ref = objects.Service.get_by_id(ctxt,
|
|
Service.service_id)
|
|
except exception.NotFound:
|
|
LOG.debug('The service database object disappeared, '
|
|
'recreating it.')
|
|
self._create_service_ref(ctxt)
|
|
service_ref = objects.Service.get_by_id(ctxt,
|
|
Service.service_id)
|
|
|
|
service_ref.report_count += 1
|
|
if self.availability_zone != service_ref.availability_zone:
|
|
service_ref.availability_zone = self.availability_zone
|
|
|
|
service_ref.save()
|
|
|
|
# TODO(termie): make this pattern be more elegant.
|
|
if getattr(self, 'model_disconnected', False):
|
|
self.model_disconnected = False
|
|
LOG.error('Recovered model server connection!')
|
|
|
|
except db_exc.DBConnectionError:
|
|
if not getattr(self, 'model_disconnected', False):
|
|
self.model_disconnected = True
|
|
LOG.exception('model server went away')
|
|
|
|
# NOTE(jsbryant) Other DB errors can happen in HA configurations.
|
|
# such errors shouldn't kill this thread, so we handle them here.
|
|
except db_exc.DBError:
|
|
if not getattr(self, 'model_disconnected', False):
|
|
self.model_disconnected = True
|
|
LOG.exception('DBError encountered: ')
|
|
|
|
except Exception:
|
|
if not getattr(self, 'model_disconnected', False):
|
|
self.model_disconnected = True
|
|
LOG.exception('Exception encountered: ')
|
|
|
|
def reset(self):
|
|
self.manager.reset()
|
|
super(Service, self).reset()
|
|
|
|
|
|
class WSGIService(service.ServiceBase):
|
|
"""Provides ability to launch API from a 'paste' configuration."""
|
|
|
|
def __init__(self, name, loader=None):
|
|
"""Initialize, but do not start the WSGI server.
|
|
|
|
:param name: The name of the WSGI server given to the loader.
|
|
:param loader: Loads the WSGI application using the given name.
|
|
:returns: None
|
|
|
|
"""
|
|
self.name = name
|
|
self.manager = self._get_manager()
|
|
self.loader = loader or wsgi.Loader(CONF)
|
|
self.app = self.loader.load_app(name)
|
|
self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
|
|
self.port = getattr(CONF, '%s_listen_port' % name, 0)
|
|
self.use_ssl = getattr(CONF, '%s_use_ssl' % name, False)
|
|
self.workers = (getattr(CONF, '%s_workers' % name, None) or
|
|
processutils.get_worker_count())
|
|
if self.workers and self.workers < 1:
|
|
worker_name = '%s_workers' % name
|
|
msg = (_("%(worker_name)s value of %(workers)d is invalid, "
|
|
"must be greater than 0.") %
|
|
{'worker_name': worker_name,
|
|
'workers': self.workers})
|
|
raise exception.InvalidInput(msg)
|
|
setup_profiler(name, self.host)
|
|
|
|
self.server = wsgi.Server(CONF,
|
|
name,
|
|
self.app,
|
|
host=self.host,
|
|
port=self.port,
|
|
use_ssl=self.use_ssl)
|
|
|
|
def _get_manager(self):
|
|
"""Initialize a Manager object appropriate for this service.
|
|
|
|
Use the service name to look up a Manager subclass from the
|
|
configuration and initialize an instance. If no class name
|
|
is configured, just return None.
|
|
|
|
:returns: a Manager instance, or None.
|
|
|
|
"""
|
|
fl = '%s_manager' % self.name
|
|
if fl not in CONF:
|
|
return None
|
|
|
|
manager_class_name = CONF.get(fl, None)
|
|
if not manager_class_name:
|
|
return None
|
|
|
|
manager_class = importutils.import_class(manager_class_name)
|
|
return manager_class()
|
|
|
|
def start(self):
|
|
"""Start serving this service using loaded configuration.
|
|
|
|
Also, retrieve updated port number in case '0' was passed in, which
|
|
indicates a random port should be used.
|
|
|
|
:returns: None
|
|
|
|
"""
|
|
if self.manager:
|
|
self.manager.init_host()
|
|
self.server.start()
|
|
self.port = self.server.port
|
|
|
|
def stop(self):
|
|
"""Stop serving this API.
|
|
|
|
:returns: None
|
|
|
|
"""
|
|
self.server.stop()
|
|
|
|
def wait(self):
|
|
"""Wait for the service to stop serving this API.
|
|
|
|
:returns: None
|
|
|
|
"""
|
|
self.server.wait()
|
|
|
|
def reset(self):
|
|
"""Reset server greenpool size to default.
|
|
|
|
:returns: None
|
|
|
|
"""
|
|
self.server.reset()
|
|
|
|
|
|
def process_launcher():
|
|
return service.ProcessLauncher(CONF, restart_method='mutate')
|
|
|
|
|
|
# NOTE(vish): the global launcher is to maintain the existing
|
|
# functionality of calling service.serve +
|
|
# service.wait
|
|
_launcher = None
|
|
|
|
|
|
def serve(server, workers=None):
|
|
global _launcher
|
|
if _launcher:
|
|
raise RuntimeError(_('serve() can only be called once'))
|
|
|
|
_launcher = service.launch(CONF, server, workers=workers)
|
|
|
|
|
|
def wait():
|
|
CONF.log_opt_values(LOG, logging.DEBUG)
|
|
|
|
try:
|
|
_launcher.wait()
|
|
except KeyboardInterrupt:
|
|
_launcher.stop()
|
|
rpc.cleanup()
|
|
|
|
|
|
class Launcher(object):
|
|
def __init__(self):
|
|
self.launch_service = serve
|
|
self.wait = wait
|
|
|
|
|
|
def get_launcher():
|
|
# Note(lpetrut): ProcessLauncher uses green pipes which fail on Windows
|
|
# due to missing support of non-blocking I/O pipes. For this reason, the
|
|
# service must be spawned differently on Windows, using the ServiceLauncher
|
|
# class instead.
|
|
if os.name == 'nt':
|
|
return Launcher()
|
|
else:
|
|
return process_launcher()
|
|
|
|
|
|
class WindowsProcessLauncher(object):
|
|
def __init__(self):
|
|
self._processutils = os_win_utilsfactory.get_processutils()
|
|
|
|
self._workers = []
|
|
self._worker_job_handles = []
|
|
self._signal_handler = service.SignalHandler()
|
|
self._add_signal_handlers()
|
|
|
|
def add_process(self, cmd):
|
|
LOG.info("Starting subprocess: %s", cmd)
|
|
|
|
worker = subprocess.Popen(cmd)
|
|
try:
|
|
job_handle = self._processutils.kill_process_on_job_close(
|
|
worker.pid)
|
|
except Exception:
|
|
LOG.exception("Could not associate child process "
|
|
"with a job, killing it.")
|
|
worker.kill()
|
|
raise
|
|
|
|
self._worker_job_handles.append(job_handle)
|
|
self._workers.append(worker)
|
|
|
|
def _add_signal_handlers(self):
|
|
self._signal_handler.add_handler('SIGINT', self._terminate)
|
|
self._signal_handler.add_handler('SIGTERM', self._terminate)
|
|
|
|
def _terminate(self, *args):
|
|
# We've already assigned win32 job objects to child processes,
|
|
# requesting them to stop once all the job handles are closed.
|
|
# When this process dies, so will the child processes.
|
|
LOG.info("Received request to terminate.")
|
|
sys.exit(1)
|
|
|
|
def wait(self):
|
|
pids = [worker.pid for worker in self._workers]
|
|
if pids:
|
|
self._processutils.wait_for_multiple_processes(pids,
|
|
wait_all=True)
|
|
# By sleeping here, we allow signal handlers to be executed.
|
|
time.sleep(0)
|