From 5179e4f6bf49795d2aa4c8d0807a502ee1561f60 Mon Sep 17 00:00:00 2001 From: Eric Harney Date: Fri, 29 Apr 2022 10:14:32 -0400 Subject: [PATCH] Use modern type annotation format for collections This works in Python 3.7 or greater and is cleaner looking. See PEP-585 for more info. https://peps.python.org/pep-0585/ Change-Id: I4c9da881cea1a3638da504c4b79ca8db13851b06 --- cinder/backup/api.py | 10 +- cinder/context.py | 6 +- cinder/flow_utils.py | 10 +- cinder/image/glance.py | 27 +++-- cinder/image/image_utils.py | 26 ++-- cinder/rpc.py | 6 +- cinder/scheduler/base_weight.py | 14 ++- cinder/scheduler/evaluator/evaluator.py | 6 +- cinder/scheduler/filter_scheduler.py | 14 ++- cinder/scheduler/flows/create_volume.py | 8 +- cinder/scheduler/host_manager.py | 23 ++-- cinder/utils.py | 22 ++-- cinder/volume/api.py | 51 ++++---- cinder/volume/drivers/rbd.py | 120 ++++++++++--------- cinder/volume/flows/api/create_volume.py | 25 ++-- cinder/volume/flows/manager/create_volume.py | 22 ++-- cinder/volume/manager.py | 42 +++---- cinder/volume/rpcapi.py | 6 +- cinder/volume/volume_types.py | 10 +- cinder/volume/volume_utils.py | 28 +++-- 20 files changed, 255 insertions(+), 221 deletions(-) diff --git a/cinder/backup/api.py b/cinder/backup/api.py index 52df30c60ca..7325fb60880 100644 --- a/cinder/backup/api.py +++ b/cinder/backup/api.py @@ -17,9 +17,11 @@ """Handles all requests relating to the volume backups service.""" +from __future__ import annotations + from datetime import datetime import random -from typing import List, Optional # noqa: H301 +from typing import Optional from eventlet import greenthread from oslo_config import cfg @@ -123,8 +125,8 @@ class API(base.Base): marker: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - sort_keys: Optional[List[str]] = None, - sort_dirs: Optional[List[str]] = None) -> 'objects.BackupList': + sort_keys: Optional[list[str]] = None, + sort_dirs: Optional[list[str]] = None) -> 'objects.BackupList': context.authorize(policy.GET_ALL_POLICY) search_opts = search_opts or {} @@ -200,7 +202,7 @@ class API(base.Base): raise exception.ServiceNotFound(service_id='cinder-backup') return backup_host - def _list_backup_services(self) -> List['objects.Service']: + def _list_backup_services(self) -> list['objects.Service']: """List all enabled backup services. :returns: list -- hosts for services that are enabled for backup. diff --git a/cinder/context.py b/cinder/context.py index 594c8606831..6cb7dda0acd 100644 --- a/cinder/context.py +++ b/cinder/context.py @@ -17,8 +17,10 @@ """RequestContext: context for requests that persist through all of cinder.""" +from __future__ import annotations + import copy -from typing import Any, Dict, Optional # noqa: H301 +from typing import Any, Optional # noqa: H301 from keystoneauth1.access import service_catalog as ksa_service_catalog from keystoneauth1 import plugin @@ -162,7 +164,7 @@ class RequestContext(context.RequestContext): read_deleted = property(_get_read_deleted, _set_read_deleted, _del_read_deleted) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: result = super(RequestContext, self).to_dict() result['user_id'] = self.user_id result['project_id'] = self.project_id diff --git a/cinder/flow_utils.py b/cinder/flow_utils.py index 576f8c5d379..17ea75bd389 100644 --- a/cinder/flow_utils.py +++ b/cinder/flow_utils.py @@ -10,8 +10,10 @@ # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + import os -from typing import Any, List # noqa: H301 +from typing import Any from oslo_log import log as logging # For more information please visit: https://wiki.openstack.org/wiki/TaskFlow @@ -25,7 +27,7 @@ from cinder import exception LOG = logging.getLogger(__name__) -def _make_task_name(cls, addons: List[str] = None) -> str: +def _make_task_name(cls, addons: list[str] = None) -> str: """Makes a pretty name for a task class.""" base_name = ".".join([cls.__module__, cls.__name__]) extra = '' @@ -41,11 +43,11 @@ class CinderTask(task.Task): implement the given task as the task name. """ - def __init__(self, addons: List[str] = None, **kwargs: Any) -> None: + def __init__(self, addons: list[str] = None, **kwargs: Any) -> None: super(CinderTask, self).__init__(self.make_name(addons), **kwargs) @classmethod - def make_name(cls, addons: List[str] = None) -> str: + def make_name(cls, addons: list[str] = None) -> str: return _make_task_name(cls, addons) diff --git a/cinder/image/glance.py b/cinder/image/glance.py index e762ed11dcb..cf32516f0ca 100644 --- a/cinder/image/glance.py +++ b/cinder/image/glance.py @@ -16,6 +16,8 @@ """Implementation of an image service that uses Glance as the backend""" +from __future__ import annotations # Remove when only supporting python 3.9+ + import copy import itertools import random @@ -23,8 +25,7 @@ import shutil import sys import textwrap import time -from typing import (Any, Callable, Dict, Iterable, List, # noqa: H301 - NoReturn, Optional, Tuple) # noqa: H301 +from typing import (Any, Callable, Iterable, NoReturn, Optional) # noqa: H301 import urllib import urllib.parse @@ -93,7 +94,7 @@ _SESSION = None LOG = logging.getLogger(__name__) -def _parse_image_ref(image_href: str) -> Tuple[str, str, bool]: +def _parse_image_ref(image_href: str) -> tuple[str, str, bool]: """Parse an image href into composite parts. :param image_href: href of an image @@ -283,7 +284,7 @@ class GlanceImageService(object): def detail(self, context: context.RequestContext, - **kwargs: str) -> List[dict]: + **kwargs: str) -> list[dict]: """Calls out to Glance for a list of detailed image information.""" params = self._extract_query_params(kwargs) try: @@ -298,7 +299,7 @@ class GlanceImageService(object): return _images - def _extract_query_params(self, params: dict) -> Dict[str, Any]: + def _extract_query_params(self, params: dict) -> dict[str, Any]: _params = {} accepted_params = ('filters', 'marker', 'limit', 'sort_key', 'sort_dir') @@ -310,7 +311,7 @@ class GlanceImageService(object): def list_members(self, context: context.RequestContext, - image_id: str) -> List[dict]: + image_id: str) -> list[dict]: """Returns a list of dicts with image member data.""" try: return self._client.call(context, @@ -330,7 +331,7 @@ class GlanceImageService(object): def show(self, context: context.RequestContext, - image_id: str) -> Dict[str, Any]: + image_id: str) -> dict[str, Any]: """Returns a dict with image data for the given opaque image id.""" try: image = self._client.call(context, 'get', image_id) @@ -345,7 +346,7 @@ class GlanceImageService(object): def get_location(self, context: context.RequestContext, - image_id: str) -> Tuple[Optional[str], Any]: + image_id: str) -> tuple[Optional[str], Any]: """Get backend storage location url. Returns a tuple containing the direct url and locations representing @@ -421,8 +422,8 @@ class GlanceImageService(object): def create(self, context: context.RequestContext, - image_meta: Dict[str, Any], - data=None) -> Dict[str, Any]: + image_meta: dict[str, Any], + data=None) -> dict[str, Any]: """Store the image data and return the new image object.""" sent_service_image_meta = self._translate_to_glance(image_meta) @@ -497,7 +498,7 @@ class GlanceImageService(object): def _translate_from_glance(self, context: context.RequestContext, - image: Dict[str, Any]) -> dict: + image: dict[str, Any]) -> dict: """Get image metadata from glance image. Extract metadata from image and convert it's properties @@ -536,7 +537,7 @@ class GlanceImageService(object): return image_meta @staticmethod - def _translate_to_glance(image_meta: Dict[str, Any]) -> Dict[str, Any]: + def _translate_to_glance(image_meta: dict[str, Any]) -> dict[str, Any]: image_meta = _convert_to_string(image_meta) image_meta = _remove_read_only(image_meta) @@ -684,7 +685,7 @@ def _translate_plain_exception(exc_value: BaseException) -> BaseException: def get_remote_image_service(context: context.RequestContext, - image_href) -> Tuple[GlanceImageService, str]: + image_href) -> tuple[GlanceImageService, str]: """Create an image_service and parse the id from the given image_href. The image_href param can be an href of the form diff --git a/cinder/image/image_utils.py b/cinder/image/image_utils.py index 6a2ea0704e2..b2ae795b02e 100644 --- a/cinder/image/image_utils.py +++ b/cinder/image/image_utils.py @@ -23,6 +23,7 @@ Some slight modifications, but at some point we should look at maybe pushing this up to Oslo """ +from __future__ import annotations # Remove when only supporting python 3.9+ import contextlib import errno @@ -31,8 +32,7 @@ import math import os import re import tempfile -from typing import (ContextManager, Dict, Generator, # noqa: H301 - List, Optional, Tuple) +from typing import ContextManager, Generator, Optional # noqa: H301 import cryptography from cursive import exception as cursive_exception @@ -152,7 +152,7 @@ def qemu_img_info(path: str, return info -def get_qemu_img_version() -> Optional[List[int]]: +def get_qemu_img_version() -> Optional[list[int]]: """The qemu-img version will be cached until the process is restarted.""" global QEMU_IMG_VERSION @@ -187,8 +187,7 @@ def _get_qemu_convert_luks_cmd(src: str, cipher_spec: Optional[dict] = None, passphrase_file: Optional[str] = None, src_passphrase_file: Optional[str] = None) \ - -> List[str]: - + -> list[str]: cmd = ['qemu-img', 'convert'] if prefix: @@ -223,8 +222,7 @@ def _get_qemu_convert_cmd(src: str, passphrase_file: Optional[str] = None, compress: bool = False, src_passphrase_file: Optional[str] = None) \ - -> List[str]: - + -> list[str]: if src_passphrase_file is not None: if passphrase_file is None: message = _("Can't create unencrypted volume %(format)s " @@ -290,7 +288,7 @@ def _get_qemu_convert_cmd(src: str, return cmd -def _get_version_from_string(version_string: str) -> List[int]: +def _get_version_from_string(version_string: str) -> list[int]: return [int(x) for x in version_string.split('.')] @@ -451,7 +449,7 @@ def resize_image(source: str, run_as_root: bool = False, file_format: Optional[str] = None) -> None: """Changes the virtual size of the image.""" - cmd: Tuple[str, ...] + cmd: tuple[str, ...] if file_format: cmd = ('qemu-img', 'resize', '-f', file_format, source, '%sG' % size) else: @@ -922,7 +920,7 @@ def extract_targz(archive_name: str, target: str) -> None: utils.execute('tar', '-xzf', archive_name, '-C', target) -def fix_vhd_chain(vhd_chain: List[str]) -> None: +def fix_vhd_chain(vhd_chain: list[str]) -> None: for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]): set_vhd_parent(child, parent) @@ -991,7 +989,7 @@ def temporary_dir() -> ContextManager[str]: return utils.tempdir(dir=CONF.image_conversion_dir) -def coalesce_chain(vhd_chain: List[str]) -> str: +def coalesce_chain(vhd_chain: list[str]) -> str: for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]): with temporary_dir() as directory_for_journal: size = get_vhd_size(child) @@ -1003,7 +1001,7 @@ def coalesce_chain(vhd_chain: List[str]) -> str: return vhd_chain[-1] -def discover_vhd_chain(directory: str) -> List[str]: +def discover_vhd_chain(directory: str) -> list[str]: counter = 0 chain = [] @@ -1028,7 +1026,7 @@ def replace_xenserver_image_with_coalesced_vhd(image_file: str) -> None: os.rename(coalesced, image_file) -def decode_cipher(cipher_spec: str, key_size: int) -> Dict[str, str]: +def decode_cipher(cipher_spec: str, key_size: int) -> dict[str, str]: """Decode a dm-crypt style cipher specification string The assumed format being cipher-chainmode-ivmode, similar to that @@ -1060,7 +1058,7 @@ class TemporaryImages(object): """ def __init__(self, image_service: glance.GlanceImageService): - self.temporary_images: Dict[str, dict] = {} + self.temporary_images: dict[str, dict] = {} self.image_service = image_service image_service.temp_images = self diff --git a/cinder/rpc.py b/cinder/rpc.py index f499f9d5089..d5db4675eb8 100644 --- a/cinder/rpc.py +++ b/cinder/rpc.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + __all__ = [ 'init', 'cleanup', @@ -26,7 +28,7 @@ __all__ = [ ] import functools -from typing import Tuple, Union # noqa: H301 +from typing import Union from oslo_config import cfg from oslo_log import log as logging @@ -230,7 +232,7 @@ class RPCAPI(object): return versions[-1] def _get_cctxt(self, - version: Union[str, Tuple[str, ...]] = None, + version: Union[str, tuple[str, ...]] = None, **kwargs): """Prepare client context diff --git a/cinder/scheduler/base_weight.py b/cinder/scheduler/base_weight.py index 519cff0e089..7ae44c2eec5 100644 --- a/cinder/scheduler/base_weight.py +++ b/cinder/scheduler/base_weight.py @@ -17,8 +17,10 @@ Pluggable Weighing support """ +from __future__ import annotations + import abc -from typing import Iterable, List, Optional # noqa: H301 +from typing import Iterable, Optional # noqa: H301 from oslo_log import log as logging @@ -28,7 +30,7 @@ from cinder.scheduler import base_handler LOG = logging.getLogger(__name__) -def normalize(weight_list: List[float], +def normalize(weight_list: list[float], minval: Optional[float] = None, maxval: Optional[float] = None) -> Iterable[float]: """Normalize the values in a list between 0 and 1.0. @@ -95,8 +97,8 @@ class BaseWeigher(object, metaclass=abc.ABCMeta): """Override in a subclass to specify a weight for a specific object.""" def weigh_objects(self, - weighed_obj_list: List[WeighedObject], - weight_properties: dict) -> List[float]: + weighed_obj_list: list[WeighedObject], + weight_properties: dict) -> list[float]: """Weigh multiple objects. Override in a subclass if you need access to all objects in order @@ -130,8 +132,8 @@ class BaseWeightHandler(base_handler.BaseHandler): def get_weighed_objects(self, weigher_classes: list, - obj_list: List[WeighedObject], - weighing_properties: dict) -> List[WeighedObject]: + obj_list: list[WeighedObject], + weighing_properties: dict) -> list[WeighedObject]: """Return a sorted (descending), normalized list of WeighedObjects.""" if not obj_list: diff --git a/cinder/scheduler/evaluator/evaluator.py b/cinder/scheduler/evaluator/evaluator.py index 39c8cad0aa2..47db14e1715 100644 --- a/cinder/scheduler/evaluator/evaluator.py +++ b/cinder/scheduler/evaluator/evaluator.py @@ -13,9 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations # Remove when only supporting python 3.9+ + import operator import re -from typing import Callable, Dict # noqa: H301 +from typing import Callable import pyparsing @@ -168,7 +170,7 @@ class EvalTernaryOp(object): class EvalFunction(object): - functions: Dict[str, Callable] = { + functions: dict[str, Callable] = { "abs": abs, "max": max, "min": min, diff --git a/cinder/scheduler/filter_scheduler.py b/cinder/scheduler/filter_scheduler.py index 57acac874e4..32be969f8d8 100644 --- a/cinder/scheduler/filter_scheduler.py +++ b/cinder/scheduler/filter_scheduler.py @@ -20,7 +20,9 @@ You can customize this scheduler by specifying your own volume Filters and Weighing Functions. """ -from typing import List, Optional # noqa: H301 +from __future__ import annotations + +from typing import Optional from oslo_config import cfg from oslo_log import log as logging @@ -375,9 +377,9 @@ class FilterScheduler(driver.Scheduler): def _get_weighted_candidates_generic_group( self, context: context.RequestContext, - group_spec: dict, request_spec_list: List[dict], + group_spec: dict, request_spec_list: list[dict], group_filter_properties: Optional[dict] = None, - filter_properties_list: Optional[List[dict]] = None) -> list: + filter_properties_list: Optional[list[dict]] = None) -> list: """Finds backends that supports the group. Returns a list of backends that meet the required specs, @@ -486,7 +488,7 @@ class FilterScheduler(driver.Scheduler): def _get_weighted_candidates_by_group_type( self, context: context.RequestContext, group_spec: dict, group_filter_properties: Optional[dict] = None) \ - -> List[WeighedHost]: + -> list[WeighedHost]: """Finds backends that supports the group type. Returns a list of backends that meet the required specs, @@ -598,7 +600,7 @@ class FilterScheduler(driver.Scheduler): return self._choose_top_backend_generic_group(weighed_backends) def _choose_top_backend(self, - weighed_backends: List[WeighedHost], + weighed_backends: list[WeighedHost], request_spec: dict): top_backend = weighed_backends[0] backend_state = top_backend.obj @@ -609,7 +611,7 @@ class FilterScheduler(driver.Scheduler): def _choose_top_backend_generic_group( self, - weighed_backends: List[WeighedHost]) -> WeighedHost: + weighed_backends: list[WeighedHost]) -> WeighedHost: top_backend = weighed_backends[0] backend_state = top_backend.obj LOG.debug("Choosing %s", backend_state.backend_id) diff --git a/cinder/scheduler/flows/create_volume.py b/cinder/scheduler/flows/create_volume.py index 54eaa61a57f..a9f967a69b7 100644 --- a/cinder/scheduler/flows/create_volume.py +++ b/cinder/scheduler/flows/create_volume.py @@ -10,7 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. -from typing import Any, Dict, Optional # noqa: H301 +from __future__ import annotations # Remove when only supporting python 3.9+ + +from typing import Any, Optional # noqa: H301 from oslo_log import log as logging from oslo_utils import excutils @@ -49,7 +51,7 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask): volume: objects.Volume, snapshot_id: Optional[str], image_id: Optional[str], - backup_id: Optional[str]) -> Dict[str, Any]: + backup_id: Optional[str]) -> dict[str, Any]: # Create the full request spec using the volume object. # # NOTE(dulek): At this point, a volume can be deleted before it gets @@ -77,7 +79,7 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask): volume: objects.Volume, snapshot_id: Optional[str], image_id: Optional[str], - backup_id: Optional[str]) -> Dict[str, Any]: + backup_id: Optional[str]) -> dict[str, Any]: # For RPC version < 1.2 backward compatibility if request_spec is None: request_spec = self._populate_request_spec(volume, diff --git a/cinder/scheduler/host_manager.py b/cinder/scheduler/host_manager.py index 52979010ffb..021181286eb 100644 --- a/cinder/scheduler/host_manager.py +++ b/cinder/scheduler/host_manager.py @@ -15,11 +15,12 @@ """Manage backends in the current zone.""" +from __future__ import annotations + from collections import abc import random import typing -from typing import (Any, Dict, Iterable, List, # noqa: H301 - Optional, Type, Union) +from typing import (Any, Iterable, Optional, Type, Union) # noqa: H301 from oslo_config import cfg from oslo_log import log as logging @@ -163,7 +164,7 @@ class BackendState(object): self.service = ReadOnlyDict(service) def update_from_volume_capability(self, - capability: Dict[str, Any], + capability: dict[str, Any], service=None) -> None: """Update information about a host from its volume_node info. @@ -289,7 +290,7 @@ class BackendState(object): 'host': self.host}) del self.pools[pool] - def _append_backend_info(self, pool_cap: Dict[str, Any]) -> None: + def _append_backend_info(self, pool_cap: dict[str, Any]) -> None: # Fill backend level info to pool if needed. if not pool_cap.get('volume_backend_name', None): pool_cap['volume_backend_name'] = self.volume_backend_name @@ -407,7 +408,7 @@ class PoolState(BackendState): self.pools: dict = {} def update_from_volume_capability(self, - capability: Dict[str, Any], + capability: dict[str, Any], service=None) -> None: """Update information about a pool from its volume_node info.""" LOG.debug("Updating capabilities for %s: %s", self.host, capability) @@ -470,7 +471,7 @@ class HostManager(object): def __init__(self): self.service_states = {} # { : {: {cap k : v}}} - self.backend_state_map: Dict[str, BackendState] = {} + self.backend_state_map: dict[str, BackendState] = {} self.backup_service_states = {} self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.' 'filters') @@ -513,7 +514,7 @@ class HostManager(object): def _choose_backend_weighers( self, - weight_cls_names: Optional[List[str]]) -> list: + weight_cls_names: Optional[list[str]]) -> list: """Return a list of available weigher names. This function checks input weigher names against a predefined set @@ -795,7 +796,7 @@ class HostManager(object): def get_pools(self, context: cinder_context.RequestContext, - filters: Optional[dict] = None) -> List[dict]: + filters: Optional[dict] = None) -> list[dict]: """Returns a dict of all pools on all hosts HostManager knows about.""" self._update_backend_state_map(context) @@ -858,7 +859,7 @@ class HostManager(object): capa_new: dict, updated_pools: Iterable[dict], host: str, - timestamp) -> List[dict]: + timestamp) -> list[dict]: pools = capa_new.get('pools') usage = [] if pools and isinstance(pools, list): @@ -888,7 +889,7 @@ class HostManager(object): def _get_pool_usage(self, pool: dict, - host: str, timestamp) -> Dict[str, Any]: + host: str, timestamp) -> dict[str, Any]: total = pool["total_capacity_gb"] free = pool["free_capacity_gb"] @@ -967,7 +968,7 @@ class HostManager(object): def _notify_capacity_usage(self, context: cinder_context.RequestContext, - usage: List[dict]) -> None: + usage: list[dict]) -> None: if usage: for u in usage: volume_utils.notify_about_capacity_usage( diff --git a/cinder/utils.py b/cinder/utils.py index 4b2182d5ef0..fbc41e27b6c 100644 --- a/cinder/utils.py +++ b/cinder/utils.py @@ -25,6 +25,8 @@ should be placed in volume_utils instead. """ +from __future__ import annotations # Remove when only supporting python 3.9+ + from collections import OrderedDict import contextlib import datetime @@ -42,8 +44,8 @@ import stat import sys import tempfile import typing -from typing import Callable, Dict, Iterable, Iterator, List # noqa: H301 -from typing import Optional, Tuple, Type, Union # noqa: H301 +from typing import Callable, Iterable, Iterator # noqa: H301 +from typing import Optional, Type, Union # noqa: H301 import eventlet from eventlet import tpool @@ -165,15 +167,15 @@ def check_exclusive_options( raise exception.InvalidInput(reason=msg) -def execute(*cmd: str, **kwargs: Union[bool, str]) -> Tuple[str, str]: +def execute(*cmd: str, **kwargs: Union[bool, str]) -> tuple[str, str]: """Convenience wrapper around oslo's execute() method.""" if 'run_as_root' in kwargs and 'root_helper' not in kwargs: kwargs['root_helper'] = get_root_helper() return processutils.execute(*cmd, **kwargs) -def check_ssh_injection(cmd_list: List[str]) -> None: - ssh_injection_pattern: Tuple[str, ...] = ('`', '$', '|', '||', ';', '&', +def check_ssh_injection(cmd_list: list[str]) -> None: + ssh_injection_pattern: tuple[str, ...] = ('`', '$', '|', '||', ';', '&', '&&', '>', '>>', '<') # Check whether injection attacks exist @@ -208,7 +210,7 @@ def check_ssh_injection(cmd_list: List[str]) -> None: def check_metadata_properties( - metadata: Optional[Dict[str, str]]) -> None: + metadata: Optional[dict[str, str]]) -> None: """Checks that the volume metadata properties are valid.""" if not metadata: @@ -235,7 +237,7 @@ def check_metadata_properties( def last_completed_audit_period(unit: Optional[str] = None) -> \ - Tuple[Union[datetime.datetime, datetime.timedelta], + tuple[Union[datetime.datetime, datetime.timedelta], Union[datetime.datetime, datetime.timedelta]]: """This method gives you the most recently *completed* audit period. @@ -496,7 +498,7 @@ def get_file_size(path: str) -> int: def _get_disk_of_partition( devpath: str, - st: Optional[os.stat_result] = None) -> Tuple[str, os.stat_result]: + st: Optional[os.stat_result] = None) -> tuple[str, os.stat_result]: """Gets a disk device path and status from partition path. Returns a disk device path from a partition device path, and stat for @@ -633,9 +635,9 @@ class retry_if_exit_code(tenacity.retry_if_exception): def retry(retry_param: Union[None, Type[Exception], - Tuple[Type[Exception], ...], + tuple[Type[Exception], ...], int, - Tuple[int, ...]], + tuple[int, ...]], interval: float = 1, retries: int = 3, backoff_rate: float = 2, diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 98b5c8d899d..b18e8a110cf 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -16,11 +16,12 @@ """Handles all requests relating to volumes.""" +from __future__ import annotations + import ast import collections import datetime -from typing import (Any, DefaultDict, Dict, Iterable, List, # noqa: H301 - Optional, Tuple, Union) +from typing import (Any, DefaultDict, Iterable, Optional, Union) # noqa: H301 from castellan import key_manager from oslo_config import cfg @@ -139,7 +140,7 @@ class API(base.Base): services = objects.ServiceList.get_all_by_topic(ctxt, topic) az_data = [(s.availability_zone, s.disabled) for s in services] - disabled_map: Dict[str, bool] = {} + disabled_map: dict[str, bool] = {} for (az_name, disabled) in az_data: tracked_disabled = disabled_map.get(az_name, True) disabled_map[az_name] = tracked_disabled and disabled @@ -725,8 +726,8 @@ class API(base.Base): search_opts: Optional[dict] = None, marker: Optional[str] = None, limit: Optional[int] = None, - sort_keys: Optional[List[str]] = None, - sort_dirs: Optional[List[str]] = None, + sort_keys: Optional[list[str]] = None, + sort_dirs: Optional[list[str]] = None, offset: Optional[int] = None) -> objects.SnapshotList: context.authorize(snapshot_policy.GET_ALL_POLICY) @@ -971,7 +972,7 @@ class API(base.Base): utils.check_metadata_properties(metadata) - valid_status: Tuple[str, ...] + valid_status: tuple[str, ...] valid_status = ('available',) if force or allow_in_use: valid_status = ('available', 'in-use') @@ -1118,7 +1119,7 @@ class API(base.Base): context: context.RequestContext, volume_list: objects.VolumeList) -> list: reserve_opts_list = [] - total_reserve_opts: Dict[str, int] = {} + total_reserve_opts: dict[str, int] = {} try: for volume in volume_list: if CONF.no_snapshot_gb_quota: @@ -1155,7 +1156,7 @@ class API(base.Base): name: str, description: str, cgsnapshot_id: str, - group_snapshot_id: Optional[str] = None) -> Dict[str, Any]: + group_snapshot_id: Optional[str] = None) -> dict[str, Any]: options = {'volume_id': volume['id'], 'cgsnapshot_id': cgsnapshot_id, 'group_snapshot_id': group_snapshot_id, @@ -1176,7 +1177,7 @@ class API(base.Base): volume: objects.Volume, name: str, description: str, - metadata: Optional[Dict[str, Any]] = None, + metadata: Optional[dict[str, Any]] = None, cgsnapshot_id: Optional[str] = None, group_snapshot_id: Optional[str] = None, allow_in_use: bool = False) -> objects.Snapshot: @@ -1193,7 +1194,7 @@ class API(base.Base): volume: objects.Volume, name: str, description: str, - metadata: Optional[Dict[str, Any]] = None) -> objects.Snapshot: + metadata: Optional[dict[str, Any]] = None) -> objects.Snapshot: result = self._create_snapshot(context, volume, name, description, True, metadata) LOG.info("Snapshot force create request issued successfully.", @@ -1211,7 +1212,7 @@ class API(base.Base): snapshot.assert_not_frozen() # Build required conditions for conditional update - expected: Dict[str, Any] = {'cgsnapshot_id': None, + expected: dict[str, Any] = {'cgsnapshot_id': None, 'group_snapshot_id': None} # If not force deleting we have status conditions if not force: @@ -1237,7 +1238,7 @@ class API(base.Base): def update_snapshot(self, context: context.RequestContext, snapshot: objects.Snapshot, - fields: Dict[str, Any]) -> None: + fields: dict[str, Any]) -> None: context.authorize(snapshot_policy.UPDATE_POLICY, target_obj=snapshot) snapshot.update(fields) @@ -1256,7 +1257,7 @@ class API(base.Base): def create_volume_metadata(self, context: context.RequestContext, volume: objects.Volume, - metadata: Dict[str, Any]) -> dict: + metadata: dict[str, Any]) -> dict: """Creates volume metadata.""" context.authorize(vol_meta_policy.CREATE_POLICY, target_obj=volume) db_meta = self._update_volume_metadata(context, volume, metadata) @@ -1284,7 +1285,7 @@ class API(base.Base): def _update_volume_metadata(self, context: context.RequestContext, volume: objects.Volume, - metadata: Dict[str, Any], + metadata: dict[str, Any], delete: bool = False, meta_type=common.METADATA_TYPES.user) -> dict: if volume['status'] in ('maintenance', 'uploading'): @@ -1298,7 +1299,7 @@ class API(base.Base): def update_volume_metadata(self, context: context.RequestContext, volume: objects.Volume, - metadata: Dict[str, Any], + metadata: dict[str, Any], delete: bool = False, meta_type=common.METADATA_TYPES.user) -> dict: """Updates volume metadata. @@ -1320,7 +1321,7 @@ class API(base.Base): def update_volume_admin_metadata(self, context: context.RequestContext, volume: objects.Volume, - metadata: Dict[str, Any], + metadata: dict[str, Any], delete: Optional[bool] = False, add: Optional[bool] = True, update: Optional[bool] = True) -> dict: @@ -1369,7 +1370,7 @@ class API(base.Base): def update_snapshot_metadata(self, context: context.RequestContext, snapshot: objects.Snapshot, - metadata: Dict[str, Any], + metadata: dict[str, Any], delete: bool = False) -> dict: """Updates or creates snapshot metadata. @@ -1410,7 +1411,7 @@ class API(base.Base): def get_volume_image_metadata(self, context: context.RequestContext, - volume: objects.Volume) -> Dict[str, str]: + volume: objects.Volume) -> dict[str, str]: context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume) db_data = self.db.volume_glance_metadata_get(context, volume['id']) LOG.info("Get volume image-metadata completed successfully.", @@ -1420,7 +1421,7 @@ class API(base.Base): def get_list_volumes_image_metadata( self, context: context.RequestContext, - volume_id_list: List[str]) -> DefaultDict[str, str]: + volume_id_list: list[str]) -> DefaultDict[str, str]: db_data = self.db.volume_glance_metadata_list_get(context, volume_id_list) results: collections.defaultdict = collections.defaultdict(dict) @@ -1458,8 +1459,8 @@ class API(base.Base): def copy_volume_to_image(self, context: context.RequestContext, volume: objects.Volume, - metadata: Dict[str, str], - force: bool) -> Dict[str, Optional[str]]: + metadata: dict[str, str], + force: bool) -> dict[str, Optional[str]]: """Create a new image from the specified volume.""" if not CONF.enable_force_upload and force: LOG.info("Force upload to image is disabled, " @@ -2069,8 +2070,8 @@ class API(base.Base): marker: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - sort_keys: Optional[List[str]] = None, - sort_dirs: Optional[List[str]] = None): + sort_keys: Optional[list[str]] = None, + sort_dirs: Optional[list[str]] = None): svc = self._get_service_by_host_cluster(context, host, cluster_name) return self.volume_rpcapi.get_manageable_volumes(context, svc, marker, limit, @@ -2110,8 +2111,8 @@ class API(base.Base): marker: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - sort_keys: Optional[List[str]] = None, - sort_dirs: Optional[List[str]] = None) -> List[dict]: + sort_keys: Optional[list[str]] = None, + sort_dirs: Optional[list[str]] = None) -> list[dict]: svc = self._get_service_by_host_cluster(context, host, cluster_name, 'snapshot') return self.volume_rpcapi.get_manageable_snapshots(context, svc, diff --git a/cinder/volume/drivers/rbd.py b/cinder/volume/drivers/rbd.py index 601b416ba4a..71777f6aa3e 100644 --- a/cinder/volume/drivers/rbd.py +++ b/cinder/volume/drivers/rbd.py @@ -13,6 +13,8 @@ # under the License. """RADOS Block Device Driver""" +from __future__ import annotations + import binascii import errno import json @@ -20,7 +22,7 @@ import math import os import tempfile import typing -from typing import Any, Dict, List, Optional, Tuple, Union # noqa: H301 +from typing import Any, Optional, Union # noqa: H301 import urllib.parse from castellan import key_manager @@ -163,7 +165,7 @@ class RBDVolumeProxy(object): pool: Optional[str] = None, snapshot: Optional[str] = None, read_only: bool = False, - remote: Optional[Dict[str, str]] = None, + remote: Optional[dict[str, str]] = None, timeout: Optional[int] = None, client: 'rados.Rados' = None, ioctx: 'rados.Ioctx' = None): @@ -254,7 +256,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, **kwargs) -> None: super(RBDDriver, self).__init__(*args, **kwargs) self.configuration.append_config_values(RBD_OPTS) - self._stats: Dict[str, Union[str, bool]] = {} + self._stats: dict[str, Union[str, bool]] = {} # allow overrides for testing self.rados = kwargs.get('rados', rados) self.rbd = kwargs.get('rbd', rbd) @@ -270,10 +272,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._backend_name = (self.configuration.volume_backend_name or self.__class__.__name__) self._active_backend_id: Optional[str] = active_backend_id - self._active_config: Dict[str, str] = {} + self._active_config: dict[str, str] = {} self._is_replication_enabled = False self._replication_targets: list = [] - self._target_names: List[str] = [] + self._target_names: list[str] = [] self._clone_v2_api_checked: bool = False if self.rbd is not None: @@ -341,7 +343,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, ' performance, fewer deletion issues') def _get_target_config(self, - target_id: Optional[str]) -> Dict[str, + target_id: Optional[str]) -> dict[str, str]: """Get a replication target from known replication targets.""" for target in self._replication_targets: @@ -371,7 +373,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._target_names.append('default') def _parse_replication_configs(self, - replication_devices: List[dict]) -> None: + replication_devices: list[dict]) -> None: for replication_device in replication_devices: if 'backend_id' not in replication_device: msg = _('Missing backend_id in replication_device ' @@ -396,8 +398,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def _get_config_tuple( self, - remote: Optional[Dict[str, str]] = None) \ - -> Tuple[Optional[str], Optional[str], + remote: Optional[dict[str, str]] = None) \ + -> tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: if not remote: remote = self._active_config @@ -486,7 +488,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def RBDProxy(self) -> tpool.Proxy: return tpool.Proxy(self.rbd.RBD()) - def _ceph_args(self) -> List[str]: + def _ceph_args(self) -> list[str]: args = [] name, conf, user, secret_uuid = self._get_config_tuple() @@ -504,13 +506,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, pool: Optional[str] = None, remote: Optional[dict] = None, timeout: Optional[int] = None) -> \ - Tuple['rados.Rados', 'rados.Ioctx']: + tuple['rados.Rados', 'rados.Ioctx']: @utils.retry(exception.VolumeBackendAPIException, self.configuration.rados_connection_interval, self.configuration.rados_connection_retries) def _do_conn(pool: Optional[str], remote: Optional[dict], - timeout: Optional[int]) -> Tuple['rados.Rados', + timeout: Optional[int]) -> tuple['rados.Rados', 'rados.Ioctx']: name, conf, user, secret_uuid = self._get_config_tuple(remote) @@ -557,7 +559,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, client.shutdown() @staticmethod - def _get_backup_snaps(rbd_image) -> List: + def _get_backup_snaps(rbd_image) -> list: """Get list of any backup snapshots that exist on this volume. There should only ever be one but accept all since they need to be @@ -570,7 +572,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, from cinder.backup.drivers import ceph return ceph.CephBackupDriver.get_backup_snaps(rbd_image) - def _get_mon_addrs(self) -> Tuple[List[str], List[str]]: + def _get_mon_addrs(self) -> tuple[list[str], list[str]]: args = ['ceph', 'mon', 'dump', '--format=json'] args.extend(self._ceph_args()) out, _ = self._execute(*args) @@ -578,7 +580,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, if lines[0].startswith('dumped monmap epoch'): lines = lines[1:] monmap = json.loads('\n'.join(lines)) - addrs: List[str] = [mon['addr'] for mon in monmap['mons']] + addrs: list[str] = [mon['addr'] for mon in monmap['mons']] hosts = [] ports = [] for addr in addrs: @@ -614,8 +616,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, total_provisioned = math.ceil(float(total_provisioned) / units.Gi) return total_provisioned - def _get_pool_stats(self) -> Union[Tuple[str, str], - Tuple[float, float]]: + def _get_pool_stats(self) -> Union[tuple[str, str], + tuple[float, float]]: """Gets pool free and total capacity in GiB. Calculate free and total capacity of the pool based on the pool's @@ -753,7 +755,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def create_cloned_volume( self, volume: Volume, - src_vref: Volume) -> Optional[Dict[str, Optional[str]]]: + src_vref: Volume) -> Optional[dict[str, Optional[str]]]: """Create a cloned volume from another volume. Since we are cloning from a volume and not a snapshot, we must first @@ -860,7 +862,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.debug("clone created successfully") return volume_update - def _enable_replication(self, volume: Volume) -> Dict[str, str]: + def _enable_replication(self, volume: Volume) -> dict[str, str]: """Enable replication for a volume. Returns required volume update. @@ -884,7 +886,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'replication_status': fields.ReplicationStatus.ENABLED, 'replication_driver_data': driver_data} - def _enable_multiattach(self, volume: Volume) -> Dict[str, str]: + def _enable_multiattach(self, volume: Volume) -> dict[str, str]: vol_name = utils.convert_str(volume.name) with RBDVolumeProxy(self, vol_name) as image: image_features = image.features() @@ -894,7 +896,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'provider_location': self._dumps({'saved_features': image_features})} - def _disable_multiattach(self, volume: Volume) -> Dict[str, None]: + def _disable_multiattach(self, volume: Volume) -> dict[str, None]: vol_name = utils.convert_str(volume.name) with RBDVolumeProxy(self, vol_name) as image: try: @@ -932,7 +934,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def _setup_volume( self, volume: Volume, - volume_type: Optional[VolumeType] = None) -> Dict[str, + volume_type: Optional[VolumeType] = None) -> dict[str, Optional[str]]: if volume_type: @@ -1030,7 +1032,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, cmd.extend(self._ceph_args()) self._execute(*cmd) - def create_volume(self, volume: Volume) -> Dict[str, Any]: + def create_volume(self, volume: Volume) -> dict[str, Any]: """Creates a logical volume.""" if volume.encryption_key_id: @@ -1092,7 +1094,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, volume: Volume, src_pool: str, src_image: str, - src_snap: str) -> Dict[str, Optional[str]]: + src_snap: str) -> dict[str, Optional[str]]: LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s', dict(pool=src_pool, img=src_image, snap=src_snap, dst=volume.name)) @@ -1178,8 +1180,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self, volume: 'rbd.Image', volume_name: str, - snap: Optional[str] = None) -> Union[Tuple[str, str, str], - Tuple[None, None, None]]: + snap: Optional[str] = None) -> Union[tuple[str, str, str], + tuple[None, None, None]]: """If volume is a clone, return its parent info. Returns a tuple of (pool, parent, snap). A snapshot may optionally be @@ -1207,7 +1209,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def _get_children_info(self, volume: 'rbd.Image', - snap: Optional[str]) -> List[tuple]: + snap: Optional[str]) -> list[tuple]: """List children for the given snapshot of a volume(image). Returns a list of (pool, image). @@ -1429,7 +1431,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, with RBDVolumeProxy(self, volume.name) as image: image.rollback_to_snap(snapshot.name) - def _disable_replication(self, volume: Volume) -> Dict[str, Optional[str]]: + def _disable_replication(self, volume: Volume) -> dict[str, Optional[str]]: """Disable replication on the given volume.""" vol_name = utils.convert_str(volume.name) with RBDVolumeProxy(self, vol_name) as image: @@ -1450,18 +1452,18 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, context: context.RequestContext, volume: Volume, new_type: VolumeType, - diff: Union[Dict[str, Dict[str, str]], Dict[str, Dict], None], - host: Optional[Dict[str, str]]) -> Tuple[bool, dict]: + diff: Union[dict[str, dict[str, str]], dict[str, dict], None], + host: Optional[dict[str, str]]) -> tuple[bool, dict]: """Retype from one volume type to another on the same backend.""" return True, self._setup_volume(volume, new_type) @staticmethod - def _dumps(obj: Dict[str, Union[bool, int]]) -> str: + def _dumps(obj: dict[str, Union[bool, int]]) -> str: return json.dumps(obj, separators=(',', ':'), sort_keys=True) def _exec_on_volume(self, volume_name: str, - remote: Dict[str, str], + remote: dict[str, str], operation: str, *args: Any, **kwargs: Any): @@ -1477,9 +1479,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def _failover_volume(self, volume: Volume, - remote: Dict[str, str], + remote: dict[str, str], is_demoted: bool, - replication_status: str) -> Dict[str, Any]: + replication_status: str) -> dict[str, Any]: """Process failover for a volume. There are 2 different cases that will return different update values @@ -1519,8 +1521,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return error_result def _demote_volumes(self, - volumes: List[Volume], - until_failure: bool = True) -> List[bool]: + volumes: list[Volume], + until_failure: bool = True) -> list[bool]: """Try to demote volumes on the current primary cluster.""" result = [] try_demoting = True @@ -1542,7 +1544,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def _get_failover_target_config( self, - secondary_id: Optional[str] = None) -> Tuple[str, dict]: + secondary_id: Optional[str] = None) -> tuple[str, dict]: if not secondary_id: # In auto mode exclude failback and active candidates = set(self._target_names).difference( @@ -1557,7 +1559,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, context: context.RequestContext, volumes: list, secondary_id: Optional[str] = None, - groups=None) -> Tuple[str, list, list]: + groups=None) -> tuple[str, list, list]: """Failover replicated volumes.""" LOG.info('RBD driver failover started.') if not self._is_replication_enabled: @@ -1595,11 +1597,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def failover_host(self, context: context.RequestContext, - volumes: List[Volume], + volumes: list[Volume], secondary_id: Optional[str] = None, - groups: Optional[List] = None) -> Tuple[str, - List[Volume], - List]: + groups: Optional[list] = None) -> tuple[str, + list[Volume], + list]: """Failover to replication target. This function combines calls to failover() and failover_completed() to @@ -1631,7 +1633,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def initialize_connection(self, volume: Volume, - connector: dict) -> Dict[str, Any]: + connector: dict) -> dict[str, Any]: hosts, ports = self._get_mon_addrs() name, conf, user, secret_uuid = self._get_config_tuple() data = { @@ -1662,7 +1664,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, pass @staticmethod - def _parse_location(location: str) -> List[str]: + def _parse_location(location: str) -> list[str]: prefix = 'rbd://' if not location.startswith(prefix): reason = _('Not stored in rbd') @@ -1729,7 +1731,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, volume: Volume, image_location: Optional[list], image_meta: dict, - image_service) -> Tuple[dict, bool]: + image_service) -> tuple[dict, bool]: if image_location: # Note: image_location[0] is glance image direct_url. # image_location[1] contains the list of all locations (including @@ -1885,7 +1887,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, {'old_size': old_size, 'new_size': new_size}) def manage_existing(self, - volume: Volume, existing_ref: Dict[str, str]) -> None: + volume: Volume, existing_ref: dict[str, str]) -> None: """Manages an existing image. Renames the image name to match the expected name for the volume. @@ -1906,7 +1908,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def manage_existing_get_size(self, volume: Volume, - existing_ref: Dict[str, str]) -> int: + existing_ref: dict[str, str]) -> int: """Return size of an existing image for manage_existing. :param volume: @@ -1963,12 +1965,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return json.loads(out) def get_manageable_volumes(self, - cinder_volumes: List[Dict[str, str]], + cinder_volumes: list[dict[str, str]], marker: Optional[Any], limit: int, offset: int, - sort_keys: List[str], - sort_dirs: List[str]) -> List[Dict[str, Any]]: + sort_keys: list[str], + sort_dirs: list[str]) -> list[dict[str, Any]]: manageable_volumes = [] cinder_ids = [resource['id'] for resource in cinder_volumes] @@ -2016,11 +2018,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, pass def update_migrated_volume(self, - ctxt: Dict, + ctxt: dict, volume: Volume, new_volume: Volume, original_volume_status: str) -> \ - Union[Dict[str, None], Dict[str, Optional[str]]]: + Union[dict[str, None], dict[str, Optional[str]]]: """Return model update from RBD for migrated volume. This method should rename the back-end volume name(id) on the @@ -2064,7 +2066,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def migrate_volume(self, context: context.RequestContext, volume: Volume, - host: Dict[str, Dict[str, str]]) -> Tuple[bool, None]: + host: dict[str, dict[str, str]]) -> tuple[bool, None]: refuse_to_migrate = (False, None) @@ -2146,7 +2148,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def manage_existing_snapshot_get_size(self, snapshot: Snapshot, - existing_ref: Dict[str, Any]) -> int: + existing_ref: dict[str, Any]) -> int: """Return size of an existing image for manage_existing. :param snapshot: @@ -2197,7 +2199,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def manage_existing_snapshot(self, snapshot: Snapshot, - existing_ref: Dict[str, Any]) -> None: + existing_ref: dict[str, Any]) -> None: """Manages an existing snapshot. Renames the snapshot name to match the expected name for the snapshot. @@ -2220,12 +2222,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, volume.protect_snap(snapshot.name) def get_manageable_snapshots(self, - cinder_snapshots: List[Dict[str, str]], + cinder_snapshots: list[dict[str, str]], marker: Optional[Any], limit: int, offset: int, - sort_keys: List[str], - sort_dirs: List[str]) -> List[Dict[str, Any]]: + sort_keys: list[str], + sort_dirs: list[str]) -> list[dict[str, Any]]: """List manageable snapshots on RBD backend.""" manageable_snapshots = [] cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots] @@ -2286,7 +2288,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def get_backup_device(self, context: context.RequestContext, - backup: Backup) -> Tuple[Volume, bool]: + backup: Backup) -> tuple[Volume, bool]: """Get a backup device from an existing volume. To support incremental backups on Ceph to Ceph we don't clone diff --git a/cinder/volume/flows/api/create_volume.py b/cinder/volume/flows/api/create_volume.py index 40f861fa1eb..c96841f7c0e 100644 --- a/cinder/volume/flows/api/create_volume.py +++ b/cinder/volume/flows/api/create_volume.py @@ -10,7 +10,10 @@ # License for the specific language governing permissions and limitations # under the License. -from typing import Any, Dict, List, Optional, Tuple, Type, Union # noqa: H301 + +from __future__ import annotations # Remove when only supporting python 3.9+ + +from typing import Any, Optional, Type, Union # noqa: H301 from oslo_config import cfg from oslo_log import log as logging @@ -82,10 +85,10 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): @staticmethod def _extract_resource(resource: Optional[dict], - allowed_vals: Tuple[Tuple[str, ...]], + allowed_vals: tuple[tuple[str, ...]], exc: Type[exception.CinderException], resource_name: str, - props: Tuple[str] = ('status',)) -> Optional[str]: + props: tuple[str] = ('status',)) -> Optional[str]: """Extracts the resource id from the provided resource. This method validates the input resource dict and checks that the @@ -233,7 +236,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): def _get_image_metadata(self, context: context.RequestContext, image_id: Optional[str], - size: int) -> Optional[Dict[str, Any]]: + size: int) -> Optional[dict[str, Any]]: """Checks image existence and validates the image metadata. Returns: image metadata or None @@ -257,7 +260,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): snapshot, source_volume, group: Optional[dict], - volume_type: Optional[Dict[str, Any]] = None) -> Tuple[List[str], + volume_type: Optional[dict[str, Any]] = None) -> tuple[list[str], bool]: """Extracts and returns a validated availability zone list. @@ -358,7 +361,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): volume_type_id: str, snapshot: Optional[objects.Snapshot], source_volume: Optional[objects.Volume], - image_metadata: Optional[Dict[str, Any]]) -> Optional[str]: + image_metadata: Optional[dict[str, Any]]) -> Optional[str]: if volume_types.is_encrypted(context, volume_type_id): encryption_key_id = None @@ -442,7 +445,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): group, group_snapshot, backup: Optional[dict], - multiattach: bool = False) -> Dict[str, Any]: + multiattach: bool = False) -> dict[str, Any]: utils.check_exclusive_options(snapshot=snapshot, imageRef=image_id, @@ -502,7 +505,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): if multiattach: context.authorize(policy.MULTIATTACH_POLICY) - specs: Optional[Dict] = {} + specs: Optional[dict] = {} if volume_type_id: qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id) if qos_specs['qos_specs']: @@ -560,7 +563,7 @@ class EntryCreateTask(flow_utils.CinderTask): def execute(self, context: context.RequestContext, optional_args: dict, - **kwargs) -> Dict[str, Any]: + **kwargs) -> dict[str, Any]: """Creates a database entry for the given inputs and returns details. Accesses the database and creates a new entry for the to be created @@ -809,8 +812,8 @@ class VolumeCastTask(flow_utils.CinderTask): def _cast_create_volume(self, context: context.RequestContext, - request_spec: Dict[str, Any], - filter_properties: Dict) -> None: + request_spec: dict[str, Any], + filter_properties: dict) -> None: source_volid = request_spec['source_volid'] volume = request_spec['volume'] snapshot_id = request_spec['snapshot_id'] diff --git a/cinder/volume/flows/manager/create_volume.py b/cinder/volume/flows/manager/create_volume.py index 0c3a53d7fc8..3f251289055 100644 --- a/cinder/volume/flows/manager/create_volume.py +++ b/cinder/volume/flows/manager/create_volume.py @@ -10,10 +10,12 @@ # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + import binascii import traceback import typing -from typing import Any, Dict, List, Optional, Tuple # noqa: H301 +from typing import Any, Optional # noqa: H301 from castellan import key_manager import os_brick.initiator.connectors @@ -676,7 +678,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): volume_metadata) @staticmethod - def _extract_cinder_ids(urls: List[str]) -> List[str]: + def _extract_cinder_ids(urls: list[str]) -> list[str]: """Process a list of location URIs from glance :param urls: list of glance location URIs @@ -707,7 +709,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): context: cinder_context.RequestContext, volume: objects.Volume, image_location, - image_meta: Dict[str, Any]) -> Tuple[None, bool]: + image_meta: dict[str, Any]) -> tuple[None, bool]: """Create a volume efficiently from an existing image. Returns a dict of volume properties eg. provider_location, @@ -768,7 +770,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): context: cinder_context.RequestContext, volume: objects.Volume, image_location, - image_meta: Dict[str, Any], + image_meta: dict[str, Any], image_service) -> dict: # TODO(harlowja): what needs to be rolled back in the clone if this # volume create fails?? Likely this should be a subflow or broken @@ -804,7 +806,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): internal_context: cinder_context.RequestContext, volume: objects.Volume, image_id: str, - image_meta: Dict[str, Any]) -> Tuple[None, bool]: + image_meta: dict[str, Any]) -> tuple[None, bool]: """Attempt to create the volume using the image cache. Best case this will simply clone the existing volume in the cache. @@ -853,8 +855,8 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): volume: objects.Volume, image_location: str, image_id: str, - image_meta: Dict[str, Any], - image_service) -> Tuple[Optional[dict], + image_meta: dict[str, Any], + image_service) -> tuple[Optional[dict], bool]: assert self.image_volume_cache is not None internal_context = cinder_context.get_internal_tenant_context() @@ -895,7 +897,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): volume: objects.Volume, image_location, image_id: str, - image_meta: Dict[str, Any], + image_meta: dict[str, Any], image_service, update_cache: bool = False) -> Optional[dict]: # NOTE(e0ne): check for free space in image_conversion_dir before @@ -1045,7 +1047,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): volume: objects.Volume, image_location, image_id: str, - image_meta: Dict[str, Any], + image_meta: dict[str, Any], image_service, **kwargs: Any) -> Optional[dict]: LOG.debug("Cloning %(volume_id)s from image %(image_id)s " @@ -1120,7 +1122,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): context: cinder_context.RequestContext, volume: objects.Volume, backup_id: str, - **kwargs) -> Tuple[Dict, bool]: + **kwargs) -> tuple[dict, bool]: LOG.info("Creating volume %(volume_id)s from backup %(backup_id)s.", {'volume_id': volume.id, 'backup_id': backup_id}) diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 88d325d2916..ded7428b0f9 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -35,10 +35,12 @@ intact. """ +from __future__ import annotations # Remove when only supporting Python 3.9+ + import functools import time import typing -from typing import Any, Dict, List, Optional, Set, Tuple, Union # noqa: H301 +from typing import Any, Optional, Union # noqa: H301 from castellan import key_manager from oslo_config import cfg @@ -539,7 +541,7 @@ class VolumeManager(manager.CleanableManager, num_vols: int = 0 num_snaps: int = 0 max_objs_num: int = 0 - req_range: Union[List[int], range] = [0] + req_range: Union[list[int], range] = [0] req_limit = CONF.init_host_max_objects_retrieval or 0 use_batch_objects_retrieval: bool = req_limit > 0 @@ -1601,7 +1603,7 @@ class VolumeManager(manager.CleanableManager, reservations = QUOTAS.reserve(ctx, **reserve_opts) # NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid # creating tmp img vol from wrong snapshot or wrong source vol. - skip: Set[str] = {'snapshot_id', 'source_volid'} + skip: set[str] = {'snapshot_id', 'source_volid'} skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES) try: new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip} @@ -3199,7 +3201,7 @@ class VolumeManager(manager.CleanableManager, return vol_ref - def _get_cluster_or_host_filters(self) -> Dict[str, Any]: + def _get_cluster_or_host_filters(self) -> dict[str, Any]: if self.cluster: filters = {'cluster_name': self.cluster} else: @@ -3500,12 +3502,12 @@ class VolumeManager(manager.CleanableManager, self, context: context.RequestContext, group: objects.Group, - volumes: List[objects.Volume], + volumes: list[objects.Volume], group_snapshot: Optional[objects.GroupSnapshot] = None, - snapshots: Optional[List[objects.Snapshot]] = None, + snapshots: Optional[list[objects.Snapshot]] = None, source_group: Optional[objects.Group] = None, - source_vols: Optional[List[objects.Volume]] = None) \ - -> Tuple[Dict[str, str], List[Dict[str, str]]]: + source_vols: Optional[list[objects.Volume]] = None) \ + -> tuple[dict[str, str], list[dict[str, str]]]: """Creates a group from source. :param context: the context of the caller. @@ -3518,7 +3520,7 @@ class VolumeManager(manager.CleanableManager, :returns: model_update, volumes_model_update """ model_update = {'status': 'available'} - volumes_model_update: List[dict] = [] + volumes_model_update: list[dict] = [] for vol in volumes: if snapshots: for snapshot in snapshots: @@ -3819,7 +3821,7 @@ class VolumeManager(manager.CleanableManager, def _convert_group_to_cg( self, group: objects.Group, - volumes: objects.VolumeList) -> Tuple[objects.Group, + volumes: objects.VolumeList) -> tuple[objects.Group, objects.VolumeList]: if not group: return None, None @@ -3832,7 +3834,7 @@ class VolumeManager(manager.CleanableManager, return cg, volumes def _remove_consistencygroup_id_from_volumes( - self, volumes: Optional[List[objects.Volume]]) -> None: + self, volumes: Optional[list[objects.Volume]]) -> None: if not volumes: return for vol in volumes: @@ -3843,7 +3845,7 @@ class VolumeManager(manager.CleanableManager, self, group_snapshot: objects.GroupSnapshot, snapshots: objects.SnapshotList, - ctxt) -> Tuple[objects.CGSnapshot, + ctxt) -> tuple[objects.CGSnapshot, objects.SnapshotList]: if not group_snapshot: return None, None @@ -3881,7 +3883,7 @@ class VolumeManager(manager.CleanableManager, def _delete_group_generic(self, context: context.RequestContext, group: objects.Group, - volumes) -> Tuple: + volumes) -> tuple: """Deletes a group and volumes in the group.""" model_update = {'status': group.status} volume_model_updates = [] @@ -3903,7 +3905,7 @@ class VolumeManager(manager.CleanableManager, def _update_group_generic( self, context: context.RequestContext, group, add_volumes=None, - remove_volumes=None) -> Tuple[None, None, None]: + remove_volumes=None) -> tuple[None, None, None]: """Updates a group.""" # NOTE(xyang): The volume manager adds/removes the volume to/from the # group in the database. This default implementation does not do @@ -3916,7 +3918,7 @@ class VolumeManager(manager.CleanableManager, group, volumes: Optional[str], add: bool = True) -> list: - valid_status: Tuple[str, ...] + valid_status: tuple[str, ...] if add: valid_status = VALID_ADD_VOL_TO_GROUP_STATUS else: @@ -4182,7 +4184,7 @@ class VolumeManager(manager.CleanableManager, self, context: context.RequestContext, group_snapshot: objects.GroupSnapshot, - snapshots: list) -> Tuple[dict, List[dict]]: + snapshots: list) -> tuple[dict, list[dict]]: """Creates a group_snapshot.""" model_update = {'status': 'available'} snapshot_model_updates = [] @@ -4208,7 +4210,7 @@ class VolumeManager(manager.CleanableManager, self, context: context.RequestContext, group_snapshot: objects.GroupSnapshot, - snapshots: list) -> Tuple[dict, List[dict]]: + snapshots: list) -> tuple[dict, list[dict]]: """Deletes a group_snapshot.""" model_update = {'status': group_snapshot.status} snapshot_model_updates = [] @@ -4772,7 +4774,7 @@ class VolumeManager(manager.CleanableManager, ctxt: context.RequestContext, volume: objects.Volume, attachment: objects.VolumeAttachment, - connector: dict) -> Dict[str, Any]: + connector: dict) -> dict[str, Any]: try: self.driver.validate_connector(connector) except exception.InvalidConnectorException as err: @@ -4830,7 +4832,7 @@ class VolumeManager(manager.CleanableManager, context: context.RequestContext, vref: objects.Volume, connector: dict, - attachment_id: str) -> Dict[str, Any]: + attachment_id: str) -> dict[str, Any]: """Update/Finalize an attachment. This call updates a valid attachment record to associate with a volume @@ -5236,7 +5238,7 @@ class VolumeManager(manager.CleanableManager, def list_replication_targets(self, ctxt: context.RequestContext, - group: objects.Group) -> Dict[str, list]: + group: objects.Group) -> dict[str, list]: """Provide a means to obtain replication targets for a group. This method is used to find the replication_device config diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 7cdeccc4430..097dc16acdb 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -12,7 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. -from typing import Optional, Tuple, Union # noqa: H301 +from __future__ import annotations + +from typing import Optional, Union # noqa: H301 from cinder.common import constants from cinder import context @@ -147,7 +149,7 @@ class VolumeAPI(rpc.RPCAPI): def _get_cctxt(self, host: Optional[str] = None, - version: Optional[Union[str, Tuple[str, ...]]] = None, + version: Optional[Union[str, tuple[str, ...]]] = None, **kwargs) -> rpc.RPCAPI: if host: server = volume_utils.extract_host(host) diff --git a/cinder/volume/volume_types.py b/cinder/volume/volume_types.py index aeec7a47a4d..2f5993d2adc 100644 --- a/cinder/volume/volume_types.py +++ b/cinder/volume/volume_types.py @@ -19,7 +19,9 @@ """Built-in volume type properties.""" -import typing as ty +from __future__ import annotations + +from typing import Optional from oslo_config import cfg from oslo_db import exception as db_exc @@ -330,7 +332,7 @@ def get_volume_type_qos_specs(volume_type_id): def volume_types_diff(context: context.RequestContext, vol_type_id1, - vol_type_id2) -> ty.Tuple[dict, bool]: + vol_type_id2) -> tuple[dict, bool]: """Returns a 'diff' of two volume types and whether they are equal. Returns a tuple of (diff, equal), where 'equal' is a boolean indicating @@ -370,8 +372,8 @@ def volume_types_diff(context: context.RequestContext, encryption.pop(param, None) return encryption - def _dict_diff(dict1: ty.Optional[dict], - dict2: ty.Optional[dict]) -> ty.Tuple[dict, bool]: + def _dict_diff(dict1: Optional[dict], + dict2: Optional[dict]) -> tuple[dict, bool]: res = {} equal = True if dict1 is None: diff --git a/cinder/volume/volume_utils.py b/cinder/volume/volume_utils.py index 1384fe455b9..01c138206ae 100644 --- a/cinder/volume/volume_utils.py +++ b/cinder/volume/volume_utils.py @@ -14,6 +14,8 @@ """Volume-related Utilities and helpers.""" +from __future__ import annotations # Remove when only supporting python 3.9+ + import abc import ast import functools @@ -31,8 +33,8 @@ import tempfile import time import types import typing -from typing import Any, BinaryIO, Callable, Dict, IO # noqa: H301 -from typing import List, Optional, Tuple, Union # noqa: H301 +from typing import Any, BinaryIO, Callable, IO # noqa: H301 +from typing import Optional, Union # noqa: H301 import uuid from castellan.common.credentials import keystone_password @@ -242,8 +244,8 @@ def notify_about_snapshot_usage(context: context.RequestContext, usage_info) -def _usage_from_capacity(capacity: Dict[str, Any], - **extra_usage_info) -> Dict[str, Any]: +def _usage_from_capacity(capacity: dict[str, Any], + **extra_usage_info) -> dict[str, Any]: capacity_info = { 'name_to_id': capacity['name_to_id'], @@ -686,7 +688,7 @@ def get_all_volume_groups(vg_name=None) -> list: def extract_availability_zones_from_volume_type( volume_type: Union['objects.VolumeType', dict]) \ - -> Optional[List[str]]: + -> Optional[list[str]]: if not volume_type: return None extra_specs = volume_type.get('extra_specs', {}) @@ -705,7 +707,7 @@ DEFAULT_PASSWORD_SYMBOLS = ('23456789', # Removed: 0,1 def generate_password( length: int = 16, - symbolgroups: Tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str: + symbolgroups: tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str: """Generate a random password from the supplied symbol groups. At least one symbol from each group will be included. Unpredictable @@ -744,7 +746,7 @@ def generate_password( def generate_username( length: int = 20, - symbolgroups: Tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str: + symbolgroups: tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str: # Use the same implementation as the password generation. return generate_password(length, symbolgroups) @@ -864,12 +866,12 @@ def extract_id_from_snapshot_name(snap_name: str) -> Optional[str]: return match.group('uuid') if match else None -def paginate_entries_list(entries: List[Dict], +def paginate_entries_list(entries: list[dict], marker: Optional[Union[dict, str]], limit: int, offset: Optional[int], - sort_keys: List[str], - sort_dirs: List[str]) -> list: + sort_keys: list[str], + sort_dirs: list[str]) -> list: """Paginate a list of entries. :param entries: list of dictionaries @@ -1096,7 +1098,7 @@ def get_max_over_subscription_ratio( return mosr -def check_image_metadata(image_meta: Dict[str, Union[str, int]], +def check_image_metadata(image_meta: dict[str, Union[str, int]], vol_size: int) -> None: """Validates the image metadata.""" # Check whether image is active @@ -1136,7 +1138,7 @@ def enable_bootable_flag(volume: 'objects.Volume') -> None: def get_volume_image_metadata(image_id: str, - image_meta: Dict[str, Any]) -> dict: + image_meta: dict[str, Any]) -> dict: # Save some base attributes into the volume metadata base_metadata = { @@ -1172,7 +1174,7 @@ def copy_image_to_volume(driver, context: context.RequestContext, volume: 'objects.Volume', image_meta: dict, - image_location: Union[str, Tuple[Optional[str], Any]], + image_location: Union[str, tuple[Optional[str], Any]], image_service) -> None: """Downloads Glance image to the specified volume.""" image_id = image_meta['id']