From 80dc6c91581bbf7e8d0c0b0af49dee0f7cfdd451 Mon Sep 17 00:00:00 2001 From: Eric Harney Date: Tue, 19 May 2020 13:06:40 -0400 Subject: [PATCH] mypy: annotate volume/api.py Change-Id: I19a47c463dfb8794ebbeda78f39c962e47ad0d93 --- cinder/volume/api.py | 551 +++++++++++++++++++++++++++++++------------ mypy-files.txt | 1 + 2 files changed, 406 insertions(+), 146 deletions(-) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index f2923812877..64ea4777234 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -19,6 +19,8 @@ import ast import collections import datetime +from typing import (Any, DefaultDict, Dict, Iterable, List, # noqa: H301 + Optional, Tuple, Union) from castellan import key_manager from oslo_config import cfg @@ -113,7 +115,9 @@ class API(base.Base): self.message = message_api.API() super().__init__() - def list_availability_zones(self, enable_cache=False, refresh_cache=False): + def list_availability_zones(self, + enable_cache: bool = False, + refresh_cache: bool = False) -> tuple: """Describe the known availability zones :param enable_cache: Enable az cache @@ -135,7 +139,7 @@ class API(base.Base): services = objects.ServiceList.get_all_by_topic(ctxt, topic) az_data = [(s.availability_zone, s.disabled) for s in services] - disabled_map = {} + disabled_map: Dict[str, bool] = {} for (az_name, disabled) in az_data: tracked_disabled = disabled_map.get(az_name, True) disabled_map[az_name] = tracked_disabled and disabled @@ -153,8 +157,10 @@ class API(base.Base): LOG.info("Availability Zones retrieved successfully.") return tuple(azs) - def _retype_is_possible(self, context, - source_type, target_type): + def _retype_is_possible(self, + context: context.RequestContext, + source_type: objects.VolumeType, + target_type: objects.VolumeType) -> bool: elevated = context.elevated() # If encryptions are different, it is not allowed # to create volume from source volume or snapshot. @@ -183,7 +189,7 @@ class API(base.Base): return True return False - def _is_volume_migrating(self, volume): + def _is_volume_migrating(self, volume: objects.Volume) -> bool: # The migration status 'none' means no migration has ever been done # before. The migration status 'error' means the previous migration # failed. The migration status 'success' means the previous migration @@ -194,24 +200,40 @@ class API(base.Base): return (volume['migration_status'] not in self.AVAILABLE_MIGRATION_STATUS) - def _is_multiattach(self, volume_type): + def _is_multiattach(self, + volume_type: objects.VolumeType) -> bool: + # TODO: getattr not needed if using obj? specs = getattr(volume_type, 'extra_specs', {}) return specs.get('multiattach', 'False') == ' True' - def _is_encrypted(self, volume_type): + def _is_encrypted(self, + volume_type: objects.volume_type.VolumeType) -> bool: specs = volume_type.get('extra_specs', {}) if 'encryption' not in specs: return False return specs.get('encryption', {}) is not {} - def create(self, context, size, name, description, snapshot=None, - image_id=None, volume_type=None, metadata=None, - availability_zone=None, source_volume=None, + def create(self, + context: context.RequestContext, + size: Optional[Union[str, int]], + name: Optional[str], + description: Optional[str], + snapshot: Optional[objects.Snapshot] = None, + image_id: str = None, + volume_type: Optional[objects.VolumeType] = None, + metadata: Optional[dict] = None, + availability_zone: str = None, + source_volume: Optional[objects.Volume] = None, scheduler_hints=None, - source_replica=None, consistencygroup=None, - cgsnapshot=None, multiattach=False, source_cg=None, - group=None, group_snapshot=None, source_group=None, - backup=None): + source_replica=None, + consistencygroup: Optional[objects.ConsistencyGroup] = None, + cgsnapshot: Optional[objects.CGSnapshot] = None, + multiattach: Optional[bool] = False, + source_cg=None, + group: Optional[objects.Group] = None, + group_snapshot=None, + source_group=None, + backup: Optional[objects.Backup] = None): if image_id: context.authorize(vol_policy.CREATE_FROM_IMAGE_POLICY) @@ -361,7 +383,10 @@ class API(base.Base): self.list_availability_zones(enable_cache=True, refresh_cache=True) - def revert_to_snapshot(self, context, volume, snapshot): + def revert_to_snapshot(self, + context: context.RequestContext, + volume: objects.Volume, + snapshot: objects.Snapshot) -> None: """revert a volume to a snapshot""" context.authorize(vol_action_policy.REVERT_POLICY, target_obj=volume) @@ -385,10 +410,12 @@ class API(base.Base): self.volume_rpcapi.revert_to_snapshot(context, volume, snapshot) - def delete(self, context, volume, - force=False, - unmanage_only=False, - cascade=False): + def delete(self, + context: context.RequestContext, + volume: objects.Volume, + force: bool = False, + unmanage_only: bool = False, + cascade: bool = False) -> None: context.authorize(vol_policy.DELETE_POLICY, target_obj=volume) if context.is_admin and context.project_id != volume.project_id: project_id = volume.project_id @@ -523,8 +550,9 @@ class API(base.Base): volume.update({'status': 'error_deleting'}) volume.save() if hasattr(e, 'msg'): + # ignore type (Exception has no attr "msg") error msg = _("Unable to delete encryption key for " - "volume: %s") % (e.msg) + "volume: %s") % (e.msg) # type: ignore else: msg = _("Unable to delete encryption key for volume.") LOG.error(msg) @@ -538,7 +566,10 @@ class API(base.Base): LOG.info("Delete volume request issued successfully.", resource=volume) - def update(self, context, volume, fields): + def update(self, + context: context.RequestContext, + volume: objects.Volume, + fields: dict) -> None: context.authorize(vol_policy.UPDATE_POLICY, target_obj=volume) # TODO(karthikp): Making sure volume is always oslo-versioned # If not we convert it at the start of update method. This check @@ -559,7 +590,10 @@ class API(base.Base): volume.save() LOG.info("Volume updated successfully.", resource=volume) - def get(self, context, volume_id, viewable_admin_meta=False): + def get(self, + context: context.RequestContext, + volume_id: str, + viewable_admin_meta: bool = False) -> objects.Volume: volume = objects.Volume.get_by_id(context, volume_id) try: @@ -579,7 +613,10 @@ class API(base.Base): LOG.info("Volume info retrieved successfully.", resource=volume) return volume - def calculate_resource_count(self, context, resource_type, filters): + def calculate_resource_count(self, + context: context.RequestContext, + resource_type: str, + filters: Optional[dict]) -> int: filters = filters if filters else {} allTenants = utils.get_bool_param('all_tenants', filters) if context.is_admin and allTenants: @@ -588,9 +625,15 @@ class API(base.Base): filters['project_id'] = context.project_id return db.calculate_resource_count(context, resource_type, filters) - def get_all(self, context, marker=None, limit=None, sort_keys=None, - sort_dirs=None, filters=None, viewable_admin_meta=False, - offset=None): + def get_all(self, + context: context.RequestContext, + marker: Optional[str] = None, + limit: Optional[int] = None, + sort_keys: Optional[Iterable[str]] = None, + sort_dirs: Optional[Iterable[str]] = None, + filters: Optional[dict] = None, + viewable_admin_meta: bool = False, + offset: Optional[int] = None) -> objects.VolumeList: context.authorize(vol_policy.GET_ALL_POLICY) if filters is None: @@ -637,7 +680,10 @@ class API(base.Base): LOG.info("Get all volumes completed successfully.") return volumes - def get_volume_summary(self, context, filters=None): + def get_volume_summary( + self, + context: context.RequestContext, + filters: Optional[dict] = None) -> objects.VolumeList: context.authorize(vol_policy.GET_ALL_POLICY) if filters is None: @@ -651,7 +697,9 @@ class API(base.Base): LOG.info("Get summary completed successfully.") return volumes - def get_snapshot(self, context, snapshot_id): + def get_snapshot(self, + context: context.RequestContext, + snapshot_id: str) -> objects.Snapshot: snapshot = objects.Snapshot.get_by_id(context, snapshot_id) context.authorize(snapshot_policy.GET_POLICY, target_obj=snapshot) @@ -662,15 +710,23 @@ class API(base.Base): 'id': snapshot.id}) return snapshot - def get_volume(self, context, volume_id): + def get_volume(self, + context: context.RequestContext, + volume_id: str) -> objects.Volume: volume = objects.Volume.get_by_id(context, volume_id) context.authorize(vol_policy.GET_POLICY, target_obj=volume) LOG.info("Volume retrieved successfully.", resource=volume) return volume - def get_all_snapshots(self, context, search_opts=None, marker=None, - limit=None, sort_keys=None, sort_dirs=None, - offset=None): + def get_all_snapshots( + self, + context: context.RequestContext, + search_opts: Optional[dict] = None, + marker: Optional[str] = None, + limit: Optional[int] = None, + sort_keys: Optional[List[str]] = None, + sort_dirs: Optional[List[str]] = None, + offset: Optional[int] = None) -> objects.SnapshotList: context.authorize(snapshot_policy.GET_ALL_POLICY) search_opts = search_opts or {} @@ -690,7 +746,9 @@ class API(base.Base): LOG.info("Get all snapshots completed successfully.") return snapshots - def reserve_volume(self, context, volume): + def reserve_volume(self, + context: context.RequestContext, + volume: objects.Volume) -> None: context.authorize(vol_action_policy.RESERVE_POLICY, target_obj=volume) expected = {'multiattach': volume.multiattach, 'status': (('available', 'in-use') if volume.multiattach @@ -709,7 +767,9 @@ class API(base.Base): LOG.info("Reserve volume completed successfully.", resource=volume) - def unreserve_volume(self, context, volume): + def unreserve_volume(self, + context: context.RequestContext, + volume: objects.Volume) -> None: context.authorize(vol_action_policy.UNRESERVE_POLICY, target_obj=volume) expected = {'status': 'attaching'} @@ -728,7 +788,9 @@ class API(base.Base): LOG.info("Unreserve volume completed successfully.", resource=volume) - def begin_detaching(self, context, volume): + def begin_detaching(self, + context: context.RequestContext, + volume: objects.Volume) -> None: context.authorize(vol_action_policy.BEGIN_DETACHING_POLICY, target_obj=volume) # If we are in the middle of a volume migration, we don't want the @@ -749,7 +811,9 @@ class API(base.Base): LOG.info("Begin detaching volume completed successfully.", resource=volume) - def roll_detaching(self, context, volume): + def roll_detaching(self, + context: context.RequestContext, + volume: objects.Volume) -> None: context.authorize(vol_action_policy.ROLL_DETACHING_POLICY, target_obj=volume) volume.conditional_update({'status': 'in-use'}, @@ -757,8 +821,13 @@ class API(base.Base): LOG.info("Roll detaching of volume completed successfully.", resource=volume) - def attach(self, context, volume, instance_uuid, host_name, - mountpoint, mode): + def attach(self, + context: context.RequestContext, + volume: objects.Volume, + instance_uuid: str, + host_name: str, + mountpoint: str, + mode: str) -> objects.VolumeAttachment: context.authorize(vol_action_policy.ATTACH_POLICY, target_obj=volume) if volume.status == 'maintenance': @@ -786,7 +855,10 @@ class API(base.Base): resource=volume) return attach_results - def detach(self, context, volume, attachment_id): + def detach(self, + context: context.RequestContext, + volume: objects.Volume, + attachment_id: str) -> None: context.authorize(vol_action_policy.DETACH_POLICY, target_obj=volume) if volume['status'] == 'maintenance': @@ -800,7 +872,10 @@ class API(base.Base): resource=volume) return detach_results - def initialize_connection(self, context, volume, connector): + def initialize_connection(self, + context: context.RequestContext, + volume: objects.Volume, + connector: dict) -> dict: context.authorize(vol_action_policy.INITIALIZE_POLICY, target_obj=volume) if volume.status == 'maintenance': @@ -817,7 +892,11 @@ class API(base.Base): resource=volume) return init_results - def terminate_connection(self, context, volume, connector, force=False): + def terminate_connection(self, + context: context.RequestContext, + volume: objects.Volume, + connector: dict, + force: bool = False) -> None: context.authorize(vol_action_policy.TERMINATE_POLICY, target_obj=volume) self.volume_rpcapi.terminate_connection(context, @@ -828,8 +907,12 @@ class API(base.Base): resource=volume) self.unreserve_volume(context, volume) - def accept_transfer(self, context, volume, new_user, new_project, - no_snapshots=False): + def accept_transfer(self, + context: context.RequestContext, + volume: objects.Volume, + new_user: str, + new_project: str, + no_snapshots: bool = False) -> dict: context.authorize(vol_transfer_policy.ACCEPT_POLICY, target_obj=volume) if volume['status'] == 'maintenance': @@ -846,12 +929,17 @@ class API(base.Base): resource=volume) return results - def _create_snapshot(self, context, - volume, name, description, - force=False, metadata=None, - cgsnapshot_id=None, - group_snapshot_id=None, - allow_in_use=False): + def _create_snapshot( + self, + context: context.RequestContext, + volume: objects.Volume, + name: str, + description: str, + force: bool = False, + metadata: Optional[dict] = None, + cgsnapshot_id: Optional[str] = None, + group_snapshot_id: Optional[str] = None, + allow_in_use: bool = False) -> objects.Snapshot: volume.assert_not_frozen() snapshot = self.create_snapshot_in_db( context, volume, name, @@ -868,17 +956,23 @@ class API(base.Base): objects.RequestSpec(**kwargs)) return snapshot - def create_snapshot_in_db(self, context, - volume, name, description, - force, metadata, - cgsnapshot_id, - commit_quota=True, - group_snapshot_id=None, - allow_in_use=False): + def create_snapshot_in_db( + self, + context: context.RequestContext, + volume: objects.Volume, + name: Optional[str], + description: Optional[str], + force: bool, + metadata: Optional[dict], + cgsnapshot_id: Optional[str], + commit_quota: bool = True, + group_snapshot_id: Optional[str] = None, + allow_in_use: bool = False) -> objects.Snapshot: self._create_snapshot_in_db_validate(context, volume) utils.check_metadata_properties(metadata) + valid_status: Tuple[str, ...] valid_status = ('available',) if force or allow_in_use: valid_status = ('available', 'in-use') @@ -943,6 +1037,7 @@ class API(base.Base): except Exception: with excutils.save_and_reraise_exception(): try: + assert snapshot is not None if snapshot.obj_attr_is_set('id'): snapshot.destroy() finally: @@ -951,11 +1046,14 @@ class API(base.Base): return snapshot - def create_snapshots_in_db(self, context, - volume_list, - name, description, - cgsnapshot_id, - group_snapshot_id=None): + def create_snapshots_in_db( + self, + context: context.RequestContext, + volume_list: list, + name: str, + description: str, + cgsnapshot_id: str, + group_snapshot_id: Optional[str] = None) -> list: snapshot_list = [] for volume in volume_list: self._create_snapshot_in_db_validate(context, volume) @@ -987,7 +1085,9 @@ class API(base.Base): return snapshot_list - def _create_snapshot_in_db_validate(self, context, volume): + def _create_snapshot_in_db_validate(self, + context: context.RequestContext, + volume: objects.Volume) -> None: context.authorize(snapshot_policy.CREATE_POLICY, target_obj=volume) if not volume.host: @@ -1014,9 +1114,12 @@ class API(base.Base): msg = _("Snapshot of secondary replica is not allowed.") raise exception.InvalidVolume(reason=msg) - def _create_snapshots_in_db_reserve(self, context, volume_list): + def _create_snapshots_in_db_reserve( + self, + context: context.RequestContext, + volume_list: objects.VolumeList) -> list: reserve_opts_list = [] - total_reserve_opts = {} + total_reserve_opts: Dict[str, int] = {} try: for volume in volume_list: if CONF.no_snapshot_gb_quota: @@ -1046,10 +1149,14 @@ class API(base.Base): return reservations - def _create_snapshot_in_db_options(self, context, volume, - name, description, - cgsnapshot_id, - group_snapshot_id=None): + def _create_snapshot_in_db_options( + self, + context: context.RequestContext, + volume: objects.Volume, + name: str, + description: str, + cgsnapshot_id: str, + group_snapshot_id: str = None) -> Dict[str, Any]: options = {'volume_id': volume['id'], 'cgsnapshot_id': cgsnapshot_id, 'group_snapshot_id': group_snapshot_id, @@ -1064,11 +1171,16 @@ class API(base.Base): 'encryption_key_id': volume['encryption_key_id']} return options - def create_snapshot(self, context, - volume, name, description, - metadata=None, cgsnapshot_id=None, - group_snapshot_id=None, - allow_in_use=False): + def create_snapshot( + self, + context: context.RequestContext, + volume: objects.Volume, + name: str, + description: str, + metadata: Dict[str, Any] = None, + cgsnapshot_id: Optional[str] = None, + group_snapshot_id: Optional[str] = None, + allow_in_use: bool = False) -> objects.Snapshot: result = self._create_snapshot(context, volume, name, description, False, metadata, cgsnapshot_id, group_snapshot_id, allow_in_use) @@ -1076,25 +1188,32 @@ class API(base.Base): resource=result) return result - def create_snapshot_force(self, context, - volume, name, - description, metadata=None): + def create_snapshot_force( + self, + context: context.RequestContext, + volume: objects.Volume, + name: str, + description: str, + metadata: Dict[str, Any] = None) -> objects.Snapshot: result = self._create_snapshot(context, volume, name, description, True, metadata) LOG.info("Snapshot force create request issued successfully.", resource=result) return result - def delete_snapshot(self, context, snapshot, force=False, - unmanage_only=False): + def delete_snapshot(self, + context: context.RequestContext, + snapshot: objects.Snapshot, + force: bool = False, + unmanage_only: bool = False) -> None: context.authorize(snapshot_policy.DELETE_POLICY, target_obj=snapshot) if not unmanage_only: snapshot.assert_not_frozen() # Build required conditions for conditional update - expected = {'cgsnapshot_id': None, - 'group_snapshot_id': None} + expected: Dict[str, Any] = {'cgsnapshot_id': None, + 'group_snapshot_id': None} # If not force deleting we have status conditions if not force: expected['status'] = (fields.SnapshotStatus.AVAILABLE, @@ -1116,13 +1235,18 @@ class API(base.Base): LOG.info("Snapshot delete request issued successfully.", resource=snapshot) - def update_snapshot(self, context, snapshot, fields): + def update_snapshot(self, + context: context.RequestContext, + snapshot: objects.Snapshot, + fields: Dict[str, Any]) -> None: context.authorize(snapshot_policy.UPDATE_POLICY, target_obj=snapshot) snapshot.update(fields) snapshot.save() - def get_volume_metadata(self, context, volume): + def get_volume_metadata(self, + context: context.RequestContext, + volume: objects.Volume) -> dict: """Get all metadata associated with a volume.""" context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume) rv = self.db.volume_metadata_get(context, volume['id']) @@ -1130,7 +1254,10 @@ class API(base.Base): resource=volume) return dict(rv) - def create_volume_metadata(self, context, volume, metadata): + def create_volume_metadata(self, + context: context.RequestContext, + volume: objects.Volume, + metadata: Dict[str, Any]) -> dict: """Creates volume metadata.""" context.authorize(vol_meta_policy.CREATE_POLICY, target_obj=volume) db_meta = self._update_volume_metadata(context, volume, metadata) @@ -1139,8 +1266,11 @@ class API(base.Base): resource=volume) return db_meta - def delete_volume_metadata(self, context, volume, - key, meta_type=common.METADATA_TYPES.user): + def delete_volume_metadata(self, + context: context.RequestContext, + volume: objects.Volume, + key: str, + meta_type=common.METADATA_TYPES.user) -> None: """Delete the given metadata item from a volume.""" context.authorize(vol_meta_policy.DELETE_POLICY, target_obj=volume) if volume.status in ('maintenance', 'uploading'): @@ -1152,8 +1282,12 @@ class API(base.Base): LOG.info("Delete volume metadata completed successfully.", resource=volume) - def _update_volume_metadata(self, context, volume, metadata, delete=False, - meta_type=common.METADATA_TYPES.user): + def _update_volume_metadata(self, + context: context.RequestContext, + volume: objects.Volume, + metadata: Dict[str, Any], + delete: bool = False, + meta_type=common.METADATA_TYPES.user) -> dict: if volume['status'] in ('maintenance', 'uploading'): msg = _('Updating volume metadata is not allowed for volumes in ' '%s status.') % volume['status'] @@ -1162,8 +1296,12 @@ class API(base.Base): return self.db.volume_metadata_update(context, volume['id'], metadata, delete, meta_type) - def update_volume_metadata(self, context, volume, metadata, delete=False, - meta_type=common.METADATA_TYPES.user): + def update_volume_metadata(self, + context: context.RequestContext, + volume: objects.Volume, + metadata: Dict[str, Any], + delete: bool = False, + meta_type=common.METADATA_TYPES.user) -> dict: """Updates volume metadata. If delete is True, metadata items that are not specified in the @@ -1180,15 +1318,22 @@ class API(base.Base): resource=volume) return db_meta - def get_volume_admin_metadata(self, context, volume): + def get_volume_admin_metadata(self, + context: context.RequestContext, + volume: objects.Volume) -> dict: """Get all administration metadata associated with a volume.""" rv = self.db.volume_admin_metadata_get(context, volume['id']) LOG.info("Get volume admin metadata completed successfully.", resource=volume) return dict(rv) - def update_volume_admin_metadata(self, context, volume, metadata, - delete=False, add=True, update=True): + def update_volume_admin_metadata(self, + context: context.RequestContext, + volume: objects.Volume, + metadata: Dict[str, Any], + delete: Optional[bool] = False, + add: Optional[bool] = True, + update: Optional[bool] = True) -> dict: """Updates or creates volume administration metadata. If delete is True, metadata items that are not specified in the @@ -1210,7 +1355,9 @@ class API(base.Base): resource=volume) return volume.admin_metadata - def get_snapshot_metadata(self, context, snapshot): + def get_snapshot_metadata(self, + context: context.RequestContext, + snapshot: objects.Snapshot) -> dict: """Get all metadata associated with a snapshot.""" context.authorize(s_meta_policy.GET_POLICY, target_obj=snapshot) @@ -1218,7 +1365,10 @@ class API(base.Base): resource=snapshot) return snapshot.metadata - def delete_snapshot_metadata(self, context, snapshot, key): + def delete_snapshot_metadata(self, + context: context.RequestContext, + snapshot: objects.Snapshot, + key: str) -> None: """Delete the given metadata item from a snapshot.""" context.authorize(s_meta_policy.DELETE_POLICY, target_obj=snapshot) @@ -1226,9 +1376,11 @@ class API(base.Base): LOG.info("Delete snapshot metadata completed successfully.", resource=snapshot) - def update_snapshot_metadata(self, context, - snapshot, metadata, - delete=False): + def update_snapshot_metadata(self, + context: context.RequestContext, + snapshot: objects.Snapshot, + metadata: Dict[str, Any], + delete: bool = False) -> dict: """Updates or creates snapshot metadata. If delete is True, metadata items that are not specified in the @@ -1255,32 +1407,42 @@ class API(base.Base): resource=snapshot) return snapshot.metadata - def get_volumes_image_metadata(self, context): + def get_volumes_image_metadata( + self, + context: context.RequestContext) -> collections.defaultdict: context.authorize(vol_meta_policy.GET_POLICY) db_data = self.db.volume_glance_metadata_get_all(context) - results = collections.defaultdict(dict) + results: collections.defaultdict = collections.defaultdict(dict) for meta_entry in db_data: results[meta_entry['volume_id']].update({meta_entry['key']: meta_entry['value']}) return results - def get_volume_image_metadata(self, context, volume): + def get_volume_image_metadata(self, + context: context.RequestContext, + volume: objects.Volume) -> Dict[str, str]: context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume) db_data = self.db.volume_glance_metadata_get(context, volume['id']) LOG.info("Get volume image-metadata completed successfully.", resource=volume) return {meta_entry.key: meta_entry.value for meta_entry in db_data} - def get_list_volumes_image_metadata(self, context, volume_id_list): + def get_list_volumes_image_metadata( + self, + context: context.RequestContext, + volume_id_list: List[str]) -> DefaultDict[str, str]: db_data = self.db.volume_glance_metadata_list_get(context, volume_id_list) - results = collections.defaultdict(dict) + results: collections.defaultdict = collections.defaultdict(dict) for meta_entry in db_data: results[meta_entry['volume_id']].update({meta_entry['key']: meta_entry['value']}) return results - def _merge_volume_image_meta(self, context, volume, image_meta): + def _merge_volume_image_meta(self, + context: context.RequestContext, + volume: objects.Volume, + image_meta: dict) -> None: """Merges the image metadata from volume into image_meta""" glance_core_props = CONF.glance_core_properties if glance_core_props: @@ -1303,7 +1465,11 @@ class API(base.Base): # volume glance metadata table pass - def copy_volume_to_image(self, context, volume, metadata, force): + def copy_volume_to_image(self, + context: context.RequestContext, + volume: objects.Volume, + metadata: Dict[str, str], + force: bool) -> Dict[str, Optional[str]]: """Create a new image from the specified volume.""" if not CONF.enable_force_upload and force: LOG.info("Force upload to image is disabled, " @@ -1367,16 +1533,21 @@ class API(base.Base): resource=volume) return response - def _extend(self, context, volume, new_size, attached=False): + def _extend(self, + context: context.RequestContext, + volume: objects.Volume, + new_size: int, + attached: bool = False) -> None: value = {'status': 'extending', 'previous_status': volume.status} + expected: dict if attached: expected = {'status': 'in-use'} else: expected = {'status': 'available'} orig_status = {'status': volume.status} - def _roll_back_status(): + def _roll_back_status() -> None: status = orig_status['status'] msg = _('Could not return volume %(id)s to %(status)s.') try: @@ -1473,7 +1644,10 @@ class API(base.Base): LOG.info("Extend volume request issued successfully.", resource=volume) - def extend(self, context, volume, new_size): + def extend(self, + context: context.RequestContext, + volume: objects.Volume, + new_size: int) -> None: context.authorize(vol_action_policy.EXTEND_POLICY, target_obj=volume) self._extend(context, volume, new_size, attached=False) @@ -1481,13 +1655,21 @@ class API(base.Base): # NOTE(tommylikehu): New method is added here so that administrator # can enable/disable this ability by editing the policy file if the # cloud environment doesn't allow this operation. - def extend_attached_volume(self, context, volume, new_size): + def extend_attached_volume(self, + context: context.RequestContext, + volume: objects.Volume, + new_size: int) -> None: context.authorize(vol_action_policy.EXTEND_ATTACHED_POLICY, target_obj=volume) self._extend(context, volume, new_size, attached=True) - def migrate_volume(self, context, volume, host, cluster_name, force_copy, - lock_volume): + def migrate_volume(self, + context: context.RequestContext, + volume: objects.Volume, + host: str, + cluster_name: str, + force_copy: bool, + lock_volume: bool) -> None: """Migrate the volume to the specified host or cluster.""" elevated = context.elevated() context.authorize(vol_action_policy.MIGRATE_POLICY, @@ -1576,7 +1758,11 @@ class API(base.Base): LOG.info("Migrate volume request issued successfully.", resource=volume) - def migrate_volume_completion(self, context, volume, new_volume, error): + def migrate_volume_completion(self, + context: context.RequestContext, + volume: objects.Volume, + new_volume: objects.Volume, + error: bool) -> str: context.authorize(vol_action_policy.MIGRATE_COMPLETE_POLICY, target_obj=volume) if not (volume.migration_status or new_volume.migration_status): @@ -1616,7 +1802,10 @@ class API(base.Base): return self.volume_rpcapi.migrate_volume_completion(context, volume, new_volume, error) - def update_readonly_flag(self, context, volume, flag): + def update_readonly_flag(self, + context: context.RequestContext, + volume: objects.Volume, + flag) -> None: context.authorize(vol_action_policy.UPDATE_READONLY_POLICY, target_obj=volume) if volume['status'] != 'available': @@ -1631,7 +1820,11 @@ class API(base.Base): "completed successfully.", resource=volume) - def retype(self, context, volume, new_type, migration_policy=None): + def retype(self, + context: context.RequestContext, + volume: objects.Volume, + new_type: Union[str, objects.VolumeType], + migration_policy: Optional[str] = None) -> None: """Attempt to modify the type associated with an existing volume.""" context.authorize(vol_action_policy.RETYPE_POLICY, target_obj=volume) @@ -1750,8 +1943,12 @@ class API(base.Base): LOG.info("Retype volume request issued successfully.", resource=volume) - def _get_service_by_host_cluster(self, context, host, cluster_name, - resource='volume'): + def _get_service_by_host_cluster( + self, + context: context.RequestContext, + host: str, + cluster_name: Optional[str], + resource: str = 'volume') -> objects.Service: elevated = context.elevated() svc_cluster = cluster_name and volume_utils.extract_host(cluster_name, @@ -1786,9 +1983,17 @@ class API(base.Base): return service - def manage_existing(self, context, host, cluster_name, ref, name=None, - description=None, volume_type=None, metadata=None, - availability_zone=None, bootable=False): + def manage_existing(self, + context: context.RequestContext, + host: str, + cluster_name: Optional[str], + ref: dict, + name: Optional[str] = None, + description: Optional[str] = None, + volume_type: Optional[objects.VolumeType] = None, + metadata: Optional[dict] = None, + availability_zone: Optional[str] = None, + bootable: Optional[bool] = False) -> objects.Volume: if 'source-name' in ref: vol_id = volume_utils.extract_id_from_volume_name( @@ -1850,18 +2055,29 @@ class API(base.Base): resource=vol_ref) return vol_ref - def get_manageable_volumes(self, context, host, cluster_name, marker=None, - limit=None, offset=None, sort_keys=None, - sort_dirs=None): + def get_manageable_volumes(self, + context: context.RequestContext, + host: str, + cluster_name, + marker: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + sort_keys: Optional[List[str]] = None, + sort_dirs: Optional[List[str]] = None): svc = self._get_service_by_host_cluster(context, host, cluster_name) return self.volume_rpcapi.get_manageable_volumes(context, svc, marker, limit, offset, sort_keys, sort_dirs) - def manage_existing_snapshot(self, context, ref, volume, - name=None, description=None, - metadata=None): + def manage_existing_snapshot( + self, + context: context.RequestContext, + ref: dict, + volume: objects.Volume, + name: Optional[str] = None, + description: Optional[str] = None, + metadata: Optional[dict] = None) -> objects.Snapshot: # Ensure the service is up and not disabled. self._get_service_by_host_cluster(context, volume.host, volume.cluster_name, @@ -1879,9 +2095,16 @@ class API(base.Base): request_spec=objects.RequestSpec(**kwargs)) return snapshot_object - def get_manageable_snapshots(self, context, host, cluster_name, - marker=None, limit=None, offset=None, - sort_keys=None, sort_dirs=None): + def get_manageable_snapshots( + self, + context: context.RequestContext, + host: str, + cluster_name: Optional[str], + marker: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + sort_keys: Optional[List[str]] = None, + sort_dirs: Optional[List[str]] = None) -> List[dict]: svc = self._get_service_by_host_cluster(context, host, cluster_name, 'snapshot') return self.volume_rpcapi.get_manageable_snapshots(context, svc, @@ -1889,8 +2112,11 @@ class API(base.Base): offset, sort_keys, sort_dirs) - def _get_cluster_and_services_for_replication(self, ctxt, host, - cluster_name): + def _get_cluster_and_services_for_replication( + self, + ctxt: context.RequestContext, + host: str, + cluster_name: str) -> tuple: services = objects.ServiceList.get_all( ctxt, filters={'host': host, 'cluster_name': cluster_name, 'binary': constants.VOLUME_BINARY}) @@ -1923,9 +2149,15 @@ class API(base.Base): return cluster, services - def _replication_db_change(self, ctxt, field, expected_value, new_value, - host, cluster_name, check_up=False): - def _error_msg(service): + def _replication_db_change(self, + ctxt: context.RequestContext, + field: str, + expected_value: Union[bool, list], + new_value: Union[bool, str], + host: str, + cluster_name: str, + check_up: bool = False) -> tuple: + def _error_msg(service) -> str: expected = utils.build_or_str(str(expected_value)) up_msg = 'and must be up ' if check_up else '' msg = (_('%(field)s in %(service)s must be %(expected)s ' @@ -1973,7 +2205,11 @@ class API(base.Base): return cluster, services - def failover(self, ctxt, host, cluster_name, secondary_id=None): + def failover(self, + ctxt: context.RequestContext, + host: str, + cluster_name: str, + secondary_id: Optional[str] = None) -> None: ctxt.authorize(svr_policy.FAILOVER_POLICY) ctxt = ctxt if ctxt.is_admin else ctxt.elevated() @@ -1994,7 +2230,10 @@ class API(base.Base): self.volume_rpcapi.failover(ctxt, services[0], secondary_id) - def freeze_host(self, ctxt, host, cluster_name): + def freeze_host(self, + ctxt: context.RequestContext, + host: str, + cluster_name: str) -> None: ctxt.authorize(svr_policy.FREEZE_POLICY) ctxt = ctxt if ctxt.is_admin else ctxt.elevated() @@ -2009,7 +2248,10 @@ class API(base.Base): # `cinder service-disable reason=freeze` self.volume_rpcapi.freeze_host(ctxt, services[0]) - def thaw_host(self, ctxt, host, cluster_name): + def thaw_host(self, + ctxt: context.RequestContext, + host: str, + cluster_name: str) -> Optional[str]: ctxt.authorize(svr_policy.THAW_POLICY) ctxt = ctxt if ctxt.is_admin else ctxt.elevated() @@ -2022,7 +2264,11 @@ class API(base.Base): if not self.volume_rpcapi.thaw_host(ctxt, services[0]): return "Backend reported error during thaw_host operation." - def check_volume_filters(self, filters, strict=False): + return None + + def check_volume_filters(self, + filters: dict, + strict: bool = False) -> None: """Sets the user filter value to accepted format""" booleans = self.db.get_booleans_for_table('volume') @@ -2051,7 +2297,10 @@ class API(base.Base): except (ValueError, SyntaxError): LOG.debug('Could not evaluate value %s, assuming string', val) - def _check_boolean_filter_value(self, key, val, strict=False): + def _check_boolean_filter_value(self, + key: str, + val: str, + strict: bool = False) -> bool: """Boolean filter values in Volume GET. Before VOLUME_LIST_BOOTABLE, all values other than 'False', 'false', @@ -2088,7 +2337,11 @@ class API(base.Base): else: return bool(val) - def _attachment_reserve(self, ctxt, vref, instance_uuid=None): + def _attachment_reserve( + self, + ctxt: context.RequestContext, + vref: objects.Volume, + instance_uuid: Optional[str] = None) -> objects.VolumeAttachment: # NOTE(jdg): Reserved is a special case, we're avoiding allowing # creation of other new reserves/attachments while in this state # so we avoid contention issues with shared connections @@ -2147,12 +2400,13 @@ class API(base.Base): db_ref = self.db.volume_attach(ctxt.elevated(), values) return objects.VolumeAttachment.get_by_id(ctxt, db_ref['id']) - def attachment_create(self, - ctxt, - volume_ref, - instance_uuid, - connector=None, - attach_mode='null'): + def attachment_create( + self, + ctxt: context.RequestContext, + volume_ref: objects.Volume, + instance_uuid: str, + connector: Optional[dict] = None, + attach_mode: Optional[str] = 'null') -> objects.VolumeAttachment: """Create an attachment record for the specified volume.""" ctxt.authorize(attachment_policy.CREATE_POLICY, target_obj=volume_ref) connection_info = {} @@ -2197,7 +2451,10 @@ class API(base.Base): @coordination.synchronized( '{f_name}-{attachment_ref.volume_id}-{connector[host]}') - def attachment_update(self, ctxt, attachment_ref, connector): + def attachment_update(self, + ctxt: context.RequestContext, + attachment_ref: objects.VolumeAttachment, + connector) -> objects.VolumeAttachment: """Update an existing attachment record.""" # Valid items to update (connector includes mode and mountpoint): # 1. connector (required) @@ -2255,7 +2512,9 @@ class API(base.Base): attachment_ref.save() return attachment_ref - def attachment_delete(self, ctxt, attachment): + def attachment_delete(self, + ctxt: context.RequestContext, + attachment) -> objects.VolumeAttachmentList: ctxt.authorize(attachment_policy.DELETE_POLICY, target_obj=attachment) volume = attachment.volume diff --git a/mypy-files.txt b/mypy-files.txt index 134b5228cf4..0241292dc9d 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -10,6 +10,7 @@ cinder/manager.py cinder/scheduler/manager.py cinder/utils.py cinder/volume/__init__.py +cinder/volume/api.py cinder/volume/flows/api/create_volume.py cinder/volume/flows/manager/create_volume.py cinder/volume/manager.py