Merge "mypy: RBD driver"
This commit is contained in:
commit
e8ad14365d
@ -19,6 +19,8 @@ import json
|
||||
import math
|
||||
import os
|
||||
import tempfile
|
||||
import typing
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union # noqa: H301
|
||||
|
||||
from castellan import key_manager
|
||||
from eventlet import tpool
|
||||
@ -36,15 +38,19 @@ try:
|
||||
except ImportError:
|
||||
rados = None
|
||||
rbd = None
|
||||
import six
|
||||
from six.moves import urllib
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder.i18n import _
|
||||
from cinder.image import image_utils
|
||||
from cinder import interface
|
||||
from cinder import objects
|
||||
from cinder.objects.backup import Backup
|
||||
from cinder.objects import fields
|
||||
from cinder.objects.snapshot import Snapshot
|
||||
from cinder.objects.volume import Volume
|
||||
from cinder.objects.volume_type import VolumeType
|
||||
from cinder import utils
|
||||
from cinder.volume import configuration
|
||||
from cinder.volume import driver
|
||||
@ -150,9 +156,16 @@ class RBDVolumeProxy(object):
|
||||
The underlying librados client and ioctx can be accessed as the attributes
|
||||
'client' and 'ioctx'.
|
||||
"""
|
||||
def __init__(self, driver, name, pool=None, snapshot=None,
|
||||
read_only=False, remote=None, timeout=None,
|
||||
client=None, ioctx=None):
|
||||
def __init__(self,
|
||||
driver: 'RBDDriver',
|
||||
name: str,
|
||||
pool: str = None,
|
||||
snapshot: str = None,
|
||||
read_only: bool = False,
|
||||
remote: Optional[Dict[str, str]] = None,
|
||||
timeout: int = None,
|
||||
client: 'rados.Rados' = None,
|
||||
ioctx: 'rados.Ioctx' = None):
|
||||
self._close_conn = not (client and ioctx)
|
||||
rados_client, rados_ioctx = driver._connect_to_rados(
|
||||
pool, remote, timeout) if self._close_conn else (client, ioctx)
|
||||
@ -173,34 +186,37 @@ class RBDVolumeProxy(object):
|
||||
self.client = rados_client
|
||||
self.ioctx = rados_ioctx
|
||||
|
||||
def __enter__(self):
|
||||
def __enter__(self) -> 'RBDVolumeProxy':
|
||||
return self
|
||||
|
||||
def __exit__(self, type_, value, traceback):
|
||||
def __exit__(self,
|
||||
type_: Optional[Any],
|
||||
value: Optional[Any],
|
||||
traceback: Optional[Any]) -> None:
|
||||
try:
|
||||
self.volume.close()
|
||||
finally:
|
||||
if self._close_conn:
|
||||
self.driver._disconnect_from_rados(self.client, self.ioctx)
|
||||
|
||||
def __getattr__(self, attrib):
|
||||
def __getattr__(self, attrib: str):
|
||||
return getattr(self.volume, attrib)
|
||||
|
||||
|
||||
class RADOSClient(object):
|
||||
"""Context manager to simplify error handling for connecting to ceph."""
|
||||
def __init__(self, driver, pool=None):
|
||||
def __init__(self, driver: 'RBDDriver', pool: str = None) -> None:
|
||||
self.driver = driver
|
||||
self.cluster, self.ioctx = driver._connect_to_rados(pool)
|
||||
|
||||
def __enter__(self):
|
||||
def __enter__(self) -> 'RADOSClient':
|
||||
return self
|
||||
|
||||
def __exit__(self, type_, value, traceback):
|
||||
def __exit__(self, type_, value, traceback) -> None:
|
||||
self.driver._disconnect_from_rados(self.cluster, self.ioctx)
|
||||
|
||||
@property
|
||||
def features(self):
|
||||
def features(self) -> int:
|
||||
features = self.cluster.conf_get('rbd_default_features')
|
||||
if ((features is None) or (int(features) == 0)):
|
||||
features = self.driver.RBD_FEATURE_LAYERING
|
||||
@ -229,10 +245,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
RBD_FEATURE_JOURNALING = 64
|
||||
STORAGE_PROTOCOL = 'ceph'
|
||||
|
||||
def __init__(self, active_backend_id=None, *args, **kwargs):
|
||||
def __init__(self,
|
||||
active_backend_id: str = None,
|
||||
*args,
|
||||
**kwargs) -> None:
|
||||
super(RBDDriver, self).__init__(*args, **kwargs)
|
||||
self.configuration.append_config_values(RBD_OPTS)
|
||||
self._stats = {}
|
||||
self._stats: Dict[str, Union[str, bool]] = {}
|
||||
# allow overrides for testing
|
||||
self.rados = kwargs.get('rados', rados)
|
||||
self.rbd = kwargs.get('rbd', rbd)
|
||||
@ -247,12 +266,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
self._backend_name = (self.configuration.volume_backend_name or
|
||||
self.__class__.__name__)
|
||||
self._active_backend_id = active_backend_id
|
||||
self._active_config = {}
|
||||
self._active_backend_id: Optional[str] = active_backend_id
|
||||
self._active_config: Dict[str, str] = {}
|
||||
self._is_replication_enabled = False
|
||||
self._replication_targets = []
|
||||
self._target_names = []
|
||||
self._clone_v2_api_checked = False
|
||||
self._replication_targets: list = []
|
||||
self._target_names: List[str] = []
|
||||
self._clone_v2_api_checked: bool = False
|
||||
|
||||
if self.rbd is not None:
|
||||
self.RBD_FEATURE_LAYERING = self.rbd.RBD_FEATURE_LAYERING
|
||||
@ -268,9 +287,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
self.RBD_FEATURE_OBJECT_MAP |
|
||||
self.RBD_FEATURE_EXCLUSIVE_LOCK)
|
||||
|
||||
self.keyring_data: Optional[str] = None
|
||||
self._set_keyring_attributes()
|
||||
|
||||
def _set_keyring_attributes(self):
|
||||
def _set_keyring_attributes(self) -> None:
|
||||
# The rbd_keyring_conf option is not available for OpenStack usage
|
||||
# for security reasons (OSSN-0085) and in OpenStack we use
|
||||
# rbd_secret_uuid or make sure that the keyring files are present on
|
||||
@ -281,8 +301,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
# file even if it's there (because we have removed the conf option
|
||||
# definition), but cinderlib will find it because it sets the option
|
||||
# directly as an attribute.
|
||||
self.keyring_file = getattr(self.configuration, 'rbd_keyring_conf',
|
||||
None)
|
||||
self.keyring_file: Optional[str] = getattr(self.configuration,
|
||||
'rbd_keyring_conf',
|
||||
None)
|
||||
|
||||
self.keyring_data = None
|
||||
try:
|
||||
@ -293,13 +314,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.debug('Cannot read RBD keyring file: %s.', self.keyring_file)
|
||||
|
||||
@classmethod
|
||||
def get_driver_options(cls):
|
||||
def get_driver_options(cls) -> list:
|
||||
additional_opts = cls._get_oslo_driver_opts(
|
||||
'replication_device', 'reserved_percentage',
|
||||
'max_over_subscription_ratio', 'volume_dd_blocksize')
|
||||
return RBD_OPTS + additional_opts
|
||||
|
||||
def _show_msg_check_clone_v2_api(self, volume_name):
|
||||
def _show_msg_check_clone_v2_api(self, volume_name: str) -> None:
|
||||
if not self._clone_v2_api_checked:
|
||||
self._clone_v2_api_checked = True
|
||||
with RBDVolumeProxy(self, volume_name, read_only=True) as volume:
|
||||
@ -315,7 +336,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
' compat version to mimic for better'
|
||||
' performance, fewer deletion issues')
|
||||
|
||||
def _get_target_config(self, target_id):
|
||||
def _get_target_config(self,
|
||||
target_id: Optional[str]) -> Dict[str,
|
||||
str]:
|
||||
"""Get a replication target from known replication targets."""
|
||||
for target in self._replication_targets:
|
||||
if target['name'] == target_id:
|
||||
@ -330,12 +353,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
raise exception.InvalidReplicationTarget(
|
||||
reason=_('RBD: Unknown failover target host %s.') % target_id)
|
||||
|
||||
def do_setup(self, context):
|
||||
def do_setup(self, context: context.RequestContext) -> None:
|
||||
"""Performs initialization steps that could raise exceptions."""
|
||||
self._do_setup_replication()
|
||||
self._active_config = self._get_target_config(self._active_backend_id)
|
||||
|
||||
def _do_setup_replication(self):
|
||||
def _do_setup_replication(self) -> None:
|
||||
replication_devices = self.configuration.safe_get(
|
||||
'replication_device')
|
||||
if replication_devices:
|
||||
@ -343,7 +366,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
self._is_replication_enabled = True
|
||||
self._target_names.append('default')
|
||||
|
||||
def _parse_replication_configs(self, replication_devices):
|
||||
def _parse_replication_configs(self,
|
||||
replication_devices: List[dict]) -> None:
|
||||
for replication_device in replication_devices:
|
||||
if 'backend_id' not in replication_device:
|
||||
msg = _('Missing backend_id in replication_device '
|
||||
@ -366,13 +390,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
self._replication_targets.append(replication_target)
|
||||
self._target_names.append(name)
|
||||
|
||||
def _get_config_tuple(self, remote=None):
|
||||
def _get_config_tuple(
|
||||
self,
|
||||
remote: Optional[Dict[str, str]] = None) \
|
||||
-> Tuple[Optional[str], Optional[str],
|
||||
Optional[str], Optional[str]]:
|
||||
if not remote:
|
||||
remote = self._active_config
|
||||
return (remote.get('name'), remote.get('conf'), remote.get('user'),
|
||||
remote.get('secret_uuid', None))
|
||||
|
||||
def _trash_purge(self):
|
||||
def _trash_purge(self) -> None:
|
||||
LOG.info("Purging trash for backend '%s'", self._backend_name)
|
||||
|
||||
def _err(vol_name: str, backend_name: str) -> None:
|
||||
@ -405,7 +433,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
vol.get('name'),
|
||||
self._backend_name)
|
||||
|
||||
def _start_periodic_tasks(self):
|
||||
def _start_periodic_tasks(self) -> None:
|
||||
if self.configuration.enable_deferred_deletion:
|
||||
LOG.info("Starting periodic trash purge for backend '%s'",
|
||||
self._backend_name)
|
||||
@ -414,7 +442,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
deferred_deletion_ptask.start(
|
||||
interval=self.configuration.deferred_deletion_purge_interval)
|
||||
|
||||
def check_for_setup_error(self):
|
||||
def check_for_setup_error(self) -> None:
|
||||
"""Returns an error if prerequisites aren't met."""
|
||||
if rados is None:
|
||||
msg = _('rados and rbd python libraries not found')
|
||||
@ -451,10 +479,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
self._start_periodic_tasks()
|
||||
|
||||
def RBDProxy(self):
|
||||
def RBDProxy(self) -> tpool.Proxy:
|
||||
return tpool.Proxy(self.rbd.RBD())
|
||||
|
||||
def _ceph_args(self):
|
||||
def _ceph_args(self) -> List[str]:
|
||||
args = []
|
||||
|
||||
name, conf, user, secret_uuid = self._get_config_tuple()
|
||||
@ -468,11 +496,18 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return args
|
||||
|
||||
def _connect_to_rados(self, pool=None, remote=None, timeout=None):
|
||||
def _connect_to_rados(self,
|
||||
pool: Optional[str] = None,
|
||||
remote: Optional[dict] = None,
|
||||
timeout: Optional[int] = None) -> \
|
||||
Tuple['rados.Rados', 'rados.Ioctx']:
|
||||
@utils.retry(exception.VolumeBackendAPIException,
|
||||
self.configuration.rados_connection_interval,
|
||||
self.configuration.rados_connection_retries)
|
||||
def _do_conn(pool, remote, timeout):
|
||||
def _do_conn(pool: Optional[str],
|
||||
remote: Optional[dict],
|
||||
timeout: Optional[int]) -> Tuple['rados.Rados',
|
||||
'rados.Ioctx']:
|
||||
name, conf, user, secret_uuid = self._get_config_tuple(remote)
|
||||
|
||||
if pool is not None:
|
||||
@ -494,10 +529,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
try:
|
||||
if timeout >= 0:
|
||||
timeout = six.text_type(timeout)
|
||||
client.conf_set('rados_osd_op_timeout', timeout)
|
||||
client.conf_set('rados_mon_op_timeout', timeout)
|
||||
client.conf_set('client_mount_timeout', timeout)
|
||||
t = str(timeout)
|
||||
client.conf_set('rados_osd_op_timeout', t)
|
||||
client.conf_set('rados_mon_op_timeout', t)
|
||||
client.conf_set('client_mount_timeout', t)
|
||||
|
||||
client.connect()
|
||||
ioctx = client.open_ioctx(pool)
|
||||
@ -510,12 +545,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return _do_conn(pool, remote, timeout)
|
||||
|
||||
def _disconnect_from_rados(self, client, ioctx):
|
||||
def _disconnect_from_rados(self,
|
||||
client: 'rados.Rados',
|
||||
ioctx: 'rados.Ioctx') -> None:
|
||||
# closing an ioctx cannot raise an exception
|
||||
ioctx.close()
|
||||
client.shutdown()
|
||||
|
||||
def _get_backup_snaps(self, rbd_image):
|
||||
def _get_backup_snaps(self, rbd_image) -> List:
|
||||
"""Get list of any backup snapshots that exist on this volume.
|
||||
|
||||
There should only ever be one but accept all since they need to be
|
||||
@ -528,7 +565,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
from cinder.backup.drivers import ceph
|
||||
return ceph.CephBackupDriver.get_backup_snaps(rbd_image)
|
||||
|
||||
def _get_mon_addrs(self):
|
||||
def _get_mon_addrs(self) -> Tuple[List[str], List[str]]:
|
||||
args = ['ceph', 'mon', 'dump', '--format=json']
|
||||
args.extend(self._ceph_args())
|
||||
out, _ = self._execute(*args)
|
||||
@ -536,7 +573,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
if lines[0].startswith('dumped monmap epoch'):
|
||||
lines = lines[1:]
|
||||
monmap = json.loads('\n'.join(lines))
|
||||
addrs = [mon['addr'] for mon in monmap['mons']]
|
||||
addrs: List[str] = [mon['addr'] for mon in monmap['mons']]
|
||||
hosts = []
|
||||
ports = []
|
||||
for addr in addrs:
|
||||
@ -546,7 +583,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
ports.append(port)
|
||||
return hosts, ports
|
||||
|
||||
def _get_usage_info(self):
|
||||
def _get_usage_info(self) -> int:
|
||||
"""Calculate provisioned volume space in GiB.
|
||||
|
||||
Stats report should send provisioned size of volumes (snapshot must not
|
||||
@ -572,7 +609,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
total_provisioned = math.ceil(float(total_provisioned) / units.Gi)
|
||||
return total_provisioned
|
||||
|
||||
def _get_pool_stats(self):
|
||||
def _get_pool_stats(self) -> Union[Tuple[str, str],
|
||||
Tuple[float, float]]:
|
||||
"""Gets pool free and total capacity in GiB.
|
||||
|
||||
Calculate free and total capacity of the pool based on the pool's
|
||||
@ -602,6 +640,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
pool_stats = [pool for pool in df_data['pools']
|
||||
if pool['name'] == pool_name][0]['stats']
|
||||
|
||||
total_capacity: float
|
||||
free_capacity: float
|
||||
quota_outbuf = encodeutils.safe_decode(quota_outbuf)
|
||||
bytes_quota = json.loads(quota_outbuf)['quota_max_bytes']
|
||||
# With quota the total is the quota limit and free is quota - used
|
||||
@ -624,7 +664,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return free_capacity, total_capacity
|
||||
|
||||
def _update_volume_stats(self):
|
||||
def _update_volume_stats(self) -> None:
|
||||
location_info = '%s:%s:%s:%s:%s' % (
|
||||
self.configuration.rbd_cluster_name,
|
||||
self.configuration.rbd_ceph_conf,
|
||||
@ -673,7 +713,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.exception('error refreshing volume stats')
|
||||
self._stats = stats
|
||||
|
||||
def _get_clone_depth(self, client, volume_name, depth=0):
|
||||
def _get_clone_depth(self,
|
||||
client: 'rados.Rados',
|
||||
volume_name: str,
|
||||
depth: int = 0) -> int:
|
||||
"""Returns the number of ancestral clones of the given volume."""
|
||||
parent_volume = self.rbd.Image(client.ioctx,
|
||||
volume_name,
|
||||
@ -689,7 +732,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return self._get_clone_depth(client, parent, depth + 1)
|
||||
|
||||
def _extend_if_required(self, volume, src_vref):
|
||||
def _extend_if_required(self, volume: Volume, src_vref: Volume) -> None:
|
||||
"""Extends a volume if required
|
||||
|
||||
In case src_vref size is smaller than the size if the requested
|
||||
@ -702,7 +745,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
'dst_size': volume.size})
|
||||
self._resize(volume)
|
||||
|
||||
def create_cloned_volume(self, volume, src_vref):
|
||||
def create_cloned_volume(
|
||||
self,
|
||||
volume: Volume,
|
||||
src_vref: Volume) -> Optional[Dict[str, Optional[str]]]:
|
||||
"""Create a cloned volume from another volume.
|
||||
|
||||
Since we are cloning from a volume and not a snapshot, we must first
|
||||
@ -722,7 +768,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
with RBDVolumeProxy(self, src_name, read_only=True) as vol:
|
||||
vol.copy(vol.ioctx, dest_name)
|
||||
self._extend_if_required(volume, src_vref)
|
||||
return
|
||||
return None
|
||||
|
||||
# Otherwise do COW clone.
|
||||
with RADOSClient(self) as client:
|
||||
@ -808,7 +854,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.debug("clone created successfully")
|
||||
return volume_update
|
||||
|
||||
def _enable_replication(self, volume):
|
||||
def _enable_replication(self, volume: Volume) -> Dict[str, str]:
|
||||
"""Enable replication for a volume.
|
||||
|
||||
Returns required volume update.
|
||||
@ -832,7 +878,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
return {'replication_status': fields.ReplicationStatus.ENABLED,
|
||||
'replication_driver_data': driver_data}
|
||||
|
||||
def _enable_multiattach(self, volume):
|
||||
def _enable_multiattach(self, volume: Volume) -> Dict[str, str]:
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
with RBDVolumeProxy(self, vol_name) as image:
|
||||
image_features = image.features()
|
||||
@ -842,7 +888,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
return {'provider_location':
|
||||
self._dumps({'saved_features': image_features})}
|
||||
|
||||
def _disable_multiattach(self, volume):
|
||||
def _disable_multiattach(self, volume: Volume) -> Dict[str, None]:
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
with RBDVolumeProxy(self, vol_name) as image:
|
||||
try:
|
||||
@ -859,7 +905,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return {'provider_location': None}
|
||||
|
||||
def _is_replicated_type(self, volume_type):
|
||||
def _is_replicated_type(self, volume_type: VolumeType) -> bool:
|
||||
try:
|
||||
extra_specs = volume_type.extra_specs
|
||||
LOG.debug('extra_specs: %s', extra_specs)
|
||||
@ -868,7 +914,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.debug('Unable to retrieve extra specs info')
|
||||
return False
|
||||
|
||||
def _is_multiattach_type(self, volume_type):
|
||||
def _is_multiattach_type(self, volume_type: VolumeType) -> bool:
|
||||
try:
|
||||
extra_specs = volume_type.extra_specs
|
||||
LOG.debug('extra_specs: %s', extra_specs)
|
||||
@ -877,7 +923,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.debug('Unable to retrieve extra specs info')
|
||||
return False
|
||||
|
||||
def _setup_volume(self, volume, volume_type=None):
|
||||
def _setup_volume(
|
||||
self,
|
||||
volume: Volume,
|
||||
volume_type: Optional[VolumeType] = None) -> Dict[str,
|
||||
Optional[str]]:
|
||||
|
||||
if volume_type:
|
||||
had_replication = self._is_replicated_type(volume.volume_type)
|
||||
@ -894,7 +944,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
msg = _('Replication and Multiattach are mutually exclusive.')
|
||||
raise RBDDriverException(reason=msg)
|
||||
|
||||
volume_update = dict()
|
||||
volume_update: dict = dict()
|
||||
|
||||
if want_replication:
|
||||
if had_multiattach:
|
||||
@ -924,7 +974,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return volume_update
|
||||
|
||||
def _create_encrypted_volume(self, volume, context):
|
||||
def _create_encrypted_volume(self,
|
||||
volume: Volume,
|
||||
context: context.RequestContext) -> None:
|
||||
"""Create an encrypted volume.
|
||||
|
||||
This works by creating an encrypted image locally,
|
||||
@ -972,11 +1024,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
cmd.extend(self._ceph_args())
|
||||
self._execute(*cmd)
|
||||
|
||||
def create_volume(self, volume):
|
||||
def create_volume(self, volume: Volume) -> Dict[str, Any]:
|
||||
"""Creates a logical volume."""
|
||||
|
||||
if volume.encryption_key_id:
|
||||
return self._create_encrypted_volume(volume, volume.obj_context)
|
||||
self._create_encrypted_volume(volume, volume.obj_context)
|
||||
return {}
|
||||
|
||||
size = int(volume.size) * units.Gi
|
||||
|
||||
@ -1004,13 +1057,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return volume_update
|
||||
|
||||
def _flatten(self, pool, volume_name):
|
||||
def _flatten(self, pool: str, volume_name: str) -> None:
|
||||
LOG.debug('flattening %(pool)s/%(img)s',
|
||||
dict(pool=pool, img=volume_name))
|
||||
with RBDVolumeProxy(self, volume_name, pool) as vol:
|
||||
vol.flatten()
|
||||
|
||||
def _get_stripe_unit(self, ioctx, volume_name):
|
||||
def _get_stripe_unit(self, ioctx: 'rados.Ioctx', volume_name: str) -> int:
|
||||
"""Return the correct stripe unit for a cloned volume.
|
||||
|
||||
A cloned volume must be created with a stripe unit at least as large
|
||||
@ -1029,7 +1082,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return max(image_stripe_unit, default_stripe_unit)
|
||||
|
||||
def _clone(self, volume, src_pool, src_image, src_snap):
|
||||
def _clone(self,
|
||||
volume: Volume,
|
||||
src_pool: str,
|
||||
src_image: str,
|
||||
src_snap: str) -> Dict[str, Optional[str]]:
|
||||
LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
|
||||
dict(pool=src_pool, img=src_image, snap=src_snap,
|
||||
dst=volume.name))
|
||||
@ -1056,7 +1113,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
volume_id=volume.id)
|
||||
return volume_update or {}
|
||||
|
||||
def _resize(self, volume, **kwargs):
|
||||
def _resize(self, volume: Volume, **kwargs: Any) -> None:
|
||||
size = kwargs.get('size', None)
|
||||
if not size:
|
||||
size = int(volume.size) * units.Gi
|
||||
@ -1064,14 +1121,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
with RBDVolumeProxy(self, volume.name) as vol:
|
||||
vol.resize(size)
|
||||
|
||||
def _calculate_new_size(self, size_diff, volume_name):
|
||||
def _calculate_new_size(self, size_diff: int, volume_name: str) -> int:
|
||||
with RBDVolumeProxy(self, volume_name) as vol:
|
||||
current_size_bytes = vol.volume.size()
|
||||
size_diff_bytes = size_diff * units.Gi
|
||||
new_size_bytes = current_size_bytes + size_diff_bytes
|
||||
return new_size_bytes
|
||||
|
||||
def create_volume_from_snapshot(self, volume, snapshot):
|
||||
def create_volume_from_snapshot(
|
||||
self,
|
||||
volume: Volume,
|
||||
snapshot: Snapshot) -> dict:
|
||||
"""Creates a volume from a snapshot."""
|
||||
volume_update = self._clone(volume, self.configuration.rbd_pool,
|
||||
snapshot.volume_name, snapshot.name)
|
||||
@ -1098,7 +1158,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
self._show_msg_check_clone_v2_api(snapshot.volume_name)
|
||||
return volume_update
|
||||
|
||||
def _delete_backup_snaps(self, rbd_image):
|
||||
def _delete_backup_snaps(self, rbd_image: 'rbd.Image') -> None:
|
||||
backup_snaps = self._get_backup_snaps(rbd_image)
|
||||
if backup_snaps:
|
||||
for snap in backup_snaps:
|
||||
@ -1106,7 +1166,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
else:
|
||||
LOG.debug("volume has no backup snaps")
|
||||
|
||||
def _get_clone_info(self, volume, volume_name, snap=None):
|
||||
def _get_clone_info(
|
||||
self,
|
||||
volume: 'rbd.Image',
|
||||
volume_name: str,
|
||||
snap: Optional[str] = None) -> Union[Tuple[str, str, str],
|
||||
Tuple[None, None, None]]:
|
||||
"""If volume is a clone, return its parent info.
|
||||
|
||||
Returns a tuple of (pool, parent, snap). A snapshot may optionally be
|
||||
@ -1132,7 +1197,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return (None, None, None)
|
||||
|
||||
def _get_children_info(self, volume, snap):
|
||||
def _get_children_info(self,
|
||||
volume: 'rbd.Image',
|
||||
snap: Optional[str]) -> List[tuple]:
|
||||
"""List children for the given snapshot of a volume(image).
|
||||
|
||||
Returns a list of (pool, image).
|
||||
@ -1147,7 +1214,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return children_list
|
||||
|
||||
def _delete_clone_parent_refs(self, client, parent_name, parent_snap):
|
||||
def _delete_clone_parent_refs(self,
|
||||
client: RADOSClient,
|
||||
parent_name: str,
|
||||
parent_snap: str) -> None:
|
||||
"""Walk back up the clone chain and delete references.
|
||||
|
||||
Deletes references i.e. deleted parent volumes and snapshots.
|
||||
@ -1183,9 +1253,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
# Now move up to grandparent if there is one
|
||||
if g_parent:
|
||||
g_parent_snap = typing.cast(str, g_parent_snap)
|
||||
self._delete_clone_parent_refs(client, g_parent, g_parent_snap)
|
||||
|
||||
def delete_volume(self, volume):
|
||||
def delete_volume(self, volume: Volume) -> None:
|
||||
"""Deletes a logical volume."""
|
||||
# NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
|
||||
# utf-8 otherwise librbd will barf.
|
||||
@ -1226,7 +1297,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
@utils.retry(self.rbd.ImageBusy,
|
||||
self.configuration.rados_connection_interval,
|
||||
self.configuration.rados_connection_retries)
|
||||
def _try_remove_volume(client, volume_name):
|
||||
def _try_remove_volume(client: Any, volume_name: str) -> None:
|
||||
if self.configuration.enable_deferred_deletion:
|
||||
delay = self.configuration.deferred_deletion_delay
|
||||
else:
|
||||
@ -1269,6 +1340,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
# references.
|
||||
if parent:
|
||||
LOG.debug("volume is a clone so cleaning references")
|
||||
parent_snap = typing.cast(str, parent_snap)
|
||||
self._delete_clone_parent_refs(client, parent, parent_snap)
|
||||
else:
|
||||
# If the volume has copy-on-write clones we will not be able to
|
||||
@ -1277,14 +1349,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
new_name = "%s.deleted" % (volume_name)
|
||||
self.RBDProxy().rename(client.ioctx, volume_name, new_name)
|
||||
|
||||
def create_snapshot(self, snapshot):
|
||||
def create_snapshot(self, snapshot: Snapshot) -> None:
|
||||
"""Creates an rbd snapshot."""
|
||||
with RBDVolumeProxy(self, snapshot.volume_name) as volume:
|
||||
snap = utils.convert_str(snapshot.name)
|
||||
volume.create_snap(snap)
|
||||
volume.protect_snap(snap)
|
||||
|
||||
def delete_snapshot(self, snapshot):
|
||||
def delete_snapshot(self, snapshot: Snapshot) -> None:
|
||||
"""Deletes an rbd snapshot."""
|
||||
# NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
|
||||
# utf-8 otherwise librbd will barf.
|
||||
@ -1320,11 +1392,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.info("Snapshot %s does not exist in backend.",
|
||||
snap_name)
|
||||
|
||||
def snapshot_revert_use_temp_snapshot(self):
|
||||
def snapshot_revert_use_temp_snapshot(self) -> bool:
|
||||
"""Disable the use of a temporary snapshot on revert."""
|
||||
return False
|
||||
|
||||
def revert_to_snapshot(self, context, volume, snapshot):
|
||||
def revert_to_snapshot(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
snapshot: Snapshot) -> None:
|
||||
"""Revert a volume to a given snapshot."""
|
||||
# NOTE(rosmaita): The Ceph documentation notes that this operation is
|
||||
# inefficient on the backend for large volumes, and that the preferred
|
||||
@ -1346,7 +1421,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
with RBDVolumeProxy(self, volume.name) as image:
|
||||
image.rollback_to_snap(snapshot.name)
|
||||
|
||||
def _disable_replication(self, volume):
|
||||
def _disable_replication(self, volume: Volume) -> Dict[str, Optional[str]]:
|
||||
"""Disable replication on the given volume."""
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
with RBDVolumeProxy(self, vol_name) as image:
|
||||
@ -1363,14 +1438,24 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
return {'replication_status': fields.ReplicationStatus.DISABLED,
|
||||
'replication_driver_data': None}
|
||||
|
||||
def retype(self, context, volume, new_type, diff, host):
|
||||
def retype(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
new_type: VolumeType,
|
||||
diff: Union[Dict[str, Dict[str, str]], Dict[str, Dict], None],
|
||||
host: Optional[Dict[str, str]]) -> Tuple[bool, dict]:
|
||||
"""Retype from one volume type to another on the same backend."""
|
||||
return True, self._setup_volume(volume, new_type)
|
||||
|
||||
def _dumps(self, obj):
|
||||
def _dumps(self, obj: Dict[str, Union[bool, int]]) -> str:
|
||||
return json.dumps(obj, separators=(',', ':'), sort_keys=True)
|
||||
|
||||
def _exec_on_volume(self, volume_name, remote, operation, *args, **kwargs):
|
||||
def _exec_on_volume(self,
|
||||
volume_name: str,
|
||||
remote: Dict[str, str],
|
||||
operation: str,
|
||||
*args: Any,
|
||||
**kwargs: Any):
|
||||
@utils.retry(rbd.ImageBusy,
|
||||
self.configuration.rados_connection_interval,
|
||||
self.configuration.rados_connection_retries)
|
||||
@ -1381,7 +1466,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
return getattr(rbd_image, operation)(*args, **kwargs)
|
||||
return _do_exec()
|
||||
|
||||
def _failover_volume(self, volume, remote, is_demoted, replication_status):
|
||||
def _failover_volume(self,
|
||||
volume: Volume,
|
||||
remote: Dict[str, str],
|
||||
is_demoted: bool,
|
||||
replication_status: str) -> Dict[str, Any]:
|
||||
"""Process failover for a volume.
|
||||
|
||||
There are 2 different cases that will return different update values
|
||||
@ -1420,7 +1509,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return error_result
|
||||
|
||||
def _demote_volumes(self, volumes, until_failure=True):
|
||||
def _demote_volumes(self,
|
||||
volumes: List[Volume],
|
||||
until_failure: bool = True) -> List[bool]:
|
||||
"""Try to demote volumes on the current primary cluster."""
|
||||
result = []
|
||||
try_demoting = True
|
||||
@ -1440,7 +1531,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
result.append(demoted)
|
||||
return result
|
||||
|
||||
def _get_failover_target_config(self, secondary_id=None):
|
||||
def _get_failover_target_config(self,
|
||||
secondary_id: str = None) -> Tuple[str,
|
||||
dict]:
|
||||
if not secondary_id:
|
||||
# In auto mode exclude failback and active
|
||||
candidates = set(self._target_names).difference(
|
||||
@ -1451,7 +1544,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
secondary_id = candidates.pop()
|
||||
return secondary_id, self._get_target_config(secondary_id)
|
||||
|
||||
def failover(self, context, volumes, secondary_id=None, groups=None):
|
||||
def failover(self,
|
||||
context: context.RequestContext,
|
||||
volumes: list,
|
||||
secondary_id: Optional[str] = None,
|
||||
groups=None) -> Tuple[str, list, list]:
|
||||
"""Failover replicated volumes."""
|
||||
LOG.info('RBD driver failover started.')
|
||||
if not self._is_replication_enabled:
|
||||
@ -1476,7 +1573,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.info('RBD driver failover completed.')
|
||||
return secondary_id, updates, []
|
||||
|
||||
def failover_completed(self, context, secondary_id=None):
|
||||
def failover_completed(self,
|
||||
context: context.RequestContext,
|
||||
secondary_id: Optional[str] = None) -> None:
|
||||
"""Failover to replication target."""
|
||||
LOG.info('RBD driver failover completion started.')
|
||||
secondary_id, remote = self._get_failover_target_config(secondary_id)
|
||||
@ -1485,7 +1584,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
self._active_config = remote
|
||||
LOG.info('RBD driver failover completion completed.')
|
||||
|
||||
def failover_host(self, context, volumes, secondary_id=None, groups=None):
|
||||
def failover_host(self,
|
||||
context: context.RequestContext,
|
||||
volumes: List[Volume],
|
||||
secondary_id: str = None,
|
||||
groups: Optional[List] = None) -> Tuple[str,
|
||||
List[Volume],
|
||||
List]:
|
||||
"""Failover to replication target.
|
||||
|
||||
This function combines calls to failover() and failover_completed() to
|
||||
@ -1496,19 +1601,28 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
self.failover_completed(context, secondary_id)
|
||||
return active_backend_id, volume_update_list, group_update_list
|
||||
|
||||
def ensure_export(self, context, volume):
|
||||
def ensure_export(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume):
|
||||
"""Synchronously recreates an export for a logical volume."""
|
||||
pass
|
||||
|
||||
def create_export(self, context, volume, connector):
|
||||
def create_export(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
connector: dict):
|
||||
"""Exports the volume."""
|
||||
pass
|
||||
|
||||
def remove_export(self, context, volume):
|
||||
def remove_export(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume):
|
||||
"""Removes an export for a logical volume."""
|
||||
pass
|
||||
|
||||
def initialize_connection(self, volume, connector):
|
||||
def initialize_connection(self,
|
||||
volume: Volume,
|
||||
connector: dict) -> Dict[str, Any]:
|
||||
hosts, ports = self._get_mon_addrs()
|
||||
name, conf, user, secret_uuid = self._get_config_tuple()
|
||||
data = {
|
||||
@ -1528,14 +1642,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
}
|
||||
}
|
||||
if self.keyring_data:
|
||||
data['data']['keyring'] = self.keyring_data
|
||||
data['data']['keyring'] = self.keyring_data # type: ignore
|
||||
LOG.debug('connection data: %s', data)
|
||||
return data
|
||||
|
||||
def terminate_connection(self, volume, connector, **kwargs):
|
||||
def terminate_connection(self,
|
||||
volume: Volume,
|
||||
connector: dict,
|
||||
**kwargs) -> None:
|
||||
pass
|
||||
|
||||
def _parse_location(self, location):
|
||||
def _parse_location(self, location: str) -> List[str]:
|
||||
prefix = 'rbd://'
|
||||
if not location.startswith(prefix):
|
||||
reason = _('Not stored in rbd')
|
||||
@ -1550,7 +1667,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
raise exception.ImageUnacceptable(image_id=location, reason=reason)
|
||||
return pieces
|
||||
|
||||
def _get_fsid(self):
|
||||
def _get_fsid(self) -> str:
|
||||
with RADOSClient(self) as client:
|
||||
# Librados's get_fsid is represented as binary
|
||||
# in py3 instead of str as it is in py2.
|
||||
@ -1567,7 +1684,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
# https://tracker.ceph.com/issues/38381
|
||||
return encodeutils.safe_decode(client.cluster.get_fsid())
|
||||
|
||||
def _is_cloneable(self, image_location, image_meta):
|
||||
def _is_cloneable(self, image_location: str, image_meta: dict) -> bool:
|
||||
try:
|
||||
fsid, pool, image, snapshot = self._parse_location(image_location)
|
||||
except exception.ImageUnacceptable as e:
|
||||
@ -1597,9 +1714,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
dict(loc=image_location, err=e))
|
||||
return False
|
||||
|
||||
def clone_image(self, context, volume,
|
||||
image_location, image_meta,
|
||||
image_service):
|
||||
def clone_image(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
image_location: Optional[list],
|
||||
image_meta: dict,
|
||||
image_service) -> Tuple[dict, bool]:
|
||||
if image_location:
|
||||
# Note: image_location[0] is glance image direct_url.
|
||||
# image_location[1] contains the list of all locations (including
|
||||
@ -1623,16 +1743,29 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
return volume_update, True
|
||||
return ({}, False)
|
||||
|
||||
def copy_image_to_encrypted_volume(self, context, volume, image_service,
|
||||
image_id):
|
||||
def copy_image_to_encrypted_volume(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
image_service,
|
||||
image_id: str) -> None:
|
||||
self._copy_image_to_volume(context, volume, image_service, image_id,
|
||||
encrypted=True)
|
||||
|
||||
def copy_image_to_volume(self, context, volume, image_service, image_id):
|
||||
def copy_image_to_volume(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
image_service,
|
||||
image_id: str) -> None:
|
||||
self._copy_image_to_volume(context, volume, image_service, image_id)
|
||||
|
||||
def _encrypt_image(self, context, volume, tmp_dir, src_image_path):
|
||||
encryption = volume_utils.check_encryption_provider(volume, context)
|
||||
def _encrypt_image(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
tmp_dir: str,
|
||||
src_image_path: Any) -> None:
|
||||
encryption = volume_utils.check_encryption_provider(
|
||||
volume,
|
||||
context)
|
||||
|
||||
# Fetch the key associated with the volume and decode the passphrase
|
||||
keymgr = key_manager.API(CONF)
|
||||
@ -1663,8 +1796,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
finally:
|
||||
fileutils.delete_if_exists(dest_image_path)
|
||||
|
||||
def _copy_image_to_volume(self, context, volume, image_service, image_id,
|
||||
encrypted=False):
|
||||
def _copy_image_to_volume(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
image_service: Any,
|
||||
image_id: str,
|
||||
encrypted: bool = False) -> None:
|
||||
|
||||
tmp_dir = volume_utils.image_conversion_dir()
|
||||
|
||||
@ -1680,7 +1817,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
@utils.retry(exception.VolumeIsBusy,
|
||||
self.configuration.rados_connection_interval,
|
||||
self.configuration.rados_connection_retries)
|
||||
def _delete_volume(volume):
|
||||
def _delete_volume(volume: Volume) -> None:
|
||||
self.delete_volume(volume)
|
||||
|
||||
_delete_volume(volume)
|
||||
@ -1721,7 +1858,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
volume)
|
||||
os.unlink(tmp_file)
|
||||
|
||||
def extend_volume(self, volume, new_size):
|
||||
def extend_volume(self, volume: Volume, new_size: str) -> None:
|
||||
"""Extend an existing volume."""
|
||||
old_size = volume.size
|
||||
|
||||
@ -1737,7 +1874,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.",
|
||||
{'old_size': old_size, 'new_size': new_size})
|
||||
|
||||
def manage_existing(self, volume, existing_ref):
|
||||
def manage_existing(self,
|
||||
volume: Volume, existing_ref: Dict[str, str]) -> None:
|
||||
"""Manages an existing image.
|
||||
|
||||
Renames the image name to match the expected name for the volume.
|
||||
@ -1756,7 +1894,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
utils.convert_str(rbd_name),
|
||||
utils.convert_str(volume.name))
|
||||
|
||||
def manage_existing_get_size(self, volume, existing_ref):
|
||||
def manage_existing_get_size(self,
|
||||
volume: Volume,
|
||||
existing_ref: Dict[str, str]) -> int:
|
||||
"""Return size of an existing image for manage_existing.
|
||||
|
||||
:param volume:
|
||||
@ -1812,8 +1952,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
out, _ = self._execute(*args)
|
||||
return json.loads(out)
|
||||
|
||||
def get_manageable_volumes(self, cinder_volumes, marker, limit, offset,
|
||||
sort_keys, sort_dirs):
|
||||
def get_manageable_volumes(self,
|
||||
cinder_volumes: List[Dict[str, str]],
|
||||
marker: Optional[Any],
|
||||
limit: int,
|
||||
offset: int,
|
||||
sort_keys: List[str],
|
||||
sort_dirs: List[str]) -> List[Dict[str, Any]]:
|
||||
manageable_volumes = []
|
||||
cinder_ids = [resource['id'] for resource in cinder_volumes]
|
||||
|
||||
@ -1860,8 +2005,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
def unmanage(self, volume):
|
||||
pass
|
||||
|
||||
def update_migrated_volume(self, ctxt, volume, new_volume,
|
||||
original_volume_status):
|
||||
def update_migrated_volume(self,
|
||||
ctxt: Dict,
|
||||
volume: Volume,
|
||||
new_volume: Volume,
|
||||
original_volume_status: str) -> \
|
||||
Union[Dict[str, None], Dict[str, Optional[str]]]:
|
||||
"""Return model update from RBD for migrated volume.
|
||||
|
||||
This method should rename the back-end volume name(id) on the
|
||||
@ -1902,7 +2051,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
return {'_name_id': name_id,
|
||||
'provider_location': provider_location}
|
||||
|
||||
def migrate_volume(self, context, volume, host):
|
||||
def migrate_volume(self,
|
||||
context: context.RequestContext,
|
||||
volume: Volume,
|
||||
host: Dict[str, Dict[str, str]]) -> Tuple[bool, None]:
|
||||
|
||||
refuse_to_migrate = (False, None)
|
||||
|
||||
@ -1982,7 +2134,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
|
||||
return (True, None)
|
||||
|
||||
def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
|
||||
def manage_existing_snapshot_get_size(self,
|
||||
snapshot: Snapshot,
|
||||
existing_ref: Dict[str, Any]) -> int:
|
||||
"""Return size of an existing image for manage_existing.
|
||||
|
||||
:param snapshot:
|
||||
@ -2031,7 +2185,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
raise exception.VolumeBackendAPIException(
|
||||
data=exception_message)
|
||||
|
||||
def manage_existing_snapshot(self, snapshot, existing_ref):
|
||||
def manage_existing_snapshot(self,
|
||||
snapshot: Snapshot,
|
||||
existing_ref: Dict[str, Any]) -> None:
|
||||
"""Manages an existing snapshot.
|
||||
|
||||
Renames the snapshot name to match the expected name for the snapshot.
|
||||
@ -2053,8 +2209,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
if not volume.is_protected_snap(snapshot.name):
|
||||
volume.protect_snap(snapshot.name)
|
||||
|
||||
def get_manageable_snapshots(self, cinder_snapshots, marker, limit, offset,
|
||||
sort_keys, sort_dirs):
|
||||
def get_manageable_snapshots(self,
|
||||
cinder_snapshots: List[Dict[str, str]],
|
||||
marker: Optional[Any],
|
||||
limit: int,
|
||||
offset: int,
|
||||
sort_keys: List[str],
|
||||
sort_dirs: List[str]) -> List[Dict[str, Any]]:
|
||||
"""List manageable snapshots on RBD backend."""
|
||||
manageable_snapshots = []
|
||||
cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots]
|
||||
@ -2104,7 +2265,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
return volume_utils.paginate_entries_list(
|
||||
manageable_snapshots, marker, limit, offset, sort_keys, sort_dirs)
|
||||
|
||||
def unmanage_snapshot(self, snapshot):
|
||||
def unmanage_snapshot(self, snapshot: Snapshot) -> None:
|
||||
"""Removes the specified snapshot from Cinder management."""
|
||||
with RBDVolumeProxy(self, snapshot.volume_name) as volume:
|
||||
volume.set_snap(snapshot.name)
|
||||
@ -2113,7 +2274,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
|
||||
if not children and volume.is_protected_snap(snapshot.name):
|
||||
volume.unprotect_snap(snapshot.name)
|
||||
|
||||
def get_backup_device(self, context, backup):
|
||||
def get_backup_device(self,
|
||||
context: context.RequestContext,
|
||||
backup: Backup) -> Tuple[Volume, bool]:
|
||||
"""Get a backup device from an existing volume.
|
||||
|
||||
To support incremental backups on Ceph to Ceph we don't clone
|
||||
|
@ -28,6 +28,7 @@ cinder/scheduler/weights/volume_number.py
|
||||
cinder/utils.py
|
||||
cinder/volume/__init__.py
|
||||
cinder/volume/api.py
|
||||
cinder/volume/drivers/rbd.py
|
||||
cinder/volume/flows/api/create_volume.py
|
||||
cinder/volume/flows/manager/create_volume.py
|
||||
cinder/volume/manager.py
|
||||
|
Loading…
Reference in New Issue
Block a user