Revert "Remove threading before process forking"
I think this needs a little more thought. This broke OVN at least and
I don't think that's good. We need to figure out how to be compatible
with existing plugins, even out of tree ones.
This reverts commit 1cafff0871
.
Change-Id: Ie087fb11213cc85911483c2d32c463fa9c973e54
This commit is contained in:
parent
1cafff0871
commit
b1cdba1696
@ -35,7 +35,6 @@ from neutron.db.availability_zone import network as network_az
|
|||||||
from neutron.db import model_base
|
from neutron.db import model_base
|
||||||
from neutron.extensions import agent as ext_agent
|
from neutron.extensions import agent as ext_agent
|
||||||
from neutron.extensions import dhcpagentscheduler
|
from neutron.extensions import dhcpagentscheduler
|
||||||
from neutron import worker as neutron_worker
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -83,38 +82,6 @@ class NetworkDhcpAgentBinding(model_base.BASEV2):
|
|||||||
primary_key=True)
|
primary_key=True)
|
||||||
|
|
||||||
|
|
||||||
class AgentStatusCheckWorker(neutron_worker.NeutronWorker):
|
|
||||||
|
|
||||||
def __init__(self, check_func, interval, initial_delay):
|
|
||||||
super(AgentStatusCheckWorker, self).__init__(worker_process_count=0)
|
|
||||||
|
|
||||||
self._check_func = check_func
|
|
||||||
self._loop = None
|
|
||||||
self._interval = interval
|
|
||||||
self._initial_delay = initial_delay
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
super(AgentStatusCheckWorker, self).start()
|
|
||||||
if self._loop is None:
|
|
||||||
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
|
|
||||||
self._loop.start(interval=self._interval,
|
|
||||||
initial_delay=self._initial_delay)
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
if self._loop is not None:
|
|
||||||
self._loop.wait()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self._loop is not None:
|
|
||||||
self._loop.stop()
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
if self._loop is not None:
|
|
||||||
self.stop()
|
|
||||||
self.wait()
|
|
||||||
self.start()
|
|
||||||
|
|
||||||
|
|
||||||
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
||||||
"""Common class for agent scheduler mixins."""
|
"""Common class for agent scheduler mixins."""
|
||||||
|
|
||||||
@ -154,16 +121,18 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
def add_agent_status_check(self, function):
|
def add_agent_status_check(self, function):
|
||||||
|
loop = loopingcall.FixedIntervalLoopingCall(function)
|
||||||
# TODO(enikanorov): make interval configurable rather than computed
|
# TODO(enikanorov): make interval configurable rather than computed
|
||||||
interval = max(cfg.CONF.agent_down_time // 2, 1)
|
interval = max(cfg.CONF.agent_down_time // 2, 1)
|
||||||
# add random initial delay to allow agents to check in after the
|
# add random initial delay to allow agents to check in after the
|
||||||
# neutron server first starts. random to offset multiple servers
|
# neutron server first starts. random to offset multiple servers
|
||||||
initial_delay = random.randint(interval, interval * 2)
|
initial_delay = random.randint(interval, interval * 2)
|
||||||
|
loop.start(interval=interval, initial_delay=initial_delay)
|
||||||
|
|
||||||
check_worker = AgentStatusCheckWorker(function, interval,
|
if hasattr(self, 'periodic_agent_loops'):
|
||||||
initial_delay)
|
self.periodic_agent_loops.append(loop)
|
||||||
|
else:
|
||||||
self.add_worker(check_worker)
|
self.periodic_agent_loops = [loop]
|
||||||
|
|
||||||
def agent_dead_limit_seconds(self):
|
def agent_dead_limit_seconds(self):
|
||||||
return cfg.CONF.agent_down_time * 2
|
return cfg.CONF.agent_down_time * 2
|
||||||
@ -198,13 +167,6 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
|||||||
network_scheduler = None
|
network_scheduler = None
|
||||||
|
|
||||||
def start_periodic_dhcp_agent_status_check(self):
|
def start_periodic_dhcp_agent_status_check(self):
|
||||||
LOG.warning(
|
|
||||||
_LW("DEPRECATED method 'start_periodic_dhcp_agent_status_check'. "
|
|
||||||
"Please use 'add_periodic_dhcp_agent_status_check' instead")
|
|
||||||
)
|
|
||||||
self.add_periodic_dhcp_agent_status_check()
|
|
||||||
|
|
||||||
def add_periodic_dhcp_agent_status_check(self):
|
|
||||||
if not cfg.CONF.allow_automatic_dhcp_failover:
|
if not cfg.CONF.allow_automatic_dhcp_failover:
|
||||||
LOG.info(_LI("Skipping periodic DHCP agent status check because "
|
LOG.info(_LI("Skipping periodic DHCP agent status check because "
|
||||||
"automatic network rescheduling is disabled."))
|
"automatic network rescheduling is disabled."))
|
||||||
|
@ -83,13 +83,6 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||||||
router_scheduler = None
|
router_scheduler = None
|
||||||
|
|
||||||
def start_periodic_l3_agent_status_check(self):
|
def start_periodic_l3_agent_status_check(self):
|
||||||
LOG.warning(
|
|
||||||
_LW("DEPRECATED method 'start_periodic_l3_agent_status_check'. "
|
|
||||||
"Please use 'add_periodic_l3_agent_status_check' instead")
|
|
||||||
)
|
|
||||||
self.add_periodic_l3_agent_status_check()
|
|
||||||
|
|
||||||
def add_periodic_l3_agent_status_check(self):
|
|
||||||
if not cfg.CONF.allow_automatic_l3agent_failover:
|
if not cfg.CONF.allow_automatic_l3agent_failover:
|
||||||
LOG.info(_LI("Skipping period L3 agent status check because "
|
LOG.info(_LI("Skipping period L3 agent status check because "
|
||||||
"automatic router rescheduling is disabled."))
|
"automatic router rescheduling is disabled."))
|
||||||
|
@ -24,11 +24,9 @@ import abc
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from neutron import worker as neutron_worker
|
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin):
|
class NeutronPluginBaseV2(object):
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def create_subnet(self, context, subnet):
|
def create_subnet(self, context, subnet):
|
||||||
@ -411,3 +409,12 @@ class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin):
|
|||||||
"""
|
"""
|
||||||
return (self.__class__.start_rpc_state_reports_listener !=
|
return (self.__class__.start_rpc_state_reports_listener !=
|
||||||
NeutronPluginBaseV2.start_rpc_state_reports_listener)
|
NeutronPluginBaseV2.start_rpc_state_reports_listener)
|
||||||
|
|
||||||
|
def get_workers(self):
|
||||||
|
"""Returns a collection NeutronWorker instances
|
||||||
|
|
||||||
|
If a plugin needs to define worker processes outside of API/RPC workers
|
||||||
|
then it will override this and return a collection of NeutronWorker
|
||||||
|
instances
|
||||||
|
"""
|
||||||
|
return ()
|
||||||
|
@ -162,7 +162,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
self._setup_dhcp()
|
self._setup_dhcp()
|
||||||
self._start_rpc_notifiers()
|
self._start_rpc_notifiers()
|
||||||
self.add_agent_status_check(self.agent_health_check)
|
self.add_agent_status_check(self.agent_health_check)
|
||||||
self.add_workers(self.mechanism_manager.get_workers())
|
|
||||||
self._verify_service_plugins_requirements()
|
self._verify_service_plugins_requirements()
|
||||||
LOG.info(_LI("Modular L2 Plugin initialization complete"))
|
LOG.info(_LI("Modular L2 Plugin initialization complete"))
|
||||||
|
|
||||||
@ -183,7 +182,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
self.network_scheduler = importutils.import_object(
|
self.network_scheduler = importutils.import_object(
|
||||||
cfg.CONF.network_scheduler_driver
|
cfg.CONF.network_scheduler_driver
|
||||||
)
|
)
|
||||||
self.add_periodic_dhcp_agent_status_check()
|
self.start_periodic_dhcp_agent_status_check()
|
||||||
|
|
||||||
def _verify_service_plugins_requirements(self):
|
def _verify_service_plugins_requirements(self):
|
||||||
for service_plugin in cfg.CONF.service_plugins:
|
for service_plugin in cfg.CONF.service_plugins:
|
||||||
@ -1625,6 +1624,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
return port.id
|
return port.id
|
||||||
return device
|
return device
|
||||||
|
|
||||||
|
def get_workers(self):
|
||||||
|
return self.mechanism_manager.get_workers()
|
||||||
|
|
||||||
def filter_hosts_with_network_access(
|
def filter_hosts_with_network_access(
|
||||||
self, context, network_id, candidate_hosts):
|
self, context, network_id, candidate_hosts):
|
||||||
segments = db.get_network_segments(context.session, network_id)
|
segments = db.get_network_segments(context.session, network_id)
|
||||||
|
@ -18,9 +18,8 @@
|
|||||||
# If ../neutron/__init__.py exists, add ../ to Python search path, so that
|
# If ../neutron/__init__.py exists, add ../ to Python search path, so that
|
||||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||||
|
|
||||||
from oslo_config import cfg
|
import eventlet
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_service import service as common_service
|
|
||||||
|
|
||||||
from neutron._i18n import _LI
|
from neutron._i18n import _LI
|
||||||
from neutron import service
|
from neutron import service
|
||||||
@ -29,14 +28,13 @@ LOG = log.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
def eventlet_rpc_server():
|
def eventlet_rpc_server():
|
||||||
|
pool = eventlet.GreenPool()
|
||||||
LOG.info(_LI("Eventlet based AMQP RPC server starting..."))
|
LOG.info(_LI("Eventlet based AMQP RPC server starting..."))
|
||||||
rpc_workers_launcher = common_service.ProcessLauncher(
|
|
||||||
cfg.CONF, wait_interval=1.0
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
service.start_rpc_workers(rpc_workers_launcher)
|
neutron_rpc = service.serve_rpc()
|
||||||
except NotImplementedError:
|
except NotImplementedError:
|
||||||
LOG.info(_LI("RPC was already started in parent process by "
|
LOG.info(_LI("RPC was already started in parent process by "
|
||||||
"plugin."))
|
"plugin."))
|
||||||
else:
|
else:
|
||||||
rpc_workers_launcher.wait()
|
pool.spawn(neutron_rpc.wait)
|
||||||
|
pool.waitall()
|
||||||
|
@ -12,10 +12,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_service import service as common_service
|
|
||||||
|
|
||||||
from neutron._i18n import _LI
|
from neutron._i18n import _LI
|
||||||
from neutron import service
|
from neutron import service
|
||||||
@ -29,24 +26,24 @@ def eventlet_wsgi_server():
|
|||||||
|
|
||||||
|
|
||||||
def start_api_and_rpc_workers(neutron_api):
|
def start_api_and_rpc_workers(neutron_api):
|
||||||
|
pool = eventlet.GreenPool()
|
||||||
|
|
||||||
|
api_thread = pool.spawn(neutron_api.wait)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
plugin_workers_launcher = common_service.ProcessLauncher(
|
neutron_rpc = service.serve_rpc()
|
||||||
cfg.CONF, wait_interval=1.0
|
|
||||||
)
|
|
||||||
service.start_rpc_workers(plugin_workers_launcher)
|
|
||||||
|
|
||||||
pool = eventlet.GreenPool()
|
|
||||||
api_thread = pool.spawn(neutron_api.wait)
|
|
||||||
plugin_workers_thread = pool.spawn(plugin_workers_launcher.wait)
|
|
||||||
|
|
||||||
# api and other workers should die together. When one dies,
|
|
||||||
# kill the other.
|
|
||||||
api_thread.link(lambda gt: plugin_workers_thread.kill())
|
|
||||||
plugin_workers_thread.link(lambda gt: api_thread.kill())
|
|
||||||
|
|
||||||
pool.waitall()
|
|
||||||
except NotImplementedError:
|
except NotImplementedError:
|
||||||
LOG.info(_LI("RPC was already started in parent process by "
|
LOG.info(_LI("RPC was already started in parent process by "
|
||||||
"plugin."))
|
"plugin."))
|
||||||
|
else:
|
||||||
|
rpc_thread = pool.spawn(neutron_rpc.wait)
|
||||||
|
|
||||||
neutron_api.wait()
|
plugin_workers = service.start_plugin_workers()
|
||||||
|
for worker in plugin_workers:
|
||||||
|
pool.spawn(worker.wait)
|
||||||
|
|
||||||
|
# api and rpc should die together. When one dies, kill the other.
|
||||||
|
rpc_thread.link(lambda gt: api_thread.kill())
|
||||||
|
api_thread.link(lambda gt: rpc_thread.kill())
|
||||||
|
|
||||||
|
pool.waitall()
|
||||||
|
@ -32,7 +32,7 @@ from neutron.common import rpc as n_rpc
|
|||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.db import api as session
|
from neutron.db import api as session
|
||||||
from neutron import manager
|
from neutron import manager
|
||||||
from neutron import worker as neutron_worker
|
from neutron import worker
|
||||||
from neutron import wsgi
|
from neutron import wsgi
|
||||||
|
|
||||||
|
|
||||||
@ -113,15 +113,25 @@ def serve_wsgi(cls):
|
|||||||
return service
|
return service
|
||||||
|
|
||||||
|
|
||||||
class RpcWorker(neutron_worker.NeutronWorker):
|
def start_plugin_workers():
|
||||||
|
launchers = []
|
||||||
|
# NOTE(twilson) get_service_plugins also returns the core plugin
|
||||||
|
for plugin in manager.NeutronManager.get_unique_service_plugins():
|
||||||
|
# TODO(twilson) Instead of defaulting here, come up with a good way to
|
||||||
|
# share a common get_workers default between NeutronPluginBaseV2 and
|
||||||
|
# ServicePluginBase
|
||||||
|
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
|
||||||
|
launcher = common_service.ProcessLauncher(cfg.CONF)
|
||||||
|
launcher.launch_service(plugin_worker)
|
||||||
|
launchers.append(launcher)
|
||||||
|
return launchers
|
||||||
|
|
||||||
|
|
||||||
|
class RpcWorker(worker.NeutronWorker):
|
||||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||||
start_listeners_method = 'start_rpc_listeners'
|
start_listeners_method = 'start_rpc_listeners'
|
||||||
|
|
||||||
def __init__(self, plugins, worker_process_count=1):
|
def __init__(self, plugins):
|
||||||
super(RpcWorker, self).__init__(
|
|
||||||
worker_process_count=worker_process_count
|
|
||||||
)
|
|
||||||
|
|
||||||
self._plugins = plugins
|
self._plugins = plugins
|
||||||
self._servers = []
|
self._servers = []
|
||||||
|
|
||||||
@ -168,7 +178,7 @@ class RpcReportsWorker(RpcWorker):
|
|||||||
start_listeners_method = 'start_rpc_state_reports_listener'
|
start_listeners_method = 'start_rpc_state_reports_listener'
|
||||||
|
|
||||||
|
|
||||||
def _get_rpc_workers():
|
def serve_rpc():
|
||||||
plugin = manager.NeutronManager.get_plugin()
|
plugin = manager.NeutronManager.get_plugin()
|
||||||
service_plugins = (
|
service_plugins = (
|
||||||
manager.NeutronManager.get_service_plugins().values())
|
manager.NeutronManager.get_service_plugins().values())
|
||||||
@ -188,101 +198,31 @@ def _get_rpc_workers():
|
|||||||
cfg.CONF.rpc_workers)
|
cfg.CONF.rpc_workers)
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
# passing service plugins only, because core plugin is among them
|
|
||||||
rpc_workers = [RpcWorker(service_plugins,
|
|
||||||
worker_process_count=cfg.CONF.rpc_workers)]
|
|
||||||
|
|
||||||
if (cfg.CONF.rpc_state_report_workers > 0 and
|
|
||||||
plugin.rpc_state_report_workers_supported()):
|
|
||||||
rpc_workers.append(
|
|
||||||
RpcReportsWorker(
|
|
||||||
[plugin],
|
|
||||||
worker_process_count=cfg.CONF.rpc_state_report_workers
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return rpc_workers
|
|
||||||
|
|
||||||
|
|
||||||
class AllServicesNeutronWorker(neutron_worker.NeutronWorker):
|
|
||||||
def __init__(self, services, worker_process_count=1):
|
|
||||||
super(AllServicesNeutronWorker, self).__init__(worker_process_count)
|
|
||||||
self._services = services
|
|
||||||
self._launcher = common_service.Launcher(cfg.CONF)
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
for srv in self._services:
|
|
||||||
self._launcher.launch_service(srv)
|
|
||||||
super(AllServicesNeutronWorker, self).start()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self._launcher.stop()
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
self._launcher.wait()
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
self._launcher.restart()
|
|
||||||
|
|
||||||
|
|
||||||
def _start_workers(worker_launcher, workers):
|
|
||||||
if not workers:
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
|
# passing service plugins only, because core plugin is among them
|
||||||
|
rpc = RpcWorker(service_plugins)
|
||||||
# dispose the whole pool before os.fork, otherwise there will
|
# dispose the whole pool before os.fork, otherwise there will
|
||||||
# be shared DB connections in child processes which may cause
|
# be shared DB connections in child processes which may cause
|
||||||
# DB errors.
|
# DB errors.
|
||||||
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
|
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
|
||||||
session.dispose()
|
session.dispose()
|
||||||
|
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
|
||||||
|
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
|
||||||
|
if (cfg.CONF.rpc_state_report_workers > 0 and
|
||||||
|
plugin.rpc_state_report_workers_supported()):
|
||||||
|
rpc_state_rep = RpcReportsWorker([plugin])
|
||||||
|
LOG.debug('using launcher for state reports rpc, workers=%s',
|
||||||
|
cfg.CONF.rpc_state_report_workers)
|
||||||
|
launcher.launch_service(
|
||||||
|
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
|
||||||
|
|
||||||
for worker in workers:
|
return launcher
|
||||||
worker_launcher.launch_service(worker, worker.worker_process_count)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.exception(_LE('Unrecoverable error: please check log for '
|
LOG.exception(_LE('Unrecoverable error: please check log for '
|
||||||
'details.'))
|
'details.'))
|
||||||
|
|
||||||
|
|
||||||
def start_plugin_workers(worker_launcher):
|
|
||||||
# NOTE(twilson) get_service_plugins also returns the core plugin
|
|
||||||
plugins = manager.NeutronManager.get_unique_service_plugins()
|
|
||||||
|
|
||||||
# TODO(twilson) Instead of defaulting here, come up with a good way to
|
|
||||||
# share a common get_workers default between NeutronPluginBaseV2 and
|
|
||||||
# ServicePluginBase
|
|
||||||
plugin_workers = [
|
|
||||||
plugin_worker
|
|
||||||
for plugin in plugins if hasattr(plugin, 'get_workers')
|
|
||||||
for plugin_worker in plugin.get_workers()
|
|
||||||
]
|
|
||||||
|
|
||||||
# we need to fork all necessary processes before spawning extra threads
|
|
||||||
# to avoid problems with infinite resource locking and other concurrency
|
|
||||||
# problems
|
|
||||||
process_plugin_workers = [
|
|
||||||
plugin_worker for plugin_worker in plugin_workers
|
|
||||||
if plugin_worker.worker_process_count > 0
|
|
||||||
]
|
|
||||||
|
|
||||||
thread_plugin_workers = [
|
|
||||||
plugin_worker for plugin_worker in plugin_workers
|
|
||||||
if plugin_worker.worker_process_count < 1
|
|
||||||
]
|
|
||||||
|
|
||||||
# add extra process worker and spawn there all workers with
|
|
||||||
# worker_process_count == 0
|
|
||||||
if thread_plugin_workers:
|
|
||||||
process_plugin_workers.append(
|
|
||||||
AllServicesNeutronWorker(thread_plugin_workers)
|
|
||||||
)
|
|
||||||
|
|
||||||
_start_workers(worker_launcher, process_plugin_workers)
|
|
||||||
|
|
||||||
|
|
||||||
def start_rpc_workers(worker_launcher):
|
|
||||||
rpc_workers = _get_rpc_workers()
|
|
||||||
_start_workers(worker_launcher, rpc_workers)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_api_workers():
|
def _get_api_workers():
|
||||||
workers = cfg.CONF.api_workers
|
workers = cfg.CONF.api_workers
|
||||||
if not workers:
|
if not workers:
|
||||||
|
@ -32,7 +32,6 @@ from neutron.db import l3_gwmode_db
|
|||||||
from neutron.db import l3_hamode_db
|
from neutron.db import l3_hamode_db
|
||||||
from neutron.plugins.common import constants
|
from neutron.plugins.common import constants
|
||||||
from neutron.quota import resource_registry
|
from neutron.quota import resource_registry
|
||||||
from neutron import service
|
|
||||||
from neutron.services import service_base
|
from neutron.services import service_base
|
||||||
|
|
||||||
|
|
||||||
@ -63,23 +62,20 @@ class L3RouterPlugin(service_base.ServicePluginBase,
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.router_scheduler = importutils.import_object(
|
self.router_scheduler = importutils.import_object(
|
||||||
cfg.CONF.router_scheduler_driver)
|
cfg.CONF.router_scheduler_driver)
|
||||||
self.add_periodic_l3_agent_status_check()
|
self.start_periodic_l3_agent_status_check()
|
||||||
super(L3RouterPlugin, self).__init__()
|
super(L3RouterPlugin, self).__init__()
|
||||||
if 'dvr' in self.supported_extension_aliases:
|
if 'dvr' in self.supported_extension_aliases:
|
||||||
l3_dvrscheduler_db.subscribe()
|
l3_dvrscheduler_db.subscribe()
|
||||||
l3_db.subscribe()
|
l3_db.subscribe()
|
||||||
self.agent_notifiers.update(
|
self.start_rpc_listeners()
|
||||||
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
|
||||||
|
|
||||||
rpc_worker = service.RpcWorker([self], worker_process_count=0)
|
|
||||||
|
|
||||||
self.add_worker(rpc_worker)
|
|
||||||
|
|
||||||
@log_helpers.log_method_call
|
@log_helpers.log_method_call
|
||||||
def start_rpc_listeners(self):
|
def start_rpc_listeners(self):
|
||||||
# RPC support
|
# RPC support
|
||||||
self.topic = topics.L3PLUGIN
|
self.topic = topics.L3PLUGIN
|
||||||
self.conn = n_rpc.create_connection()
|
self.conn = n_rpc.create_connection()
|
||||||
|
self.agent_notifiers.update(
|
||||||
|
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
||||||
self.endpoints = [l3_rpc.L3RpcCallback()]
|
self.endpoints = [l3_rpc.L3RpcCallback()]
|
||||||
self.conn.create_consumer(self.topic, self.endpoints,
|
self.conn.create_consumer(self.topic, self.endpoints,
|
||||||
fanout=False)
|
fanout=False)
|
||||||
|
@ -17,7 +17,6 @@ from neutron.common import rpc as n_rpc
|
|||||||
from neutron.common import topics
|
from neutron.common import topics
|
||||||
from neutron.db.metering import metering_db
|
from neutron.db.metering import metering_db
|
||||||
from neutron.db.metering import metering_rpc
|
from neutron.db.metering import metering_rpc
|
||||||
from neutron import service
|
|
||||||
|
|
||||||
|
|
||||||
class MeteringPlugin(metering_db.MeteringDbMixin):
|
class MeteringPlugin(metering_db.MeteringDbMixin):
|
||||||
@ -29,9 +28,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
|
|||||||
super(MeteringPlugin, self).__init__()
|
super(MeteringPlugin, self).__init__()
|
||||||
|
|
||||||
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
||||||
rpc_worker = service.RpcWorker([self], worker_process_count=0)
|
self.start_rpc_listeners()
|
||||||
|
|
||||||
self.add_worker(rpc_worker)
|
|
||||||
|
|
||||||
def start_rpc_listeners(self):
|
def start_rpc_listeners(self):
|
||||||
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
|
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
|
||||||
|
@ -24,14 +24,12 @@ from neutron._i18n import _, _LE, _LI
|
|||||||
from neutron.api import extensions
|
from neutron.api import extensions
|
||||||
from neutron.db import servicetype_db as sdb
|
from neutron.db import servicetype_db as sdb
|
||||||
from neutron.services import provider_configuration as pconf
|
from neutron.services import provider_configuration as pconf
|
||||||
from neutron import worker as neutron_worker
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class ServicePluginBase(extensions.PluginInterface,
|
class ServicePluginBase(extensions.PluginInterface):
|
||||||
neutron_worker.WorkerSupportServiceMixin):
|
|
||||||
"""Define base interface for any Advanced Service plugin."""
|
"""Define base interface for any Advanced Service plugin."""
|
||||||
supported_extension_aliases = []
|
supported_extension_aliases = []
|
||||||
|
|
||||||
@ -48,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface,
|
|||||||
"""Return string description of the plugin."""
|
"""Return string description of the plugin."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def get_workers(self):
|
||||||
|
"""Returns a collection of NeutronWorkers"""
|
||||||
|
return ()
|
||||||
|
|
||||||
|
|
||||||
def load_drivers(service_type, plugin):
|
def load_drivers(service_type, plugin):
|
||||||
"""Loads drivers for specific service.
|
"""Loads drivers for specific service.
|
||||||
|
@ -414,7 +414,7 @@ class PluginFixture(fixtures.Fixture):
|
|||||||
self.patched_default_svc_plugins = self.default_svc_plugins_p.start()
|
self.patched_default_svc_plugins = self.default_svc_plugins_p.start()
|
||||||
self.dhcp_periodic_p = mock.patch(
|
self.dhcp_periodic_p = mock.patch(
|
||||||
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
||||||
'add_periodic_dhcp_agent_status_check')
|
'start_periodic_dhcp_agent_status_check')
|
||||||
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
|
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
|
||||||
self.agent_health_check_p = mock.patch(
|
self.agent_health_check_p = mock.patch(
|
||||||
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
||||||
|
@ -22,13 +22,12 @@ import traceback
|
|||||||
import httplib2
|
import httplib2
|
||||||
import mock
|
import mock
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_service import service as common_service
|
|
||||||
import psutil
|
import psutil
|
||||||
|
|
||||||
from neutron.agent.linux import utils
|
from neutron.agent.linux import utils
|
||||||
from neutron import service
|
from neutron import service
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
from neutron import worker as neutron_worker
|
from neutron import worker
|
||||||
from neutron import wsgi
|
from neutron import wsgi
|
||||||
|
|
||||||
|
|
||||||
@ -245,9 +244,8 @@ class TestRPCServer(TestNeutronServer):
|
|||||||
# not interested in state report workers specifically
|
# not interested in state report workers specifically
|
||||||
CONF.set_override("rpc_state_report_workers", 0)
|
CONF.set_override("rpc_state_report_workers", 0)
|
||||||
|
|
||||||
rpc_workers_launcher = common_service.ProcessLauncher(CONF)
|
launcher = service.serve_rpc()
|
||||||
service.start_rpc_workers(rpc_workers_launcher)
|
launcher.wait()
|
||||||
rpc_workers_launcher.wait()
|
|
||||||
|
|
||||||
def test_restart_rpc_on_sighup_multiple_workers(self):
|
def test_restart_rpc_on_sighup_multiple_workers(self):
|
||||||
self._test_restart_service_on_sighup(service=self._serve_rpc,
|
self._test_restart_service_on_sighup(service=self._serve_rpc,
|
||||||
@ -266,12 +264,12 @@ class TestPluginWorker(TestNeutronServer):
|
|||||||
def _start_plugin(self, workers=1):
|
def _start_plugin(self, workers=1):
|
||||||
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
|
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
|
||||||
gp.return_value = self.plugin
|
gp.return_value = self.plugin
|
||||||
plugin_workers_launcher = common_service.ProcessLauncher(CONF)
|
launchers = service.start_plugin_workers()
|
||||||
service.start_plugin_workers(plugin_workers_launcher)
|
for launcher in launchers:
|
||||||
plugin_workers_launcher.wait()
|
launcher.wait()
|
||||||
|
|
||||||
def test_start(self):
|
def test_start(self):
|
||||||
class FakeWorker(neutron_worker.NeutronWorker):
|
class FakeWorker(worker.NeutronWorker):
|
||||||
def start(self):
|
def start(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -256,7 +256,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
|
|||||||
self.patched_l3_notify = self.l3_notify_p.start()
|
self.patched_l3_notify = self.l3_notify_p.start()
|
||||||
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
|
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
|
||||||
'L3AgentSchedulerDbMixin.'
|
'L3AgentSchedulerDbMixin.'
|
||||||
'add_periodic_l3_agent_status_check')
|
'start_periodic_l3_agent_status_check')
|
||||||
self.patched_l3_periodic = self.l3_periodic_p.start()
|
self.patched_l3_periodic = self.l3_periodic_p.start()
|
||||||
self.dhcp_notify_p = mock.patch(
|
self.dhcp_notify_p = mock.patch(
|
||||||
'neutron.extensions.dhcpagentscheduler.notify')
|
'neutron.extensions.dhcpagentscheduler.notify')
|
||||||
|
@ -17,38 +17,6 @@ from neutron.callbacks import registry
|
|||||||
from neutron.callbacks import resources
|
from neutron.callbacks import resources
|
||||||
|
|
||||||
|
|
||||||
class WorkerSupportServiceMixin(object):
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _workers(self):
|
|
||||||
try:
|
|
||||||
return self.__workers
|
|
||||||
except AttributeError:
|
|
||||||
self.__workers = []
|
|
||||||
return self.__workers
|
|
||||||
|
|
||||||
def get_workers(self):
|
|
||||||
"""Returns a collection NeutronWorker instances needed by this service
|
|
||||||
"""
|
|
||||||
return list(self._workers)
|
|
||||||
|
|
||||||
def add_worker(self, worker):
|
|
||||||
"""Adds NeutronWorker needed for this service
|
|
||||||
|
|
||||||
If a object needs to define workers thread/processes outside of API/RPC
|
|
||||||
workers then it will call this method to register worker. Should be
|
|
||||||
called on initialization stage before running services
|
|
||||||
"""
|
|
||||||
self._workers.append(worker)
|
|
||||||
|
|
||||||
def add_workers(self, workers):
|
|
||||||
"""Adds NeutronWorker list needed for this service
|
|
||||||
|
|
||||||
The same as add_worker but adds a list of workers
|
|
||||||
"""
|
|
||||||
self._workers.extend(workers)
|
|
||||||
|
|
||||||
|
|
||||||
class NeutronWorker(service.ServiceBase):
|
class NeutronWorker(service.ServiceBase):
|
||||||
"""Partial implementation of the ServiceBase ABC
|
"""Partial implementation of the ServiceBase ABC
|
||||||
|
|
||||||
@ -68,25 +36,5 @@ class NeutronWorker(service.ServiceBase):
|
|||||||
super(MyPluginWorker, self).start()
|
super(MyPluginWorker, self).start()
|
||||||
do_sync()
|
do_sync()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# default class value for case when super().__init__ is not called
|
|
||||||
_worker_process_count = 1
|
|
||||||
|
|
||||||
def __init__(self, worker_process_count=_worker_process_count):
|
|
||||||
"""
|
|
||||||
Initialize worker
|
|
||||||
|
|
||||||
:param worker_process_count: Defines how many processes to spawn for
|
|
||||||
worker:
|
|
||||||
0 - spawn 1 new worker thread,
|
|
||||||
1..N - spawn N new worker processes
|
|
||||||
"""
|
|
||||||
self._worker_process_count = worker_process_count
|
|
||||||
|
|
||||||
@property
|
|
||||||
def worker_process_count(self):
|
|
||||||
return self._worker_process_count
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if self.worker_process_count > 0:
|
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
|
||||||
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
|
|
||||||
|
@ -44,7 +44,7 @@ from neutron.common import config
|
|||||||
from neutron.common import exceptions as n_exc
|
from neutron.common import exceptions as n_exc
|
||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.db import api
|
from neutron.db import api
|
||||||
from neutron import worker as neutron_worker
|
from neutron import worker
|
||||||
|
|
||||||
socket_opts = [
|
socket_opts = [
|
||||||
cfg.IntOpt('backlog',
|
cfg.IntOpt('backlog',
|
||||||
@ -74,12 +74,9 @@ def encode_body(body):
|
|||||||
return encodeutils.to_utf8(body)
|
return encodeutils.to_utf8(body)
|
||||||
|
|
||||||
|
|
||||||
class WorkerService(neutron_worker.NeutronWorker):
|
class WorkerService(worker.NeutronWorker):
|
||||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||||
def __init__(self, service, application, disable_ssl=False,
|
def __init__(self, service, application, disable_ssl=False):
|
||||||
worker_process_count=0):
|
|
||||||
super(WorkerService, self).__init__(worker_process_count)
|
|
||||||
|
|
||||||
self._service = service
|
self._service = service
|
||||||
self._application = application
|
self._application = application
|
||||||
self._disable_ssl = disable_ssl
|
self._disable_ssl = disable_ssl
|
||||||
@ -191,7 +188,7 @@ class Server(object):
|
|||||||
self._launch(application, workers)
|
self._launch(application, workers)
|
||||||
|
|
||||||
def _launch(self, application, workers=0):
|
def _launch(self, application, workers=0):
|
||||||
service = WorkerService(self, application, self.disable_ssl, workers)
|
service = WorkerService(self, application, self.disable_ssl)
|
||||||
if workers < 1:
|
if workers < 1:
|
||||||
# The API service should run in the current process.
|
# The API service should run in the current process.
|
||||||
self._server = service
|
self._server = service
|
||||||
@ -209,8 +206,7 @@ class Server(object):
|
|||||||
# wait interval past the default of 0.01s.
|
# wait interval past the default of 0.01s.
|
||||||
self._server = common_service.ProcessLauncher(cfg.CONF,
|
self._server = common_service.ProcessLauncher(cfg.CONF,
|
||||||
wait_interval=1.0)
|
wait_interval=1.0)
|
||||||
self._server.launch_service(service,
|
self._server.launch_service(service, workers=workers)
|
||||||
workers=service.worker_process_count)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def host(self):
|
def host(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user