mypy: service.py

Change-Id: Iab6db48ae180fb17b5e8ebd7d462c22a7fa2ab49
This commit is contained in:
Eric Harney 2021-04-27 13:16:55 -04:00
parent ee662725a6
commit e6a264e4ad
2 changed files with 59 additions and 34 deletions

View File

@ -24,6 +24,7 @@ import random
import subprocess
import sys
import time
from typing import Optional
from oslo_concurrency import processutils
from oslo_config import cfg
@ -92,7 +93,7 @@ if profiler_opts:
profiler_opts.set_defaults(CONF)
def setup_profiler(binary, host):
def setup_profiler(binary: str, host: str) -> None:
if (osprofiler_initializer is None or
profiler is None or
profiler_opts is None):
@ -129,10 +130,15 @@ class Service(service.Service):
# Make service_id a class attribute so it can be used for clean up
service_id = None
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, coordination=False, cluster=None, *args,
**kwargs):
def __init__(self, host: str, binary: str, topic: str,
manager: str,
report_interval: Optional[int] = None,
periodic_interval: Optional[int] = None,
periodic_fuzzy_delay: Optional[int] = None,
service_name: Optional[str] = None,
coordination: bool = False,
cluster: Optional[str] = None,
*args, **kwargs):
super(Service, self).__init__()
if not rpc.initialized():
@ -153,7 +159,8 @@ class Service(service.Service):
cluster=self.cluster,
service_name=service_name,
*args, **kwargs)
self.availability_zone = self.manager.availability_zone
self.availability_zone: str = self.manager.availability_zone
self.model_disconnected: bool
# NOTE(geguileo): We need to create the Service DB entry before we
# create the manager, otherwise capped versions for serializer and rpc
@ -197,7 +204,7 @@ class Service(service.Service):
# Service entry Entry didn't exist because it was manually removed
# or it's the first time running, to be on the safe side we say we
# were added if we are clustered.
self.added_to_cluster = bool(cluster)
self.added_to_cluster = bool(cluster) # type: ignore
self.report_interval = report_interval
self.periodic_interval = periodic_interval
@ -206,11 +213,11 @@ class Service(service.Service):
self.saved_args, self.saved_kwargs = args, kwargs
setup_profiler(binary, host)
self.rpcserver = None
self.backend_rpcserver = None
self.cluster_rpcserver = None
self.rpcserver: Optional['messaging.rpc.RPCServer'] = None
self.backend_rpcserver: Optional['messaging.rpc.RPCServer'] = None
self.cluster_rpcserver: Optional['messaging.rpc.RPCServer'] = None
def start(self):
def start(self) -> None:
version_string = version.version_string()
LOG.info('Starting %(topic)s node (version %(version_string)s)',
{'topic': self.topic, 'version_string': version_string})
@ -275,6 +282,7 @@ class Service(service.Service):
initial_delay=self.report_interval)
if self.periodic_interval:
initial_delay: Optional[int]
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
@ -282,7 +290,7 @@ class Service(service.Service):
self.tg.add_timer(self.periodic_interval, self.periodic_tasks,
initial_delay=initial_delay)
def basic_config_check(self):
def basic_config_check(self) -> None:
"""Perform basic config checks before starting service."""
# Make sure report interval is less than service down time
if self.report_interval:
@ -299,7 +307,9 @@ class Service(service.Service):
'new_down_time': new_down_time})
CONF.set_override('service_down_time', new_down_time)
def _ensure_cluster_exists(self, context, service):
def _ensure_cluster_exists(self,
context: context.RequestContext,
service: 'Service') -> None:
if self.cluster:
try:
cluster = objects.Cluster.get_by_id(context, None,
@ -335,7 +345,9 @@ class Service(service.Service):
except exception.ClusterExists:
pass
def _create_service_ref(self, context, rpc_version=None):
def _create_service_ref(self,
context: context.RequestContext,
rpc_version: Optional[str] = None) -> None:
kwargs = {
'host': self.host,
'binary': self.binary,
@ -355,15 +367,23 @@ class Service(service.Service):
# the cluster it will be saved.
service_ref.save()
def __getattr__(self, key):
def __getattr__(self, key: str):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None,
coordination=False, cluster=None, **kwargs):
def create(cls,
host: Optional[str] = None,
binary: Optional[str] = None,
topic: Optional[str] = None,
manager: Optional[str] = None,
report_interval: Optional[int] = None,
periodic_interval: Optional[int] = None,
periodic_fuzzy_delay: Optional[int] = None,
service_name: Optional[str] = None,
coordination: bool = False,
cluster: Optional[str] = None,
**kwargs) -> 'Service':
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
@ -391,6 +411,9 @@ class Service(service.Service):
periodic_interval = CONF.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
assert host is not None
assert manager is not None
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
@ -401,10 +424,11 @@ class Service(service.Service):
return service_obj
def stop(self):
def stop(self) -> None:
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
if self.rpcserver is not None:
self.rpcserver.stop()
if self.backend_rpcserver:
self.backend_rpcserver.stop()
@ -420,7 +444,7 @@ class Service(service.Service):
pass
super(Service, self).stop(graceful=True)
def wait(self):
def wait(self) -> None:
if self.rpcserver:
self.rpcserver.wait()
if self.backend_rpcserver:
@ -429,12 +453,12 @@ class Service(service.Service):
self.cluster_rpcserver.wait()
super(Service, self).wait()
def periodic_tasks(self, raise_on_error=False):
def periodic_tasks(self, raise_on_error: bool = False) -> None:
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.run_periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
def report_state(self) -> None:
"""Update the state of this service in the datastore."""
if not self.manager.is_working():
# NOTE(dulek): If manager reports a problem we're not sending
@ -486,7 +510,7 @@ class Service(service.Service):
self.model_disconnected = True
LOG.exception('Exception encountered: ')
def reset(self):
def reset(self) -> None:
self.manager.reset()
super(Service, self).reset()
@ -548,7 +572,7 @@ class WSGIService(service.ServiceBase):
manager_class = importutils.import_class(manager_class_name)
return manager_class()
def start(self):
def start(self) -> None:
"""Start serving this service using loaded configuration.
Also, retrieve updated port number in case '0' was passed in, which
@ -562,7 +586,7 @@ class WSGIService(service.ServiceBase):
self.server.start()
self.port = self.server.port
def stop(self):
def stop(self) -> None:
"""Stop serving this API.
:returns: None
@ -570,7 +594,7 @@ class WSGIService(service.ServiceBase):
"""
self.server.stop()
def wait(self):
def wait(self) -> None:
"""Wait for the service to stop serving this API.
:returns: None
@ -578,7 +602,7 @@ class WSGIService(service.ServiceBase):
"""
self.server.wait()
def reset(self):
def reset(self) -> None:
"""Reset server greenpool size to default.
:returns: None
@ -587,7 +611,7 @@ class WSGIService(service.ServiceBase):
self.server.reset()
def process_launcher():
def process_launcher() -> service.ProcessLauncher:
return service.ProcessLauncher(CONF, restart_method='mutate')
@ -606,13 +630,13 @@ def serve(server, workers=None):
restart_method='mutate')
def wait():
def wait() -> None:
CONF.log_opt_values(LOG, logging.DEBUG)
try:
_launcher.wait()
_launcher.wait() # type: ignore
except KeyboardInterrupt:
_launcher.stop()
_launcher.stop() # type: ignore
rpc.cleanup()
@ -622,7 +646,7 @@ class Launcher(object):
self.wait = wait
def get_launcher():
def get_launcher() -> service.ProcessLauncher:
# Note(lpetrut): ProcessLauncher uses green pipes which fail on Windows
# due to missing support of non-blocking I/O pipes. For this reason, the
# service must be spawned differently on Windows, using the ServiceLauncher

View File

@ -27,6 +27,7 @@ cinder/scheduler/weights/chance.py
cinder/scheduler/weights/goodness.py
cinder/scheduler/weights/stochastic.py
cinder/scheduler/weights/volume_number.py
cinder/service.py
cinder/utils.py
cinder/volume/__init__.py
cinder/volume/api.py