From b1cdba1696f5d4ec71d37a773501bd4f9e0cddb9 Mon Sep 17 00:00:00 2001 From: Carl Baldwin Date: Thu, 5 May 2016 21:07:21 +0000 Subject: [PATCH] Revert "Remove threading before process forking" I think this needs a little more thought. This broke OVN at least and I don't think that's good. We need to figure out how to be compatible with existing plugins, even out of tree ones. This reverts commit 1cafff087194711399ad2a85a9f394f7204d7bdd. Change-Id: Ie087fb11213cc85911483c2d32c463fa9c973e54 --- neutron/db/agentschedulers_db.py | 50 +------- neutron/db/l3_agentschedulers_db.py | 7 - neutron/neutron_plugin_base_v2.py | 13 +- neutron/plugins/ml2/plugin.py | 6 +- neutron/server/rpc_eventlet.py | 12 +- neutron/server/wsgi_eventlet.py | 35 +++-- neutron/service.py | 120 +++++------------- .../services/l3_router/l3_router_plugin.py | 12 +- neutron/services/metering/metering_plugin.py | 5 +- neutron/services/service_base.py | 8 +- neutron/tests/base.py | 2 +- neutron/tests/functional/test_server.py | 16 +-- .../tests/unit/db/test_agentschedulers_db.py | 2 +- neutron/worker.py | 54 +------- neutron/wsgi.py | 14 +- 15 files changed, 96 insertions(+), 260 deletions(-) diff --git a/neutron/db/agentschedulers_db.py b/neutron/db/agentschedulers_db.py index e12b1f399d3..2f90bf9f5fb 100644 --- a/neutron/db/agentschedulers_db.py +++ b/neutron/db/agentschedulers_db.py @@ -35,7 +35,6 @@ from neutron.db.availability_zone import network as network_az from neutron.db import model_base from neutron.extensions import agent as ext_agent from neutron.extensions import dhcpagentscheduler -from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @@ -83,38 +82,6 @@ class NetworkDhcpAgentBinding(model_base.BASEV2): primary_key=True) -class AgentStatusCheckWorker(neutron_worker.NeutronWorker): - - def __init__(self, check_func, interval, initial_delay): - super(AgentStatusCheckWorker, self).__init__(worker_process_count=0) - - self._check_func = check_func - self._loop = None - self._interval = interval - self._initial_delay = initial_delay - - def start(self): - super(AgentStatusCheckWorker, self).start() - if self._loop is None: - self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func) - self._loop.start(interval=self._interval, - initial_delay=self._initial_delay) - - def wait(self): - if self._loop is not None: - self._loop.wait() - - def stop(self): - if self._loop is not None: - self._loop.stop() - - def reset(self): - if self._loop is not None: - self.stop() - self.wait() - self.start() - - class AgentSchedulerDbMixin(agents_db.AgentDbMixin): """Common class for agent scheduler mixins.""" @@ -154,16 +121,18 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin): return result def add_agent_status_check(self, function): + loop = loopingcall.FixedIntervalLoopingCall(function) # TODO(enikanorov): make interval configurable rather than computed interval = max(cfg.CONF.agent_down_time // 2, 1) # add random initial delay to allow agents to check in after the # neutron server first starts. random to offset multiple servers initial_delay = random.randint(interval, interval * 2) + loop.start(interval=interval, initial_delay=initial_delay) - check_worker = AgentStatusCheckWorker(function, interval, - initial_delay) - - self.add_worker(check_worker) + if hasattr(self, 'periodic_agent_loops'): + self.periodic_agent_loops.append(loop) + else: + self.periodic_agent_loops = [loop] def agent_dead_limit_seconds(self): return cfg.CONF.agent_down_time * 2 @@ -198,13 +167,6 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler network_scheduler = None def start_periodic_dhcp_agent_status_check(self): - LOG.warning( - _LW("DEPRECATED method 'start_periodic_dhcp_agent_status_check'. " - "Please use 'add_periodic_dhcp_agent_status_check' instead") - ) - self.add_periodic_dhcp_agent_status_check() - - def add_periodic_dhcp_agent_status_check(self): if not cfg.CONF.allow_automatic_dhcp_failover: LOG.info(_LI("Skipping periodic DHCP agent status check because " "automatic network rescheduling is disabled.")) diff --git a/neutron/db/l3_agentschedulers_db.py b/neutron/db/l3_agentschedulers_db.py index e98e3bf294e..b0516072ee1 100644 --- a/neutron/db/l3_agentschedulers_db.py +++ b/neutron/db/l3_agentschedulers_db.py @@ -83,13 +83,6 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, router_scheduler = None def start_periodic_l3_agent_status_check(self): - LOG.warning( - _LW("DEPRECATED method 'start_periodic_l3_agent_status_check'. " - "Please use 'add_periodic_l3_agent_status_check' instead") - ) - self.add_periodic_l3_agent_status_check() - - def add_periodic_l3_agent_status_check(self): if not cfg.CONF.allow_automatic_l3agent_failover: LOG.info(_LI("Skipping period L3 agent status check because " "automatic router rescheduling is disabled.")) diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 513890f7822..12ed5bba1e6 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -24,11 +24,9 @@ import abc import six -from neutron import worker as neutron_worker - @six.add_metaclass(abc.ABCMeta) -class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin): +class NeutronPluginBaseV2(object): @abc.abstractmethod def create_subnet(self, context, subnet): @@ -411,3 +409,12 @@ class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin): """ return (self.__class__.start_rpc_state_reports_listener != NeutronPluginBaseV2.start_rpc_state_reports_listener) + + def get_workers(self): + """Returns a collection NeutronWorker instances + + If a plugin needs to define worker processes outside of API/RPC workers + then it will override this and return a collection of NeutronWorker + instances + """ + return () diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 68861178c77..c361475c5ab 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -162,7 +162,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self._setup_dhcp() self._start_rpc_notifiers() self.add_agent_status_check(self.agent_health_check) - self.add_workers(self.mechanism_manager.get_workers()) self._verify_service_plugins_requirements() LOG.info(_LI("Modular L2 Plugin initialization complete")) @@ -183,7 +182,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver ) - self.add_periodic_dhcp_agent_status_check() + self.start_periodic_dhcp_agent_status_check() def _verify_service_plugins_requirements(self): for service_plugin in cfg.CONF.service_plugins: @@ -1625,6 +1624,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return port.id return device + def get_workers(self): + return self.mechanism_manager.get_workers() + def filter_hosts_with_network_access( self, context, network_id, candidate_hosts): segments = db.get_network_segments(context.session, network_id) diff --git a/neutron/server/rpc_eventlet.py b/neutron/server/rpc_eventlet.py index 36118a034f9..9b218cdaeb5 100644 --- a/neutron/server/rpc_eventlet.py +++ b/neutron/server/rpc_eventlet.py @@ -18,9 +18,8 @@ # If ../neutron/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... -from oslo_config import cfg +import eventlet from oslo_log import log -from oslo_service import service as common_service from neutron._i18n import _LI from neutron import service @@ -29,14 +28,13 @@ LOG = log.getLogger(__name__) def eventlet_rpc_server(): + pool = eventlet.GreenPool() LOG.info(_LI("Eventlet based AMQP RPC server starting...")) - rpc_workers_launcher = common_service.ProcessLauncher( - cfg.CONF, wait_interval=1.0 - ) try: - service.start_rpc_workers(rpc_workers_launcher) + neutron_rpc = service.serve_rpc() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) else: - rpc_workers_launcher.wait() + pool.spawn(neutron_rpc.wait) + pool.waitall() diff --git a/neutron/server/wsgi_eventlet.py b/neutron/server/wsgi_eventlet.py index 7381da8f834..472ea6bf743 100644 --- a/neutron/server/wsgi_eventlet.py +++ b/neutron/server/wsgi_eventlet.py @@ -12,10 +12,7 @@ # under the License. import eventlet - -from oslo_config import cfg from oslo_log import log -from oslo_service import service as common_service from neutron._i18n import _LI from neutron import service @@ -29,24 +26,24 @@ def eventlet_wsgi_server(): def start_api_and_rpc_workers(neutron_api): + pool = eventlet.GreenPool() + + api_thread = pool.spawn(neutron_api.wait) + try: - plugin_workers_launcher = common_service.ProcessLauncher( - cfg.CONF, wait_interval=1.0 - ) - service.start_rpc_workers(plugin_workers_launcher) - - pool = eventlet.GreenPool() - api_thread = pool.spawn(neutron_api.wait) - plugin_workers_thread = pool.spawn(plugin_workers_launcher.wait) - - # api and other workers should die together. When one dies, - # kill the other. - api_thread.link(lambda gt: plugin_workers_thread.kill()) - plugin_workers_thread.link(lambda gt: api_thread.kill()) - - pool.waitall() + neutron_rpc = service.serve_rpc() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) + else: + rpc_thread = pool.spawn(neutron_rpc.wait) - neutron_api.wait() + plugin_workers = service.start_plugin_workers() + for worker in plugin_workers: + pool.spawn(worker.wait) + + # api and rpc should die together. When one dies, kill the other. + rpc_thread.link(lambda gt: api_thread.kill()) + api_thread.link(lambda gt: rpc_thread.kill()) + + pool.waitall() diff --git a/neutron/service.py b/neutron/service.py index 7bd74467738..6510f579667 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -32,7 +32,7 @@ from neutron.common import rpc as n_rpc from neutron import context from neutron.db import api as session from neutron import manager -from neutron import worker as neutron_worker +from neutron import worker from neutron import wsgi @@ -113,15 +113,25 @@ def serve_wsgi(cls): return service -class RpcWorker(neutron_worker.NeutronWorker): +def start_plugin_workers(): + launchers = [] + # NOTE(twilson) get_service_plugins also returns the core plugin + for plugin in manager.NeutronManager.get_unique_service_plugins(): + # TODO(twilson) Instead of defaulting here, come up with a good way to + # share a common get_workers default between NeutronPluginBaseV2 and + # ServicePluginBase + for plugin_worker in getattr(plugin, 'get_workers', tuple)(): + launcher = common_service.ProcessLauncher(cfg.CONF) + launcher.launch_service(plugin_worker) + launchers.append(launcher) + return launchers + + +class RpcWorker(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" start_listeners_method = 'start_rpc_listeners' - def __init__(self, plugins, worker_process_count=1): - super(RpcWorker, self).__init__( - worker_process_count=worker_process_count - ) - + def __init__(self, plugins): self._plugins = plugins self._servers = [] @@ -168,7 +178,7 @@ class RpcReportsWorker(RpcWorker): start_listeners_method = 'start_rpc_state_reports_listener' -def _get_rpc_workers(): +def serve_rpc(): plugin = manager.NeutronManager.get_plugin() service_plugins = ( manager.NeutronManager.get_service_plugins().values()) @@ -188,101 +198,31 @@ def _get_rpc_workers(): cfg.CONF.rpc_workers) raise NotImplementedError() - # passing service plugins only, because core plugin is among them - rpc_workers = [RpcWorker(service_plugins, - worker_process_count=cfg.CONF.rpc_workers)] - - if (cfg.CONF.rpc_state_report_workers > 0 and - plugin.rpc_state_report_workers_supported()): - rpc_workers.append( - RpcReportsWorker( - [plugin], - worker_process_count=cfg.CONF.rpc_state_report_workers - ) - ) - return rpc_workers - - -class AllServicesNeutronWorker(neutron_worker.NeutronWorker): - def __init__(self, services, worker_process_count=1): - super(AllServicesNeutronWorker, self).__init__(worker_process_count) - self._services = services - self._launcher = common_service.Launcher(cfg.CONF) - - def start(self): - for srv in self._services: - self._launcher.launch_service(srv) - super(AllServicesNeutronWorker, self).start() - - def stop(self): - self._launcher.stop() - - def wait(self): - self._launcher.wait() - - def reset(self): - self._launcher.restart() - - -def _start_workers(worker_launcher, workers): - if not workers: - return try: + # passing service plugins only, because core plugin is among them + rpc = RpcWorker(service_plugins) # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers) session.dispose() + launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) + launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) + if (cfg.CONF.rpc_state_report_workers > 0 and + plugin.rpc_state_report_workers_supported()): + rpc_state_rep = RpcReportsWorker([plugin]) + LOG.debug('using launcher for state reports rpc, workers=%s', + cfg.CONF.rpc_state_report_workers) + launcher.launch_service( + rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers) - for worker in workers: - worker_launcher.launch_service(worker, worker.worker_process_count) + return launcher except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE('Unrecoverable error: please check log for ' 'details.')) -def start_plugin_workers(worker_launcher): - # NOTE(twilson) get_service_plugins also returns the core plugin - plugins = manager.NeutronManager.get_unique_service_plugins() - - # TODO(twilson) Instead of defaulting here, come up with a good way to - # share a common get_workers default between NeutronPluginBaseV2 and - # ServicePluginBase - plugin_workers = [ - plugin_worker - for plugin in plugins if hasattr(plugin, 'get_workers') - for plugin_worker in plugin.get_workers() - ] - - # we need to fork all necessary processes before spawning extra threads - # to avoid problems with infinite resource locking and other concurrency - # problems - process_plugin_workers = [ - plugin_worker for plugin_worker in plugin_workers - if plugin_worker.worker_process_count > 0 - ] - - thread_plugin_workers = [ - plugin_worker for plugin_worker in plugin_workers - if plugin_worker.worker_process_count < 1 - ] - - # add extra process worker and spawn there all workers with - # worker_process_count == 0 - if thread_plugin_workers: - process_plugin_workers.append( - AllServicesNeutronWorker(thread_plugin_workers) - ) - - _start_workers(worker_launcher, process_plugin_workers) - - -def start_rpc_workers(worker_launcher): - rpc_workers = _get_rpc_workers() - _start_workers(worker_launcher, rpc_workers) - - def _get_api_workers(): workers = cfg.CONF.api_workers if not workers: diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index ef89c626612..c10668cbc11 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -32,7 +32,6 @@ from neutron.db import l3_gwmode_db from neutron.db import l3_hamode_db from neutron.plugins.common import constants from neutron.quota import resource_registry -from neutron import service from neutron.services import service_base @@ -63,23 +62,20 @@ class L3RouterPlugin(service_base.ServicePluginBase, def __init__(self): self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) - self.add_periodic_l3_agent_status_check() + self.start_periodic_l3_agent_status_check() super(L3RouterPlugin, self).__init__() if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe() - self.agent_notifiers.update( - {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) - - rpc_worker = service.RpcWorker([self], worker_process_count=0) - - self.add_worker(rpc_worker) + self.start_rpc_listeners() @log_helpers.log_method_call def start_rpc_listeners(self): # RPC support self.topic = topics.L3PLUGIN self.conn = n_rpc.create_connection() + self.agent_notifiers.update( + {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index f7cb42411e6..11a928642a0 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -17,7 +17,6 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.db.metering import metering_db from neutron.db.metering import metering_rpc -from neutron import service class MeteringPlugin(metering_db.MeteringDbMixin): @@ -29,9 +28,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin): super(MeteringPlugin, self).__init__() self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() - rpc_worker = service.RpcWorker([self], worker_process_count=0) - - self.add_worker(rpc_worker) + self.start_rpc_listeners() def start_rpc_listeners(self): self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] diff --git a/neutron/services/service_base.py b/neutron/services/service_base.py index 33da0286d15..5fa9bcf388d 100644 --- a/neutron/services/service_base.py +++ b/neutron/services/service_base.py @@ -24,14 +24,12 @@ from neutron._i18n import _, _LE, _LI from neutron.api import extensions from neutron.db import servicetype_db as sdb from neutron.services import provider_configuration as pconf -from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) -class ServicePluginBase(extensions.PluginInterface, - neutron_worker.WorkerSupportServiceMixin): +class ServicePluginBase(extensions.PluginInterface): """Define base interface for any Advanced Service plugin.""" supported_extension_aliases = [] @@ -48,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface, """Return string description of the plugin.""" pass + def get_workers(self): + """Returns a collection of NeutronWorkers""" + return () + def load_drivers(service_type, plugin): """Loads drivers for specific service. diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 6488cabc3d3..6e3d8b3a9bc 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -414,7 +414,7 @@ class PluginFixture(fixtures.Fixture): self.patched_default_svc_plugins = self.default_svc_plugins_p.start() self.dhcp_periodic_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' - 'add_periodic_dhcp_agent_status_check') + 'start_periodic_dhcp_agent_status_check') self.patched_dhcp_periodic = self.dhcp_periodic_p.start() self.agent_health_check_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 2c7615a5c05..891bdf3276d 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -22,13 +22,12 @@ import traceback import httplib2 import mock from oslo_config import cfg -from oslo_service import service as common_service import psutil from neutron.agent.linux import utils from neutron import service from neutron.tests import base -from neutron import worker as neutron_worker +from neutron import worker from neutron import wsgi @@ -245,9 +244,8 @@ class TestRPCServer(TestNeutronServer): # not interested in state report workers specifically CONF.set_override("rpc_state_report_workers", 0) - rpc_workers_launcher = common_service.ProcessLauncher(CONF) - service.start_rpc_workers(rpc_workers_launcher) - rpc_workers_launcher.wait() + launcher = service.serve_rpc() + launcher.wait() def test_restart_rpc_on_sighup_multiple_workers(self): self._test_restart_service_on_sighup(service=self._serve_rpc, @@ -266,12 +264,12 @@ class TestPluginWorker(TestNeutronServer): def _start_plugin(self, workers=1): with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp: gp.return_value = self.plugin - plugin_workers_launcher = common_service.ProcessLauncher(CONF) - service.start_plugin_workers(plugin_workers_launcher) - plugin_workers_launcher.wait() + launchers = service.start_plugin_workers() + for launcher in launchers: + launcher.wait() def test_start(self): - class FakeWorker(neutron_worker.NeutronWorker): + class FakeWorker(worker.NeutronWorker): def start(self): pass diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index 655073846f9..692e1b96de1 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -256,7 +256,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, self.patched_l3_notify = self.l3_notify_p.start() self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.' 'L3AgentSchedulerDbMixin.' - 'add_periodic_l3_agent_status_check') + 'start_periodic_l3_agent_status_check') self.patched_l3_periodic = self.l3_periodic_p.start() self.dhcp_notify_p = mock.patch( 'neutron.extensions.dhcpagentscheduler.notify') diff --git a/neutron/worker.py b/neutron/worker.py index a602d1c01f0..80a16533eba 100644 --- a/neutron/worker.py +++ b/neutron/worker.py @@ -17,38 +17,6 @@ from neutron.callbacks import registry from neutron.callbacks import resources -class WorkerSupportServiceMixin(object): - - @property - def _workers(self): - try: - return self.__workers - except AttributeError: - self.__workers = [] - return self.__workers - - def get_workers(self): - """Returns a collection NeutronWorker instances needed by this service - """ - return list(self._workers) - - def add_worker(self, worker): - """Adds NeutronWorker needed for this service - - If a object needs to define workers thread/processes outside of API/RPC - workers then it will call this method to register worker. Should be - called on initialization stage before running services - """ - self._workers.append(worker) - - def add_workers(self, workers): - """Adds NeutronWorker list needed for this service - - The same as add_worker but adds a list of workers - """ - self._workers.extend(workers) - - class NeutronWorker(service.ServiceBase): """Partial implementation of the ServiceBase ABC @@ -68,25 +36,5 @@ class NeutronWorker(service.ServiceBase): super(MyPluginWorker, self).start() do_sync() """ - - # default class value for case when super().__init__ is not called - _worker_process_count = 1 - - def __init__(self, worker_process_count=_worker_process_count): - """ - Initialize worker - - :param worker_process_count: Defines how many processes to spawn for - worker: - 0 - spawn 1 new worker thread, - 1..N - spawn N new worker processes - """ - self._worker_process_count = worker_process_count - - @property - def worker_process_count(self): - return self._worker_process_count - def start(self): - if self.worker_process_count > 0: - registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) + registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) diff --git a/neutron/wsgi.py b/neutron/wsgi.py index f49358579c6..684fb69cb87 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -44,7 +44,7 @@ from neutron.common import config from neutron.common import exceptions as n_exc from neutron import context from neutron.db import api -from neutron import worker as neutron_worker +from neutron import worker socket_opts = [ cfg.IntOpt('backlog', @@ -74,12 +74,9 @@ def encode_body(body): return encodeutils.to_utf8(body) -class WorkerService(neutron_worker.NeutronWorker): +class WorkerService(worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" - def __init__(self, service, application, disable_ssl=False, - worker_process_count=0): - super(WorkerService, self).__init__(worker_process_count) - + def __init__(self, service, application, disable_ssl=False): self._service = service self._application = application self._disable_ssl = disable_ssl @@ -191,7 +188,7 @@ class Server(object): self._launch(application, workers) def _launch(self, application, workers=0): - service = WorkerService(self, application, self.disable_ssl, workers) + service = WorkerService(self, application, self.disable_ssl) if workers < 1: # The API service should run in the current process. self._server = service @@ -209,8 +206,7 @@ class Server(object): # wait interval past the default of 0.01s. self._server = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) - self._server.launch_service(service, - workers=service.worker_process_count) + self._server.launch_service(service, workers=workers) @property def host(self):