diff --git a/cinder/service.py b/cinder/service.py index 848debb5e53..a45d48f679b 100644 --- a/cinder/service.py +++ b/cinder/service.py @@ -24,6 +24,7 @@ import random import subprocess import sys import time +from typing import Optional from oslo_concurrency import processutils from oslo_config import cfg @@ -92,7 +93,7 @@ if profiler_opts: profiler_opts.set_defaults(CONF) -def setup_profiler(binary, host): +def setup_profiler(binary: str, host: str) -> None: if (osprofiler_initializer is None or profiler is None or profiler_opts is None): @@ -129,10 +130,15 @@ class Service(service.Service): # Make service_id a class attribute so it can be used for clean up service_id = None - def __init__(self, host, binary, topic, manager, report_interval=None, - periodic_interval=None, periodic_fuzzy_delay=None, - service_name=None, coordination=False, cluster=None, *args, - **kwargs): + def __init__(self, host: str, binary: str, topic: str, + manager: str, + report_interval: Optional[int] = None, + periodic_interval: Optional[int] = None, + periodic_fuzzy_delay: Optional[int] = None, + service_name: Optional[str] = None, + coordination: bool = False, + cluster: Optional[str] = None, + *args, **kwargs): super(Service, self).__init__() if not rpc.initialized(): @@ -153,7 +159,8 @@ class Service(service.Service): cluster=self.cluster, service_name=service_name, *args, **kwargs) - self.availability_zone = self.manager.availability_zone + self.availability_zone: str = self.manager.availability_zone + self.model_disconnected: bool # NOTE(geguileo): We need to create the Service DB entry before we # create the manager, otherwise capped versions for serializer and rpc @@ -197,7 +204,7 @@ class Service(service.Service): # Service entry Entry didn't exist because it was manually removed # or it's the first time running, to be on the safe side we say we # were added if we are clustered. - self.added_to_cluster = bool(cluster) + self.added_to_cluster = bool(cluster) # type: ignore self.report_interval = report_interval self.periodic_interval = periodic_interval @@ -206,11 +213,11 @@ class Service(service.Service): self.saved_args, self.saved_kwargs = args, kwargs setup_profiler(binary, host) - self.rpcserver = None - self.backend_rpcserver = None - self.cluster_rpcserver = None + self.rpcserver: Optional['messaging.rpc.RPCServer'] = None + self.backend_rpcserver: Optional['messaging.rpc.RPCServer'] = None + self.cluster_rpcserver: Optional['messaging.rpc.RPCServer'] = None - def start(self): + def start(self) -> None: version_string = version.version_string() LOG.info('Starting %(topic)s node (version %(version_string)s)', {'topic': self.topic, 'version_string': version_string}) @@ -275,6 +282,7 @@ class Service(service.Service): initial_delay=self.report_interval) if self.periodic_interval: + initial_delay: Optional[int] if self.periodic_fuzzy_delay: initial_delay = random.randint(0, self.periodic_fuzzy_delay) else: @@ -282,7 +290,7 @@ class Service(service.Service): self.tg.add_timer(self.periodic_interval, self.periodic_tasks, initial_delay=initial_delay) - def basic_config_check(self): + def basic_config_check(self) -> None: """Perform basic config checks before starting service.""" # Make sure report interval is less than service down time if self.report_interval: @@ -299,7 +307,9 @@ class Service(service.Service): 'new_down_time': new_down_time}) CONF.set_override('service_down_time', new_down_time) - def _ensure_cluster_exists(self, context, service): + def _ensure_cluster_exists(self, + context: context.RequestContext, + service: 'Service') -> None: if self.cluster: try: cluster = objects.Cluster.get_by_id(context, None, @@ -335,7 +345,9 @@ class Service(service.Service): except exception.ClusterExists: pass - def _create_service_ref(self, context, rpc_version=None): + def _create_service_ref(self, + context: context.RequestContext, + rpc_version: Optional[str] = None) -> None: kwargs = { 'host': self.host, 'binary': self.binary, @@ -355,15 +367,23 @@ class Service(service.Service): # the cluster it will be saved. service_ref.save() - def __getattr__(self, key): + def __getattr__(self, key: str): manager = self.__dict__.get('manager', None) return getattr(manager, key) @classmethod - def create(cls, host=None, binary=None, topic=None, manager=None, - report_interval=None, periodic_interval=None, - periodic_fuzzy_delay=None, service_name=None, - coordination=False, cluster=None, **kwargs): + def create(cls, + host: Optional[str] = None, + binary: Optional[str] = None, + topic: Optional[str] = None, + manager: Optional[str] = None, + report_interval: Optional[int] = None, + periodic_interval: Optional[int] = None, + periodic_fuzzy_delay: Optional[int] = None, + service_name: Optional[str] = None, + coordination: bool = False, + cluster: Optional[str] = None, + **kwargs) -> 'Service': """Instantiates class and passes back application object. :param host: defaults to CONF.host @@ -391,6 +411,9 @@ class Service(service.Service): periodic_interval = CONF.periodic_interval if periodic_fuzzy_delay is None: periodic_fuzzy_delay = CONF.periodic_fuzzy_delay + + assert host is not None + assert manager is not None service_obj = cls(host, binary, topic, manager, report_interval=report_interval, periodic_interval=periodic_interval, @@ -401,11 +424,12 @@ class Service(service.Service): return service_obj - def stop(self): + def stop(self) -> None: # Try to shut the connection down, but if we get any sort of # errors, go ahead and ignore them.. as we're shutting down anyway try: - self.rpcserver.stop() + if self.rpcserver is not None: + self.rpcserver.stop() if self.backend_rpcserver: self.backend_rpcserver.stop() if self.cluster_rpcserver: @@ -420,7 +444,7 @@ class Service(service.Service): pass super(Service, self).stop(graceful=True) - def wait(self): + def wait(self) -> None: if self.rpcserver: self.rpcserver.wait() if self.backend_rpcserver: @@ -429,12 +453,12 @@ class Service(service.Service): self.cluster_rpcserver.wait() super(Service, self).wait() - def periodic_tasks(self, raise_on_error=False): + def periodic_tasks(self, raise_on_error: bool = False) -> None: """Tasks to be run at a periodic interval.""" ctxt = context.get_admin_context() self.manager.run_periodic_tasks(ctxt, raise_on_error=raise_on_error) - def report_state(self): + def report_state(self) -> None: """Update the state of this service in the datastore.""" if not self.manager.is_working(): # NOTE(dulek): If manager reports a problem we're not sending @@ -486,7 +510,7 @@ class Service(service.Service): self.model_disconnected = True LOG.exception('Exception encountered: ') - def reset(self): + def reset(self) -> None: self.manager.reset() super(Service, self).reset() @@ -548,7 +572,7 @@ class WSGIService(service.ServiceBase): manager_class = importutils.import_class(manager_class_name) return manager_class() - def start(self): + def start(self) -> None: """Start serving this service using loaded configuration. Also, retrieve updated port number in case '0' was passed in, which @@ -562,7 +586,7 @@ class WSGIService(service.ServiceBase): self.server.start() self.port = self.server.port - def stop(self): + def stop(self) -> None: """Stop serving this API. :returns: None @@ -570,7 +594,7 @@ class WSGIService(service.ServiceBase): """ self.server.stop() - def wait(self): + def wait(self) -> None: """Wait for the service to stop serving this API. :returns: None @@ -578,7 +602,7 @@ class WSGIService(service.ServiceBase): """ self.server.wait() - def reset(self): + def reset(self) -> None: """Reset server greenpool size to default. :returns: None @@ -587,7 +611,7 @@ class WSGIService(service.ServiceBase): self.server.reset() -def process_launcher(): +def process_launcher() -> service.ProcessLauncher: return service.ProcessLauncher(CONF, restart_method='mutate') @@ -606,13 +630,13 @@ def serve(server, workers=None): restart_method='mutate') -def wait(): +def wait() -> None: CONF.log_opt_values(LOG, logging.DEBUG) try: - _launcher.wait() + _launcher.wait() # type: ignore except KeyboardInterrupt: - _launcher.stop() + _launcher.stop() # type: ignore rpc.cleanup() @@ -622,7 +646,7 @@ class Launcher(object): self.wait = wait -def get_launcher(): +def get_launcher() -> service.ProcessLauncher: # Note(lpetrut): ProcessLauncher uses green pipes which fail on Windows # due to missing support of non-blocking I/O pipes. For this reason, the # service must be spawned differently on Windows, using the ServiceLauncher diff --git a/mypy-files.txt b/mypy-files.txt index e089a7014c1..9bf7a2314ad 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -30,6 +30,7 @@ cinder/scheduler/weights/chance.py cinder/scheduler/weights/goodness.py cinder/scheduler/weights/stochastic.py cinder/scheduler/weights/volume_number.py +cinder/service.py cinder/utils.py cinder/volume/__init__.py cinder/volume/api.py