typing: Correct type for missing attributes

Change-Id: I55652220ecd663fa024937646dfef92595e1cd0f
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2025-02-19 20:15:44 +00:00
parent 9435ef825a
commit e28046cc19
12 changed files with 36 additions and 29 deletions
openstackclient

@@ -36,7 +36,7 @@ def ask_user_yesno(msg):
:return bool: User choice :return bool: User choice
""" """
while True: while True:
answer = getpass._raw_input('{} [{}]: '.format(msg, 'y/n')) answer = getpass.getpass('{} [{}]: '.format(msg, 'y/n'))
if answer in ('y', 'Y', 'yes'): if answer in ('y', 'Y', 'yes'):
return True return True
elif answer in ('n', 'N', 'no'): elif answer in ('n', 'N', 'no'):

@@ -18,6 +18,7 @@ import argparse
import itertools import itertools
import logging import logging
import sys import sys
import typing as ty
from openstack import exceptions as sdk_exceptions from openstack import exceptions as sdk_exceptions
from osc_lib.command import command from osc_lib.command import command
@@ -199,7 +200,7 @@ def get_network_quotas(
# #
# so we need to make conversion to have data in same format from # so we need to make conversion to have data in same format from
# all of the services # all of the services
result = {"usage": {}, "reservation": {}} result: dict[str, ty.Any] = {"usage": {}, "reservation": {}}
for key, values in dict_quota.items(): for key, values in dict_quota.items():
if values is None: if values is None:
continue continue

@@ -126,7 +126,6 @@ class CreateKeypair(command.ShowOne):
kwargs = {'name': parsed_args.name} kwargs = {'name': parsed_args.name}
if parsed_args.public_key: if parsed_args.public_key:
generated_keypair = None
try: try:
with open(os.path.expanduser(parsed_args.public_key)) as p: with open(os.path.expanduser(parsed_args.public_key)) as p:
public_key = p.read() public_key = p.read()

@@ -1970,7 +1970,7 @@ class CreateServer(command.ShowOne):
# convert from the novaclient-derived "NIC" view to the actual # convert from the novaclient-derived "NIC" view to the actual
# "network" view # "network" view
network = {} network: dict[str, str] = {}
if nic['net-id']: if nic['net-id']:
network['uuid'] = nic['net-id'] network['uuid'] = nic['net-id']
@@ -1986,7 +1986,7 @@ class CreateServer(command.ShowOne):
if nic.get('tag'): # tags are optional if nic.get('tag'): # tags are optional
network['tag'] = nic['tag'] network['tag'] = nic['tag']
networks.append(network) networks.append(network) # type: ignore[union-attr]
if not parsed_args.nics and sdk_utils.supports_microversion( if not parsed_args.nics and sdk_utils.supports_microversion(
compute_client, '2.37' compute_client, '2.37'

@@ -249,10 +249,7 @@ class ListUser(command.Lister):
data = identity_client.users.list(tenant_id=project) data = identity_client.users.list(tenant_id=project)
if parsed_args.project: if parsed_args.project:
d = {} data = {s.id: s for s in data}.values()
for s in data:
d[s.id] = s
data = d.values()
if parsed_args.long: if parsed_args.long:
# FIXME(dtroyer): Sometimes user objects have 'tenant_id' instead # FIXME(dtroyer): Sometimes user objects have 'tenant_id' instead

@@ -17,6 +17,7 @@
import copy import copy
import logging import logging
import typing as ty
from openstack import exceptions as sdk_exc from openstack import exceptions as sdk_exc
from osc_lib.command import command from osc_lib.command import command
@@ -58,7 +59,7 @@ def _format_user(user):
def _get_options_for_user(identity_client, parsed_args): def _get_options_for_user(identity_client, parsed_args):
options = {} options: dict[str, ty.Any] = {}
if parsed_args.ignore_lockout_failure_attempts: if parsed_args.ignore_lockout_failure_attempts:
options['ignore_lockout_failure_attempts'] = True options['ignore_lockout_failure_attempts'] = True
if parsed_args.no_ignore_lockout_failure_attempts: if parsed_args.no_ignore_lockout_failure_attempts:

@@ -428,7 +428,7 @@ class CreateImage(command.ShowOne):
# Build an attribute dict from the parsed args, only include # Build an attribute dict from the parsed args, only include
# attributes that were actually set on the command line # attributes that were actually set on the command line
kwargs = {'allow_duplicates': True} kwargs: dict[str, ty.Any] = {'allow_duplicates': True}
copy_attrs = ( copy_attrs = (
'name', 'name',
'id', 'id',
@@ -611,7 +611,7 @@ class CreateImage(command.ShowOne):
volume_client.volumes, volume_client.volumes,
parsed_args.volume, parsed_args.volume,
) )
kwargs = { kwargs: dict[str, ty.Any] = {
'visibility': None, 'visibility': None,
'protected': None, 'protected': None,
} }
@@ -1575,7 +1575,7 @@ class StageImage(command.Command):
else: else:
fp = get_data_from_stdin() fp = get_data_from_stdin()
kwargs = {} kwargs: dict[str, ty.Any] = {}
if parsed_args.progress and parsed_args.filename: if parsed_args.progress and parsed_args.filename:
# NOTE(stephenfin): we only show a progress bar if the user # NOTE(stephenfin): we only show a progress bar if the user

@@ -12,10 +12,12 @@
# #
import abc import abc
import argparse
import contextlib import contextlib
import logging import logging
import typing as ty import typing as ty
import cliff.app
import openstack.exceptions import openstack.exceptions
from osc_lib.cli import parseractions from osc_lib.cli import parseractions
from osc_lib.command import command from osc_lib.command import command
@@ -66,6 +68,8 @@ class NetDetectionMixin(metaclass=abc.ABCMeta):
present the options for both network types, often qualified accordingly. present the options for both network types, often qualified accordingly.
""" """
app: cliff.app.App
@property @property
def _network_type(self): def _network_type(self):
"""Discover whether the running cloud is using neutron or nova-network. """Discover whether the running cloud is using neutron or nova-network.
@@ -132,9 +136,9 @@ class NetDetectionMixin(metaclass=abc.ABCMeta):
) )
) )
def get_parser(self, prog_name): def get_parser(self, prog_name: str) -> argparse.ArgumentParser:
LOG.debug('get_parser(%s)', prog_name) LOG.debug('get_parser(%s)', prog_name)
parser = super().get_parser(prog_name) parser = super().get_parser(prog_name) # type: ignore
parser = self.update_parser_common(parser) parser = self.update_parser_common(parser)
LOG.debug('common parser: %s', parser) LOG.debug('common parser: %s', parser)
if self.is_neutron or self.is_docs_build: if self.is_neutron or self.is_docs_build:

