From 698ab86a74442a13713902dfb7bf0bc4ec7d628c Mon Sep 17 00:00:00 2001 From: Eric Harney Date: Tue, 19 May 2020 10:42:54 -0400 Subject: [PATCH] mypy: annotate volume manager This is just a first pass to get things rolling. Change-Id: I67ac3df632a34729646e7b037205327696818a22 --- cinder/context.py | 2 +- cinder/image/glance.py | 5 +- cinder/manager.py | 23 +- cinder/volume/manager.py | 414 +++++++++++++++++++++++----------- cinder/volume/volume_types.py | 8 +- mypy-files.txt | 3 + 6 files changed, 306 insertions(+), 149 deletions(-) diff --git a/cinder/context.py b/cinder/context.py index 9bd33dbe2fe..9118312ef78 100644 --- a/cinder/context.py +++ b/cinder/context.py @@ -122,7 +122,7 @@ class RequestContext(context.RequestContext): # We need to have RequestContext attributes defined # when policy.check_is_admin invokes request logging # to make it loggable. - if self.is_admin is None: + if self.is_admin is None: # type: ignore self.is_admin = policy.check_is_admin(self) elif self.is_admin and 'admin' not in self.roles: self.roles.append('admin') diff --git a/cinder/image/glance.py b/cinder/image/glance.py index 37ba4167ae3..59f3451974d 100644 --- a/cinder/image/glance.py +++ b/cinder/image/glance.py @@ -23,6 +23,7 @@ import shutil import sys import textwrap import time +import typing as ty import urllib import glanceclient.exc @@ -32,6 +33,7 @@ from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_utils import timeutils +from cinder import context from cinder import exception from cinder.i18n import _ from cinder import service_auth @@ -651,7 +653,8 @@ def _translate_plain_exception(exc_value): return exc_value -def get_remote_image_service(context, image_href): +def get_remote_image_service(context: context.RequestContext, + image_href) -> ty.Tuple[GlanceImageService, str]: """Create an image_service and parse the id from the given image_href. The image_href param can be an href of the form diff --git a/cinder/manager.py b/cinder/manager.py index 08940ceed14..6689b3d0c34 100644 --- a/cinder/manager.py +++ b/cinder/manager.py @@ -54,6 +54,7 @@ This module provides Manager, a base class for managers. from eventlet import greenpool from eventlet import tpool from oslo_config import cfg +import oslo_config.types from oslo_log import log as logging import oslo_messaging as messaging from oslo_service import periodic_task @@ -83,16 +84,20 @@ class Manager(base.Base, PeriodicTasks): target = messaging.Target(version=RPC_API_VERSION) - def __init__(self, host=None, db_driver=None, cluster=None, **kwargs): + def __init__(self, + host: oslo_config.types.HostAddress = None, + db_driver=None, + cluster=None, + **_kwargs): if not host: host = CONF.host - self.host = host + self.host: oslo_config.types.HostAddress = host self.cluster = cluster - self.additional_endpoints = [] + self.additional_endpoints: list = [] self.availability_zone = CONF.storage_availability_zone - super(Manager, self).__init__(db_driver) + super(Manager, self).__init__(db_driver) # type: ignore - def _set_tpool_size(self, nthreads): + def _set_tpool_size(self, nthreads: int) -> None: # NOTE(geguileo): Until PR #472 is merged we have to be very careful # not to call "tpool.execute" before calling this method. tpool.set_num_threads(nthreads) @@ -217,14 +222,14 @@ class SchedulerDependentManager(ThreadPoolManager): class CleanableManager(object): - def do_cleanup(self, context, cleanup_request): + def do_cleanup(self, context, cleanup_request) -> None: LOG.info('Initiating service %s cleanup', cleanup_request.service_id) # If the 'until' field in the cleanup request is not set, we default to # this very moment. until = cleanup_request.until or timeutils.utcnow() - keep_entry = False + keep_entry: bool = False to_clean = db.worker_get_all( context, @@ -300,10 +305,10 @@ class CleanableManager(object): LOG.info('Service %s cleanup completed.', cleanup_request.service_id) - def _do_cleanup(self, ctxt, vo_resource): + def _do_cleanup(self, ctxt, vo_resource) -> bool: return False - def init_host(self, service_id, **kwargs): + def init_host(self, service_id, **kwargs) -> None: ctxt = context.get_admin_context() self.service_id = service_id # TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 8cd2d40d040..b1e38782a65 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -36,6 +36,7 @@ intact. """ import time +import typing as ty from castellan import key_manager from oslo_config import cfg @@ -48,6 +49,7 @@ from oslo_utils import importutils from oslo_utils import timeutils from oslo_utils import units from oslo_utils import uuidutils +from oslo_versionedobjects import fields as ovo_fields profiler = importutils.try_import('osprofiler.profiler') import requests from taskflow import exceptions as tfe @@ -200,27 +202,32 @@ class VolumeManager(manager.CleanableManager, 'attach_status', 'migration_status', 'volume_type', 'consistencygroup', 'volume_attachment', 'group'} - def _get_service(self, host=None, binary=constants.VOLUME_BINARY): + def _get_service(self, + host: str = None, + binary: str = constants.VOLUME_BINARY) -> objects.Service: host = host or self.host ctxt = context.get_admin_context() svc_host = volume_utils.extract_host(host, 'backend') return objects.Service.get_by_args(ctxt, svc_host, binary) - def __init__(self, volume_driver=None, service_name=None, + def __init__(self, volume_driver=None, service_name: str = None, *args, **kwargs): """Load the driver from the one specified in args, or from flags.""" # update_service_capabilities needs service_name to be volume - super(VolumeManager, self).__init__(service_name='volume', - *args, **kwargs) + super(VolumeManager, self).__init__( # type: ignore + service_name='volume', + *args, **kwargs) # NOTE(dulek): service_name=None means we're running in unit tests. service_name = service_name or 'backend_defaults' self.configuration = config.Configuration(volume_backend_opts, config_group=service_name) self._set_tpool_size( self.configuration.backend_native_threads_pool_size) - self.stats = {} + self.stats: dict = {} self.service_uuid = None + self.cluster: str + if not volume_driver: # Get from configuration, which will get the default # if its not using the multi backend @@ -312,7 +319,8 @@ class VolumeManager(manager.CleanableManager, {'host': self.host}) self.image_volume_cache = None - def _count_allocated_capacity(self, ctxt, volume): + def _count_allocated_capacity(self, ctxt: context.RequestContext, + volume: objects.Volume) -> None: pool = volume_utils.extract_host(volume['host'], 'pool') if pool is None: # No pool name encoded in host, so this is a legacy @@ -352,7 +360,9 @@ class VolumeManager(manager.CleanableManager, self.stats['pools'][pool]['allocated_capacity_gb'] = pool_sum self.stats['allocated_capacity_gb'] += volume['size'] - def _set_voldb_empty_at_startup_indicator(self, ctxt): + def _set_voldb_empty_at_startup_indicator( + self, + ctxt: context.RequestContext) -> bool: """Determine if the Cinder volume DB is empty. A check of the volume DB is done to determine whether it is empty or @@ -369,7 +379,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Determined volume DB was not empty at startup.") return False - def _sync_provider_info(self, ctxt, volumes, snapshots): + def _sync_provider_info(self, ctxt, volumes, snapshots) -> None: # NOTE(jdg): For now this just updates provider_id, we can add more # items to the update if they're relevant but we need to be safe in # what we allow and add a list of allowed keys. Things that make sense @@ -378,6 +388,7 @@ class VolumeManager(manager.CleanableManager, updates, snapshot_updates = self.driver.update_provider_info( volumes, snapshots) + update: ty.Any if updates: for volume in volumes: # NOTE(JDG): Make sure returned item is in this hosts volumes @@ -404,7 +415,7 @@ class VolumeManager(manager.CleanableManager, update['id'], {'provider_id': update['provider_id']}) - def _include_resources_in_cluster(self, ctxt): + def _include_resources_in_cluster(self, ctxt) -> None: LOG.info('Including all resources from host %(host)s in cluster ' '%(cluster)s.', @@ -425,7 +436,9 @@ class VolumeManager(manager.CleanableManager, 'host': self.host, 'cluster': self.cluster, 'num_cache': num_cache}) - def init_host(self, added_to_cluster=None, **kwargs): + def init_host(self, # type: ignore + added_to_cluster=None, + **kwargs) -> None: """Perform any required initialization.""" if not self.driver.supported: utils.log_unsupported_driver_warning(self.driver) @@ -450,7 +463,7 @@ class VolumeManager(manager.CleanableManager, return reinit_count += 1 - def _init_host(self, added_to_cluster=None, **kwargs): + def _init_host(self, added_to_cluster=None, **kwargs) -> None: ctxt = context.get_admin_context() # If we have just added this host to a cluster we have to include all @@ -481,9 +494,12 @@ class VolumeManager(manager.CleanableManager, # Batch retrieval volumes and snapshots - num_vols, num_snaps, max_objs_num, req_range = None, None, None, [0] - req_limit = CONF.init_host_max_objects_retrieval - use_batch_objects_retrieval = req_limit > 0 + num_vols: int = 0 + num_snaps: int = 0 + max_objs_num: int = 0 + req_range: ty.Union[ty.List[int], range] = [0] + req_limit = CONF.init_host_max_objects_retrieval or 0 + use_batch_objects_retrieval: bool = req_limit > 0 if use_batch_objects_retrieval: # Get total number of volumes @@ -492,16 +508,18 @@ class VolumeManager(manager.CleanableManager, num_snaps, __ = self._get_my_snapshots_summary(ctxt) # Calculate highest number of the objects (volumes or snapshots) max_objs_num = max(num_vols, num_snaps) + max_objs_num = ty.cast(int, max_objs_num) # Make batch request loop counter req_range = range(0, max_objs_num, req_limit) volumes_to_migrate = volume_migration.VolumeMigrationList() + req_offset: int for req_offset in req_range: # Retrieve 'req_limit' number of objects starting from # 'req_offset' position - volumes, snapshots = None, None + volumes, snapshots = [], [] if use_batch_objects_retrieval: if req_offset < num_vols: volumes = self._get_my_volumes(ctxt, @@ -579,7 +597,7 @@ class VolumeManager(manager.CleanableManager, super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster, **kwargs) - def init_host_with_rpc(self): + def init_host_with_rpc(self) -> None: LOG.info("Initializing RPC dependent components of volume " "driver %(driver_name)s (%(version)s)", {'driver_name': self.driver.__class__.__name__, @@ -624,7 +642,7 @@ class VolumeManager(manager.CleanableManager, resource={'type': 'driver', 'id': self.driver.__class__.__name__}) - def _do_cleanup(self, ctxt, vo_resource): + def _do_cleanup(self, ctxt, vo_resource) -> bool: if isinstance(vo_resource, objects.Volume): if vo_resource.status == 'downloading': self.driver.clear_download(ctxt, vo_resource) @@ -655,7 +673,9 @@ class VolumeManager(manager.CleanableManager, vo_resource.status = 'error' vo_resource.save() - def is_working(self): + return False + + def is_working(self) -> bool: """Return if Manager is ready to accept requests. This is to inform Service class that in case of volume driver @@ -664,7 +684,7 @@ class VolumeManager(manager.CleanableManager, """ return self.driver.initialized - def _set_resource_host(self, resource): + def _set_resource_host(self, resource) -> None: """Set the host field on the DB to our own when we are clustered.""" if (resource.is_clustered and not volume_utils.hosts_are_equivalent(resource.host, @@ -675,7 +695,8 @@ class VolumeManager(manager.CleanableManager, @objects.Volume.set_workers def create_volume(self, context, volume, request_spec=None, - filter_properties=None, allow_reschedule=True): + filter_properties=None, + allow_reschedule=True) -> ovo_fields.UUIDField: """Creates the volume.""" # Log about unsupported drivers utils.log_unsupported_driver_warning(self.driver) @@ -721,6 +742,7 @@ class VolumeManager(manager.CleanableManager, snapshot_id = request_spec.get('snapshot_id') source_volid = request_spec.get('source_volid') + locked_action: ty.Optional[str] if snapshot_id is not None: # Make sure the snapshot is not deleted until we are done with it. locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot') @@ -730,7 +752,7 @@ class VolumeManager(manager.CleanableManager, else: locked_action = None - def _run_flow(): + def _run_flow() -> None: # This code executes create volume flow. If something goes wrong, # flow reverts all job that was done and reraises an exception. # Otherwise, all data that was generated by flow becomes available @@ -782,7 +804,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Created volume successfully.", resource=volume) return volume.id - def _check_is_our_resource(self, resource): + def _check_is_our_resource(self, resource) -> None: if resource.host: res_backend = volume_utils.extract_host( resource.service_topic_queue) @@ -796,8 +818,11 @@ class VolumeManager(manager.CleanableManager, @coordination.synchronized('{volume.id}-{f_name}') @objects.Volume.set_workers - def delete_volume(self, context, volume, unmanage_only=False, - cascade=False): + def delete_volume(self, + context: context.RequestContext, + volume: objects.volume.Volume, + unmanage_only=False, + cascade=False) -> None: """Deletes and unexports volume. 1. Delete a volume(normal case) @@ -955,7 +980,7 @@ class VolumeManager(manager.CleanableManager, msg = "Unmanaged volume successfully." LOG.info(msg, resource=volume) - def _clear_db(self, is_migrating_dest, volume_ref, status): + def _clear_db(self, is_migrating_dest, volume_ref, status) -> None: # This method is called when driver.unmanage() or # driver.delete_volume() fails in delete_volume(), so it is already # in the exception handling part. @@ -968,7 +993,10 @@ class VolumeManager(manager.CleanableManager, volume_ref.status = status volume_ref.save() - def _revert_to_snapshot_generic(self, ctxt, volume, snapshot): + def _revert_to_snapshot_generic(self, + ctxt: context.RequestContext, + volume, + snapshot) -> None: """Generic way to revert volume to a snapshot. the framework will use the generic way to implement the revert @@ -1001,7 +1029,7 @@ class VolumeManager(manager.CleanableManager, self.driver.delete_volume(temp_vol) temp_vol.destroy() - def _revert_to_snapshot(self, context, volume, snapshot): + def _revert_to_snapshot(self, context, volume, snapshot) -> None: """Use driver or generic method to rollback volume.""" try: @@ -1011,7 +1039,7 @@ class VolumeManager(manager.CleanableManager, "Try to use copy-snapshot-to-volume method.") self._revert_to_snapshot_generic(context, volume, snapshot) - def _create_backup_snapshot(self, context, volume): + def _create_backup_snapshot(self, context, volume) -> objects.Snapshot: kwargs = { 'volume_id': volume.id, 'user_id': context.user_id, @@ -1033,7 +1061,7 @@ class VolumeManager(manager.CleanableManager, self.create_snapshot(context, snapshot) return snapshot - def revert_to_snapshot(self, context, volume, snapshot): + def revert_to_snapshot(self, context, volume, snapshot) -> None: """Revert a volume to a snapshot. The process of reverting to snapshot consists of several steps: @@ -1121,7 +1149,7 @@ class VolumeManager(manager.CleanableManager, self._notify_about_snapshot_usage(context, snapshot, "revert.end") @objects.Snapshot.set_workers - def create_snapshot(self, context, snapshot): + def create_snapshot(self, context, snapshot) -> ovo_fields.UUIDField: """Creates and exports the snapshot.""" context = context.elevated() @@ -1197,8 +1225,11 @@ class VolumeManager(manager.CleanableManager, return snapshot.id @coordination.synchronized('{snapshot.id}-{f_name}') - def delete_snapshot(self, context, snapshot, - unmanage_only=False, handle_quota=True): + def delete_snapshot(self, + context: context.RequestContext, + snapshot: objects.Snapshot, + unmanage_only: bool = False, + handle_quota: bool = True) -> None: """Deletes and unexports snapshot.""" context = context.elevated() snapshot._context = context @@ -1283,7 +1314,8 @@ class VolumeManager(manager.CleanableManager, @coordination.synchronized('{volume_id}') def attach_volume(self, context, volume_id, instance_uuid, host_name, - mountpoint, mode, volume=None): + mountpoint, mode, + volume=None) -> objects.VolumeAttachment: """Updates db to show volume is attached.""" # FIXME(lixiaoy1): Remove this in v4.0 of RPC API. if volume is None: @@ -1379,7 +1411,7 @@ class VolumeManager(manager.CleanableManager, @coordination.synchronized('{volume_id}-{f_name}') def detach_volume(self, context, volume_id, attachment_id=None, - volume=None): + volume=None) -> None: """Updates db to show volume is detached.""" # TODO(vish): refactor this into a more general "unreserve" # FIXME(lixiaoy1): Remove this in v4.0 of RPC API. @@ -1471,7 +1503,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Detach volume completed successfully.", resource=volume) def _create_image_cache_volume_entry(self, ctx, volume_ref, - image_id, image_meta): + image_id, image_meta) -> None: """Create a new image-volume and cache entry for it. This assumes that the image has already been downloaded and stored @@ -1517,14 +1549,19 @@ class VolumeManager(manager.CleanableManager, if image_volume: self.delete_volume(ctx, image_volume) - def _clone_image_volume(self, ctx, volume, image_meta): - volume_type_id = volume.get('volume_type_id') - reserve_opts = {'volumes': 1, 'gigabytes': volume.size} + def _clone_image_volume(self, + ctx: context.RequestContext, + volume, + image_meta: dict) -> ty.Union[None, + objects.Volume]: + # TODO: should this return None? + volume_type_id: str = volume.get('volume_type_id') + reserve_opts: dict = {'volumes': 1, 'gigabytes': volume.size} QUOTAS.add_volume_type_opts(ctx, reserve_opts, volume_type_id) reservations = QUOTAS.reserve(ctx, **reserve_opts) # NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid # creating tmp img vol from wrong snapshot or wrong source vol. - skip = {'snapshot_id', 'source_volid'} + skip: ty.Set[str] = {'snapshot_id', 'source_volid'} skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES) try: new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip} @@ -1547,7 +1584,7 @@ class VolumeManager(manager.CleanableManager, 'image_id': image_meta['id'], 'except': ex}) QUOTAS.rollback(ctx, reservations) - return + return None QUOTAS.commit(ctx, reservations, project_id=new_vol_values['project_id']) @@ -1573,10 +1610,10 @@ class VolumeManager(manager.CleanableManager, except exception.CinderException: LOG.exception('Could not delete the image volume %(id)s.', {'id': volume.id}) - return + return None def _clone_image_volume_and_add_location(self, ctx, volume, image_service, - image_meta): + image_meta) -> bool: """Create a cloned volume and register its location to the image.""" if (image_meta['disk_format'] != 'raw' or image_meta['container_format'] != 'bare'): @@ -1638,14 +1675,17 @@ class VolumeManager(manager.CleanableManager, False) return True - def copy_volume_to_image(self, context, volume_id, image_meta): + def copy_volume_to_image(self, + context: context.RequestContext, + volume_id: str, + image_meta: dict) -> None: """Uploads the specified volume to Glance. image_meta is a dictionary containing the following keys: 'id', 'container_format', 'disk_format' """ - payload = {'volume_id': volume_id, 'image_id': image_meta['id']} + payload: dict = {'volume_id': volume_id, 'image_id': image_meta['id']} image_service = None try: volume = objects.Volume.get_by_id(context, volume_id) @@ -1692,7 +1732,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Copy volume to image completed successfully.", resource=volume) - def _delete_image(self, context, image_id, image_service): + def _delete_image(self, context, image_id, image_service) -> None: """Deletes an image stuck in queued or saving state.""" try: image_meta = image_service.show(context, image_id) @@ -1708,7 +1748,8 @@ class VolumeManager(manager.CleanableManager, exc_info=True, resource={'type': 'image', 'id': image_id}) - def _parse_connection_options(self, context, volume, conn_info): + def _parse_connection_options(self, context, volume: objects.Volume, + conn_info: dict) -> dict: # Add qos_specs to connection info typeid = volume.volume_type_id specs = None @@ -1779,7 +1820,10 @@ class VolumeManager(manager.CleanableManager, return conn_info - def initialize_connection(self, context, volume, connector): + def initialize_connection(self, + context, + volume: objects.Volume, + connector: dict) -> dict: """Prepare volume for connection from host represented by connector. This method calls the driver initialize_connection and returns @@ -1874,7 +1918,10 @@ class VolumeManager(manager.CleanableManager, resource=volume) return conn_info - def initialize_connection_snapshot(self, ctxt, snapshot_id, connector): + def initialize_connection_snapshot(self, + ctxt, + snapshot_id: ovo_fields.UUIDField, + connector: dict) -> dict: utils.require_driver_initialized(self.driver) snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id) try: @@ -1932,7 +1979,11 @@ class VolumeManager(manager.CleanableManager, resource=snapshot) return conn - def terminate_connection(self, context, volume_id, connector, force=False): + def terminate_connection(self, + context, + volume_id: ovo_fields.UUIDField, + connector: dict, + force=False) -> None: """Cleanup connection from host represented by connector. The format of connector is the same as for initialize_connection. @@ -1954,8 +2005,11 @@ class VolumeManager(manager.CleanableManager, LOG.info("Terminate volume connection completed successfully.", resource=volume_ref) - def terminate_connection_snapshot(self, ctxt, snapshot_id, - connector, force=False): + def terminate_connection_snapshot(self, + ctxt, + snapshot_id: ovo_fields.UUIDField, + connector: dict, + force=False) -> None: utils.require_driver_initialized(self.driver) snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id) @@ -1970,7 +2024,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Terminate snapshot connection completed successfully.", resource=snapshot) - def remove_export(self, context, volume_id): + def remove_export(self, context, volume_id: ovo_fields.UUIDField) -> None: """Removes an export for a volume.""" utils.require_driver_initialized(self.driver) volume_ref = self.db.volume_get(context, volume_id) @@ -1984,7 +2038,9 @@ class VolumeManager(manager.CleanableManager, LOG.info("Remove volume export completed successfully.", resource=volume_ref) - def remove_export_snapshot(self, ctxt, snapshot_id): + def remove_export_snapshot(self, + ctxt, + snapshot_id: ovo_fields.UUIDField) -> None: """Removes an export for a snapshot.""" utils.require_driver_initialized(self.driver) snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id) @@ -1999,7 +2055,7 @@ class VolumeManager(manager.CleanableManager, resource=snapshot) def accept_transfer(self, context, volume_id, new_user, new_project, - no_snapshots=False): + no_snapshots=False) -> dict: # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught # and the volume status updated. @@ -2034,7 +2090,7 @@ class VolumeManager(manager.CleanableManager, resource=volume_ref) return model_update - def _connect_device(self, conn): + def _connect_device(self, conn: dict) -> dict: use_multipath = self.configuration.use_multipath_for_image_xfer device_scan_attempts = self.configuration.num_volume_device_scan_tries protocol = conn['driver_volume_type'] @@ -2063,7 +2119,12 @@ class VolumeManager(manager.CleanableManager, return {'conn': conn, 'device': vol_handle, 'connector': connector} def _attach_volume(self, ctxt, volume, properties, remote=False, - attach_encryptor=False): + attach_encryptor=False) -> dict: + """Attach a volume. + + Returns a dict of attachment info or raises an exception. + """ + status = volume['status'] if remote: @@ -2097,11 +2158,13 @@ class VolumeManager(manager.CleanableManager, " %(vol)s.", {'vol': volume['id']}) self._detach_volume(ctxt, attach_info, volume, properties, force=True, remote=remote) + + attach_info = ty.cast(dict, attach_info) return attach_info def _detach_volume(self, ctxt, attach_info, volume, properties, force=False, remote=False, - attach_encryptor=False): + attach_encryptor=False) -> None: if attach_info: connector = attach_info['connector'] if attach_encryptor and ( @@ -2129,7 +2192,7 @@ class VolumeManager(manager.CleanableManager, LOG.error('Unable to terminate volume connection: ' '%(err)s.', {'err': err}) - def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None): + def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None) -> None: """Copy data from src_vol to dest_vol.""" LOG.debug('_copy_volume_data %(src)s -> %(dest)s.', @@ -2199,7 +2262,11 @@ class VolumeManager(manager.CleanableManager, remote=src_remote, attach_encryptor=attach_encryptor) - def _migrate_volume_generic(self, ctxt, volume, backend, new_type_id): + def _migrate_volume_generic(self, + ctxt: context.RequestContext, + volume, + backend, + new_type_id) -> None: rpcapi = volume_rpcapi.VolumeAPI() # Create new volume on remote host @@ -2296,7 +2363,7 @@ class VolumeManager(manager.CleanableManager, new_volume) def _clean_temporary_volume(self, ctxt, volume, new_volume, - clean_db_only=False): + clean_db_only=False) -> None: # If we're in the migrating phase, we need to cleanup # destination volume because source volume is remaining if volume.migration_status == 'migrating': @@ -2334,7 +2401,11 @@ class VolumeManager(manager.CleanableManager, "source volume may have been deleted.", {'vol': new_volume.id}) - def migrate_volume_completion(self, ctxt, volume, new_volume, error=False): + def migrate_volume_completion(self, + ctxt: context.RequestContext, + volume, + new_volume, + error=False) -> ovo_fields.UUIDField: try: # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught @@ -2479,8 +2550,12 @@ class VolumeManager(manager.CleanableManager, resource=volume) return volume.id - def migrate_volume(self, ctxt, volume, host, force_host_copy=False, - new_type_id=None): + def migrate_volume(self, + ctxt: context.RequestContext, + volume, + host, + force_host_copy: bool = False, + new_type_id=None) -> None: """Migrate the volume to the specified host (called on source host).""" try: # NOTE(flaper87): Verify the driver is enabled @@ -2542,7 +2617,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Migrate volume completed successfully.", resource=volume) - def _report_driver_status(self, context): + def _report_driver_status(self, context: context.RequestContext) -> None: # It's possible during live db migration that the self.service_uuid # value isn't set (we didn't restart services), so we'll go ahead # and make this a part of the service periodic @@ -2639,7 +2714,7 @@ class VolumeManager(manager.CleanableManager, # queue it to be sent to the Schedulers. self.update_service_capabilities(volume_stats) - def _append_volume_stats(self, vol_stats): + def _append_volume_stats(self, vol_stats) -> None: pools = vol_stats.get('pools', None) if pools: if isinstance(pools, list): @@ -2667,7 +2742,7 @@ class VolumeManager(manager.CleanableManager, vol_stats.update(self.stats) vol_stats.pop('pools', None) - def _append_filter_goodness_functions(self, volume_stats): + def _append_filter_goodness_functions(self, volume_stats: dict) -> dict: """Returns volume_stats updated as needed.""" # Append filter_function if needed @@ -2683,16 +2758,17 @@ class VolumeManager(manager.CleanableManager, return volume_stats @periodic_task.periodic_task(spacing=CONF.backend_stats_polling_interval) - def publish_service_capabilities(self, context): + def publish_service_capabilities(self, + context: context.RequestContext) -> None: """Collect driver status and then publish.""" self._report_driver_status(context) self._publish_service_capabilities(context) def _notify_about_volume_usage(self, - context, + context: context.RequestContext, volume, event_suffix, - extra_usage_info=None): + extra_usage_info=None) -> None: volume_utils.notify_about_volume_usage( context, volume, event_suffix, extra_usage_info=extra_usage_info, host=self.host) @@ -2701,7 +2777,7 @@ class VolumeManager(manager.CleanableManager, context, snapshot, event_suffix, - extra_usage_info=None): + extra_usage_info=None) -> None: volume_utils.notify_about_snapshot_usage( context, snapshot, event_suffix, extra_usage_info=extra_usage_info, host=self.host) @@ -2711,7 +2787,7 @@ class VolumeManager(manager.CleanableManager, group, event_suffix, volumes=None, - extra_usage_info=None): + extra_usage_info=None) -> None: volume_utils.notify_about_group_usage( context, group, event_suffix, extra_usage_info=extra_usage_info, host=self.host) @@ -2730,7 +2806,7 @@ class VolumeManager(manager.CleanableManager, group_snapshot, event_suffix, snapshots=None, - extra_usage_info=None): + extra_usage_info=None) -> None: volume_utils.notify_about_group_snapshot_usage( context, group_snapshot, event_suffix, extra_usage_info=extra_usage_info, host=self.host) @@ -2744,7 +2820,11 @@ class VolumeManager(manager.CleanableManager, context, snapshot, event_suffix, extra_usage_info=extra_usage_info, host=self.host) - def extend_volume(self, context, volume, new_size, reservations): + def extend_volume(self, + context, + volume: objects.Volume, + new_size: int, + reservations) -> None: try: # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught @@ -2817,19 +2897,24 @@ class VolumeManager(manager.CleanableManager, LOG.info("Extend volume completed successfully.", resource=volume) - def _is_our_backend(self, host, cluster_name): + def _is_our_backend(self, host: str, cluster_name: str): return ((not cluster_name and volume_utils.hosts_are_equivalent(self.driver.host, host)) or (cluster_name and volume_utils.hosts_are_equivalent(self.driver.cluster_name, cluster_name))) - def retype(self, context, volume, new_type_id, host, - migration_policy='never', reservations=None, - old_reservations=None): + def retype(self, + context: context.RequestContext, + volume: objects.Volume, + new_type_id: str, + host, + migration_policy: str = 'never', + reservations=None, + old_reservations=None) -> None: def _retype_error(context, volume, old_reservations, - new_reservations, status_update): + new_reservations, status_update) -> None: try: volume.update(status_update) volume.save() @@ -2974,7 +3059,7 @@ class VolumeManager(manager.CleanableManager, resource=volume) @staticmethod - def _set_replication_status(diff, model_update): + def _set_replication_status(diff, model_update: dict) -> None: """Update replication_status in model_update if it has changed.""" if not diff or model_update.get('replication_status'): return @@ -2990,7 +3075,7 @@ class VolumeManager(manager.CleanableManager, replication_status = fields.ReplicationStatus.DISABLED model_update['replication_status'] = replication_status - def manage_existing(self, ctxt, volume, ref=None): + def manage_existing(self, ctxt, volume, ref=None) -> ovo_fields.UUIDField: vol_ref = self._run_manage_existing_flow_engine( ctxt, volume, ref) @@ -3000,7 +3085,8 @@ class VolumeManager(manager.CleanableManager, resource=vol_ref) return vol_ref.id - def _update_stats_for_managed(self, volume_reference): + def _update_stats_for_managed(self, + volume_reference: objects.Volume) -> None: # Update volume stats pool = volume_utils.extract_host(volume_reference.host, 'pool') if pool is None: @@ -3016,7 +3102,10 @@ class VolumeManager(manager.CleanableManager, self.stats['pools'][pool] = dict( allocated_capacity_gb=volume_reference.size) - def _run_manage_existing_flow_engine(self, ctxt, volume, ref): + def _run_manage_existing_flow_engine(self, + ctxt, + volume: objects.Volume, + ref): try: flow_engine = manage_existing.get_flow( ctxt, @@ -3039,14 +3128,16 @@ class VolumeManager(manager.CleanableManager, return vol_ref - def _get_cluster_or_host_filters(self): + def _get_cluster_or_host_filters(self) -> ty.Dict[str, ty.Any]: if self.cluster: filters = {'cluster_name': self.cluster} else: filters = {'host': self.host} return filters - def _get_my_volumes_summary(self, ctxt): + def _get_my_volumes_summary( + self, + ctxt: context.RequestContext): filters = self._get_cluster_or_host_filters() return objects.VolumeList.get_volume_summary(ctxt, False, filters) @@ -3060,7 +3151,8 @@ class VolumeManager(manager.CleanableManager, limit=limit, offset=offset) - def _get_my_volumes(self, ctxt, limit=None, offset=None): + def _get_my_volumes(self, + ctxt, limit=None, offset=None) -> objects.VolumeList: return self._get_my_resources(ctxt, objects.VolumeList, limit, offset) @@ -3093,7 +3185,9 @@ class VolumeManager(manager.CleanableManager, "to driver error.") return driver_entries - def create_group(self, context, group): + def create_group(self, + context: context.RequestContext, + group) -> objects.Group: """Creates the group.""" context = context.elevated() @@ -3152,7 +3246,8 @@ class VolumeManager(manager.CleanableManager, return group def create_group_from_src(self, context, group, - group_snapshot=None, source_group=None): + group_snapshot=None, + source_group=None) -> objects.Group: """Creates the group from source. The source can be a group snapshot or a source group. @@ -3310,9 +3405,12 @@ class VolumeManager(manager.CleanableManager, 'id': group.id}) return group - def _create_group_from_src_generic(self, context, group, volumes, - group_snapshot=None, snapshots=None, - source_group=None, source_vols=None): + def _create_group_from_src_generic( + self, context, group, volumes, + group_snapshot=None, snapshots=None, + source_group=None, + source_vols=None) -> ty.Tuple[ty.Dict[str, str], + ty.List[ty.Dict[str, str]]]: """Creates a group from source. :param context: the context of the caller. @@ -3325,7 +3423,7 @@ class VolumeManager(manager.CleanableManager, :returns: model_update, volumes_model_update """ model_update = {'status': 'available'} - volumes_model_update = [] + volumes_model_update: list = [] for vol in volumes: if snapshots: for snapshot in snapshots: @@ -3365,7 +3463,7 @@ class VolumeManager(manager.CleanableManager, return model_update, volumes_model_update - def _sort_snapshots(self, volumes, snapshots): + def _sort_snapshots(self, volumes, snapshots) -> list: # Sort source snapshots so that they are in the same order as their # corresponding target volumes. Each source snapshot in the snapshots # list should have a corresponding target volume in the volumes list. @@ -3374,7 +3472,7 @@ class VolumeManager(manager.CleanableManager, LOG.error(msg) raise exception.InvalidInput(reason=msg) - sorted_snapshots = [] + sorted_snapshots: list = [] for vol in volumes: found_snaps = [snap for snap in snapshots if snap['id'] == vol['snapshot_id']] @@ -3388,7 +3486,7 @@ class VolumeManager(manager.CleanableManager, return sorted_snapshots - def _sort_source_vols(self, volumes, source_vols): + def _sort_source_vols(self, volumes, source_vols) -> list: # Sort source volumes so that they are in the same order as their # corresponding target volumes. Each source volume in the source_vols # list should have a corresponding target volume in the volumes list. @@ -3397,7 +3495,7 @@ class VolumeManager(manager.CleanableManager, LOG.error(msg) raise exception.InvalidInput(reason=msg) - sorted_source_vols = [] + sorted_source_vols: list = [] for vol in volumes: found_source_vols = [source_vol for source_vol in source_vols if source_vol['id'] == vol['source_volid']] @@ -3411,7 +3509,8 @@ class VolumeManager(manager.CleanableManager, return sorted_source_vols - def _update_volume_from_src(self, context, vol, update, group=None): + def _update_volume_from_src(self, + context, vol, update, group=None) -> None: try: snapshot_id = vol.get('snapshot_id') source_volid = vol.get('source_volid') @@ -3466,7 +3565,10 @@ class VolumeManager(manager.CleanableManager, self.db.volume_update(context, vol['id'], update) - def _update_allocated_capacity(self, vol, decrement=False, host=None): + def _update_allocated_capacity(self, + vol, + decrement=False, + host: str = None) -> None: # Update allocated capacity in volume stats host = host or vol['host'] pool = volume_utils.extract_host(host, 'pool') @@ -3484,7 +3586,7 @@ class VolumeManager(manager.CleanableManager, self.stats['pools'][pool] = dict( allocated_capacity_gb=max(vol_size, 0)) - def delete_group(self, context, group): + def delete_group(self, context, group: objects.Group) -> None: """Deletes group and the volumes in the group.""" context = context.elevated() project_id = group.project_id @@ -3612,7 +3714,11 @@ class VolumeManager(manager.CleanableManager, resource={'type': 'group', 'id': group.id}) - def _convert_group_to_cg(self, group, volumes): + def _convert_group_to_cg( + self, + group: objects.Group, + volumes: objects.VolumeList) -> ty.Tuple[objects.Group, + objects.VolumeList]: if not group: return None, None cg = consistencygroup.ConsistencyGroup() @@ -3623,15 +3729,19 @@ class VolumeManager(manager.CleanableManager, return cg, volumes - def _remove_consistencygroup_id_from_volumes(self, volumes): + def _remove_consistencygroup_id_from_volumes(self, volumes) -> None: if not volumes: return for vol in volumes: vol.consistencygroup_id = None vol.consistencygroup = None - def _convert_group_snapshot_to_cgsnapshot(self, group_snapshot, snapshots, - ctxt): + def _convert_group_snapshot_to_cgsnapshot( + self, + group_snapshot: objects.GroupSnapshot, + snapshots: objects.SnapshotList, + ctxt) -> ty.Tuple[objects.CGSnapshot, + objects.SnapshotList]: if not group_snapshot: return None, None cgsnap = cgsnapshot.CGSnapshot() @@ -3648,21 +3758,21 @@ class VolumeManager(manager.CleanableManager, return cgsnap, snapshots - def _remove_cgsnapshot_id_from_snapshots(self, snapshots): + def _remove_cgsnapshot_id_from_snapshots(self, snapshots) -> None: if not snapshots: return for snap in snapshots: snap.cgsnapshot_id = None snap.cgsnapshot = None - def _create_group_generic(self, context, group): + def _create_group_generic(self, context, group) -> dict: """Creates a group.""" # A group entry is already created in db. Just returns a status here. model_update = {'status': fields.GroupStatus.AVAILABLE, 'created_at': timeutils.utcnow()} return model_update - def _delete_group_generic(self, context, group, volumes): + def _delete_group_generic(self, context, group, volumes) -> ty.Tuple: """Deletes a group and volumes in the group.""" model_update = {'status': group.status} volume_model_updates = [] @@ -3681,20 +3791,24 @@ class VolumeManager(manager.CleanableManager, return model_update, volume_model_updates - def _update_group_generic(self, context, group, - add_volumes=None, remove_volumes=None): + def _update_group_generic( + self, context, group, + add_volumes=None, + remove_volumes=None) -> ty.Tuple[None, None, None]: """Updates a group.""" # NOTE(xyang): The volume manager adds/removes the volume to/from the # group in the database. This default implementation does not do # anything in the backend storage. return None, None, None - def _collect_volumes_for_group(self, context, group, volumes, add=True): + def _collect_volumes_for_group( + self, context, group, volumes, add=True) -> list: + valid_status: ty.Tuple[str, ...] if add: valid_status = VALID_ADD_VOL_TO_GROUP_STATUS else: valid_status = VALID_REMOVE_VOL_FROM_GROUP_STATUS - volumes_ref = [] + volumes_ref: list = [] if not volumes: return volumes_ref for add_vol in volumes.split(','): @@ -3725,7 +3839,7 @@ class VolumeManager(manager.CleanableManager, return volumes_ref def update_group(self, context, group, - add_volumes=None, remove_volumes=None): + add_volumes=None, remove_volumes=None) -> None: """Updates group. Update group by adding volumes to the group, @@ -3771,7 +3885,7 @@ class VolumeManager(manager.CleanableManager, self._remove_consistencygroup_id_from_volumes( remove_volumes_ref) - volumes_to_update = [] + volumes_to_update: list = [] if add_volumes_update: volumes_to_update.extend(add_volumes_update) if remove_volumes_update: @@ -3824,7 +3938,10 @@ class VolumeManager(manager.CleanableManager, resource={'type': 'group', 'id': group.id}) - def create_group_snapshot(self, context, group_snapshot): + def create_group_snapshot( + self, + context, + group_snapshot: objects.GroupSnapshot) -> objects.GroupSnapshot: """Creates the group_snapshot.""" caller_context = context context = context.elevated() @@ -3945,8 +4062,9 @@ class VolumeManager(manager.CleanableManager, context, group_snapshot, "create.end") return group_snapshot - def _create_group_snapshot_generic(self, context, group_snapshot, - snapshots): + def _create_group_snapshot_generic( + self, context, group_snapshot, + snapshots) -> ty.Tuple[dict, ty.List[dict]]: """Creates a group_snapshot.""" model_update = {'status': 'available'} snapshot_model_updates = [] @@ -3969,7 +4087,8 @@ class VolumeManager(manager.CleanableManager, return model_update, snapshot_model_updates def _delete_group_snapshot_generic(self, context, group_snapshot, - snapshots): + snapshots) -> ty.Tuple[dict, + ty.List[dict]]: """Deletes a group_snapshot.""" model_update = {'status': group_snapshot.status} snapshot_model_updates = [] @@ -3990,7 +4109,7 @@ class VolumeManager(manager.CleanableManager, return model_update, snapshot_model_updates - def delete_group_snapshot(self, context, group_snapshot): + def delete_group_snapshot(self, context, group_snapshot) -> None: """Deletes group_snapshot.""" caller_context = context context = context.elevated() @@ -4155,7 +4274,9 @@ class VolumeManager(manager.CleanableManager, volume.save() # Replication V2.1 and a/a method - def failover(self, context, secondary_backend_id=None): + def failover(self, + context: context.RequestContext, + secondary_backend_id=None) -> None: """Failover a backend to a secondary replication target. Instructs a replication capable/configured backend to failover @@ -4316,7 +4437,7 @@ class VolumeManager(manager.CleanableManager, # TODO(geguileo): In P - remove this failover_host = failover - def finish_failover(self, context, service, updates): + def finish_failover(self, context, service, updates) -> None: """Completion of the failover locally or via RPC.""" # If the service is clustered, broadcast the service changes to all # volume services, including this one. @@ -4333,7 +4454,7 @@ class VolumeManager(manager.CleanableManager, service.update(updates) service.save() - def failover_completed(self, context, updates): + def failover_completed(self, context, updates) -> None: """Finalize failover of this backend. When a service is clustered and replicated the failover has 2 stages, @@ -4358,7 +4479,7 @@ class VolumeManager(manager.CleanableManager, fields.ReplicationStatus.ERROR) service.save() - def freeze_host(self, context): + def freeze_host(self, context) -> bool: """Freeze management plane on this backend. Basically puts the control/management plane into a @@ -4388,7 +4509,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Set backend status to frozen successfully.") return True - def thaw_host(self, context): + def thaw_host(self, context) -> bool: """UnFreeze management plane on this backend. Basically puts the control/management plane back into @@ -4417,7 +4538,10 @@ class VolumeManager(manager.CleanableManager, LOG.info("Thawed backend successfully.") return True - def manage_existing_snapshot(self, ctxt, snapshot, ref=None): + def manage_existing_snapshot(self, + ctxt, + snapshot, + ref=None) -> ovo_fields.UUIDField: LOG.debug('manage_existing_snapshot: managing %s.', ref) try: flow_engine = manage_existing_snapshot.get_flow( @@ -4485,11 +4609,17 @@ class VolumeManager(manager.CleanableManager, ctxt) if want_objects else backup_device_dict) - def secure_file_operations_enabled(self, ctxt, volume): + def secure_file_operations_enabled(self, + ctxt: context.RequestContext, + volume): secure_enabled = self.driver.secure_file_operations_enabled() return secure_enabled - def _connection_create(self, ctxt, volume, attachment, connector): + def _connection_create(self, + ctxt: context.RequestContext, + volume, + attachment, + connector) -> dict: try: self.driver.validate_connector(connector) except exception.InvalidConnectorException as err: @@ -4541,10 +4671,10 @@ class VolumeManager(manager.CleanableManager, return connection_info def attachment_update(self, - context, + context: context.RequestContext, vref, - connector, - attachment_id): + connector: dict, + attachment_id: str) -> dict: """Update/Finalize an attachment. This call updates a valid attachment record to associate with a volume @@ -4607,8 +4737,11 @@ class VolumeManager(manager.CleanableManager, resource=vref) return connection_info - def _connection_terminate(self, context, volume, - attachment, force=False): + def _connection_terminate(self, + context: context.RequestContext, + volume, + attachment, + force: bool = False) -> ty.Union[None, bool]: """Remove a volume connection, but leave attachment. Exits early if the attachment does not have a connector and returns @@ -4648,7 +4781,10 @@ class VolumeManager(manager.CleanableManager, # going on here. return shared_connections - def attachment_delete(self, context, attachment_id, vref): + def attachment_delete(self, + context: context.RequestContext, + attachment_id, + vref) -> None: """Delete/Detach the specified attachment. Notifies the backend device that we're detaching the specified @@ -4668,7 +4804,10 @@ class VolumeManager(manager.CleanableManager, else: self._do_attachment_delete(context, vref, attachment_ref) - def _do_attachment_delete(self, context, vref, attachment): + def _do_attachment_delete(self, + context: context.RequestContext, + vref, + attachment: objects.VolumeAttachment) -> None: utils.require_driver_initialized(self.driver) self._notify_about_volume_usage(context, vref, "detach.start") has_shared_connection = self._connection_terminate(context, @@ -4699,7 +4838,9 @@ class VolumeManager(manager.CleanableManager, self._notify_about_volume_usage(context, vref, "detach.end") # Replication group API (Tiramisu) - def enable_replication(self, ctxt, group): + def enable_replication(self, + ctxt: context.RequestContext, + group: objects.Group) -> None: """Enable replication.""" group.refresh() if group.replication_status != fields.ReplicationStatus.ENABLING: @@ -4782,7 +4923,7 @@ class VolumeManager(manager.CleanableManager, 'id': group.id}) # Replication group API (Tiramisu) - def disable_replication(self, ctxt, group): + def disable_replication(self, ctxt: context.RequestContext, group) -> None: """Disable replication.""" group.refresh() if group.replication_status != fields.ReplicationStatus.DISABLING: @@ -4866,8 +5007,9 @@ class VolumeManager(manager.CleanableManager, 'id': group.id}) # Replication group API (Tiramisu) - def failover_replication(self, ctxt, group, allow_attached_volume=False, - secondary_backend_id=None): + def failover_replication(self, ctxt: context.RequestContext, + group, allow_attached_volume=False, + secondary_backend_id=None) -> None: """Failover replication.""" group.refresh() if group.replication_status != fields.ReplicationStatus.FAILING_OVER: @@ -4963,7 +5105,7 @@ class VolumeManager(manager.CleanableManager, resource={'type': 'group', 'id': group.id}) - def list_replication_targets(self, ctxt, group): + def list_replication_targets(self, ctxt, group) -> ty.Dict[str, list]: """Provide a means to obtain replication targets for a group. This method is used to find the replication_device config diff --git a/cinder/volume/volume_types.py b/cinder/volume/volume_types.py index 14c028cfb0b..7eb94b716b4 100644 --- a/cinder/volume/volume_types.py +++ b/cinder/volume/volume_types.py @@ -19,6 +19,7 @@ """Built-in volume type properties.""" +import typing as ty from oslo_config import cfg from oslo_db import exception as db_exc @@ -331,7 +332,9 @@ def get_volume_type_qos_specs(volume_type_id): return res -def volume_types_diff(context, vol_type_id1, vol_type_id2): +def volume_types_diff(context: context.RequestContext, + vol_type_id1, + vol_type_id2) -> ty.Tuple[dict, bool]: """Returns a 'diff' of two volume types and whether they are equal. Returns a tuple of (diff, equal), where 'equal' is a boolean indicating @@ -371,7 +374,8 @@ def volume_types_diff(context, vol_type_id1, vol_type_id2): encryption.pop(param, None) return encryption - def _dict_diff(dict1, dict2): + def _dict_diff(dict1: ty.Optional[dict], + dict2: ty.Optional[dict]) -> ty.Tuple[dict, bool]: res = {} equal = True if dict1 is None: diff --git a/mypy-files.txt b/mypy-files.txt index 272d7908218..61c423ece8c 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -1,3 +1,6 @@ +cinder/context.py cinder/i18n.py cinder/manager.py cinder/volume/__init__.py +cinder/volume/manager.py +cinder/volume/volume_types.py