Files
Stephen Finucane 2c0a3ba137 typing: Use consistent types
Resolve 'Incompatible types in assignment' errors.

Change-Id: I1ea186ff766e0f72cac384fab22d1c2f82e02ef0
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
2025-03-31 17:48:36 +01:00

421 lines
14 KiB
Python

# Copyright 2016 ZTE Corporation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Network trunk and subports action implementations"""
import logging
import typing as ty
from cliff import columns as cliff_columns
from osc_lib.cli import format_columns
from osc_lib.cli import identity as identity_utils
from osc_lib.cli import parseractions
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from openstackclient.i18n import _
LOG = logging.getLogger(__name__)
TRUNK = 'trunk'
TRUNKS = 'trunks'
SUB_PORTS = 'sub_ports'
class AdminStateColumn(cliff_columns.FormattableColumn):
def human_readable(self):
return 'UP' if self._value else 'DOWN'
class CreateNetworkTrunk(command.ShowOne):
"""Create a network trunk for a given project"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'name', metavar='<name>', help=_("Name of the trunk to create")
)
parser.add_argument(
'--description',
metavar='<description>',
help=_("A description of the trunk"),
)
parser.add_argument(
'--parent-port',
metavar='<parent-port>',
required=True,
help=_("Parent port belonging to this trunk (name or ID)"),
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction,
dest='add_subports',
optional_keys=['segmentation-id', 'segmentation-type'],
required_keys=['port'],
help=_(
"Subport to add. Subport is of form "
"'port=<name or ID>,segmentation-type=<segmentation-type>,"
"segmentation-id=<segmentation-ID>' (repeat option "
"to add multiple subports)"
),
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
default=True,
help=_("Enable trunk (default)"),
)
admin_group.add_argument(
'--disable', action='store_true', help=_("Disable trunk")
)
identity_utils.add_project_owner_option_to_parser(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args)
obj = client.create_trunk(**attrs)
display_columns, columns = _get_columns(obj)
data = osc_utils.get_dict_properties(
obj, columns, formatters=_formatters
)
return display_columns, data
class DeleteNetworkTrunk(command.Command):
"""Delete a given network trunk"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
nargs="+",
help=_("Trunk(s) to delete (name or ID)"),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
result = 0
for trunk in parsed_args.trunk:
try:
trunk_id = client.find_trunk(trunk).id
client.delete_trunk(trunk_id)
except Exception as e:
result += 1
LOG.error(
_(
"Failed to delete trunk with name "
"or ID '%(trunk)s': %(e)s"
),
{'trunk': trunk, 'e': e},
)
if result > 0:
total = len(parsed_args.trunk)
msg = _("%(result)s of %(total)s trunks failed to delete.") % {
'result': result,
'total': total,
}
raise exceptions.CommandError(msg)
class ListNetworkTrunk(command.Lister):
"""List all network trunks"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
default=False,
help=_("List additional fields in output"),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
data = client.trunks()
headers: tuple[str, ...] = ('ID', 'Name', 'Parent Port', 'Description')
columns: tuple[str, ...] = ('id', 'name', 'port_id', 'description')
if parsed_args.long:
headers += (
'Status',
'State',
'Created At',
'Updated At',
)
columns += ('status', 'admin_state_up', 'created_at', 'updated_at')
return (
headers,
(
osc_utils.get_item_properties(
s,
columns,
formatters=_formatters,
)
for s in data
),
)
class SetNetworkTrunk(command.Command):
"""Set network trunk properties"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'trunk', metavar="<trunk>", help=_("Trunk to modify (name or ID)")
)
parser.add_argument(
'--name', metavar="<name>", help=_("Set trunk name")
)
parser.add_argument(
'--description',
metavar='<description>',
help=_("A description of the trunk"),
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction,
dest='set_subports',
optional_keys=['segmentation-id', 'segmentation-type'],
required_keys=['port'],
help=_(
"Subport to add. Subport is of form "
"'port=<name or ID>,segmentation-type=<segmentation-type>,"
"segmentation-id=<segmentation-ID>' (repeat option "
"to add multiple subports)"
),
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable', action='store_true', help=_("Enable trunk")
)
admin_group.add_argument(
'--disable', action='store_true', help=_("Disable trunk")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
trunk_id = client.find_trunk(parsed_args.trunk)
attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args)
try:
client.update_trunk(trunk_id, **attrs)
except Exception as e:
msg = _("Failed to set trunk '%(t)s': %(e)s") % {
't': parsed_args.trunk,
'e': e,
}
raise exceptions.CommandError(msg)
if parsed_args.set_subports:
subport_attrs = _get_attrs_for_subports(
self.app.client_manager, parsed_args
)
try:
client.add_trunk_subports(trunk_id, subport_attrs)
except Exception as e:
msg = _("Failed to add subports to trunk '%(t)s': %(e)s") % {
't': parsed_args.trunk,
'e': e,
}
raise exceptions.CommandError(msg)
class ShowNetworkTrunk(command.ShowOne):
"""Show information of a given network trunk"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'trunk', metavar="<trunk>", help=_("Trunk to display (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
trunk_id = client.find_trunk(parsed_args.trunk).id
obj = client.get_trunk(trunk_id)
display_columns, columns = _get_columns(obj)
data = osc_utils.get_dict_properties(
obj, columns, formatters=_formatters
)
return display_columns, data
class ListNetworkSubport(command.Lister):
"""List all subports for a given network trunk"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'--trunk',
required=True,
metavar="<trunk>",
help=_("List subports belonging to this trunk (name or ID)"),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
trunk_id = client.find_trunk(parsed_args.trunk)
data = client.get_trunk_subports(trunk_id)
headers: tuple[str, ...] = (
'Port',
'Segmentation Type',
'Segmentation ID',
)
columns: tuple[str, ...] = (
'port_id',
'segmentation_type',
'segmentation_id',
)
return (
headers,
(
osc_utils.get_dict_properties(
s,
columns,
)
for s in data[SUB_PORTS]
),
)
class UnsetNetworkTrunk(command.Command):
"""Unset subports from a given network trunk"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Unset subports from this trunk (name or ID)"),
)
parser.add_argument(
'--subport',
metavar="<subport>",
required=True,
action='append',
dest='unset_subports',
help=_(
"Subport to unset (name or ID of the port) "
"(repeat option to unset multiple subports)"
),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
attrs = _get_attrs_for_subports(self.app.client_manager, parsed_args)
trunk_id = client.find_trunk(parsed_args.trunk)
client.delete_trunk_subports(trunk_id, attrs)
_formatters = {
'admin_state_up': AdminStateColumn,
'sub_ports': format_columns.ListDictColumn,
}
def _get_columns(item):
hidden_columns = ['location', 'tenant_id']
return osc_utils.get_osc_show_columns_for_sdk_resource(
item, {}, hidden_columns
)
def _get_attrs_for_trunk(client_manager, parsed_args):
attrs: dict[str, ty.Any] = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if parsed_args.enable:
attrs['admin_state_up'] = True
if parsed_args.disable:
attrs['admin_state_up'] = False
if 'parent_port' in parsed_args and parsed_args.parent_port is not None:
port_id = client_manager.network.find_port(parsed_args.parent_port)[
'id'
]
attrs['port_id'] = port_id
if 'add_subports' in parsed_args and parsed_args.add_subports is not None:
attrs[SUB_PORTS] = _format_subports(
client_manager, parsed_args.add_subports
)
# "trunk set" command doesn't support setting project.
if 'project' in parsed_args and parsed_args.project is not None:
identity_client = client_manager.identity
project_id = identity_utils.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain,
).id
attrs['tenant_id'] = project_id
return attrs
def _format_subports(client_manager, subports):
attrs = []
for subport in subports:
subport_attrs = {}
if subport.get('port'):
port_id = client_manager.network.find_port(subport['port'])['id']
subport_attrs['port_id'] = port_id
if subport.get('segmentation-id'):
try:
subport_attrs['segmentation_id'] = int(
subport['segmentation-id']
)
except ValueError:
msg = (
_("Segmentation-id '%s' is not an integer")
% subport['segmentation-id']
)
raise exceptions.CommandError(msg)
if subport.get('segmentation-type'):
subport_attrs['segmentation_type'] = subport['segmentation-type']
attrs.append(subport_attrs)
return attrs
def _get_attrs_for_subports(client_manager, parsed_args):
attrs = []
if 'set_subports' in parsed_args and parsed_args.set_subports is not None:
attrs = _format_subports(client_manager, parsed_args.set_subports)
if (
'unset_subports' in parsed_args
and parsed_args.unset_subports is not None
):
subports_list = []
for subport in parsed_args.unset_subports:
port_id = client_manager.network.find_port(subport)['id']
subports_list.append({'port_id': port_id})
attrs = subports_list
return attrs
def _get_id(client, id_or_name, resource):
return client.find_resource(resource, str(id_or_name))['id']