diff --git a/cinder/volume/drivers/remotefs.py b/cinder/volume/drivers/remotefs.py index 14204bb4558..7b8881a177b 100644 --- a/cinder/volume/drivers/remotefs.py +++ b/cinder/volume/drivers/remotefs.py @@ -26,16 +26,21 @@ import shutil import string import tempfile import time +import typing +from typing import Callable, List, Optional, Tuple, Union # noqa: H301 from castellan import key_manager +from os_brick.remotefs import remotefs as remotefs_brick from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils +from oslo_utils import imageutils from oslo_utils.secretutils import md5 from oslo_utils import units import six from cinder import compute +from cinder import context from cinder import coordination from cinder import db from cinder import exception @@ -110,7 +115,7 @@ CONF.register_opts(nas_opts, group=configuration.SHARED_CONF_GROUP) CONF.register_opts(volume_opts, group=configuration.SHARED_CONF_GROUP) -def locked_volume_id_operation(f): +def locked_volume_id_operation(f: Callable) -> Callable: """Lock decorator for volume operations. Takes a named lock prior to executing the operation. The lock is named @@ -154,9 +159,9 @@ class BackingFileTemplate(string.Template): class RemoteFSDriver(driver.BaseVD): """Common base for drivers that work like NFS.""" - driver_volume_type = None + driver_volume_type: Optional[str] = None driver_prefix = 'remotefs' - volume_backend_name = None + volume_backend_name: Optional[str] = None vendor_name = 'Open Source' SHARE_FORMAT_REGEX = r'.+:/.+' @@ -168,7 +173,7 @@ class RemoteFSDriver(driver.BaseVD): def __init__(self, *args, **kwargs): super(RemoteFSDriver, self).__init__(*args, **kwargs) self.shares = {} - self._mounted_shares = [] + self._mounted_shares: List[str] = [] self._execute_as_root = True self._is_voldb_empty_at_startup = kwargs.pop('is_vol_db_empty', None) self._supports_encryption = False @@ -178,12 +183,14 @@ class RemoteFSDriver(driver.BaseVD): self.configuration.append_config_values(nas_opts) self.configuration.append_config_values(volume_opts) - def check_for_setup_error(self): + def check_for_setup_error(self) -> None: """Just to override parent behavior.""" pass @volume_utils.trace - def initialize_connection(self, volume, connector): + def initialize_connection(self, + volume: objects.Volume, + connector: dict) -> dict: """Allow connection to connector and return connection info. :param volume: volume reference @@ -199,7 +206,7 @@ class RemoteFSDriver(driver.BaseVD): 'mount_point_base': self._get_mount_point_base() } - def do_setup(self, context): + def do_setup(self, context: context.RequestContext) -> None: """Any initialization the volume driver does while starting.""" super(RemoteFSDriver, self).do_setup(context) @@ -223,7 +230,7 @@ class RemoteFSDriver(driver.BaseVD): LOG.error(msg) raise exception.InvalidConfigurationValue(msg) - def _get_provisioned_capacity(self): + def _get_provisioned_capacity(self) -> float: """Returns the provisioned capacity. Get the sum of sizes of volumes, snapshots and any other @@ -237,7 +244,7 @@ class RemoteFSDriver(driver.BaseVD): provisioned_size += int(out.split()[0]) return round(provisioned_size / units.Gi, 2) - def _get_mount_point_base(self): + def _get_mount_point_base(self) -> Optional[str]: """Returns the mount point base for the remote fs. This method facilitates returning mount point base @@ -253,10 +260,10 @@ class RemoteFSDriver(driver.BaseVD): return None @staticmethod - def _validate_state(current_state, - acceptable_states, + def _validate_state(current_state: str, + acceptable_states: Union[tuple, list], obj_description='volume', - invalid_exc=exception.InvalidVolume): + invalid_exc=exception.InvalidVolume) -> None: if current_state not in acceptable_states: message = _('Invalid %(obj_description)s state. ' 'Acceptable states for this operation: ' @@ -270,7 +277,7 @@ class RemoteFSDriver(driver.BaseVD): current_state=current_state)) @volume_utils.trace - def create_volume(self, volume): + def create_volume(self, volume: objects.Volume) -> dict: """Creates a volume. :param volume: volume reference @@ -292,7 +299,7 @@ class RemoteFSDriver(driver.BaseVD): return {'provider_location': volume.provider_location} - def _do_create_volume(self, volume): + def _do_create_volume(self, volume: objects.Volume) -> None: """Create a volume on given remote share. :param volume: volume reference @@ -331,9 +338,9 @@ class RemoteFSDriver(driver.BaseVD): with volume.obj_as_admin(): volume.save() - def _ensure_shares_mounted(self): + def _ensure_shares_mounted(self) -> None: """Look for remote shares in the flags and mount them locally.""" - mounted_shares = [] + mounted_shares: List[str] = [] self._load_shares_config(getattr(self.configuration, self.driver_prefix + @@ -351,7 +358,7 @@ class RemoteFSDriver(driver.BaseVD): LOG.debug('Available shares %s', self._mounted_shares) @volume_utils.trace - def delete_volume(self, volume): + def delete_volume(self, volume: objects.Volume) -> None: """Deletes a logical volume. :param volume: volume reference @@ -371,11 +378,16 @@ class RemoteFSDriver(driver.BaseVD): self._delete(mounted_path) - def ensure_export(self, ctx, volume): + def ensure_export(self, + ctx: context.RequestContext, + volume: objects.Volume) -> None: """Synchronously recreates an export for a logical volume.""" self._ensure_share_mounted(volume.provider_location) - def create_export(self, ctx, volume, connector): + def create_export(self, + ctx: context.RequestContext, + volume: objects.Volume, + connector: dict) -> None: """Exports the volume. Can optionally return a dictionary of changes @@ -383,11 +395,13 @@ class RemoteFSDriver(driver.BaseVD): """ pass - def remove_export(self, ctx, volume): + def remove_export(self, + ctx: context.RequestContext, + volume: objects.Volume) -> None: """Removes an export for a logical volume.""" pass - def delete_snapshot(self, snapshot): + def delete_snapshot(self, snapshot: objects.Snapshot) -> None: """Delete snapshot. Do nothing for this driver, but allow manager to handle deletion @@ -395,17 +409,17 @@ class RemoteFSDriver(driver.BaseVD): """ pass - def _delete(self, path): + def _delete(self, path: str) -> None: # Note(lpetrut): this method is needed in order to provide # interoperability with Windows as it will be overridden. self._execute('rm', '-f', path, run_as_root=self._execute_as_root) - def _create_sparsed_file(self, path, size): + def _create_sparsed_file(self, path: str, size: int) -> None: """Creates a sparse file of a given size in GiB.""" self._execute('truncate', '-s', '%sG' % size, path, run_as_root=self._execute_as_root) - def _create_regular_file(self, path, size): + def _create_regular_file(self, path: str, size: int) -> None: """Creates a regular file of given size in GiB.""" block_size_mb = 1 @@ -416,7 +430,7 @@ class RemoteFSDriver(driver.BaseVD): 'count=%d' % block_count, run_as_root=self._execute_as_root) - def _create_qcow2_file(self, path, size_gb): + def _create_qcow2_file(self, path: str, size_gb: int) -> None: """Creates a QCOW2 file of a given size in GiB.""" self._execute('qemu-img', 'create', '-f', 'qcow2', @@ -425,10 +439,10 @@ class RemoteFSDriver(driver.BaseVD): run_as_root=self._execute_as_root) def _create_encrypted_volume_file(self, - path, - size_gb, - encryption, - context): + path: str, + size_gb: int, + encryption: dict, + context: context.RequestContext) -> None: """Create an encrypted volume. This works by creating an encrypted image locally, @@ -465,7 +479,7 @@ class RemoteFSDriver(driver.BaseVD): path, str(size_gb * units.Gi), run_as_root=self._execute_as_root) - def _set_rw_permissions(self, path): + def _set_rw_permissions(self, path: str) -> None: """Sets access permissions for given NFS path. Volume file permissions are set based upon the value of @@ -487,17 +501,17 @@ class RemoteFSDriver(driver.BaseVD): self._execute('chmod', permissions, path, run_as_root=self._execute_as_root) - def _set_rw_permissions_for_all(self, path): + def _set_rw_permissions_for_all(self, path: str) -> None: """Sets 666 permissions for the path.""" self._execute('chmod', 'ugo+rw', path, run_as_root=self._execute_as_root) - def _set_rw_permissions_for_owner(self, path): + def _set_rw_permissions_for_owner(self, path: str) -> None: """Sets read-write permissions to the owner for the path.""" self._execute('chmod', 'u+rw', path, run_as_root=self._execute_as_root) - def local_path(self, volume): + def local_path(self, volume: objects.Volume) -> str: """Get volume path (mounted locally fs path) for given volume. :param volume: volume reference @@ -506,7 +520,11 @@ class RemoteFSDriver(driver.BaseVD): return os.path.join(self._get_mount_point_for_share(remotefs_share), volume.name) - def copy_image_to_volume(self, context, volume, image_service, image_id): + def copy_image_to_volume(self, + context: context.RequestContext, + volume: objects.Volume, + image_service, + image_id: str) -> None: """Fetch the image from image_service and write it to the volume.""" image_utils.fetch_to_raw(context, @@ -536,7 +554,11 @@ class RemoteFSDriver(driver.BaseVD): reason=(_("Expected volume size was %d") % volume.size) + (_(" but size is now %d") % virt_size)) - def copy_volume_to_image(self, context, volume, image_service, image_meta): + def copy_volume_to_image(self, + context: context.RequestContext, + volume: objects.Volume, + image_service, + image_meta: dict) -> None: """Copy the volume to the specified image.""" volume_utils.upload_volume(context, image_service, @@ -545,12 +567,12 @@ class RemoteFSDriver(driver.BaseVD): volume, run_as_root=self._execute_as_root) - def _read_config_file(self, config_file): + def _read_config_file(self, config_file: str) -> List[str]: # Returns list of lines in file with open(config_file) as f: return f.readlines() - def _load_shares_config(self, share_file=None): + def _load_shares_config(self, share_file: Optional[str] = None) -> None: self.shares = {} if all((self.configuration.nas_host, @@ -606,14 +628,17 @@ class RemoteFSDriver(driver.BaseVD): LOG.debug("shares loaded: %s", self.shares) - def _get_mount_point_for_share(self, path): + def _get_mount_point_for_share(self, path: str) -> str: raise NotImplementedError() - def terminate_connection(self, volume, connector, **kwargs): + def terminate_connection(self, + volume: objects.Volume, + connector: dict, + **kwargs) -> None: """Disallow connection from connector.""" pass - def _update_volume_stats(self): + def _update_volume_stats(self) -> None: """Retrieve stats info from volume group.""" data = {} @@ -638,16 +663,16 @@ class RemoteFSDriver(driver.BaseVD): data['QoS_support'] = False self._stats = data - def _get_capacity_info(self, share): + def _get_capacity_info(self, share: str): raise NotImplementedError() - def _find_share(self, volume): + def _find_share(self, volume: objects.Volume): raise NotImplementedError() - def _ensure_share_mounted(self, share): + def _ensure_share_mounted(self, share: str): raise NotImplementedError() - def secure_file_operations_enabled(self): + def secure_file_operations_enabled(self) -> bool: """Determine if driver is operating in Secure File Operations mode. The Cinder Volume driver needs to query if this driver is operating @@ -657,7 +682,7 @@ class RemoteFSDriver(driver.BaseVD): return True return False - def set_nas_security_options(self, is_new_cinder_install): + def set_nas_security_options(self, is_new_cinder_install: bool) -> None: """Determine the setting to use for Secure NAS options. This method must be overridden by child wishing to use secure @@ -680,8 +705,11 @@ class RemoteFSDriver(driver.BaseVD): "information on a secure NFS configuration.", doc_html) - def _determine_nas_security_option_setting(self, nas_option, mount_point, - is_new_cinder_install): + def _determine_nas_security_option_setting( + self, + nas_option: str, + mount_point: str, + is_new_cinder_install: bool) -> str: """Determine NAS security option setting when 'auto' is assigned. This method determines the final 'true'/'false' setting of an NAS @@ -746,7 +774,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): _local_volume_dir(self, volume) """ - _VALID_IMAGE_EXTENSIONS = [] + _VALID_IMAGE_EXTENSIONS: List[str] = [] # The following flag may be overridden by the concrete drivers in order # to avoid using temporary volume snapshots when creating volume clones, # when possible. @@ -754,44 +782,44 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): _always_use_temp_snap_when_cloning = True def __init__(self, *args, **kwargs): - self._remotefsclient = None - self.base = None - self._nova = None + self._remotefsclient: remotefs_brick.RemoteFsClient = None + self.base: Optional[str] = None + self._nova: Optional[db.base.Base] = None super(RemoteFSSnapDriverBase, self).__init__(*args, **kwargs) - def do_setup(self, context): + def do_setup(self, context: context.RequestContext) -> None: super(RemoteFSSnapDriverBase, self).do_setup(context) self._nova = compute.API() - def snapshot_revert_use_temp_snapshot(self): + def snapshot_revert_use_temp_snapshot(self) -> bool: # Considering that RemoteFS based drivers use COW images # for storing snapshots, having chains of such images, # creating a backup snapshot when reverting one is not # actutally helpful. return False - def _local_volume_dir(self, volume): + def _local_volume_dir(self, volume: objects.Volume) -> str: share = volume.provider_location local_dir = self._get_mount_point_for_share(share) return local_dir - def _local_path_volume(self, volume): + def _local_path_volume(self, volume: objects.Volume) -> str: path_to_disk = os.path.join( self._local_volume_dir(volume), volume.name) return path_to_disk - def _get_new_snap_path(self, snapshot): + def _get_new_snap_path(self, snapshot: objects.Snapshot) -> str: vol_path = self.local_path(snapshot.volume) snap_path = '%s.%s' % (vol_path, snapshot.id) return snap_path - def _local_path_volume_info(self, volume): + def _local_path_volume_info(self, volume: objects.Volume) -> str: return '%s%s' % (self.local_path(volume), '.info') - def _read_file(self, filename): + def _read_file(self, filename: str) -> str: """This method is to make it easier to stub out code for testing. Returns a string representing the contents of the file. @@ -800,7 +828,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): with open(filename, 'r') as f: return f.read() - def _write_info_file(self, info_path, snap_info): + def _write_info_file(self, info_path: str, snap_info: dict) -> None: if 'active' not in snap_info.keys(): msg = _("'active' must be present when writing snap_info.") raise exception.RemoteFSException(msg) @@ -815,10 +843,13 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): with open(info_path, 'w') as f: json.dump(snap_info, f, indent=1, sort_keys=True) - def _qemu_img_info_base(self, path, volume_name, basedir, + def _qemu_img_info_base(self, + path: str, + volume_name: str, + basedir: str, ext_bf_template=None, force_share=False, - run_as_root=False): + run_as_root=False) -> imageutils.QemuImgInfo: """Sanitize image_utils' qemu_img_info. This code expects to deal only with relative filenames. @@ -871,10 +902,13 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): return info - def _qemu_img_info(self, path, volume_name): + def _qemu_img_info(self, path: str, volume_name: str): raise NotImplementedError() - def _img_commit(self, path, passphrase_file=None, backing_file=None): + def _img_commit(self, + path: str, + passphrase_file: Optional[str] = None, + backing_file: Optional[str] = None) -> None: # TODO(eharney): this is not using the correct permissions for # NFS snapshots # It needs to run as root for volumes attached to instances, but @@ -901,8 +935,11 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): self._execute(*cmd, run_as_root=self._execute_as_root) self._delete(path) - def _rebase_img(self, image, backing_file, volume_format, - passphrase_file=None): + def _rebase_img(self, + image: str, + backing_file: str, + volume_format: str, + passphrase_file: Optional[str] = None) -> None: # qemu-img create must run as root, because it reads from the # backing file, which will be owned by qemu:qemu if attached to an # instance. @@ -922,7 +959,9 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): self._execute(*command, run_as_root=self._execute_as_root) - def _read_info_file(self, info_path, empty_if_missing=False): + def _read_info_file(self, + info_path: str, + empty_if_missing: bool = False) -> dict: """Return dict of snapshot information. :param info_path: path to file @@ -935,7 +974,8 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): return json.loads(self._read_file(info_path)) - def _get_higher_image_path(self, snapshot): + def _get_higher_image_path(self, + snapshot: objects.Snapshot) -> Optional[str]: volume = snapshot.volume info_path = self._local_path_volume_info(volume) snap_info = self._read_info_file(info_path) @@ -954,7 +994,9 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): None) return higher_file - def _get_backing_chain_for_path(self, volume, path): + def _get_backing_chain_for_path(self, + volume: objects.Volume, + path: str) -> List[dict]: """Returns list of dicts containing backing-chain information. Includes 'filename', and 'backing-filename' for each @@ -990,7 +1032,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): return output - def _get_hash_str(self, base_str): + def _get_hash_str(self, base_str) -> str: """Return a string that represents hash of base_str. Returns string in a hex format. @@ -999,14 +1041,14 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): base_str = base_str.encode('utf-8') return md5(base_str, usedforsecurity=False).hexdigest() - def _get_mount_point_for_share(self, share): + def _get_mount_point_for_share(self, share: str) -> str: """Return mount point for share. :param share: example 172.18.194.100:/var/fs """ return self._remotefsclient.get_mount_point(share) - def _get_available_capacity(self, share): + def _get_available_capacity(self, share: str) -> Tuple[int, int]: """Calculate available space on the share. :param share: example 172.18.194.100:/var/fs @@ -1023,15 +1065,20 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): return available, size - def _get_capacity_info(self, remotefs_share): + def _get_capacity_info(self, + remotefs_share: str) -> Tuple[int, int, int]: available, size = self._get_available_capacity(remotefs_share) return size, available, size - available - def _get_mount_point_base(self): + def _get_mount_point_base(self) -> Optional[str]: return self.base - def _copy_volume_to_image(self, context, volume, image_service, - image_meta, store_id=None): + def _copy_volume_to_image(self, + context: context.RequestContext, + volume: objects.Volume, + image_service, + image_meta: dict, + store_id: Optional[str] = None) -> None: """Copy the volume to the specified image.""" # If snapshots exist, flatten to a temporary image, and upload it @@ -1066,7 +1113,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): volume, run_as_root=self._execute_as_root) - def get_active_image_from_info(self, volume): + def get_active_image_from_info(self, volume: objects.Volume) -> str: """Returns filename of the active image from the info file.""" info_file = self._local_path_volume_info(volume) @@ -1080,14 +1127,14 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): return snap_info['active'] - def _local_path_active_image(self, volume): + def _local_path_active_image(self, volume: objects.Volume) -> str: active_fname = self.get_active_image_from_info(volume) vol_dir = self._local_volume_dir(volume) active_fpath = os.path.join(vol_dir, active_fname) return active_fpath - def _get_snapshot_backing_file(self, snapshot): + def _get_snapshot_backing_file(self, snapshot: objects.Snapshot) -> str: info_path = self._local_path_volume_info(snapshot.volume) snap_info = self._read_info_file(info_path) vol_dir = self._local_volume_dir(snapshot.volume) @@ -1097,10 +1144,11 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): # Find the file which backs this file, which represents the point # in which this snapshot was created. - img_info = self._qemu_img_info(forward_path) + # TODO: something is wrong here + img_info = self._qemu_img_info(forward_path) # type: ignore return img_info.backing_file - def _snapshots_exist(self, volume): + def _snapshots_exist(self, volume: objects.Volume) -> bool: if not volume.provider_location: return False @@ -1109,10 +1157,13 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): return not utils.paths_normcase_equal(active_fpath, base_vol_path) - def _is_volume_attached(self, volume): + def _is_volume_attached(self, volume: objects.Volume) -> bool: return volume.attach_status == fields.VolumeAttachStatus.ATTACHED - def _create_cloned_volume(self, volume, src_vref, context): + def _create_cloned_volume(self, + volume: objects.Volume, + src_vref: objects.Volume, + context: context.RequestContext) -> dict: LOG.info('Cloning volume %(src)s to volume %(dst)s', {'src': src_vref.id, 'dst': volume.id}) @@ -1125,9 +1176,12 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): volume_name = CONF.volume_name_template % volume.id # Create fake volume and snapshot objects - vol_attrs = ['provider_location', 'size', 'id', 'name', 'status', - 'volume_type', 'metadata', 'obj_context'] - Volume = collections.namedtuple('Volume', vol_attrs) + Volume = collections.namedtuple('Volume', + ('provider_location', 'size', 'id', + 'name', 'status', + 'volume_type', 'metadata', + 'obj_context')) + volume_info = Volume(provider_location=src_vref.provider_location, size=src_vref.size, id=volume.id, @@ -1185,11 +1239,11 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): volume.save() return {'provider_location': src_vref.provider_location} - def _copy_volume_image(self, src_path, dest_path): + def _copy_volume_image(self, src_path: str, dest_path: str) -> None: shutil.copyfile(src_path, dest_path) self._set_rw_permissions(dest_path) - def _delete_stale_snapshot(self, snapshot): + def _delete_stale_snapshot(self, snapshot: objects.Snapshot) -> None: info_path = self._local_path_volume_info(snapshot.volume) snap_info = self._read_info_file(info_path) @@ -1205,7 +1259,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): del(snap_info[snapshot.id]) self._write_info_file(info_path, snap_info) - def _delete_snapshot(self, snapshot): + def _delete_snapshot(self, snapshot: objects.Snapshot) -> None: """Delete a snapshot. If volume status is 'available', delete snapshot here in Cinder @@ -1382,7 +1436,9 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): del(snap_info[snapshot.id]) self._write_info_file(info_path, snap_info) - def _create_volume_from_snapshot(self, volume, snapshot): + def _create_volume_from_snapshot(self, + volume: objects.Volume, + snapshot: objects.Snapshot) -> dict: """Creates a volume from a snapshot. Snapshot must not be the active snapshot. (offline) @@ -1411,13 +1467,19 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): return {'provider_location': volume.provider_location} - def _copy_volume_from_snapshot(self, snapshot, volume, volume_size, - src_encryption_key_id=None, - new_encryption_key_id=None): + def _copy_volume_from_snapshot( + self, + snapshot: objects.Snapshot, + volume: objects.Volume, + volume_size: int, + src_encryption_key_id: Optional[str] = None, + new_encryption_key_id: Optional[str] = None): raise NotImplementedError() - def _do_create_snapshot(self, snapshot, backing_filename, - new_snap_path): + def _do_create_snapshot(self, + snapshot: objects.Snapshot, + backing_filename: str, + new_snap_path: str) -> None: """Create a QCOW2 file backed by another file. :param snapshot: snapshot reference @@ -1485,7 +1547,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): cipher_spec = image_utils.decode_cipher(encryption['cipher'], encryption['key_size']) - command = ('qemu-img', 'create', '-f' 'qcow2', + command = ['qemu-img', 'create', '-f' 'qcow2', '-o', 'encrypt.format=luks,encrypt.key-secret=s1,' 'encrypt.cipher-alg=%(cipher_alg)s,' 'encrypt.cipher-mode=%(cipher_mode)s,' @@ -1493,7 +1555,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): '-b', 'json:' + file_json, '--object', 'secret,id=s0,file=' + tmp_key.name, '--object', 'secret,id=s1,file=' + tmp_key.name, - new_snap_path) + new_snap_path] self._execute(*command, run_as_root=self._execute_as_root) command_path = 'encrypt.key-secret=s0,file.filename=' @@ -1523,7 +1585,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): new_snap_path] self._execute(*command, run_as_root=self._execute_as_root) - def _create_snapshot(self, snapshot): + def _create_snapshot(self, snapshot: objects.Snapshot) -> None: """Create a snapshot. If volume is attached, call to Nova to create snapshot, providing a @@ -1671,8 +1733,10 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): snap_info[snapshot.id] = active self._write_info_file(info_path, snap_info) - def _create_snapshot_online(self, snapshot, backing_filename, - new_snap_path): + def _create_snapshot_online(self, + snapshot: objects.Snapshot, + backing_filename: str, + new_snap_path: str) -> None: # Perform online snapshot via Nova self._do_create_snapshot(snapshot, backing_filename, @@ -1685,6 +1749,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): } try: + assert self._nova is not None result = self._nova.create_volume_snapshot( snapshot.obj_context, snapshot.volume_id, @@ -1740,7 +1805,10 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): 'for creation of snapshot %s.') % snapshot.id raise exception.RemoteFSException(msg) - def _delete_snapshot_online(self, context, snapshot, info): + def _delete_snapshot_online(self, + context: context.RequestContext, + snapshot: objects.Snapshot, + info: dict) -> None: # Update info over the course of this method # active file never changes info_path = self._local_path_volume_info(snapshot.volume) @@ -1796,8 +1864,12 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): self._local_volume_dir(snapshot.volume), file_to_delete) self._delete(path_to_delete) - def _nova_assisted_vol_snap_delete(self, context, snapshot, delete_info): + def _nova_assisted_vol_snap_delete(self, + context: context.RequestContext, + snapshot: objects.Snapshot, + delete_info: dict) -> None: try: + assert self._nova is not None self._nova.delete_volume_snapshot( context, snapshot.id, @@ -1843,51 +1915,65 @@ class RemoteFSSnapDriverBase(RemoteFSDriver): {'id': snapshot.id} raise exception.RemoteFSException(msg) - def _extend_volume(self, volume, size_gb): + def _extend_volume(self, volume: objects.Volume, size_gb: int): raise NotImplementedError() - def _revert_to_snapshot(self, context, volume, snapshot): + def _revert_to_snapshot(self, + context: context.RequestContext, + volume: objects.Volume, + snapshot: objects.Snapshot): raise NotImplementedError() class RemoteFSSnapDriver(RemoteFSSnapDriverBase): @locked_volume_id_operation - def create_snapshot(self, snapshot): + def create_snapshot(self, snapshot: objects.Snapshot) -> None: """Apply locking to the create snapshot operation.""" return self._create_snapshot(snapshot) @locked_volume_id_operation - def delete_snapshot(self, snapshot): + def delete_snapshot(self, snapshot: objects.Snapshot) -> None: """Apply locking to the delete snapshot operation.""" return self._delete_snapshot(snapshot) @locked_volume_id_operation - def create_volume_from_snapshot(self, volume, snapshot): + def create_volume_from_snapshot(self, + volume: objects.Volume, + snapshot: objects.Snapshot) -> dict: return self._create_volume_from_snapshot(volume, snapshot) # TODO: should be locking on src_vref id -- bug #1852449 @locked_volume_id_operation - def create_cloned_volume(self, volume, src_vref): + def create_cloned_volume(self, + volume: objects.Volume, + src_vref: objects.Volume) -> dict: """Creates a clone of the specified volume.""" return self._create_cloned_volume(volume, src_vref, src_vref.obj_context) @locked_volume_id_operation - def copy_volume_to_image(self, context, volume, image_service, image_meta): + def copy_volume_to_image(self, + context: context.RequestContext, + volume: objects.Volume, + image_service, + image_meta: dict) -> None: """Copy the volume to the specified image.""" return self._copy_volume_to_image(context, volume, image_service, image_meta) @locked_volume_id_operation - def extend_volume(self, volume, size_gb): + def extend_volume(self, volume: objects.Volume, size_gb: int) -> None: return self._extend_volume(volume, size_gb) @locked_volume_id_operation - def revert_to_snapshot(self, context, volume, snapshot): + def revert_to_snapshot(self, + context: context.RequestContext, + volume: objects.Volume, + snapshot: objects.Snapshot) -> None: """Revert to specified snapshot.""" return self._revert_to_snapshot(context, volume, snapshot) @@ -1895,43 +1981,54 @@ class RemoteFSSnapDriver(RemoteFSSnapDriverBase): class RemoteFSSnapDriverDistributed(RemoteFSSnapDriverBase): @coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}') - def create_snapshot(self, snapshot): + def create_snapshot(self, snapshot: objects.Snapshot) -> None: """Apply locking to the create snapshot operation.""" return self._create_snapshot(snapshot) @coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}') - def delete_snapshot(self, snapshot): + def delete_snapshot(self, snapshot: objects.Snapshot) -> None: """Apply locking to the delete snapshot operation.""" return self._delete_snapshot(snapshot) @coordination.synchronized('{self.driver_prefix}-{volume.id}') - def create_volume_from_snapshot(self, volume, snapshot): + def create_volume_from_snapshot(self, + volume: objects.Volume, + snapshot: objects.Snapshot) -> dict: return self._create_volume_from_snapshot(volume, snapshot) # lock the source volume id first @coordination.synchronized('{self.driver_prefix}-{src_vref.id}') @coordination.synchronized('{self.driver_prefix}-{volume.id}') - def create_cloned_volume(self, volume, src_vref): + def create_cloned_volume(self, + volume: objects.Volume, + src_vref: objects.Volume) -> dict: """Creates a clone of the specified volume.""" return self._create_cloned_volume(volume, src_vref, src_vref.obj_context) @coordination.synchronized('{self.driver_prefix}-{volume.id}') - def copy_volume_to_image(self, context, volume, image_service, image_meta): + def copy_volume_to_image(self, + context: context.RequestContext, + volume: objects.Volume, + image_service, + image_meta: dict) -> None: """Copy the volume to the specified image.""" return self._copy_volume_to_image(context, volume, image_service, image_meta) @coordination.synchronized('{self.driver_prefix}-{volume.id}') - def extend_volume(self, volume, size_gb): + def extend_volume(self, volume: objects.Volume, size_gb: int) -> None: return self._extend_volume(volume, size_gb) @coordination.synchronized('{self.driver_prefix}-{volume.id}') - def revert_to_snapshot(self, context, volume, snapshot): + def revert_to_snapshot(self, + context: context.RequestContext, + volume: objects.Volume, + snapshot: objects.Snapshot) -> None: """Revert to specified snapshot.""" return self._revert_to_snapshot(context, volume, snapshot) @@ -1940,24 +2037,26 @@ class RemoteFSSnapDriverDistributed(RemoteFSSnapDriverBase): class RemoteFSPoolMixin(object): """Drivers inheriting this will report each share as a pool.""" - def _find_share(self, volume): + def _find_share(self, volume: objects.Volume) -> Optional[str]: # We let the scheduler choose a pool for us. pool_name = self._get_pool_name_from_volume(volume) share = self._get_share_from_pool_name(pool_name) return share - def _get_pool_name_from_volume(self, volume): + def _get_pool_name_from_volume(self, + volume: objects.Volume) -> Optional[str]: pool_name = volume_utils.extract_host(volume['host'], level='pool') return pool_name - def _get_pool_name_from_share(self, share): + def _get_pool_name_from_share(self, share: str): raise NotImplementedError() - def _get_share_from_pool_name(self, pool_name): + def _get_share_from_pool_name(self, pool_name: Optional[str]): # To be implemented by drivers using pools. raise NotImplementedError() + @typing.no_type_check def _update_volume_stats(self): data = {} pools = [] @@ -2001,6 +2100,7 @@ class RemoteFSPoolMixin(object): class RevertToSnapshotMixin(object): + @typing.no_type_check def _revert_to_snapshot(self, context, volume, snapshot): """Revert a volume to specified snapshot @@ -2048,6 +2148,7 @@ class RemoteFSManageableVolumesMixin(object): _SUPPORTED_IMAGE_FORMATS = ['raw', 'qcow2'] _MANAGEABLE_IMAGE_RE = None + @typing.no_type_check def _get_manageable_vol_location(self, existing_ref): if 'source-name' not in existing_ref: reason = _('The existing volume reference ' @@ -2095,6 +2196,7 @@ class RemoteFSManageableVolumesMixin(object): return os.path.join(volume_location['mountpoint'], volume.name) + @typing.no_type_check def _is_volume_manageable(self, volume_path, already_managed=False): unmanageable_reason = None @@ -2120,6 +2222,7 @@ class RemoteFSManageableVolumesMixin(object): return True, None + @typing.no_type_check def manage_existing(self, volume, existing_ref): LOG.info('Managing volume %(volume_id)s with ref %(ref)s', {'volume_id': volume.id, 'ref': existing_ref}) @@ -2149,6 +2252,7 @@ class RemoteFSManageableVolumesMixin(object): return {'provider_location': vol_location['share']} + @typing.no_type_check def _get_rounded_manageable_image_size(self, image_path): image_size = image_utils.qemu_img_info( image_path, run_as_root=self._execute_as_root).virtual_size @@ -2162,6 +2266,7 @@ class RemoteFSManageableVolumesMixin(object): def unmanage(self, volume): pass + @typing.no_type_check def _get_manageable_volume(self, share, volume_path, managed_volume=None): manageable, unmanageable_reason = self._is_volume_manageable( volume_path, already_managed=managed_volume is not None) @@ -2192,6 +2297,7 @@ class RemoteFSManageableVolumesMixin(object): } return manageable_volume + @typing.no_type_check def _get_share_manageable_volumes(self, share, managed_volumes): manageable_volumes = [] mount_path = self._get_mount_point_for_share(share) @@ -2217,6 +2323,7 @@ class RemoteFSManageableVolumesMixin(object): dict(image_path=img_path, exc=exc)) return manageable_volumes + @typing.no_type_check def get_manageable_volumes(self, cinder_volumes, marker, limit, offset, sort_keys, sort_dirs): manageable_volumes = [] diff --git a/mypy-files.txt b/mypy-files.txt index dfae267dc9e..fc05a3b90fa 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -31,6 +31,7 @@ cinder/utils.py cinder/volume/__init__.py cinder/volume/api.py cinder/volume/drivers/rbd.py +cinder/volume/drivers/remotefs.py cinder/volume/flows/api/create_volume.py cinder/volume/flows/manager/create_volume.py cinder/volume/manager.py