Merge "mypy: service.py"
This commit is contained in:
commit
a2deecef89
@ -24,6 +24,7 @@ import random
|
|||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from oslo_concurrency import processutils
|
from oslo_concurrency import processutils
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -92,7 +93,7 @@ if profiler_opts:
|
|||||||
profiler_opts.set_defaults(CONF)
|
profiler_opts.set_defaults(CONF)
|
||||||
|
|
||||||
|
|
||||||
def setup_profiler(binary, host):
|
def setup_profiler(binary: str, host: str) -> None:
|
||||||
if (osprofiler_initializer is None or
|
if (osprofiler_initializer is None or
|
||||||
profiler is None or
|
profiler is None or
|
||||||
profiler_opts is None):
|
profiler_opts is None):
|
||||||
@ -129,10 +130,15 @@ class Service(service.Service):
|
|||||||
# Make service_id a class attribute so it can be used for clean up
|
# Make service_id a class attribute so it can be used for clean up
|
||||||
service_id = None
|
service_id = None
|
||||||
|
|
||||||
def __init__(self, host, binary, topic, manager, report_interval=None,
|
def __init__(self, host: str, binary: str, topic: str,
|
||||||
periodic_interval=None, periodic_fuzzy_delay=None,
|
manager: str,
|
||||||
service_name=None, coordination=False, cluster=None, *args,
|
report_interval: Optional[int] = None,
|
||||||
**kwargs):
|
periodic_interval: Optional[int] = None,
|
||||||
|
periodic_fuzzy_delay: Optional[int] = None,
|
||||||
|
service_name: Optional[str] = None,
|
||||||
|
coordination: bool = False,
|
||||||
|
cluster: Optional[str] = None,
|
||||||
|
*args, **kwargs):
|
||||||
super(Service, self).__init__()
|
super(Service, self).__init__()
|
||||||
|
|
||||||
if not rpc.initialized():
|
if not rpc.initialized():
|
||||||
@ -153,7 +159,8 @@ class Service(service.Service):
|
|||||||
cluster=self.cluster,
|
cluster=self.cluster,
|
||||||
service_name=service_name,
|
service_name=service_name,
|
||||||
*args, **kwargs)
|
*args, **kwargs)
|
||||||
self.availability_zone = self.manager.availability_zone
|
self.availability_zone: str = self.manager.availability_zone
|
||||||
|
self.model_disconnected: bool
|
||||||
|
|
||||||
# NOTE(geguileo): We need to create the Service DB entry before we
|
# NOTE(geguileo): We need to create the Service DB entry before we
|
||||||
# create the manager, otherwise capped versions for serializer and rpc
|
# create the manager, otherwise capped versions for serializer and rpc
|
||||||
@ -197,7 +204,7 @@ class Service(service.Service):
|
|||||||
# Service entry Entry didn't exist because it was manually removed
|
# Service entry Entry didn't exist because it was manually removed
|
||||||
# or it's the first time running, to be on the safe side we say we
|
# or it's the first time running, to be on the safe side we say we
|
||||||
# were added if we are clustered.
|
# were added if we are clustered.
|
||||||
self.added_to_cluster = bool(cluster)
|
self.added_to_cluster = bool(cluster) # type: ignore
|
||||||
|
|
||||||
self.report_interval = report_interval
|
self.report_interval = report_interval
|
||||||
self.periodic_interval = periodic_interval
|
self.periodic_interval = periodic_interval
|
||||||
@ -206,11 +213,11 @@ class Service(service.Service):
|
|||||||
self.saved_args, self.saved_kwargs = args, kwargs
|
self.saved_args, self.saved_kwargs = args, kwargs
|
||||||
|
|
||||||
setup_profiler(binary, host)
|
setup_profiler(binary, host)
|
||||||
self.rpcserver = None
|
self.rpcserver: Optional['messaging.rpc.RPCServer'] = None
|
||||||
self.backend_rpcserver = None
|
self.backend_rpcserver: Optional['messaging.rpc.RPCServer'] = None
|
||||||
self.cluster_rpcserver = None
|
self.cluster_rpcserver: Optional['messaging.rpc.RPCServer'] = None
|
||||||
|
|
||||||
def start(self):
|
def start(self) -> None:
|
||||||
version_string = version.version_string()
|
version_string = version.version_string()
|
||||||
LOG.info('Starting %(topic)s node (version %(version_string)s)',
|
LOG.info('Starting %(topic)s node (version %(version_string)s)',
|
||||||
{'topic': self.topic, 'version_string': version_string})
|
{'topic': self.topic, 'version_string': version_string})
|
||||||
@ -275,6 +282,7 @@ class Service(service.Service):
|
|||||||
initial_delay=self.report_interval)
|
initial_delay=self.report_interval)
|
||||||
|
|
||||||
if self.periodic_interval:
|
if self.periodic_interval:
|
||||||
|
initial_delay: Optional[int]
|
||||||
if self.periodic_fuzzy_delay:
|
if self.periodic_fuzzy_delay:
|
||||||
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
|
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
|
||||||
else:
|
else:
|
||||||
@ -282,7 +290,7 @@ class Service(service.Service):
|
|||||||
self.tg.add_timer(self.periodic_interval, self.periodic_tasks,
|
self.tg.add_timer(self.periodic_interval, self.periodic_tasks,
|
||||||
initial_delay=initial_delay)
|
initial_delay=initial_delay)
|
||||||
|
|
||||||
def basic_config_check(self):
|
def basic_config_check(self) -> None:
|
||||||
"""Perform basic config checks before starting service."""
|
"""Perform basic config checks before starting service."""
|
||||||
# Make sure report interval is less than service down time
|
# Make sure report interval is less than service down time
|
||||||
if self.report_interval:
|
if self.report_interval:
|
||||||
@ -299,7 +307,9 @@ class Service(service.Service):
|
|||||||
'new_down_time': new_down_time})
|
'new_down_time': new_down_time})
|
||||||
CONF.set_override('service_down_time', new_down_time)
|
CONF.set_override('service_down_time', new_down_time)
|
||||||
|
|
||||||
def _ensure_cluster_exists(self, context, service):
|
def _ensure_cluster_exists(self,
|
||||||
|
context: context.RequestContext,
|
||||||
|
service: 'Service') -> None:
|
||||||
if self.cluster:
|
if self.cluster:
|
||||||
try:
|
try:
|
||||||
cluster = objects.Cluster.get_by_id(context, None,
|
cluster = objects.Cluster.get_by_id(context, None,
|
||||||
@ -335,7 +345,9 @@ class Service(service.Service):
|
|||||||
except exception.ClusterExists:
|
except exception.ClusterExists:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _create_service_ref(self, context, rpc_version=None):
|
def _create_service_ref(self,
|
||||||
|
context: context.RequestContext,
|
||||||
|
rpc_version: Optional[str] = None) -> None:
|
||||||
kwargs = {
|
kwargs = {
|
||||||
'host': self.host,
|
'host': self.host,
|
||||||
'binary': self.binary,
|
'binary': self.binary,
|
||||||
@ -355,15 +367,23 @@ class Service(service.Service):
|
|||||||
# the cluster it will be saved.
|
# the cluster it will be saved.
|
||||||
service_ref.save()
|
service_ref.save()
|
||||||
|
|
||||||
def __getattr__(self, key):
|
def __getattr__(self, key: str):
|
||||||
manager = self.__dict__.get('manager', None)
|
manager = self.__dict__.get('manager', None)
|
||||||
return getattr(manager, key)
|
return getattr(manager, key)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, host=None, binary=None, topic=None, manager=None,
|
def create(cls,
|
||||||
report_interval=None, periodic_interval=None,
|
host: Optional[str] = None,
|
||||||
periodic_fuzzy_delay=None, service_name=None,
|
binary: Optional[str] = None,
|
||||||
coordination=False, cluster=None, **kwargs):
|
topic: Optional[str] = None,
|
||||||
|
manager: Optional[str] = None,
|
||||||
|
report_interval: Optional[int] = None,
|
||||||
|
periodic_interval: Optional[int] = None,
|
||||||
|
periodic_fuzzy_delay: Optional[int] = None,
|
||||||
|
service_name: Optional[str] = None,
|
||||||
|
coordination: bool = False,
|
||||||
|
cluster: Optional[str] = None,
|
||||||
|
**kwargs) -> 'Service':
|
||||||
"""Instantiates class and passes back application object.
|
"""Instantiates class and passes back application object.
|
||||||
|
|
||||||
:param host: defaults to CONF.host
|
:param host: defaults to CONF.host
|
||||||
@ -391,6 +411,9 @@ class Service(service.Service):
|
|||||||
periodic_interval = CONF.periodic_interval
|
periodic_interval = CONF.periodic_interval
|
||||||
if periodic_fuzzy_delay is None:
|
if periodic_fuzzy_delay is None:
|
||||||
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
|
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
|
||||||
|
|
||||||
|
assert host is not None
|
||||||
|
assert manager is not None
|
||||||
service_obj = cls(host, binary, topic, manager,
|
service_obj = cls(host, binary, topic, manager,
|
||||||
report_interval=report_interval,
|
report_interval=report_interval,
|
||||||
periodic_interval=periodic_interval,
|
periodic_interval=periodic_interval,
|
||||||
@ -401,11 +424,12 @@ class Service(service.Service):
|
|||||||
|
|
||||||
return service_obj
|
return service_obj
|
||||||
|
|
||||||
def stop(self):
|
def stop(self) -> None:
|
||||||
# Try to shut the connection down, but if we get any sort of
|
# Try to shut the connection down, but if we get any sort of
|
||||||
# errors, go ahead and ignore them.. as we're shutting down anyway
|
# errors, go ahead and ignore them.. as we're shutting down anyway
|
||||||
try:
|
try:
|
||||||
self.rpcserver.stop()
|
if self.rpcserver is not None:
|
||||||
|
self.rpcserver.stop()
|
||||||
if self.backend_rpcserver:
|
if self.backend_rpcserver:
|
||||||
self.backend_rpcserver.stop()
|
self.backend_rpcserver.stop()
|
||||||
if self.cluster_rpcserver:
|
if self.cluster_rpcserver:
|
||||||
@ -420,7 +444,7 @@ class Service(service.Service):
|
|||||||
pass
|
pass
|
||||||
super(Service, self).stop(graceful=True)
|
super(Service, self).stop(graceful=True)
|
||||||
|
|
||||||
def wait(self):
|
def wait(self) -> None:
|
||||||
if self.rpcserver:
|
if self.rpcserver:
|
||||||
self.rpcserver.wait()
|
self.rpcserver.wait()
|
||||||
if self.backend_rpcserver:
|
if self.backend_rpcserver:
|
||||||
@ -429,12 +453,12 @@ class Service(service.Service):
|
|||||||
self.cluster_rpcserver.wait()
|
self.cluster_rpcserver.wait()
|
||||||
super(Service, self).wait()
|
super(Service, self).wait()
|
||||||
|
|
||||||
def periodic_tasks(self, raise_on_error=False):
|
def periodic_tasks(self, raise_on_error: bool = False) -> None:
|
||||||
"""Tasks to be run at a periodic interval."""
|
"""Tasks to be run at a periodic interval."""
|
||||||
ctxt = context.get_admin_context()
|
ctxt = context.get_admin_context()
|
||||||
self.manager.run_periodic_tasks(ctxt, raise_on_error=raise_on_error)
|
self.manager.run_periodic_tasks(ctxt, raise_on_error=raise_on_error)
|
||||||
|
|
||||||
def report_state(self):
|
def report_state(self) -> None:
|
||||||
"""Update the state of this service in the datastore."""
|
"""Update the state of this service in the datastore."""
|
||||||
if not self.manager.is_working():
|
if not self.manager.is_working():
|
||||||
# NOTE(dulek): If manager reports a problem we're not sending
|
# NOTE(dulek): If manager reports a problem we're not sending
|
||||||
@ -486,7 +510,7 @@ class Service(service.Service):
|
|||||||
self.model_disconnected = True
|
self.model_disconnected = True
|
||||||
LOG.exception('Exception encountered: ')
|
LOG.exception('Exception encountered: ')
|
||||||
|
|
||||||
def reset(self):
|
def reset(self) -> None:
|
||||||
self.manager.reset()
|
self.manager.reset()
|
||||||
super(Service, self).reset()
|
super(Service, self).reset()
|
||||||
|
|
||||||
@ -548,7 +572,7 @@ class WSGIService(service.ServiceBase):
|
|||||||
manager_class = importutils.import_class(manager_class_name)
|
manager_class = importutils.import_class(manager_class_name)
|
||||||
return manager_class()
|
return manager_class()
|
||||||
|
|
||||||
def start(self):
|
def start(self) -> None:
|
||||||
"""Start serving this service using loaded configuration.
|
"""Start serving this service using loaded configuration.
|
||||||
|
|
||||||
Also, retrieve updated port number in case '0' was passed in, which
|
Also, retrieve updated port number in case '0' was passed in, which
|
||||||
@ -562,7 +586,7 @@ class WSGIService(service.ServiceBase):
|
|||||||
self.server.start()
|
self.server.start()
|
||||||
self.port = self.server.port
|
self.port = self.server.port
|
||||||
|
|
||||||
def stop(self):
|
def stop(self) -> None:
|
||||||
"""Stop serving this API.
|
"""Stop serving this API.
|
||||||
|
|
||||||
:returns: None
|
:returns: None
|
||||||
@ -570,7 +594,7 @@ class WSGIService(service.ServiceBase):
|
|||||||
"""
|
"""
|
||||||
self.server.stop()
|
self.server.stop()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self) -> None:
|
||||||
"""Wait for the service to stop serving this API.
|
"""Wait for the service to stop serving this API.
|
||||||
|
|
||||||
:returns: None
|
:returns: None
|
||||||
@ -578,7 +602,7 @@ class WSGIService(service.ServiceBase):
|
|||||||
"""
|
"""
|
||||||
self.server.wait()
|
self.server.wait()
|
||||||
|
|
||||||
def reset(self):
|
def reset(self) -> None:
|
||||||
"""Reset server greenpool size to default.
|
"""Reset server greenpool size to default.
|
||||||
|
|
||||||
:returns: None
|
:returns: None
|
||||||
@ -587,7 +611,7 @@ class WSGIService(service.ServiceBase):
|
|||||||
self.server.reset()
|
self.server.reset()
|
||||||
|
|
||||||
|
|
||||||
def process_launcher():
|
def process_launcher() -> service.ProcessLauncher:
|
||||||
return service.ProcessLauncher(CONF, restart_method='mutate')
|
return service.ProcessLauncher(CONF, restart_method='mutate')
|
||||||
|
|
||||||
|
|
||||||
@ -606,13 +630,13 @@ def serve(server, workers=None):
|
|||||||
restart_method='mutate')
|
restart_method='mutate')
|
||||||
|
|
||||||
|
|
||||||
def wait():
|
def wait() -> None:
|
||||||
CONF.log_opt_values(LOG, logging.DEBUG)
|
CONF.log_opt_values(LOG, logging.DEBUG)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_launcher.wait()
|
_launcher.wait() # type: ignore
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
_launcher.stop()
|
_launcher.stop() # type: ignore
|
||||||
rpc.cleanup()
|
rpc.cleanup()
|
||||||
|
|
||||||
|
|
||||||
@ -622,7 +646,7 @@ class Launcher(object):
|
|||||||
self.wait = wait
|
self.wait = wait
|
||||||
|
|
||||||
|
|
||||||
def get_launcher():
|
def get_launcher() -> service.ProcessLauncher:
|
||||||
# Note(lpetrut): ProcessLauncher uses green pipes which fail on Windows
|
# Note(lpetrut): ProcessLauncher uses green pipes which fail on Windows
|
||||||
# due to missing support of non-blocking I/O pipes. For this reason, the
|
# due to missing support of non-blocking I/O pipes. For this reason, the
|
||||||
# service must be spawned differently on Windows, using the ServiceLauncher
|
# service must be spawned differently on Windows, using the ServiceLauncher
|
||||||
|
@ -30,6 +30,7 @@ cinder/scheduler/weights/chance.py
|
|||||||
cinder/scheduler/weights/goodness.py
|
cinder/scheduler/weights/goodness.py
|
||||||
cinder/scheduler/weights/stochastic.py
|
cinder/scheduler/weights/stochastic.py
|
||||||
cinder/scheduler/weights/volume_number.py
|
cinder/scheduler/weights/volume_number.py
|
||||||
|
cinder/service.py
|
||||||
cinder/utils.py
|
cinder/utils.py
|
||||||
cinder/volume/__init__.py
|
cinder/volume/__init__.py
|
||||||
cinder/volume/api.py
|
cinder/volume/api.py
|
||||||
|
Loading…
Reference in New Issue
Block a user