Merge "Change process name of neutron-server to match worker role"
This commit is contained in:
commit
683b81e089
@ -110,6 +110,13 @@ core_opts = [
|
|||||||
cfg.IntOpt('send_events_interval', default=2,
|
cfg.IntOpt('send_events_interval', default=2,
|
||||||
help=_('Number of seconds between sending events to nova if '
|
help=_('Number of seconds between sending events to nova if '
|
||||||
'there are any events to send.')),
|
'there are any events to send.')),
|
||||||
|
cfg.StrOpt('setproctitle', default='on',
|
||||||
|
help=_("Set process name to match child worker role. "
|
||||||
|
"Available options are: 'off' - retains the previous "
|
||||||
|
"behavior; 'on' - renames processes to "
|
||||||
|
"'neutron-server: role (original string)'; "
|
||||||
|
"'brief' - renames the same as 'on', but without the "
|
||||||
|
"original string, such as 'neutron-server: role'.")),
|
||||||
cfg.StrOpt('ipam_driver', default='internal',
|
cfg.StrOpt('ipam_driver', default='internal',
|
||||||
help=_("Neutron IPAM (IP address management) driver to use. "
|
help=_("Neutron IPAM (IP address management) driver to use. "
|
||||||
"By default, the reference implementation of the "
|
"By default, the reference implementation of the "
|
||||||
|
@ -24,7 +24,6 @@ from neutron_lib import context
|
|||||||
from neutron_lib.db import api as session
|
from neutron_lib.db import api as session
|
||||||
from neutron_lib.plugins import directory
|
from neutron_lib.plugins import directory
|
||||||
from neutron_lib import rpc as n_rpc
|
from neutron_lib import rpc as n_rpc
|
||||||
from neutron_lib import worker as neutron_worker
|
|
||||||
from oslo_concurrency import processutils
|
from oslo_concurrency import processutils
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
@ -38,6 +37,7 @@ import psutil
|
|||||||
from neutron.common import config
|
from neutron.common import config
|
||||||
from neutron.common import profiler
|
from neutron.common import profiler
|
||||||
from neutron.conf import service
|
from neutron.conf import service
|
||||||
|
from neutron import worker as neutron_worker
|
||||||
from neutron import wsgi
|
from neutron import wsgi
|
||||||
|
|
||||||
|
|
||||||
@ -95,7 +95,7 @@ def serve_wsgi(cls):
|
|||||||
return service
|
return service
|
||||||
|
|
||||||
|
|
||||||
class RpcWorker(neutron_worker.BaseWorker):
|
class RpcWorker(neutron_worker.NeutronBaseWorker):
|
||||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||||
start_listeners_method = 'start_rpc_listeners'
|
start_listeners_method = 'start_rpc_listeners'
|
||||||
|
|
||||||
@ -108,7 +108,7 @@ class RpcWorker(neutron_worker.BaseWorker):
|
|||||||
self._servers = []
|
self._servers = []
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
super(RpcWorker, self).start()
|
super(RpcWorker, self).start(desc="rpc worker")
|
||||||
for plugin in self._plugins:
|
for plugin in self._plugins:
|
||||||
if hasattr(plugin, self.start_listeners_method):
|
if hasattr(plugin, self.start_listeners_method):
|
||||||
try:
|
try:
|
||||||
@ -221,7 +221,7 @@ def _get_plugins_workers():
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class AllServicesNeutronWorker(neutron_worker.BaseWorker):
|
class AllServicesNeutronWorker(neutron_worker.NeutronBaseWorker):
|
||||||
def __init__(self, services, worker_process_count=1):
|
def __init__(self, services, worker_process_count=1):
|
||||||
super(AllServicesNeutronWorker, self).__init__(worker_process_count)
|
super(AllServicesNeutronWorker, self).__init__(worker_process_count)
|
||||||
self._services = services
|
self._services = services
|
||||||
@ -231,7 +231,7 @@ class AllServicesNeutronWorker(neutron_worker.BaseWorker):
|
|||||||
def start(self):
|
def start(self):
|
||||||
for srv in self._services:
|
for srv in self._services:
|
||||||
self._launcher.launch_service(srv)
|
self._launcher.launch_service(srv)
|
||||||
super(AllServicesNeutronWorker, self).start()
|
super(AllServicesNeutronWorker, self).start(desc="services worker")
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._launcher.stop()
|
self._launcher.stop()
|
||||||
@ -323,7 +323,7 @@ def _run_wsgi(app_name):
|
|||||||
def run_wsgi_app(app):
|
def run_wsgi_app(app):
|
||||||
server = wsgi.Server("Neutron")
|
server = wsgi.Server("Neutron")
|
||||||
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
|
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
|
||||||
workers=_get_api_workers())
|
workers=_get_api_workers(), desc="api worker")
|
||||||
LOG.info("Neutron service started, listening on %(host)s:%(port)s",
|
LOG.info("Neutron service started, listening on %(host)s:%(port)s",
|
||||||
{'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
|
{'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
|
||||||
return server
|
return server
|
||||||
|
@ -80,7 +80,8 @@ class TestRunWsgiApp(base.BaseTestCase):
|
|||||||
service.run_wsgi_app(mock.sentinel.app)
|
service.run_wsgi_app(mock.sentinel.app)
|
||||||
start_call = mock_server.return_value.start.call_args
|
start_call = mock_server.return_value.start.call_args
|
||||||
expected_call = mock.call(
|
expected_call = mock.call(
|
||||||
mock.ANY, mock.ANY, mock.ANY, workers=expected_passed_value)
|
mock.ANY, mock.ANY, mock.ANY, desc='api worker',
|
||||||
|
workers=expected_passed_value)
|
||||||
self.assertEqual(expected_call, start_call)
|
self.assertEqual(expected_call, start_call)
|
||||||
|
|
||||||
def test_api_workers_zero(self):
|
def test_api_workers_zero(self):
|
||||||
|
@ -74,7 +74,7 @@ class TestWorkerService(TestServiceBase):
|
|||||||
_service.pool.spawn.return_value = None
|
_service.pool.spawn.return_value = None
|
||||||
|
|
||||||
_app = mock.Mock()
|
_app = mock.Mock()
|
||||||
workerservice = wsgi.WorkerService(_service, _app)
|
workerservice = wsgi.WorkerService(_service, _app, "on")
|
||||||
workerservice.start()
|
workerservice.start()
|
||||||
self.assertFalse(apimock.called)
|
self.assertFalse(apimock.called)
|
||||||
|
|
||||||
@ -82,7 +82,7 @@ class TestWorkerService(TestServiceBase):
|
|||||||
_service = mock.Mock()
|
_service = mock.Mock()
|
||||||
_app = mock.Mock()
|
_app = mock.Mock()
|
||||||
|
|
||||||
worker_service = wsgi.WorkerService(_service, _app)
|
worker_service = wsgi.WorkerService(_service, _app, "on")
|
||||||
self._test_reset(worker_service)
|
self._test_reset(worker_service)
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,10 +11,24 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from neutron_lib import worker
|
from neutron_lib import worker
|
||||||
|
from oslo_config import cfg
|
||||||
from oslo_service import loopingcall
|
from oslo_service import loopingcall
|
||||||
|
|
||||||
|
|
||||||
class PeriodicWorker(worker.BaseWorker):
|
class NeutronBaseWorker(worker.BaseWorker):
|
||||||
|
|
||||||
|
def __init__(self, worker_process_count=1, set_proctitle=None):
|
||||||
|
set_proctitle = set_proctitle or cfg.CONF.setproctitle
|
||||||
|
super(NeutronBaseWorker, self).__init__(
|
||||||
|
worker_process_count=worker_process_count,
|
||||||
|
set_proctitle=set_proctitle
|
||||||
|
)
|
||||||
|
|
||||||
|
def start(self, name="neutron-server", desc=None):
|
||||||
|
super(NeutronBaseWorker, self).start(name=name, desc=desc)
|
||||||
|
|
||||||
|
|
||||||
|
class PeriodicWorker(NeutronBaseWorker):
|
||||||
"""A worker that runs a function at a fixed interval."""
|
"""A worker that runs a function at a fixed interval."""
|
||||||
|
|
||||||
def __init__(self, check_func, interval, initial_delay):
|
def __init__(self, check_func, interval, initial_delay):
|
||||||
@ -26,7 +40,7 @@ class PeriodicWorker(worker.BaseWorker):
|
|||||||
self._initial_delay = initial_delay
|
self._initial_delay = initial_delay
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
super(PeriodicWorker, self).start()
|
super(PeriodicWorker, self).start(desc="periodic worker")
|
||||||
if self._loop is None:
|
if self._loop is None:
|
||||||
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
|
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
|
||||||
self._loop.start(interval=self._interval,
|
self._loop.start(interval=self._interval,
|
||||||
|
@ -25,7 +25,6 @@ import eventlet.wsgi
|
|||||||
from neutron_lib import context
|
from neutron_lib import context
|
||||||
from neutron_lib.db import api as db_api
|
from neutron_lib.db import api as db_api
|
||||||
from neutron_lib import exceptions as exception
|
from neutron_lib import exceptions as exception
|
||||||
from neutron_lib import worker as neutron_worker
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
import oslo_i18n
|
import oslo_i18n
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
@ -43,6 +42,7 @@ import webob.exc
|
|||||||
from neutron._i18n import _
|
from neutron._i18n import _
|
||||||
from neutron.common import config
|
from neutron.common import config
|
||||||
from neutron.conf import wsgi as wsgi_config
|
from neutron.conf import wsgi as wsgi_config
|
||||||
|
from neutron import worker as neutron_worker
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
wsgi_config.register_socket_opts()
|
wsgi_config.register_socket_opts()
|
||||||
@ -58,19 +58,20 @@ def encode_body(body):
|
|||||||
return encodeutils.to_utf8(body)
|
return encodeutils.to_utf8(body)
|
||||||
|
|
||||||
|
|
||||||
class WorkerService(neutron_worker.BaseWorker):
|
class WorkerService(neutron_worker.NeutronBaseWorker):
|
||||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||||
def __init__(self, service, application, disable_ssl=False,
|
def __init__(self, service, application, set_proctitle, disable_ssl=False,
|
||||||
worker_process_count=0):
|
worker_process_count=0):
|
||||||
super(WorkerService, self).__init__(worker_process_count)
|
super(WorkerService, self).__init__(worker_process_count,
|
||||||
|
set_proctitle)
|
||||||
|
|
||||||
self._service = service
|
self._service = service
|
||||||
self._application = application
|
self._application = application
|
||||||
self._disable_ssl = disable_ssl
|
self._disable_ssl = disable_ssl
|
||||||
self._server = None
|
self._server = None
|
||||||
|
|
||||||
def start(self):
|
def start(self, desc=None):
|
||||||
super(WorkerService, self).start()
|
super(WorkerService, self).start(desc=desc)
|
||||||
# When api worker is stopped it kills the eventlet wsgi server which
|
# When api worker is stopped it kills the eventlet wsgi server which
|
||||||
# internally closes the wsgi server socket object. This server socket
|
# internally closes the wsgi server socket object. This server socket
|
||||||
# object becomes not usable which leads to "Bad file descriptor"
|
# object becomes not usable which leads to "Bad file descriptor"
|
||||||
@ -162,7 +163,7 @@ class Server(object):
|
|||||||
|
|
||||||
return sock
|
return sock
|
||||||
|
|
||||||
def start(self, application, port, host='0.0.0.0', workers=0):
|
def start(self, application, port, host='0.0.0.0', workers=0, desc=None):
|
||||||
"""Run a WSGI server with the given application."""
|
"""Run a WSGI server with the given application."""
|
||||||
self._host = host
|
self._host = host
|
||||||
self._port = port
|
self._port = port
|
||||||
@ -174,14 +175,16 @@ class Server(object):
|
|||||||
|
|
||||||
self._launch(application, workers)
|
self._launch(application, workers)
|
||||||
|
|
||||||
def _launch(self, application, workers=0):
|
def _launch(self, application, workers=0, desc=None):
|
||||||
service = WorkerService(self, application, self.disable_ssl, workers)
|
set_proctitle = "off" if desc is None else CONF.setproctitle
|
||||||
|
service = WorkerService(self, application, set_proctitle,
|
||||||
|
self.disable_ssl, workers)
|
||||||
if workers < 1:
|
if workers < 1:
|
||||||
# The API service should run in the current process.
|
# The API service should run in the current process.
|
||||||
self._server = service
|
self._server = service
|
||||||
# Dump the initial option values
|
# Dump the initial option values
|
||||||
cfg.CONF.log_opt_values(LOG, logging.DEBUG)
|
cfg.CONF.log_opt_values(LOG, logging.DEBUG)
|
||||||
service.start()
|
service.start(desc=desc)
|
||||||
systemd.notify_once()
|
systemd.notify_once()
|
||||||
else:
|
else:
|
||||||
# dispose the whole pool before os.fork, otherwise there will
|
# dispose the whole pool before os.fork, otherwise there will
|
||||||
|
@ -0,0 +1,20 @@
|
|||||||
|
features:
|
||||||
|
- Neutron child processes now set their process titles
|
||||||
|
to match their roles ('api worker', 'rpc worker',
|
||||||
|
'periodic worker', 'services worker', or any other defined
|
||||||
|
by workers from out-of-tree plugins.) This behavior can be
|
||||||
|
disabled by setting the ``setproctitle`` config option in the
|
||||||
|
``[default]`` section in neutron.conf to ``off``. The original
|
||||||
|
process string is also appended to the end, to help with
|
||||||
|
scripting that is looking for the old strings. There is also an
|
||||||
|
option called ``brief``, which results in much shorter and easier
|
||||||
|
to read process names. The default setting for this
|
||||||
|
option is ``on``, for a combination of backwards compatibility
|
||||||
|
and identifying different processes easily. The recommended
|
||||||
|
setting is ``brief``, once the deployer has verified that none
|
||||||
|
of their tooling depends on the older strings.
|
||||||
|
upgrade:
|
||||||
|
- The change to the process title happens by default with the new
|
||||||
|
``setproctitle`` config option. The old string is still part of
|
||||||
|
the new process title, but any scripts looking for exact string
|
||||||
|
matches of the old string may need to be modified.
|
Loading…
Reference in New Issue
Block a user