@@ -14,6 +14,7 @@
"""Meter Rule Implementations""" """Meter Rule Implementations"""
import logging import logging
import typing as ty
from osc_lib.command import command from osc_lib.command import command
from osc_lib import exceptions from osc_lib import exceptions
@@ -34,7 +35,7 @@ def _get_columns(item):
def _get_attrs(client_manager, parsed_args): def _get_attrs(client_manager, parsed_args):
attrs = {} attrs: dict[str, ty.Any] = {}
if parsed_args.exclude: if parsed_args.exclude:
attrs['excluded'] = True attrs['excluded'] = True

@@ -18,6 +18,7 @@ import collections
import copy import copy
import json import json
import logging import logging
import typing as ty
from cliff import columns as cliff_columns from cliff import columns as cliff_columns
from osc_lib.cli import format_columns from osc_lib.cli import format_columns
@@ -104,21 +105,21 @@ def _passed_multiple_gateways(extension_supported, external_gateways):
def _get_external_gateway_attrs(client_manager, parsed_args): def _get_external_gateway_attrs(client_manager, parsed_args):
attrs = {} attrs: dict[str, ty.Any] = {}
if parsed_args.external_gateways: if parsed_args.external_gateways:
external_gateways: collections.defaultdict[str, list[dict]] = ( external_gateways: collections.defaultdict[str, list[dict]] = (
collections.defaultdict(list) collections.defaultdict(list)
) )
n_client = client_manager.network n_client = client_manager.network
first_network_id = None first_network_id = ''
for gw_net_name_or_id in parsed_args.external_gateways: for gw_net_name_or_id in parsed_args.external_gateways:
gateway_info = {} gateway_info = {}
gw_net = n_client.find_network( gw_net = n_client.find_network(
gw_net_name_or_id, ignore_missing=False gw_net_name_or_id, ignore_missing=False
) )
if first_network_id is None: if not first_network_id:
first_network_id = gw_net.id first_network_id = gw_net.id
gateway_info['network_id'] = gw_net.id gateway_info['network_id'] = gw_net.id
if 'disable_snat' in parsed_args and parsed_args.disable_snat: if 'disable_snat' in parsed_args and parsed_args.disable_snat:
@@ -146,7 +147,7 @@ def _get_external_gateway_attrs(client_manager, parsed_args):
for ip_spec in parsed_args.fixed_ips: for ip_spec in parsed_args.fixed_ips:
# If there is only one gateway, this value will represent the # If there is only one gateway, this value will represent the
# network ID for it, otherwise it will be overridden. # network ID for it, otherwise it will be overridden.
ip_net_id = first_network_id ip_net_id: str = first_network_id
if ip_spec.get('subnet', False): if ip_spec.get('subnet', False):
subnet_name_id = ip_spec.pop('subnet') subnet_name_id = ip_spec.pop('subnet')

