diff --git a/neutron/db/agentschedulers_db.py b/neutron/db/agentschedulers_db.py index 2f90bf9f5fb..5a60d1ff472 100644 --- a/neutron/db/agentschedulers_db.py +++ b/neutron/db/agentschedulers_db.py @@ -17,6 +17,7 @@ import datetime import random import time +import debtcollector from neutron_lib import constants from oslo_config import cfg from oslo_log import log as logging @@ -35,6 +36,7 @@ from neutron.db.availability_zone import network as network_az from neutron.db import model_base from neutron.extensions import agent as ext_agent from neutron.extensions import dhcpagentscheduler +from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @@ -82,6 +84,38 @@ class NetworkDhcpAgentBinding(model_base.BASEV2): primary_key=True) +class AgentStatusCheckWorker(neutron_worker.NeutronWorker): + + def __init__(self, check_func, interval, initial_delay): + super(AgentStatusCheckWorker, self).__init__(worker_process_count=0) + + self._check_func = check_func + self._loop = None + self._interval = interval + self._initial_delay = initial_delay + + def start(self): + super(AgentStatusCheckWorker, self).start() + if self._loop is None: + self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func) + self._loop.start(interval=self._interval, + initial_delay=self._initial_delay) + + def wait(self): + if self._loop is not None: + self._loop.wait() + + def stop(self): + if self._loop is not None: + self._loop.stop() + + def reset(self): + if self._loop is not None: + self.stop() + self.wait() + self.start() + + class AgentSchedulerDbMixin(agents_db.AgentDbMixin): """Common class for agent scheduler mixins.""" @@ -120,6 +154,22 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin): original_agent['host']) return result + def add_agent_status_check_worker(self, function): + # TODO(enikanorov): make interval configurable rather than computed + interval = max(cfg.CONF.agent_down_time // 2, 1) + # add random initial delay to allow agents to check in after the + # neutron server first starts. random to offset multiple servers + initial_delay = random.randint(interval, interval * 2) + + check_worker = AgentStatusCheckWorker(function, interval, + initial_delay) + + self.add_worker(check_worker) + + @debtcollector.removals.remove( + message="This will be removed in the N cycle. " + "Please use 'add_agent_status_check_worker' instead." + ) def add_agent_status_check(self, function): loop = loopingcall.FixedIntervalLoopingCall(function) # TODO(enikanorov): make interval configurable rather than computed @@ -166,6 +216,10 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler network_scheduler = None + @debtcollector.removals.remove( + message="This will be removed in the N cycle. " + "Please use 'add_periodic_dhcp_agent_status_check' instead." + ) def start_periodic_dhcp_agent_status_check(self): if not cfg.CONF.allow_automatic_dhcp_failover: LOG.info(_LI("Skipping periodic DHCP agent status check because " @@ -174,6 +228,16 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler self.add_agent_status_check(self.remove_networks_from_down_agents) + def add_periodic_dhcp_agent_status_check(self): + if not cfg.CONF.allow_automatic_dhcp_failover: + LOG.info(_LI("Skipping periodic DHCP agent status check because " + "automatic network rescheduling is disabled.")) + return + + self.add_agent_status_check_worker( + self.remove_networks_from_down_agents + ) + def is_eligible_agent(self, context, active, agent): # eligible agent is active or starting up return (AgentSchedulerDbMixin.is_eligible_agent(active, agent) or diff --git a/neutron/db/l3_agentschedulers_db.py b/neutron/db/l3_agentschedulers_db.py index 56055cefc15..9681ae70cd6 100644 --- a/neutron/db/l3_agentschedulers_db.py +++ b/neutron/db/l3_agentschedulers_db.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import debtcollector from neutron_lib import constants from oslo_config import cfg from oslo_db import exception as db_exc @@ -78,6 +79,10 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, router_scheduler = None + @debtcollector.removals.remove( + message="This will be removed in the N cycle. " + "Please use 'add_periodic_l3_agent_status_check' instead." + ) def start_periodic_l3_agent_status_check(self): if not cfg.CONF.allow_automatic_l3agent_failover: LOG.info(_LI("Skipping period L3 agent status check because " @@ -87,6 +92,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, self.add_agent_status_check( self.reschedule_routers_from_down_agents) + def add_periodic_l3_agent_status_check(self): + if not cfg.CONF.allow_automatic_l3agent_failover: + LOG.info(_LI("Skipping period L3 agent status check because " + "automatic router rescheduling is disabled.")) + return + + self.add_agent_status_check_worker( + self.reschedule_routers_from_down_agents) + def reschedule_routers_from_down_agents(self): """Reschedule routers from down l3 agents if admin state is up.""" agent_dead_limit = self.agent_dead_limit_seconds() diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 12ed5bba1e6..513890f7822 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -24,9 +24,11 @@ import abc import six +from neutron import worker as neutron_worker + @six.add_metaclass(abc.ABCMeta) -class NeutronPluginBaseV2(object): +class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin): @abc.abstractmethod def create_subnet(self, context, subnet): @@ -409,12 +411,3 @@ class NeutronPluginBaseV2(object): """ return (self.__class__.start_rpc_state_reports_listener != NeutronPluginBaseV2.start_rpc_state_reports_listener) - - def get_workers(self): - """Returns a collection NeutronWorker instances - - If a plugin needs to define worker processes outside of API/RPC workers - then it will override this and return a collection of NeutronWorker - instances - """ - return () diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 2b373f52fac..789cb5a366f 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -164,7 +164,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, provisioning_blocks.PROVISIONING_COMPLETE) self._setup_dhcp() self._start_rpc_notifiers() - self.add_agent_status_check(self.agent_health_check) + self.add_agent_status_check_worker(self.agent_health_check) + self.add_workers(self.mechanism_manager.get_workers()) self._verify_service_plugins_requirements() LOG.info(_LI("Modular L2 Plugin initialization complete")) @@ -185,7 +186,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver ) - self.start_periodic_dhcp_agent_status_check() + self.add_periodic_dhcp_agent_status_check() def _verify_service_plugins_requirements(self): for service_plugin in cfg.CONF.service_plugins: @@ -1677,9 +1678,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return port.id return device - def get_workers(self): - return self.mechanism_manager.get_workers() - def filter_hosts_with_network_access( self, context, network_id, candidate_hosts): segments = db.get_network_segments(context.session, network_id) diff --git a/neutron/server/rpc_eventlet.py b/neutron/server/rpc_eventlet.py index 9b218cdaeb5..e662cee8792 100644 --- a/neutron/server/rpc_eventlet.py +++ b/neutron/server/rpc_eventlet.py @@ -18,7 +18,6 @@ # If ../neutron/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... -import eventlet from oslo_log import log from neutron._i18n import _LI @@ -28,13 +27,12 @@ LOG = log.getLogger(__name__) def eventlet_rpc_server(): - pool = eventlet.GreenPool() LOG.info(_LI("Eventlet based AMQP RPC server starting...")) + try: - neutron_rpc = service.serve_rpc() + rpc_workers_launcher = service.start_rpc_workers() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) else: - pool.spawn(neutron_rpc.wait) - pool.waitall() + rpc_workers_launcher.wait() diff --git a/neutron/server/wsgi_eventlet.py b/neutron/server/wsgi_eventlet.py index 472ea6bf743..e72796b250a 100644 --- a/neutron/server/wsgi_eventlet.py +++ b/neutron/server/wsgi_eventlet.py @@ -12,6 +12,7 @@ # under the License. import eventlet + from oslo_log import log from neutron._i18n import _LI @@ -26,24 +27,21 @@ def eventlet_wsgi_server(): def start_api_and_rpc_workers(neutron_api): - pool = eventlet.GreenPool() - - api_thread = pool.spawn(neutron_api.wait) - try: - neutron_rpc = service.serve_rpc() + worker_launcher = service.start_all_workers() + + pool = eventlet.GreenPool() + api_thread = pool.spawn(neutron_api.wait) + plugin_workers_thread = pool.spawn(worker_launcher.wait) + + # api and other workers should die together. When one dies, + # kill the other. + api_thread.link(lambda gt: plugin_workers_thread.kill()) + plugin_workers_thread.link(lambda gt: api_thread.kill()) + + pool.waitall() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) - else: - rpc_thread = pool.spawn(neutron_rpc.wait) - plugin_workers = service.start_plugin_workers() - for worker in plugin_workers: - pool.spawn(worker.wait) - - # api and rpc should die together. When one dies, kill the other. - rpc_thread.link(lambda gt: api_thread.kill()) - api_thread.link(lambda gt: rpc_thread.kill()) - - pool.waitall() + neutron_api.wait() diff --git a/neutron/service.py b/neutron/service.py index 72a46cd21e3..52dc58142b8 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -34,7 +34,7 @@ from neutron.conf import service from neutron import context from neutron.db import api as session from neutron import manager -from neutron import worker +from neutron import worker as neutron_worker from neutron import wsgi @@ -90,25 +90,15 @@ def serve_wsgi(cls): return service -def start_plugin_workers(): - launchers = [] - # NOTE(twilson) get_service_plugins also returns the core plugin - for plugin in manager.NeutronManager.get_unique_service_plugins(): - # TODO(twilson) Instead of defaulting here, come up with a good way to - # share a common get_workers default between NeutronPluginBaseV2 and - # ServicePluginBase - for plugin_worker in getattr(plugin, 'get_workers', tuple)(): - launcher = common_service.ProcessLauncher(cfg.CONF) - launcher.launch_service(plugin_worker) - launchers.append(launcher) - return launchers - - -class RpcWorker(worker.NeutronWorker): +class RpcWorker(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" start_listeners_method = 'start_rpc_listeners' - def __init__(self, plugins): + def __init__(self, plugins, worker_process_count=1): + super(RpcWorker, self).__init__( + worker_process_count=worker_process_count + ) + self._plugins = plugins self._servers = [] @@ -155,7 +145,7 @@ class RpcReportsWorker(RpcWorker): start_listeners_method = 'start_rpc_state_reports_listener' -def serve_rpc(): +def _get_rpc_workers(): plugin = manager.NeutronManager.get_plugin() service_plugins = ( manager.NeutronManager.get_service_plugins().values()) @@ -175,31 +165,115 @@ def serve_rpc(): cfg.CONF.rpc_workers) raise NotImplementedError() - try: - # passing service plugins only, because core plugin is among them - rpc = RpcWorker(service_plugins) - # dispose the whole pool before os.fork, otherwise there will - # be shared DB connections in child processes which may cause - # DB errors. - LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers) - session.dispose() - launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) - launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) - if (cfg.CONF.rpc_state_report_workers > 0 and - plugin.rpc_state_report_workers_supported()): - rpc_state_rep = RpcReportsWorker([plugin]) - LOG.debug('using launcher for state reports rpc, workers=%s', - cfg.CONF.rpc_state_report_workers) - launcher.launch_service( - rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers) + # passing service plugins only, because core plugin is among them + rpc_workers = [RpcWorker(service_plugins, + worker_process_count=cfg.CONF.rpc_workers)] - return launcher + if (cfg.CONF.rpc_state_report_workers > 0 and + plugin.rpc_state_report_workers_supported()): + rpc_workers.append( + RpcReportsWorker( + [plugin], + worker_process_count=cfg.CONF.rpc_state_report_workers + ) + ) + return rpc_workers + + +def _get_plugins_workers(): + # NOTE(twilson) get_service_plugins also returns the core plugin + plugins = manager.NeutronManager.get_unique_service_plugins() + + # TODO(twilson) Instead of defaulting here, come up with a good way to + # share a common get_workers default between NeutronPluginBaseV2 and + # ServicePluginBase + return [ + plugin_worker + for plugin in plugins if hasattr(plugin, 'get_workers') + for plugin_worker in plugin.get_workers() + ] + + +class AllServicesNeutronWorker(neutron_worker.NeutronWorker): + def __init__(self, services, worker_process_count=1): + super(AllServicesNeutronWorker, self).__init__(worker_process_count) + self._services = services + self._launcher = common_service.Launcher(cfg.CONF) + + def start(self): + for srv in self._services: + self._launcher.launch_service(srv) + super(AllServicesNeutronWorker, self).start() + + def stop(self): + self._launcher.stop() + + def wait(self): + self._launcher.wait() + + def reset(self): + self._launcher.restart() + + +def _start_workers(workers): + process_workers = [ + plugin_worker for plugin_worker in workers + if plugin_worker.worker_process_count > 0 + ] + + try: + if process_workers: + worker_launcher = common_service.ProcessLauncher( + cfg.CONF, wait_interval=1.0 + ) + + # add extra process worker and spawn there all workers with + # worker_process_count == 0 + thread_workers = [ + plugin_worker for plugin_worker in workers + if plugin_worker.worker_process_count < 1 + ] + if thread_workers: + process_workers.append( + AllServicesNeutronWorker(thread_workers) + ) + + # dispose the whole pool before os.fork, otherwise there will + # be shared DB connections in child processes which may cause + # DB errors. + session.dispose() + + for worker in process_workers: + worker_launcher.launch_service(worker, + worker.worker_process_count) + else: + worker_launcher = common_service.ServiceLauncher(cfg.CONF) + for worker in workers: + worker_launcher.launch_service(worker) + return worker_launcher except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE('Unrecoverable error: please check log for ' 'details.')) +def start_all_workers(): + workers = _get_rpc_workers() + _get_plugins_workers() + return _start_workers(workers) + + +def start_rpc_workers(): + rpc_workers = _get_rpc_workers() + + LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers) + return _start_workers(rpc_workers) + + +def start_plugins_workers(): + plugins_workers = _get_plugins_workers() + return _start_workers(plugins_workers) + + def _get_api_workers(): workers = cfg.CONF.api_workers if not workers: diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index c9e24414f21..d5f26ef8078 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -32,6 +32,7 @@ from neutron.db import l3_gwmode_db from neutron.db import l3_hamode_db from neutron.plugins.common import constants from neutron.quota import resource_registry +from neutron import service from neutron.services import service_base @@ -61,20 +62,23 @@ class L3RouterPlugin(service_base.ServicePluginBase, def __init__(self): self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) - self.start_periodic_l3_agent_status_check() + self.add_periodic_l3_agent_status_check() super(L3RouterPlugin, self).__init__() if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe() - self.start_rpc_listeners() + self.agent_notifiers.update( + {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) + + rpc_worker = service.RpcWorker([self], worker_process_count=0) + + self.add_worker(rpc_worker) @log_helpers.log_method_call def start_rpc_listeners(self): # RPC support self.topic = topics.L3PLUGIN self.conn = n_rpc.create_connection() - self.agent_notifiers.update( - {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index 11a928642a0..f7cb42411e6 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -17,6 +17,7 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.db.metering import metering_db from neutron.db.metering import metering_rpc +from neutron import service class MeteringPlugin(metering_db.MeteringDbMixin): @@ -28,7 +29,9 @@ class MeteringPlugin(metering_db.MeteringDbMixin): super(MeteringPlugin, self).__init__() self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() - self.start_rpc_listeners() + rpc_worker = service.RpcWorker([self], worker_process_count=0) + + self.add_worker(rpc_worker) def start_rpc_listeners(self): self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] diff --git a/neutron/services/service_base.py b/neutron/services/service_base.py index 5fa9bcf388d..33da0286d15 100644 --- a/neutron/services/service_base.py +++ b/neutron/services/service_base.py @@ -24,12 +24,14 @@ from neutron._i18n import _, _LE, _LI from neutron.api import extensions from neutron.db import servicetype_db as sdb from neutron.services import provider_configuration as pconf +from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) -class ServicePluginBase(extensions.PluginInterface): +class ServicePluginBase(extensions.PluginInterface, + neutron_worker.WorkerSupportServiceMixin): """Define base interface for any Advanced Service plugin.""" supported_extension_aliases = [] @@ -46,10 +48,6 @@ class ServicePluginBase(extensions.PluginInterface): """Return string description of the plugin.""" pass - def get_workers(self): - """Returns a collection of NeutronWorkers""" - return () - def load_drivers(service_type, plugin): """Loads drivers for specific service. diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 6e3d8b3a9bc..49f712b11e1 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -414,11 +414,11 @@ class PluginFixture(fixtures.Fixture): self.patched_default_svc_plugins = self.default_svc_plugins_p.start() self.dhcp_periodic_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' - 'start_periodic_dhcp_agent_status_check') + 'add_periodic_dhcp_agent_status_check') self.patched_dhcp_periodic = self.dhcp_periodic_p.start() self.agent_health_check_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' - 'add_agent_status_check') + 'add_agent_status_check_worker') self.agent_health_check = self.agent_health_check_p.start() # Plugin cleanup should be triggered last so that # test-specific cleanup has a chance to release references. diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 891bdf3276d..e1848b3e22e 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -27,7 +27,7 @@ import psutil from neutron.agent.linux import utils from neutron import service from neutron.tests import base -from neutron import worker +from neutron import worker as neutron_worker from neutron import wsgi @@ -244,8 +244,8 @@ class TestRPCServer(TestNeutronServer): # not interested in state report workers specifically CONF.set_override("rpc_state_report_workers", 0) - launcher = service.serve_rpc() - launcher.wait() + rpc_workers_launcher = service.start_rpc_workers() + rpc_workers_launcher.wait() def test_restart_rpc_on_sighup_multiple_workers(self): self._test_restart_service_on_sighup(service=self._serve_rpc, @@ -264,12 +264,11 @@ class TestPluginWorker(TestNeutronServer): def _start_plugin(self, workers=1): with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp: gp.return_value = self.plugin - launchers = service.start_plugin_workers() - for launcher in launchers: - launcher.wait() + plugin_workers_launcher = service.start_plugins_workers() + plugin_workers_launcher.wait() def test_start(self): - class FakeWorker(worker.NeutronWorker): + class FakeWorker(neutron_worker.NeutronWorker): def start(self): pass diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index efea84046aa..b68a1f01bba 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -261,7 +261,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, self.patched_l3_notify = self.l3_notify_p.start() self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.' 'L3AgentSchedulerDbMixin.' - 'start_periodic_l3_agent_status_check') + 'add_periodic_l3_agent_status_check') self.patched_l3_periodic = self.l3_periodic_p.start() self.dhcp_notify_p = mock.patch( 'neutron.extensions.dhcpagentscheduler.notify') diff --git a/neutron/worker.py b/neutron/worker.py index 80a16533eba..a602d1c01f0 100644 --- a/neutron/worker.py +++ b/neutron/worker.py @@ -17,6 +17,38 @@ from neutron.callbacks import registry from neutron.callbacks import resources +class WorkerSupportServiceMixin(object): + + @property + def _workers(self): + try: + return self.__workers + except AttributeError: + self.__workers = [] + return self.__workers + + def get_workers(self): + """Returns a collection NeutronWorker instances needed by this service + """ + return list(self._workers) + + def add_worker(self, worker): + """Adds NeutronWorker needed for this service + + If a object needs to define workers thread/processes outside of API/RPC + workers then it will call this method to register worker. Should be + called on initialization stage before running services + """ + self._workers.append(worker) + + def add_workers(self, workers): + """Adds NeutronWorker list needed for this service + + The same as add_worker but adds a list of workers + """ + self._workers.extend(workers) + + class NeutronWorker(service.ServiceBase): """Partial implementation of the ServiceBase ABC @@ -36,5 +68,25 @@ class NeutronWorker(service.ServiceBase): super(MyPluginWorker, self).start() do_sync() """ + + # default class value for case when super().__init__ is not called + _worker_process_count = 1 + + def __init__(self, worker_process_count=_worker_process_count): + """ + Initialize worker + + :param worker_process_count: Defines how many processes to spawn for + worker: + 0 - spawn 1 new worker thread, + 1..N - spawn N new worker processes + """ + self._worker_process_count = worker_process_count + + @property + def worker_process_count(self): + return self._worker_process_count + def start(self): - registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) + if self.worker_process_count > 0: + registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) diff --git a/neutron/wsgi.py b/neutron/wsgi.py index 62be9eaeb51..4040d5d0cc8 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -44,7 +44,7 @@ from neutron.common import config from neutron.common import exceptions as n_exc from neutron import context from neutron.db import api -from neutron import worker +from neutron import worker as neutron_worker socket_opts = [ cfg.IntOpt('backlog', @@ -74,9 +74,12 @@ def encode_body(body): return encodeutils.to_utf8(body) -class WorkerService(worker.NeutronWorker): +class WorkerService(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" - def __init__(self, service, application, disable_ssl=False): + def __init__(self, service, application, disable_ssl=False, + worker_process_count=0): + super(WorkerService, self).__init__(worker_process_count) + self._service = service self._application = application self._disable_ssl = disable_ssl @@ -188,7 +191,7 @@ class Server(object): self._launch(application, workers) def _launch(self, application, workers=0): - service = WorkerService(self, application, self.disable_ssl) + service = WorkerService(self, application, self.disable_ssl, workers) if workers < 1: # The API service should run in the current process. self._server = service @@ -206,7 +209,8 @@ class Server(object): # wait interval past the default of 0.01s. self._server = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) - self._server.launch_service(service, workers=workers) + self._server.launch_service(service, + workers=service.worker_process_count) @property def host(self):