Revert "Revert "Remove threading before process forking""
This reverts commit b1cdba1696
Original patch was reverted because it broke neutron plugin's
backward compatibility and needed more work.
This patch fixes that problems:
1) original behaviour of add_agent_status_check,
start_periodic_l3_agent_status_check and
start_periodic_dhcp_agent_status_check methods is deprecated but kept
for using in third part plugins for backward compatibility
2) new add_agent_status_check_worker, add_periodic_l3_agent_status_check
and add_periodic_dhcp_agent_status_check method are implemented
instead and are used for implementing plugins in neutron codebase
Closes-Bug: #1569404
Change-Id: I3a32a95489831f0d862930384309eefdc881d8f6
This commit is contained in:
parent
2e1c671bb3
commit
483c5982c0
@ -17,6 +17,7 @@ import datetime
|
|||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import debtcollector
|
||||||
from neutron_lib import constants
|
from neutron_lib import constants
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
@ -35,6 +36,7 @@ from neutron.db.availability_zone import network as network_az
|
|||||||
from neutron.db import model_base
|
from neutron.db import model_base
|
||||||
from neutron.extensions import agent as ext_agent
|
from neutron.extensions import agent as ext_agent
|
||||||
from neutron.extensions import dhcpagentscheduler
|
from neutron.extensions import dhcpagentscheduler
|
||||||
|
from neutron import worker as neutron_worker
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -82,6 +84,38 @@ class NetworkDhcpAgentBinding(model_base.BASEV2):
|
|||||||
primary_key=True)
|
primary_key=True)
|
||||||
|
|
||||||
|
|
||||||
|
class AgentStatusCheckWorker(neutron_worker.NeutronWorker):
|
||||||
|
|
||||||
|
def __init__(self, check_func, interval, initial_delay):
|
||||||
|
super(AgentStatusCheckWorker, self).__init__(worker_process_count=0)
|
||||||
|
|
||||||
|
self._check_func = check_func
|
||||||
|
self._loop = None
|
||||||
|
self._interval = interval
|
||||||
|
self._initial_delay = initial_delay
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
super(AgentStatusCheckWorker, self).start()
|
||||||
|
if self._loop is None:
|
||||||
|
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
|
||||||
|
self._loop.start(interval=self._interval,
|
||||||
|
initial_delay=self._initial_delay)
|
||||||
|
|
||||||
|
def wait(self):
|
||||||
|
if self._loop is not None:
|
||||||
|
self._loop.wait()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self._loop is not None:
|
||||||
|
self._loop.stop()
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
if self._loop is not None:
|
||||||
|
self.stop()
|
||||||
|
self.wait()
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
|
||||||
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
||||||
"""Common class for agent scheduler mixins."""
|
"""Common class for agent scheduler mixins."""
|
||||||
|
|
||||||
@ -120,6 +154,22 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
|||||||
original_agent['host'])
|
original_agent['host'])
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def add_agent_status_check_worker(self, function):
|
||||||
|
# TODO(enikanorov): make interval configurable rather than computed
|
||||||
|
interval = max(cfg.CONF.agent_down_time // 2, 1)
|
||||||
|
# add random initial delay to allow agents to check in after the
|
||||||
|
# neutron server first starts. random to offset multiple servers
|
||||||
|
initial_delay = random.randint(interval, interval * 2)
|
||||||
|
|
||||||
|
check_worker = AgentStatusCheckWorker(function, interval,
|
||||||
|
initial_delay)
|
||||||
|
|
||||||
|
self.add_worker(check_worker)
|
||||||
|
|
||||||
|
@debtcollector.removals.remove(
|
||||||
|
message="This will be removed in the N cycle. "
|
||||||
|
"Please use 'add_agent_status_check_worker' instead."
|
||||||
|
)
|
||||||
def add_agent_status_check(self, function):
|
def add_agent_status_check(self, function):
|
||||||
loop = loopingcall.FixedIntervalLoopingCall(function)
|
loop = loopingcall.FixedIntervalLoopingCall(function)
|
||||||
# TODO(enikanorov): make interval configurable rather than computed
|
# TODO(enikanorov): make interval configurable rather than computed
|
||||||
@ -166,6 +216,10 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
|||||||
|
|
||||||
network_scheduler = None
|
network_scheduler = None
|
||||||
|
|
||||||
|
@debtcollector.removals.remove(
|
||||||
|
message="This will be removed in the N cycle. "
|
||||||
|
"Please use 'add_periodic_dhcp_agent_status_check' instead."
|
||||||
|
)
|
||||||
def start_periodic_dhcp_agent_status_check(self):
|
def start_periodic_dhcp_agent_status_check(self):
|
||||||
if not cfg.CONF.allow_automatic_dhcp_failover:
|
if not cfg.CONF.allow_automatic_dhcp_failover:
|
||||||
LOG.info(_LI("Skipping periodic DHCP agent status check because "
|
LOG.info(_LI("Skipping periodic DHCP agent status check because "
|
||||||
@ -174,6 +228,16 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
|||||||
|
|
||||||
self.add_agent_status_check(self.remove_networks_from_down_agents)
|
self.add_agent_status_check(self.remove_networks_from_down_agents)
|
||||||
|
|
||||||
|
def add_periodic_dhcp_agent_status_check(self):
|
||||||
|
if not cfg.CONF.allow_automatic_dhcp_failover:
|
||||||
|
LOG.info(_LI("Skipping periodic DHCP agent status check because "
|
||||||
|
"automatic network rescheduling is disabled."))
|
||||||
|
return
|
||||||
|
|
||||||
|
self.add_agent_status_check_worker(
|
||||||
|
self.remove_networks_from_down_agents
|
||||||
|
)
|
||||||
|
|
||||||
def is_eligible_agent(self, context, active, agent):
|
def is_eligible_agent(self, context, active, agent):
|
||||||
# eligible agent is active or starting up
|
# eligible agent is active or starting up
|
||||||
return (AgentSchedulerDbMixin.is_eligible_agent(active, agent) or
|
return (AgentSchedulerDbMixin.is_eligible_agent(active, agent) or
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import debtcollector
|
||||||
from neutron_lib import constants
|
from neutron_lib import constants
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
@ -78,6 +79,10 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||||||
|
|
||||||
router_scheduler = None
|
router_scheduler = None
|
||||||
|
|
||||||
|
@debtcollector.removals.remove(
|
||||||
|
message="This will be removed in the N cycle. "
|
||||||
|
"Please use 'add_periodic_l3_agent_status_check' instead."
|
||||||
|
)
|
||||||
def start_periodic_l3_agent_status_check(self):
|
def start_periodic_l3_agent_status_check(self):
|
||||||
if not cfg.CONF.allow_automatic_l3agent_failover:
|
if not cfg.CONF.allow_automatic_l3agent_failover:
|
||||||
LOG.info(_LI("Skipping period L3 agent status check because "
|
LOG.info(_LI("Skipping period L3 agent status check because "
|
||||||
@ -87,6 +92,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||||||
self.add_agent_status_check(
|
self.add_agent_status_check(
|
||||||
self.reschedule_routers_from_down_agents)
|
self.reschedule_routers_from_down_agents)
|
||||||
|
|
||||||
|
def add_periodic_l3_agent_status_check(self):
|
||||||
|
if not cfg.CONF.allow_automatic_l3agent_failover:
|
||||||
|
LOG.info(_LI("Skipping period L3 agent status check because "
|
||||||
|
"automatic router rescheduling is disabled."))
|
||||||
|
return
|
||||||
|
|
||||||
|
self.add_agent_status_check_worker(
|
||||||
|
self.reschedule_routers_from_down_agents)
|
||||||
|
|
||||||
def reschedule_routers_from_down_agents(self):
|
def reschedule_routers_from_down_agents(self):
|
||||||
"""Reschedule routers from down l3 agents if admin state is up."""
|
"""Reschedule routers from down l3 agents if admin state is up."""
|
||||||
agent_dead_limit = self.agent_dead_limit_seconds()
|
agent_dead_limit = self.agent_dead_limit_seconds()
|
||||||
|
@ -24,9 +24,11 @@ import abc
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from neutron import worker as neutron_worker
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class NeutronPluginBaseV2(object):
|
class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin):
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def create_subnet(self, context, subnet):
|
def create_subnet(self, context, subnet):
|
||||||
@ -409,12 +411,3 @@ class NeutronPluginBaseV2(object):
|
|||||||
"""
|
"""
|
||||||
return (self.__class__.start_rpc_state_reports_listener !=
|
return (self.__class__.start_rpc_state_reports_listener !=
|
||||||
NeutronPluginBaseV2.start_rpc_state_reports_listener)
|
NeutronPluginBaseV2.start_rpc_state_reports_listener)
|
||||||
|
|
||||||
def get_workers(self):
|
|
||||||
"""Returns a collection NeutronWorker instances
|
|
||||||
|
|
||||||
If a plugin needs to define worker processes outside of API/RPC workers
|
|
||||||
then it will override this and return a collection of NeutronWorker
|
|
||||||
instances
|
|
||||||
"""
|
|
||||||
return ()
|
|
||||||
|
@ -164,7 +164,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
provisioning_blocks.PROVISIONING_COMPLETE)
|
provisioning_blocks.PROVISIONING_COMPLETE)
|
||||||
self._setup_dhcp()
|
self._setup_dhcp()
|
||||||
self._start_rpc_notifiers()
|
self._start_rpc_notifiers()
|
||||||
self.add_agent_status_check(self.agent_health_check)
|
self.add_agent_status_check_worker(self.agent_health_check)
|
||||||
|
self.add_workers(self.mechanism_manager.get_workers())
|
||||||
self._verify_service_plugins_requirements()
|
self._verify_service_plugins_requirements()
|
||||||
LOG.info(_LI("Modular L2 Plugin initialization complete"))
|
LOG.info(_LI("Modular L2 Plugin initialization complete"))
|
||||||
|
|
||||||
@ -185,7 +186,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
self.network_scheduler = importutils.import_object(
|
self.network_scheduler = importutils.import_object(
|
||||||
cfg.CONF.network_scheduler_driver
|
cfg.CONF.network_scheduler_driver
|
||||||
)
|
)
|
||||||
self.start_periodic_dhcp_agent_status_check()
|
self.add_periodic_dhcp_agent_status_check()
|
||||||
|
|
||||||
def _verify_service_plugins_requirements(self):
|
def _verify_service_plugins_requirements(self):
|
||||||
for service_plugin in cfg.CONF.service_plugins:
|
for service_plugin in cfg.CONF.service_plugins:
|
||||||
@ -1677,9 +1678,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
return port.id
|
return port.id
|
||||||
return device
|
return device
|
||||||
|
|
||||||
def get_workers(self):
|
|
||||||
return self.mechanism_manager.get_workers()
|
|
||||||
|
|
||||||
def filter_hosts_with_network_access(
|
def filter_hosts_with_network_access(
|
||||||
self, context, network_id, candidate_hosts):
|
self, context, network_id, candidate_hosts):
|
||||||
segments = db.get_network_segments(context.session, network_id)
|
segments = db.get_network_segments(context.session, network_id)
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
# If ../neutron/__init__.py exists, add ../ to Python search path, so that
|
# If ../neutron/__init__.py exists, add ../ to Python search path, so that
|
||||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||||
|
|
||||||
import eventlet
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
from neutron._i18n import _LI
|
from neutron._i18n import _LI
|
||||||
@ -28,13 +27,12 @@ LOG = log.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
def eventlet_rpc_server():
|
def eventlet_rpc_server():
|
||||||
pool = eventlet.GreenPool()
|
|
||||||
LOG.info(_LI("Eventlet based AMQP RPC server starting..."))
|
LOG.info(_LI("Eventlet based AMQP RPC server starting..."))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
neutron_rpc = service.serve_rpc()
|
rpc_workers_launcher = service.start_rpc_workers()
|
||||||
except NotImplementedError:
|
except NotImplementedError:
|
||||||
LOG.info(_LI("RPC was already started in parent process by "
|
LOG.info(_LI("RPC was already started in parent process by "
|
||||||
"plugin."))
|
"plugin."))
|
||||||
else:
|
else:
|
||||||
pool.spawn(neutron_rpc.wait)
|
rpc_workers_launcher.wait()
|
||||||
pool.waitall()
|
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
from neutron._i18n import _LI
|
from neutron._i18n import _LI
|
||||||
@ -26,24 +27,21 @@ def eventlet_wsgi_server():
|
|||||||
|
|
||||||
|
|
||||||
def start_api_and_rpc_workers(neutron_api):
|
def start_api_and_rpc_workers(neutron_api):
|
||||||
pool = eventlet.GreenPool()
|
|
||||||
|
|
||||||
api_thread = pool.spawn(neutron_api.wait)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
neutron_rpc = service.serve_rpc()
|
worker_launcher = service.start_all_workers()
|
||||||
|
|
||||||
|
pool = eventlet.GreenPool()
|
||||||
|
api_thread = pool.spawn(neutron_api.wait)
|
||||||
|
plugin_workers_thread = pool.spawn(worker_launcher.wait)
|
||||||
|
|
||||||
|
# api and other workers should die together. When one dies,
|
||||||
|
# kill the other.
|
||||||
|
api_thread.link(lambda gt: plugin_workers_thread.kill())
|
||||||
|
plugin_workers_thread.link(lambda gt: api_thread.kill())
|
||||||
|
|
||||||
|
pool.waitall()
|
||||||
except NotImplementedError:
|
except NotImplementedError:
|
||||||
LOG.info(_LI("RPC was already started in parent process by "
|
LOG.info(_LI("RPC was already started in parent process by "
|
||||||
"plugin."))
|
"plugin."))
|
||||||
else:
|
|
||||||
rpc_thread = pool.spawn(neutron_rpc.wait)
|
|
||||||
|
|
||||||
plugin_workers = service.start_plugin_workers()
|
neutron_api.wait()
|
||||||
for worker in plugin_workers:
|
|
||||||
pool.spawn(worker.wait)
|
|
||||||
|
|
||||||
# api and rpc should die together. When one dies, kill the other.
|
|
||||||
rpc_thread.link(lambda gt: api_thread.kill())
|
|
||||||
api_thread.link(lambda gt: rpc_thread.kill())
|
|
||||||
|
|
||||||
pool.waitall()
|
|
||||||
|
@ -34,7 +34,7 @@ from neutron.conf import service
|
|||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.db import api as session
|
from neutron.db import api as session
|
||||||
from neutron import manager
|
from neutron import manager
|
||||||
from neutron import worker
|
from neutron import worker as neutron_worker
|
||||||
from neutron import wsgi
|
from neutron import wsgi
|
||||||
|
|
||||||
|
|
||||||
@ -90,25 +90,15 @@ def serve_wsgi(cls):
|
|||||||
return service
|
return service
|
||||||
|
|
||||||
|
|
||||||
def start_plugin_workers():
|
class RpcWorker(neutron_worker.NeutronWorker):
|
||||||
launchers = []
|
|
||||||
# NOTE(twilson) get_service_plugins also returns the core plugin
|
|
||||||
for plugin in manager.NeutronManager.get_unique_service_plugins():
|
|
||||||
# TODO(twilson) Instead of defaulting here, come up with a good way to
|
|
||||||
# share a common get_workers default between NeutronPluginBaseV2 and
|
|
||||||
# ServicePluginBase
|
|
||||||
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
|
|
||||||
launcher = common_service.ProcessLauncher(cfg.CONF)
|
|
||||||
launcher.launch_service(plugin_worker)
|
|
||||||
launchers.append(launcher)
|
|
||||||
return launchers
|
|
||||||
|
|
||||||
|
|
||||||
class RpcWorker(worker.NeutronWorker):
|
|
||||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||||
start_listeners_method = 'start_rpc_listeners'
|
start_listeners_method = 'start_rpc_listeners'
|
||||||
|
|
||||||
def __init__(self, plugins):
|
def __init__(self, plugins, worker_process_count=1):
|
||||||
|
super(RpcWorker, self).__init__(
|
||||||
|
worker_process_count=worker_process_count
|
||||||
|
)
|
||||||
|
|
||||||
self._plugins = plugins
|
self._plugins = plugins
|
||||||
self._servers = []
|
self._servers = []
|
||||||
|
|
||||||
@ -155,7 +145,7 @@ class RpcReportsWorker(RpcWorker):
|
|||||||
start_listeners_method = 'start_rpc_state_reports_listener'
|
start_listeners_method = 'start_rpc_state_reports_listener'
|
||||||
|
|
||||||
|
|
||||||
def serve_rpc():
|
def _get_rpc_workers():
|
||||||
plugin = manager.NeutronManager.get_plugin()
|
plugin = manager.NeutronManager.get_plugin()
|
||||||
service_plugins = (
|
service_plugins = (
|
||||||
manager.NeutronManager.get_service_plugins().values())
|
manager.NeutronManager.get_service_plugins().values())
|
||||||
@ -175,31 +165,115 @@ def serve_rpc():
|
|||||||
cfg.CONF.rpc_workers)
|
cfg.CONF.rpc_workers)
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
try:
|
# passing service plugins only, because core plugin is among them
|
||||||
# passing service plugins only, because core plugin is among them
|
rpc_workers = [RpcWorker(service_plugins,
|
||||||
rpc = RpcWorker(service_plugins)
|
worker_process_count=cfg.CONF.rpc_workers)]
|
||||||
# dispose the whole pool before os.fork, otherwise there will
|
|
||||||
# be shared DB connections in child processes which may cause
|
|
||||||
# DB errors.
|
|
||||||
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
|
|
||||||
session.dispose()
|
|
||||||
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
|
|
||||||
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
|
|
||||||
if (cfg.CONF.rpc_state_report_workers > 0 and
|
|
||||||
plugin.rpc_state_report_workers_supported()):
|
|
||||||
rpc_state_rep = RpcReportsWorker([plugin])
|
|
||||||
LOG.debug('using launcher for state reports rpc, workers=%s',
|
|
||||||
cfg.CONF.rpc_state_report_workers)
|
|
||||||
launcher.launch_service(
|
|
||||||
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
|
|
||||||
|
|
||||||
return launcher
|
if (cfg.CONF.rpc_state_report_workers > 0 and
|
||||||
|
plugin.rpc_state_report_workers_supported()):
|
||||||
|
rpc_workers.append(
|
||||||
|
RpcReportsWorker(
|
||||||
|
[plugin],
|
||||||
|
worker_process_count=cfg.CONF.rpc_state_report_workers
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return rpc_workers
|
||||||
|
|
||||||
|
|
||||||
|
def _get_plugins_workers():
|
||||||
|
# NOTE(twilson) get_service_plugins also returns the core plugin
|
||||||
|
plugins = manager.NeutronManager.get_unique_service_plugins()
|
||||||
|
|
||||||
|
# TODO(twilson) Instead of defaulting here, come up with a good way to
|
||||||
|
# share a common get_workers default between NeutronPluginBaseV2 and
|
||||||
|
# ServicePluginBase
|
||||||
|
return [
|
||||||
|
plugin_worker
|
||||||
|
for plugin in plugins if hasattr(plugin, 'get_workers')
|
||||||
|
for plugin_worker in plugin.get_workers()
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class AllServicesNeutronWorker(neutron_worker.NeutronWorker):
|
||||||
|
def __init__(self, services, worker_process_count=1):
|
||||||
|
super(AllServicesNeutronWorker, self).__init__(worker_process_count)
|
||||||
|
self._services = services
|
||||||
|
self._launcher = common_service.Launcher(cfg.CONF)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
for srv in self._services:
|
||||||
|
self._launcher.launch_service(srv)
|
||||||
|
super(AllServicesNeutronWorker, self).start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._launcher.stop()
|
||||||
|
|
||||||
|
def wait(self):
|
||||||
|
self._launcher.wait()
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self._launcher.restart()
|
||||||
|
|
||||||
|
|
||||||
|
def _start_workers(workers):
|
||||||
|
process_workers = [
|
||||||
|
plugin_worker for plugin_worker in workers
|
||||||
|
if plugin_worker.worker_process_count > 0
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
if process_workers:
|
||||||
|
worker_launcher = common_service.ProcessLauncher(
|
||||||
|
cfg.CONF, wait_interval=1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# add extra process worker and spawn there all workers with
|
||||||
|
# worker_process_count == 0
|
||||||
|
thread_workers = [
|
||||||
|
plugin_worker for plugin_worker in workers
|
||||||
|
if plugin_worker.worker_process_count < 1
|
||||||
|
]
|
||||||
|
if thread_workers:
|
||||||
|
process_workers.append(
|
||||||
|
AllServicesNeutronWorker(thread_workers)
|
||||||
|
)
|
||||||
|
|
||||||
|
# dispose the whole pool before os.fork, otherwise there will
|
||||||
|
# be shared DB connections in child processes which may cause
|
||||||
|
# DB errors.
|
||||||
|
session.dispose()
|
||||||
|
|
||||||
|
for worker in process_workers:
|
||||||
|
worker_launcher.launch_service(worker,
|
||||||
|
worker.worker_process_count)
|
||||||
|
else:
|
||||||
|
worker_launcher = common_service.ServiceLauncher(cfg.CONF)
|
||||||
|
for worker in workers:
|
||||||
|
worker_launcher.launch_service(worker)
|
||||||
|
return worker_launcher
|
||||||
except Exception:
|
except Exception:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.exception(_LE('Unrecoverable error: please check log for '
|
LOG.exception(_LE('Unrecoverable error: please check log for '
|
||||||
'details.'))
|
'details.'))
|
||||||
|
|
||||||
|
|
||||||
|
def start_all_workers():
|
||||||
|
workers = _get_rpc_workers() + _get_plugins_workers()
|
||||||
|
return _start_workers(workers)
|
||||||
|
|
||||||
|
|
||||||
|
def start_rpc_workers():
|
||||||
|
rpc_workers = _get_rpc_workers()
|
||||||
|
|
||||||
|
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
|
||||||
|
return _start_workers(rpc_workers)
|
||||||
|
|
||||||
|
|
||||||
|
def start_plugins_workers():
|
||||||
|
plugins_workers = _get_plugins_workers()
|
||||||
|
return _start_workers(plugins_workers)
|
||||||
|
|
||||||
|
|
||||||
def _get_api_workers():
|
def _get_api_workers():
|
||||||
workers = cfg.CONF.api_workers
|
workers = cfg.CONF.api_workers
|
||||||
if not workers:
|
if not workers:
|
||||||
|
@ -32,6 +32,7 @@ from neutron.db import l3_gwmode_db
|
|||||||
from neutron.db import l3_hamode_db
|
from neutron.db import l3_hamode_db
|
||||||
from neutron.plugins.common import constants
|
from neutron.plugins.common import constants
|
||||||
from neutron.quota import resource_registry
|
from neutron.quota import resource_registry
|
||||||
|
from neutron import service
|
||||||
from neutron.services import service_base
|
from neutron.services import service_base
|
||||||
|
|
||||||
|
|
||||||
@ -61,20 +62,23 @@ class L3RouterPlugin(service_base.ServicePluginBase,
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.router_scheduler = importutils.import_object(
|
self.router_scheduler = importutils.import_object(
|
||||||
cfg.CONF.router_scheduler_driver)
|
cfg.CONF.router_scheduler_driver)
|
||||||
self.start_periodic_l3_agent_status_check()
|
self.add_periodic_l3_agent_status_check()
|
||||||
super(L3RouterPlugin, self).__init__()
|
super(L3RouterPlugin, self).__init__()
|
||||||
if 'dvr' in self.supported_extension_aliases:
|
if 'dvr' in self.supported_extension_aliases:
|
||||||
l3_dvrscheduler_db.subscribe()
|
l3_dvrscheduler_db.subscribe()
|
||||||
l3_db.subscribe()
|
l3_db.subscribe()
|
||||||
self.start_rpc_listeners()
|
self.agent_notifiers.update(
|
||||||
|
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
||||||
|
|
||||||
|
rpc_worker = service.RpcWorker([self], worker_process_count=0)
|
||||||
|
|
||||||
|
self.add_worker(rpc_worker)
|
||||||
|
|
||||||
@log_helpers.log_method_call
|
@log_helpers.log_method_call
|
||||||
def start_rpc_listeners(self):
|
def start_rpc_listeners(self):
|
||||||
# RPC support
|
# RPC support
|
||||||
self.topic = topics.L3PLUGIN
|
self.topic = topics.L3PLUGIN
|
||||||
self.conn = n_rpc.create_connection()
|
self.conn = n_rpc.create_connection()
|
||||||
self.agent_notifiers.update(
|
|
||||||
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
|
||||||
self.endpoints = [l3_rpc.L3RpcCallback()]
|
self.endpoints = [l3_rpc.L3RpcCallback()]
|
||||||
self.conn.create_consumer(self.topic, self.endpoints,
|
self.conn.create_consumer(self.topic, self.endpoints,
|
||||||
fanout=False)
|
fanout=False)
|
||||||
|
@ -17,6 +17,7 @@ from neutron.common import rpc as n_rpc
|
|||||||
from neutron.common import topics
|
from neutron.common import topics
|
||||||
from neutron.db.metering import metering_db
|
from neutron.db.metering import metering_db
|
||||||
from neutron.db.metering import metering_rpc
|
from neutron.db.metering import metering_rpc
|
||||||
|
from neutron import service
|
||||||
|
|
||||||
|
|
||||||
class MeteringPlugin(metering_db.MeteringDbMixin):
|
class MeteringPlugin(metering_db.MeteringDbMixin):
|
||||||
@ -28,7 +29,9 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
|
|||||||
super(MeteringPlugin, self).__init__()
|
super(MeteringPlugin, self).__init__()
|
||||||
|
|
||||||
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
||||||
self.start_rpc_listeners()
|
rpc_worker = service.RpcWorker([self], worker_process_count=0)
|
||||||
|
|
||||||
|
self.add_worker(rpc_worker)
|
||||||
|
|
||||||
def start_rpc_listeners(self):
|
def start_rpc_listeners(self):
|
||||||
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
|
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
|
||||||
|
@ -24,12 +24,14 @@ from neutron._i18n import _, _LE, _LI
|
|||||||
from neutron.api import extensions
|
from neutron.api import extensions
|
||||||
from neutron.db import servicetype_db as sdb
|
from neutron.db import servicetype_db as sdb
|
||||||
from neutron.services import provider_configuration as pconf
|
from neutron.services import provider_configuration as pconf
|
||||||
|
from neutron import worker as neutron_worker
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class ServicePluginBase(extensions.PluginInterface):
|
class ServicePluginBase(extensions.PluginInterface,
|
||||||
|
neutron_worker.WorkerSupportServiceMixin):
|
||||||
"""Define base interface for any Advanced Service plugin."""
|
"""Define base interface for any Advanced Service plugin."""
|
||||||
supported_extension_aliases = []
|
supported_extension_aliases = []
|
||||||
|
|
||||||
@ -46,10 +48,6 @@ class ServicePluginBase(extensions.PluginInterface):
|
|||||||
"""Return string description of the plugin."""
|
"""Return string description of the plugin."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_workers(self):
|
|
||||||
"""Returns a collection of NeutronWorkers"""
|
|
||||||
return ()
|
|
||||||
|
|
||||||
|
|
||||||
def load_drivers(service_type, plugin):
|
def load_drivers(service_type, plugin):
|
||||||
"""Loads drivers for specific service.
|
"""Loads drivers for specific service.
|
||||||
|
@ -414,11 +414,11 @@ class PluginFixture(fixtures.Fixture):
|
|||||||
self.patched_default_svc_plugins = self.default_svc_plugins_p.start()
|
self.patched_default_svc_plugins = self.default_svc_plugins_p.start()
|
||||||
self.dhcp_periodic_p = mock.patch(
|
self.dhcp_periodic_p = mock.patch(
|
||||||
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
||||||
'start_periodic_dhcp_agent_status_check')
|
'add_periodic_dhcp_agent_status_check')
|
||||||
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
|
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
|
||||||
self.agent_health_check_p = mock.patch(
|
self.agent_health_check_p = mock.patch(
|
||||||
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
|
||||||
'add_agent_status_check')
|
'add_agent_status_check_worker')
|
||||||
self.agent_health_check = self.agent_health_check_p.start()
|
self.agent_health_check = self.agent_health_check_p.start()
|
||||||
# Plugin cleanup should be triggered last so that
|
# Plugin cleanup should be triggered last so that
|
||||||
# test-specific cleanup has a chance to release references.
|
# test-specific cleanup has a chance to release references.
|
||||||
|
@ -27,7 +27,7 @@ import psutil
|
|||||||
from neutron.agent.linux import utils
|
from neutron.agent.linux import utils
|
||||||
from neutron import service
|
from neutron import service
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
from neutron import worker
|
from neutron import worker as neutron_worker
|
||||||
from neutron import wsgi
|
from neutron import wsgi
|
||||||
|
|
||||||
|
|
||||||
@ -244,8 +244,8 @@ class TestRPCServer(TestNeutronServer):
|
|||||||
# not interested in state report workers specifically
|
# not interested in state report workers specifically
|
||||||
CONF.set_override("rpc_state_report_workers", 0)
|
CONF.set_override("rpc_state_report_workers", 0)
|
||||||
|
|
||||||
launcher = service.serve_rpc()
|
rpc_workers_launcher = service.start_rpc_workers()
|
||||||
launcher.wait()
|
rpc_workers_launcher.wait()
|
||||||
|
|
||||||
def test_restart_rpc_on_sighup_multiple_workers(self):
|
def test_restart_rpc_on_sighup_multiple_workers(self):
|
||||||
self._test_restart_service_on_sighup(service=self._serve_rpc,
|
self._test_restart_service_on_sighup(service=self._serve_rpc,
|
||||||
@ -264,12 +264,11 @@ class TestPluginWorker(TestNeutronServer):
|
|||||||
def _start_plugin(self, workers=1):
|
def _start_plugin(self, workers=1):
|
||||||
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
|
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
|
||||||
gp.return_value = self.plugin
|
gp.return_value = self.plugin
|
||||||
launchers = service.start_plugin_workers()
|
plugin_workers_launcher = service.start_plugins_workers()
|
||||||
for launcher in launchers:
|
plugin_workers_launcher.wait()
|
||||||
launcher.wait()
|
|
||||||
|
|
||||||
def test_start(self):
|
def test_start(self):
|
||||||
class FakeWorker(worker.NeutronWorker):
|
class FakeWorker(neutron_worker.NeutronWorker):
|
||||||
def start(self):
|
def start(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -261,7 +261,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
|
|||||||
self.patched_l3_notify = self.l3_notify_p.start()
|
self.patched_l3_notify = self.l3_notify_p.start()
|
||||||
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
|
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
|
||||||
'L3AgentSchedulerDbMixin.'
|
'L3AgentSchedulerDbMixin.'
|
||||||
'start_periodic_l3_agent_status_check')
|
'add_periodic_l3_agent_status_check')
|
||||||
self.patched_l3_periodic = self.l3_periodic_p.start()
|
self.patched_l3_periodic = self.l3_periodic_p.start()
|
||||||
self.dhcp_notify_p = mock.patch(
|
self.dhcp_notify_p = mock.patch(
|
||||||
'neutron.extensions.dhcpagentscheduler.notify')
|
'neutron.extensions.dhcpagentscheduler.notify')
|
||||||
|
@ -17,6 +17,38 @@ from neutron.callbacks import registry
|
|||||||
from neutron.callbacks import resources
|
from neutron.callbacks import resources
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerSupportServiceMixin(object):
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _workers(self):
|
||||||
|
try:
|
||||||
|
return self.__workers
|
||||||
|
except AttributeError:
|
||||||
|
self.__workers = []
|
||||||
|
return self.__workers
|
||||||
|
|
||||||
|
def get_workers(self):
|
||||||
|
"""Returns a collection NeutronWorker instances needed by this service
|
||||||
|
"""
|
||||||
|
return list(self._workers)
|
||||||
|
|
||||||
|
def add_worker(self, worker):
|
||||||
|
"""Adds NeutronWorker needed for this service
|
||||||
|
|
||||||
|
If a object needs to define workers thread/processes outside of API/RPC
|
||||||
|
workers then it will call this method to register worker. Should be
|
||||||
|
called on initialization stage before running services
|
||||||
|
"""
|
||||||
|
self._workers.append(worker)
|
||||||
|
|
||||||
|
def add_workers(self, workers):
|
||||||
|
"""Adds NeutronWorker list needed for this service
|
||||||
|
|
||||||
|
The same as add_worker but adds a list of workers
|
||||||
|
"""
|
||||||
|
self._workers.extend(workers)
|
||||||
|
|
||||||
|
|
||||||
class NeutronWorker(service.ServiceBase):
|
class NeutronWorker(service.ServiceBase):
|
||||||
"""Partial implementation of the ServiceBase ABC
|
"""Partial implementation of the ServiceBase ABC
|
||||||
|
|
||||||
@ -36,5 +68,25 @@ class NeutronWorker(service.ServiceBase):
|
|||||||
super(MyPluginWorker, self).start()
|
super(MyPluginWorker, self).start()
|
||||||
do_sync()
|
do_sync()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# default class value for case when super().__init__ is not called
|
||||||
|
_worker_process_count = 1
|
||||||
|
|
||||||
|
def __init__(self, worker_process_count=_worker_process_count):
|
||||||
|
"""
|
||||||
|
Initialize worker
|
||||||
|
|
||||||
|
:param worker_process_count: Defines how many processes to spawn for
|
||||||
|
worker:
|
||||||
|
0 - spawn 1 new worker thread,
|
||||||
|
1..N - spawn N new worker processes
|
||||||
|
"""
|
||||||
|
self._worker_process_count = worker_process_count
|
||||||
|
|
||||||
|
@property
|
||||||
|
def worker_process_count(self):
|
||||||
|
return self._worker_process_count
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
|
if self.worker_process_count > 0:
|
||||||
|
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
|
||||||
|
@ -44,7 +44,7 @@ from neutron.common import config
|
|||||||
from neutron.common import exceptions as n_exc
|
from neutron.common import exceptions as n_exc
|
||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.db import api
|
from neutron.db import api
|
||||||
from neutron import worker
|
from neutron import worker as neutron_worker
|
||||||
|
|
||||||
socket_opts = [
|
socket_opts = [
|
||||||
cfg.IntOpt('backlog',
|
cfg.IntOpt('backlog',
|
||||||
@ -74,9 +74,12 @@ def encode_body(body):
|
|||||||
return encodeutils.to_utf8(body)
|
return encodeutils.to_utf8(body)
|
||||||
|
|
||||||
|
|
||||||
class WorkerService(worker.NeutronWorker):
|
class WorkerService(neutron_worker.NeutronWorker):
|
||||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||||
def __init__(self, service, application, disable_ssl=False):
|
def __init__(self, service, application, disable_ssl=False,
|
||||||
|
worker_process_count=0):
|
||||||
|
super(WorkerService, self).__init__(worker_process_count)
|
||||||
|
|
||||||
self._service = service
|
self._service = service
|
||||||
self._application = application
|
self._application = application
|
||||||
self._disable_ssl = disable_ssl
|
self._disable_ssl = disable_ssl
|
||||||
@ -188,7 +191,7 @@ class Server(object):
|
|||||||
self._launch(application, workers)
|
self._launch(application, workers)
|
||||||
|
|
||||||
def _launch(self, application, workers=0):
|
def _launch(self, application, workers=0):
|
||||||
service = WorkerService(self, application, self.disable_ssl)
|
service = WorkerService(self, application, self.disable_ssl, workers)
|
||||||
if workers < 1:
|
if workers < 1:
|
||||||
# The API service should run in the current process.
|
# The API service should run in the current process.
|
||||||
self._server = service
|
self._server = service
|
||||||
@ -206,7 +209,8 @@ class Server(object):
|
|||||||
# wait interval past the default of 0.01s.
|
# wait interval past the default of 0.01s.
|
||||||
self._server = common_service.ProcessLauncher(cfg.CONF,
|
self._server = common_service.ProcessLauncher(cfg.CONF,
|
||||||
wait_interval=1.0)
|
wait_interval=1.0)
|
||||||
self._server.launch_service(service, workers=workers)
|
self._server.launch_service(service,
|
||||||
|
workers=service.worker_process_count)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def host(self):
|
def host(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user