@@ -33,7 +33,7 @@ class NetworkTests(base.TestCase):
class NetworkTagTests(NetworkTests): class NetworkTagTests(NetworkTests):
"""Functional tests with tag operation""" """Functional tests with tag operation"""
base_command = None base_command: str
def test_tag_operation(self): def test_tag_operation(self):
# Get project IDs # Get project IDs
@@ -56,7 +56,7 @@ class NetworkTagTests(NetworkTests):
'set', name1, '--tag red --tag green', ['red', 'green'] 'set', name1, '--tag red --tag green', ['red', 'green']
) )
list_expected: tuple[tuple[str, list[str]]] = ( list_expected: tuple[tuple[str, list[str]], ...] = (
(name1, ['red', 'green']), (name1, ['red', 'green']),
(name2, ['red', 'blue']), (name2, ['red', 'blue']),
(name3, []), (name3, []),
@@ -93,7 +93,11 @@ class NetworkTagTests(NetworkTests):
parse_output=True, parse_output=True,
) )
def _create_resource_and_tag_check(self, args, expected): def _create_resource_and_tag_check(
self,
args: str,
expected: list[str],
) -> str:
name = uuid.uuid4().hex name = uuid.uuid4().hex
cmd_output = self._create_resource_for_tag_test(name, args) cmd_output = self._create_resource_for_tag_test(name, args)
self.addCleanup(self.openstack, f'{self.base_command} delete {name}') self.addCleanup(self.openstack, f'{self.base_command} delete {name}')

@@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from io import StringIO
from unittest import mock from unittest import mock
from openstackclient.common import project_cleanup from openstackclient.common import project_cleanup
@@ -70,7 +69,7 @@ class TestProjectCleanup(test_utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = None result = None
with mock.patch('sys.stdin', StringIO('y')): with mock.patch('getpass.getpass', return_value='y'):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.sdk_connect_as_project_mock.assert_called_with(self.project) self.sdk_connect_as_project_mock.assert_called_with(self.project)
@@ -143,7 +142,7 @@ class TestProjectCleanup(test_utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = None result = None
with mock.patch('sys.stdin', StringIO('y')): with mock.patch('getpass.getpass', return_value='y'):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.sdk_connect_as_project_mock.assert_called_with(self.project) self.sdk_connect_as_project_mock.assert_called_with(self.project)
@@ -178,7 +177,7 @@ class TestProjectCleanup(test_utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = None result = None
with mock.patch('sys.stdin', StringIO('n')): with mock.patch('getpass.getpass', return_value='y'):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.sdk_connect_as_project_mock.assert_called_with(self.project) self.sdk_connect_as_project_mock.assert_called_with(self.project)
@@ -234,7 +233,7 @@ class TestProjectCleanup(test_utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = None result = None
with mock.patch('sys.stdin', StringIO('y')): with mock.patch('getpass.getpass', return_value='y'):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.sdk_connect_as_project_mock.assert_not_called() self.sdk_connect_as_project_mock.assert_not_called()
@@ -268,7 +267,7 @@ class TestProjectCleanup(test_utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = None result = None
with mock.patch('sys.stdin', StringIO('y')): with mock.patch('getpass.getpass', return_value='y'):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.sdk_connect_as_project_mock.assert_called_with(self.project) self.sdk_connect_as_project_mock.assert_called_with(self.project)