From 07e0156c20db4d0cdfd4cce02858e3254589f638 Mon Sep 17 00:00:00 2001 From: Eric Harney Date: Tue, 27 Apr 2021 13:08:05 -0400 Subject: [PATCH] mypy: ceph backup driver Change-Id: I81da9ae38f03012d2589f8352135652fed338e84 --- cinder/backup/drivers/ceph.py | 193 +++++++++++++++++++++++----------- mypy-files.txt | 1 + 2 files changed, 132 insertions(+), 62 deletions(-) diff --git a/cinder/backup/drivers/ceph.py b/cinder/backup/drivers/ceph.py index 21a2a3a98c0..6173d9260e8 100644 --- a/cinder/backup/drivers/ceph.py +++ b/cinder/backup/drivers/ceph.py @@ -48,6 +48,7 @@ import os import re import subprocess import time +from typing import Dict, List, Optional, Tuple # noqa: H301 import eventlet from os_brick.initiator import linuxrbd @@ -60,6 +61,7 @@ from cinder.backup import driver from cinder import exception from cinder.i18n import _ from cinder import interface +from cinder import objects from cinder import utils import cinder.volume.drivers.rbd as rbd_driver @@ -102,21 +104,21 @@ CONF.register_opts(service_opts) class VolumeMetadataBackup(object): - def __init__(self, client, backup_id): - self._client = client - self._backup_id = backup_id + def __init__(self, client: 'rados.Rados', backup_id: str): + self._client: 'rados.Rados' = client + self._backup_id: str = backup_id @property - def name(self): + def name(self) -> str: return utils.convert_str("backup.%s.meta" % self._backup_id) @property - def exists(self): + def exists(self) -> bool: meta_obj = eventlet.tpool.Proxy(rados.Object(self._client.ioctx, self.name)) return self._exists(meta_obj) - def _exists(self, obj): + def _exists(self, obj) -> bool: try: obj.stat() except rados.ObjectNotFound: @@ -124,7 +126,7 @@ class VolumeMetadataBackup(object): else: return True - def set(self, json_meta): + def set(self, json_meta: str) -> None: """Write JSON metadata to a new object. This should only be called once per backup. Raises @@ -138,7 +140,7 @@ class VolumeMetadataBackup(object): meta_obj.write(json_meta.encode('utf-8')) - def get(self): + def get(self) -> Optional[str]: """Get metadata backup object. Returns None if the object does not exist. @@ -151,7 +153,7 @@ class VolumeMetadataBackup(object): return meta_obj.read().decode('utf-8') - def remove_if_exists(self): + def remove_if_exists(self) -> None: meta_obj = eventlet.tpool.Proxy(rados.Object(self._client.ioctx, self.name)) try: @@ -195,14 +197,15 @@ class CephBackupDriver(driver.BackupDriver): self._ceph_backup_conf = utils.convert_str(CONF.backup_ceph_conf) @staticmethod - def get_driver_options(): + def get_driver_options() -> list: return service_opts - def _validate_string_args(self, *args): + def _validate_string_args(self, *args: str) -> bool: """Ensure all args are non-None and non-empty.""" return all(args) - def _ceph_args(self, user, conf=None, pool=None): + def _ceph_args(self, user: str, conf: Optional[str] = None, + pool: Optional[str] = None) -> List[str]: """Create default ceph args for executing rbd commands. If no --conf is provided, rbd will look in the default locations e.g. @@ -224,31 +227,31 @@ class CephBackupDriver(driver.BackupDriver): return args @property - def _supports_layering(self): + def _supports_layering(self) -> bool: """Determine if copy-on-write is supported by our version of librbd.""" return hasattr(self.rbd, 'RBD_FEATURE_LAYERING') @property - def _supports_stripingv2(self): + def _supports_stripingv2(self) -> bool: """Determine if striping is supported by our version of librbd.""" return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2') @property - def _supports_exclusive_lock(self): + def _supports_exclusive_lock(self) -> bool: """Determine if exclusive-lock is supported by librbd.""" return hasattr(self.rbd, 'RBD_FEATURE_EXCLUSIVE_LOCK') @property - def _supports_journaling(self): + def _supports_journaling(self) -> bool: """Determine if journaling is supported by our version of librbd.""" return hasattr(self.rbd, 'RBD_FEATURE_JOURNALING') @property - def _supports_fast_diff(self): + def _supports_fast_diff(self) -> bool: """Determine if fast-diff is supported by our version of librbd.""" return hasattr(self.rbd, 'RBD_FEATURE_FAST_DIFF') - def _get_rbd_support(self): + def _get_rbd_support(self) -> Tuple[bool, int]: """Determine RBD features supported by our version of librbd.""" old_format = True features = 0 @@ -278,7 +281,7 @@ class CephBackupDriver(driver.BackupDriver): return (old_format, features) - def check_for_setup_error(self): + def check_for_setup_error(self) -> None: """Returns an error if prerequisites aren't met.""" if rados is None or rbd is None: msg = _('rados and rbd python libraries not found') @@ -306,7 +309,9 @@ class CephBackupDriver(driver.BackupDriver): "not support journaling") ) - def _connect_to_rados(self, pool=None): + def _connect_to_rados(self, + pool: Optional[str] = None) -> Tuple['rados.Rados', + 'rados.Ioctx']: """Establish connection to the backup Ceph cluster.""" client = eventlet.tpool.Proxy(self.rados.Rados( rados_id=self._ceph_backup_user, @@ -321,17 +326,22 @@ class CephBackupDriver(driver.BackupDriver): client.shutdown() raise - def _disconnect_from_rados(self, client, ioctx): + def _disconnect_from_rados(self, + client: 'rados.Rados', + ioctx: 'rados.Ioctx') -> None: """Terminate connection with the backup Ceph cluster.""" # closing an ioctx cannot raise an exception ioctx.close() client.shutdown() - def _format_base_name(self, service_metadata): + def _format_base_name(self, service_metadata: str) -> str: base_name = json.loads(service_metadata)["base"] return utils.convert_str(base_name) - def _get_backup_base_name(self, volume_id, backup=None): + def _get_backup_base_name( + self, + volume_id: str, + backup: Optional['objects.Backup'] = None) -> str: """Return name of base image used for backup. Incremental backups use a new base name so we support old and new style @@ -361,7 +371,10 @@ class CephBackupDriver(driver.BackupDriver): return utils.convert_str("volume-%s.backup.%s" % (volume_id, backup.id)) - def _discard_bytes(self, volume, offset, length): + def _discard_bytes(self, + volume: linuxrbd.RBDVolumeIOWrapper, + offset: int, + length: int) -> None: """Trim length bytes from offset. If the volume is an rbd do a discard() otherwise assume it is a file @@ -394,7 +407,12 @@ class CephBackupDriver(driver.BackupDriver): volume.write(zeroes) volume.flush() - def _transfer_data(self, src, src_name, dest, dest_name, length): + def _transfer_data(self, + src: linuxrbd.RBDVolumeIOWrapper, + src_name: str, + dest: linuxrbd.RBDVolumeIOWrapper, + dest_name: str, + length: int) -> None: """Transfer data between files (Python IO objects).""" LOG.debug("Transferring data between '%(src)s' and '%(dest)s'", {'src': src_name, 'dest': dest_name}) @@ -436,7 +454,10 @@ class CephBackupDriver(driver.BackupDriver): dest.write(data) dest.flush() - def _create_base_image(self, name, size, rados_client): + def _create_base_image(self, + name: str, + size: int, + rados_client: 'rados.Rados') -> None: """Create a base backup image. This will be the base image used for storing differential exports. @@ -452,7 +473,10 @@ class CephBackupDriver(driver.BackupDriver): stripe_unit=self.rbd_stripe_unit, stripe_count=self.rbd_stripe_count) - def _delete_backup_snapshot(self, rados_client, base_name, backup_id): + def _delete_backup_snapshot(self, + rados_client: 'rados.Rados', + base_name: Optional[str], + backup_id: str) -> Tuple[Optional[str], int]: """Delete snapshot associated with this backup if one exists. A backup should have at most ONE associated snapshot. @@ -484,7 +508,9 @@ class CephBackupDriver(driver.BackupDriver): return snap_name, remaining_snaps - def _try_delete_base_image(self, backup, base_name=None): + def _try_delete_base_image(self, + backup: 'objects.Backup', + base_name: Optional[str] = None) -> None: """Try to delete backup RBD image. If the rbd image is a base image for incremental backups, it may have @@ -580,7 +606,7 @@ class CephBackupDriver(driver.BackupDriver): finally: src_rbd.close() - def _piped_execute(self, cmd1, cmd2): + def _piped_execute(self, cmd1: list, cmd2: list) -> Tuple[int, bytes]: """Pipe output of cmd1 into cmd2.""" LOG.debug("Piping cmd1='%s' into...", ' '.join(cmd1)) LOG.debug("cmd2='%s'", ' '.join(cmd2)) @@ -596,6 +622,7 @@ class CephBackupDriver(driver.BackupDriver): # NOTE(dosaboy): ensure that the pipe is blocking. This is to work # around the case where evenlet.green.subprocess is used which seems to # use a non-blocking pipe. + assert p1.stdout is not None flags = fcntl.fcntl(p1.stdout, fcntl.F_GETFL) & (~os.O_NONBLOCK) fcntl.fcntl(p1.stdout, fcntl.F_SETFL, flags) @@ -612,9 +639,12 @@ class CephBackupDriver(driver.BackupDriver): stdout, stderr = p2.communicate() return p2.returncode, stderr - def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool, - src_user, src_conf, dest_user, dest_conf, - src_snap=None, from_snap=None): + def _rbd_diff_transfer(self, src_name: str, src_pool: str, + dest_name: str, dest_pool: str, + src_user: str, src_conf: Optional[str], + dest_user: str, dest_conf: Optional[str], + src_snap: Optional[str] = None, + from_snap: Optional[str] = None) -> None: """Copy only extents changed between two points. If no snapshot is provided, the diff extents will be all those changed @@ -653,8 +683,10 @@ class CephBackupDriver(driver.BackupDriver): LOG.info(msg) raise exception.BackupRBDOperationFailed(msg) - def _rbd_image_exists(self, name, volume_id, client, - try_diff_format=False): + def _rbd_image_exists( + self, name: str, volume_id: str, + client: 'rados.Rados', + try_diff_format: Optional[bool] = False) -> Tuple[bool, str]: """Return tuple (exists, name).""" rbds = eventlet.tpool.Proxy(self.rbd.RBD()).list(client.ioctx) if name not in rbds: @@ -669,7 +701,10 @@ class CephBackupDriver(driver.BackupDriver): return True, name - def _snap_exists(self, base_name, snap_name, client): + def _snap_exists(self, + base_name: str, + snap_name: str, + client: 'rados.Rados') -> bool: """Return True if snapshot exists in base image.""" base_rbd = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx, base_name, read_only=True)) @@ -687,7 +722,10 @@ class CephBackupDriver(driver.BackupDriver): return False - def _full_rbd_backup(self, container, base_name, length): + def _full_rbd_backup(self, + container: str, + base_name: str, + length: int) -> Tuple[Optional[str], bool]: """Create the base_image for a full RBD backup.""" with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self, container)) as client: @@ -697,8 +735,10 @@ class CephBackupDriver(driver.BackupDriver): # base image. return None, True - def _incremental_rbd_backup(self, backup, base_name, length, - source_rbd_image, volume_id): + def _incremental_rbd_backup( + self, backup: 'objects.Backup', + base_name: str, length: int, + source_rbd_image, volume_id: str) -> Tuple[Optional[str], bool]: """Select the last snapshot for a RBD incremental backup.""" container = backup.container @@ -735,7 +775,10 @@ class CephBackupDriver(driver.BackupDriver): return from_snap, False - def _backup_rbd(self, backup, volume_file, volume_name, length): + def _backup_rbd(self, + backup: 'objects.Backup', + volume_file: linuxrbd.RBDVolumeIOWrapper, + volume_name: str, length: int) -> Dict[str, str]: """Create an incremental or full backup from an RBD image.""" rbd_user = volume_file.rbd_user rbd_pool = volume_file.rbd_pool @@ -806,11 +849,13 @@ class CephBackupDriver(driver.BackupDriver): return {'service_metadata': '{"base": "%s"}' % base_name} - def _file_is_rbd(self, volume_file): + def _file_is_rbd(self, volume_file: linuxrbd.RBDVolumeIOWrapper) -> bool: """Returns True if the volume_file is actually an RBD image.""" return hasattr(volume_file, 'rbd_image') - def _full_backup(self, backup, src_volume, src_name, length): + def _full_backup(self, backup: 'objects.Backup', + src_volume: linuxrbd.RBDVolumeIOWrapper, + src_name: str, length: int) -> None: """Perform a full backup of src volume. First creates a base backup image in our backup location then performs @@ -855,7 +900,7 @@ class CephBackupDriver(driver.BackupDriver): dest_rbd.close() @staticmethod - def backup_snapshot_name_pattern(): + def backup_snapshot_name_pattern() -> str: """Returns the pattern used to match backup snapshots. It is essential that snapshots created for purposes other than backups @@ -864,7 +909,8 @@ class CephBackupDriver(driver.BackupDriver): return r"^backup\.([a-z0-9\-]+?)\.snap\.(.+)$" @classmethod - def get_backup_snaps(cls, rbd_image, sort=False): + def get_backup_snaps(cls, rbd_image: 'rbd.Image', + sort: bool = False) -> List[dict]: """Get all backup snapshots for the given rbd image. NOTE: this call is made public since these snapshots must be deleted @@ -887,11 +933,12 @@ class CephBackupDriver(driver.BackupDriver): return backup_snaps - def _get_new_snap_name(self, backup_id): + def _get_new_snap_name(self, backup_id: str) -> str: return utils.convert_str("backup.%s.snap.%s" % (backup_id, time.time())) - def _get_backup_snap_name(self, rbd_image, name, backup_id): + def _get_backup_snap_name(self, rbd_image: 'rbd.Image', + name: Optional[str], backup_id: str): """Return the name of the snapshot associated with backup_id. The rbd image provided must be the base image used for an incremental @@ -924,7 +971,7 @@ class CephBackupDriver(driver.BackupDriver): LOG.debug("Found snapshot '%s'", snaps[0]) return snaps[0] - def _get_volume_size_bytes(self, volume): + def _get_volume_size_bytes(self, volume: 'objects.Volume') -> int: """Return the size in bytes of the given volume. Raises exception.InvalidParameterValue if volume size is 0. @@ -935,7 +982,7 @@ class CephBackupDriver(driver.BackupDriver): return int(volume['size']) * units.Gi - def _backup_metadata(self, backup): + def _backup_metadata(self, backup: 'objects.Backup') -> None: """Backup volume metadata. NOTE(dosaboy): the metadata we are backing up is obtained from a @@ -958,7 +1005,9 @@ class CephBackupDriver(driver.BackupDriver): msg = (_("Failed to backup volume metadata - %s") % e) raise exception.BackupOperationError(msg) - def backup(self, backup, volume_file, backup_metadata=True): + def backup(self, backup: 'objects.Backup', + volume_file: linuxrbd.RBDVolumeIOWrapper, + backup_metadata: bool = True) -> dict: """Backup volume and metadata (if available) to Ceph object store. If the source volume is an RBD we will attempt to do an @@ -1017,8 +1066,11 @@ class CephBackupDriver(driver.BackupDriver): return updates - def _full_restore(self, backup, dest_file, dest_name, length, - src_snap=None): + def _full_restore(self, backup: 'objects.Backup', + dest_file, + dest_name: str, + length: int, + src_snap=None) -> None: """Restore volume using full copy i.e. all extents. This will result in all extents being copied from source to @@ -1050,8 +1102,9 @@ class CephBackupDriver(driver.BackupDriver): finally: src_rbd.close() - def _check_restore_vol_size(self, backup, restore_vol, restore_length, - src_pool): + def _check_restore_vol_size(self, backup: 'objects.Backup', + restore_vol, restore_length: int, + src_pool) -> None: """Ensure that the restore volume is the correct size. If the restore volume was bigger than the backup, the diff restore will @@ -1077,8 +1130,11 @@ class CephBackupDriver(driver.BackupDriver): LOG.debug("Adjusting restore vol size") restore_vol.rbd_image.resize(adjust_size) - def _diff_restore_rbd(self, backup, restore_file, restore_name, - restore_point, restore_length): + def _diff_restore_rbd(self, backup: 'objects.Backup', + restore_file, + restore_name: str, + restore_point: Optional[str], + restore_length: int) -> None: """Attempt restore rbd volume from backup using diff transfer.""" rbd_user = restore_file.rbd_user rbd_pool = restore_file.rbd_pool @@ -1111,7 +1167,9 @@ class CephBackupDriver(driver.BackupDriver): LOG.debug("Restore transfer completed in %.4fs", (time.time() - before)) - def _get_restore_point(self, base_name, backup_id): + def _get_restore_point(self, + base_name: str, + backup_id: str) -> Optional[str]: """Get restore point snapshot name for incremental backup. If the backup was not incremental (determined by the fact that the @@ -1130,7 +1188,7 @@ class CephBackupDriver(driver.BackupDriver): return restore_point - def _rbd_has_extents(self, rbd_volume): + def _rbd_has_extents(self, rbd_volume) -> bool: """Check whether the given rbd volume has extents. Return True if has extents, otherwise False. @@ -1149,8 +1207,11 @@ class CephBackupDriver(driver.BackupDriver): return False - def _diff_restore_allowed(self, base_name, backup, volume, volume_file, - rados_client): + def _diff_restore_allowed(self, base_name: str, backup: 'objects.Backup', + volume: 'objects.Volume', + volume_file: linuxrbd.RBDVolumeIOWrapper, + rados_client: 'rados.Rados' + ) -> Tuple[bool, Optional[str]]: """Determine if differential restore is possible and restore point. Determine whether a differential restore is possible/allowed, @@ -1209,7 +1270,10 @@ class CephBackupDriver(driver.BackupDriver): 'volume': backup.volume_id}) return False, restore_point - def _restore_volume(self, backup, volume, volume_file): + def _restore_volume(self, + backup: 'objects.Backup', + volume: 'objects.Volume', + volume_file: linuxrbd.RBDVolumeIOWrapper) -> None: """Restore volume from backup using diff transfer if possible. Attempts a differential restore and reverts to full copy if diff fails. @@ -1245,7 +1309,9 @@ class CephBackupDriver(driver.BackupDriver): self._full_restore(backup, volume_file, volume.name, length, src_snap=restore_point) - def _restore_metadata(self, backup, volume_id): + def _restore_metadata(self, + backup: 'objects.Backup', + volume_id: str) -> None: """Restore volume metadata from backup. If this backup has associated metadata, save it to the restore target @@ -1265,7 +1331,10 @@ class CephBackupDriver(driver.BackupDriver): LOG.error(msg) raise exception.BackupOperationError(msg) - def restore(self, backup, volume_id, volume_file): + def restore(self, + backup: 'objects.Backup', + volume_id: str, + volume_file: linuxrbd.RBDVolumeIOWrapper) -> None: """Restore volume from backup in Ceph object store. If volume metadata is available this will also be restored. @@ -1296,7 +1365,7 @@ class CephBackupDriver(driver.BackupDriver): '%(error)s.', {'error': e, 'volume': volume_id}) raise - def delete_backup(self, backup): + def delete_backup(self, backup: 'objects.Backup') -> None: """Delete the given backup from Ceph object store.""" LOG.debug('Delete started for backup=%s', backup.id) diff --git a/mypy-files.txt b/mypy-files.txt index 5ec1c0d76ef..27587898a7b 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -1,5 +1,6 @@ cinder/backup/api.py cinder/backup/manager.py +cinder/backup/drivers/ceph.py cinder/common/constants.py cinder/context.py cinder/coordination.py