mypy: api_utils

Add coverage for cinder/api/api_utils.

Change-Id: I57913aa8669c30a1e60e704ff8e946cb1eef5213
This commit is contained in:
Eric Harney 2022-05-04 10:57:53 -04:00
parent 04b6700f48
commit 4c24a3a54c
2 changed files with 39 additions and 15 deletions

View File

@ -10,6 +10,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import annotations
import typing
from typing import Any, Generator, Iterable, Optional, Union # noqa: H301
from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import exceptions as ks_exc
from keystoneauth1 import identity from keystoneauth1 import identity
from keystoneauth1 import loading as ka_loading from keystoneauth1 import loading as ka_loading
@ -20,6 +25,9 @@ from oslo_utils import strutils
import webob import webob
from webob import exc from webob import exc
if typing.TYPE_CHECKING:
# conditional import to avoid a circular import problem from cinderlib
from cinder import context
from cinder import exception from cinder import exception
from cinder.i18n import _ from cinder.i18n import _
@ -30,7 +38,7 @@ CONF.import_group('keystone_authtoken',
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _parse_is_public(is_public): def _parse_is_public(is_public: Optional[str]) -> Optional[bool]:
"""Parse is_public into something usable. """Parse is_public into something usable.
* True: List public volume types only * True: List public volume types only
@ -51,7 +59,7 @@ def _parse_is_public(is_public):
raise exc.HTTPBadRequest(explanation=msg) raise exc.HTTPBadRequest(explanation=msg)
def is_none_string(val): def is_none_string(val: Any) -> bool:
"""Check if a string represents a None value.""" """Check if a string represents a None value."""
if not isinstance(val, str): if not isinstance(val, str):
return False return False
@ -59,8 +67,10 @@ def is_none_string(val):
return val.lower() == 'none' return val.lower() == 'none'
def remove_invalid_filter_options(context, filters, def remove_invalid_filter_options(
allowed_search_options): context: 'context.RequestContext',
filters: dict,
allowed_search_options: Iterable[str]) -> None:
"""Remove search options that are not valid for non-admin API/context.""" """Remove search options that are not valid for non-admin API/context."""
if context.is_admin: if context.is_admin:
@ -78,7 +88,7 @@ def remove_invalid_filter_options(context, filters,
_visible_admin_metadata_keys = ['readonly', 'attached_mode'] _visible_admin_metadata_keys = ['readonly', 'attached_mode']
def add_visible_admin_metadata(volume): def add_visible_admin_metadata(volume) -> None:
"""Add user-visible admin metadata to regular metadata. """Add user-visible admin metadata to regular metadata.
Extracts the admin metadata keys that are to be made visible to Extracts the admin metadata keys that are to be made visible to
@ -125,13 +135,15 @@ def add_visible_admin_metadata(volume):
volume['metadata'] = visible_admin_meta volume['metadata'] = visible_admin_meta
def validate_integer(value, name, min_value=None, max_value=None): def validate_integer(value: int, name: str,
min_value: Optional[int] = None,
max_value: Optional[int] = None) -> int:
"""Make sure that value is a valid integer, potentially within range. """Make sure that value is a valid integer, potentially within range.
:param value: the value of the integer :param value: the value of the integer
:param name: the name of the integer :param name: the name of the integer
:param min_length: the min_length of the integer :param min_value: the min value of the integer
:param max_length: the max_length of the integer :param max_value: the max value of the integer
:returns: integer :returns: integer
""" """
try: try:
@ -141,7 +153,9 @@ def validate_integer(value, name, min_value=None, max_value=None):
raise webob.exc.HTTPBadRequest(explanation=str(e)) raise webob.exc.HTTPBadRequest(explanation=str(e))
def walk_class_hierarchy(clazz, encountered=None): def walk_class_hierarchy(clazz: type,
encountered: Optional[list[type]] = None) -> \
Generator[type, None, None]:
"""Walk class hierarchy, yielding most derived classes first.""" """Walk class hierarchy, yielding most derived classes first."""
if not encountered: if not encountered:
encountered = [] encountered = []
@ -154,7 +168,8 @@ def walk_class_hierarchy(clazz, encountered=None):
yield subclass yield subclass
def _keystone_client(context, version=(3, 0)): def _keystone_client(context: 'context.RequestContext',
version: tuple[int, int] = (3, 0)) -> client.Client:
"""Creates and returns an instance of a generic keystone client. """Creates and returns an instance of a generic keystone client.
:param context: The request context :param context: The request context
@ -199,8 +214,12 @@ def _keystone_client(context, version=(3, 0)):
class GenericProjectInfo(object): class GenericProjectInfo(object):
"""Abstraction layer for Keystone V2 and V3 project objects""" """Abstraction layer for Keystone V2 and V3 project objects"""
def __init__(self, project_id, project_keystone_api_version, def __init__(self,
domain_id=None, name=None, description=None): project_id: str,
project_keystone_api_version: str,
domain_id: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None):
self.id = project_id self.id = project_id
self.keystone_api_version = project_keystone_api_version self.keystone_api_version = project_keystone_api_version
self.domain_id = domain_id self.domain_id = domain_id
@ -208,7 +227,8 @@ class GenericProjectInfo(object):
self.description = description self.description = description
def get_project(context, project_id): def get_project(context: 'context.RequestContext',
project_id: str) -> GenericProjectInfo:
"""Method to verify project exists in keystone""" """Method to verify project exists in keystone"""
keystone = _keystone_client(context) keystone = _keystone_client(context)
generic_project = GenericProjectInfo(project_id, keystone.version) generic_project = GenericProjectInfo(project_id, keystone.version)
@ -219,8 +239,11 @@ def get_project(context, project_id):
return generic_project return generic_project
def validate_project_and_authorize(context, project_id, policy_check=None, def validate_project_and_authorize(context: 'context.RequestContext',
validate_only=False): project_id: str,
policy_check: str,
validate_only: bool = False) -> None:
target_project: Union[GenericProjectInfo, dict]
try: try:
target_project = get_project(context, project_id) target_project = get_project(context, project_id)
if not validate_only: if not validate_only:

View File

@ -1,3 +1,4 @@
cinder/api/api_utils.py
cinder/api/v3/types.py cinder/api/v3/types.py
cinder/backup/api.py cinder/backup/api.py
cinder/backup/manager.py cinder/backup/manager.py