Merge "Fix some new pylint "R" warnings"

This commit is contained in:
Zuul 2023-07-28 06:58:46 +00:00 committed by Gerrit Code Review
commit d32c5f8f32
22 changed files with 68 additions and 53 deletions

View File

@ -97,15 +97,8 @@ disable=
too-many-statements,
consider-using-set-comprehension,
useless-object-inheritance,
use-a-generator,
unnecessary-lambda-assignment,
super-with-arguments,
consider-using-max-builtin,
use-dict-literal,
consider-using-generator,
consider-using-in,
use-list-literal,
consider-using-from-import
use-dict-literal
[BASIC]
# Variable names can be 1 to 31 characters long, with lowercase and underscores

View File

@ -609,6 +609,10 @@ class L3NATAgent(ha.AgentMixin,
self._queue.add(update)
def _process_network_update(self, router_id, network_id):
def _port_belongs(p):
return p['network_id'] == network_id
ri = self.router_info.get(router_id)
if not ri:
return
@ -617,8 +621,7 @@ class L3NATAgent(ha.AgentMixin,
ports = list(ri.internal_ports)
if ri.ex_gw_port:
ports.append(ri.ex_gw_port)
port_belongs = lambda p: p['network_id'] == network_id
if any(port_belongs(p) for p in ports):
if any(_port_belongs(p) for p in ports):
update = queue.ResourceUpdate(
ri.router_id, PRIORITY_SYNC_ROUTERS_TASK)
self._resync_router(update)

View File

@ -923,14 +923,14 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
del self.sg_members[sg_id]
def _find_deleted_sg_rules(self, sg_id):
del_rules = list()
del_rules = []
for pre_rule in self.pre_sg_rules.get(sg_id, []):
if pre_rule not in self.sg_rules.get(sg_id, []):
del_rules.append(pre_rule)
return del_rules
def _find_devices_on_security_group(self, sg_id):
device_list = list()
device_list = []
for device in self.filtered_ports.values():
if sg_id in device.get('security_groups', []):
device_list.append(device)

View File

@ -856,11 +856,14 @@ def _get_rules_by_chain(rules):
def _ensure_all_mac_addresses_are_uppercase(rules):
def _to_upper(pat):
return pat.group(0).upper()
new_rules = []
lowercase_mac_regex = re.compile(r"(?:[0-9a-f]{2}[:]){5}(?:[0-9a-f]{2})")
callback = lambda pat: pat.group(0).upper()
for rule in rules:
new_rules.append(re.sub(lowercase_mac_regex, callback, rule))
new_rules.append(re.sub(lowercase_mac_regex, _to_upper, rule))
return new_rules

View File

@ -171,7 +171,7 @@ class OFPort(object):
self.lla_address = str(netutils.get_ipv6_addr_by_EUI64(
lib_const.IPv6_LLA_PREFIX, self.mac))
self.ofport = ovs_port.ofport
self.sec_groups = list()
self.sec_groups = []
self.fixed_ips = port_dict.get('fixed_ips', [])
self.neutron_port_dict = port_dict.copy()
self.allowed_pairs_v4 = self._get_allowed_pairs(port_dict, version=4)
@ -307,7 +307,7 @@ class ConjIdMap(object):
for table in ovs_consts.OVS_FIREWALL_TABLES])
conj_ids = CONJ_ID_REGEX.findall(" | ".join(flows_iter))
try:
conj_id_max = max([int(conj_id) for conj_id in conj_ids])
conj_id_max = max(int(conj_id) for conj_id in conj_ids)
except ValueError:
conj_id_max = 0

View File

@ -167,9 +167,12 @@ class ResourceConsumerTracker(object):
def report(self):
"""Output debug information about the consumer versions."""
format = lambda versions: pprint.pformat(dict(versions), indent=4)
debug_dict = {'pushed_versions': format(self._versions),
'consumer_versions': format(self._versions_by_consumer)}
def _format(versions):
return pprint.pformat(dict(versions), indent=4)
debug_dict = {'pushed_versions': _format(self._versions),
'consumer_versions': _format(self._versions_by_consumer)}
if self.last_report != debug_dict:
self.last_report = debug_dict
LOG.debug('Tracked resource versions report:\n'

View File

@ -69,11 +69,13 @@ def filter_fields(f):
except (IndexError, ValueError):
return result
do_filter = lambda d: {k: v for k, v in d.items() if k in fields}
def _do_filter(d):
return {k: v for k, v in d.items() if k in fields}
if isinstance(result, list):
return [do_filter(obj) for obj in result]
return [_do_filter(obj) for obj in result]
else:
return do_filter(result)
return _do_filter(result)
return inner_filter

View File

@ -522,7 +522,7 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
dvr_routers):
related_routers = self._get_other_dvr_router_ids_connected_router(
context, router_id)
return any([r in dvr_routers for r in related_routers])
return any(r in dvr_routers for r in related_routers)
def _dvr_handle_unbound_allowed_addr_pair_add(

View File

@ -16,9 +16,9 @@
from neutron_lib.api.definitions import network_ip_availability as apidef
from neutron_lib.api import extensions as api_extensions
import neutron.api.extensions as extensions
import neutron.api.v2.base as base
import neutron.services.network_ip_availability.plugin as plugin
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.services.network_ip_availability import plugin
class Network_ip_availability(api_extensions.APIExtensionDescriptor):

View File

@ -279,9 +279,12 @@ class PortForwarding(base.NeutronDbObject):
@staticmethod
def _unique_port_forwarding(query):
def _row_one(row):
return row[1]
q = query.order_by(l3.FloatingIP.router_id)
keyfunc = lambda row: row[1]
group_iterator = itertools.groupby(q, keyfunc)
group_iterator = itertools.groupby(q, _row_one)
result = []
for key, value in group_iterator:

View File

@ -92,8 +92,7 @@ class PolicyHook(hooks.PecanHook):
if not controller or utils.is_member_action(controller):
return
collection = state.request.context.get('collection')
needs_prefetch = (state.request.method == 'PUT' or
state.request.method == 'DELETE')
needs_prefetch = state.request.method in ('PUT', 'DELETE')
policy.init()
action = controller.plugin_handlers[

View File

@ -208,7 +208,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# Stores the port IDs whose binding has been activated
self.activated_bindings = set()
# Stores smartnic ports update/remove
self.updated_smartnic_ports = list()
self.updated_smartnic_ports = []
# Stores integration bridge smartnic ports data
self.current_smartnic_ports_map = {}
@ -223,7 +223,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# keeps association between ports and ofports to detect ofport change
self.vifname_to_ofport_map = {}
# Stores newly created bridges
self.added_bridges = list()
self.added_bridges = []
self.bridge_mappings = self._parse_bridge_mappings(
ovs_conf.bridge_mappings)
self.rp_bandwidths = place_utils.parse_rp_bandwidths(
@ -2808,7 +2808,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.process_smartnic_ports()
updated_smartnic_ports_copy = (
self.updated_smartnic_ports)
self.updated_smartnic_ports = list()
self.updated_smartnic_ports = []
for port_data in updated_smartnic_ports_copy:
self.treat_smartnic_port(port_data)

View File

@ -963,8 +963,8 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
for route in self._nb_idl.lr_route_list(router.uuid).execute(
check_error=True):
if (route.nexthop == '' and
(route.ip_prefix == n_const.IPv4_ANY or
route.ip_prefix == n_const.IPv6_ANY)):
route.ip_prefix in (n_const.IPv4_ANY,
n_const.IPv6_ANY)):
cmds.append(
self._nb_idl.delete_static_route(
router.name, route.ip_prefix, ''))

View File

@ -1967,8 +1967,8 @@ class OVNClient(object):
if self.is_external_ports_supported():
# If there are no external ports in this network, there's
# no need to check the AZs
if any([p for p in lswitch.ports if
p.type == ovn_const.LSP_TYPE_EXTERNAL]):
if any(p for p in lswitch.ports if
p.type == ovn_const.LSP_TYPE_EXTERNAL):
# Check for changes in the network Availability Zones
ovn_ls_azs = lswitch.external_ids.get(
ovn_const.OVN_AZ_HINTS_EXT_ID_KEY, '')

View File

@ -105,7 +105,7 @@ class ChassisEvent(row_event.RowEvent):
def _get_min_priority_in_hcg(self, ha_chassis_group):
"""Find the next lowest priority number within a HA Chassis Group."""
min_priority = min(
[ch.priority for ch in ha_chassis_group.ha_chassis],
(ch.priority for ch in ha_chassis_group.ha_chassis),
default=ovn_const.HA_CHASSIS_GROUP_HIGHEST_PRIORITY)
return min_priority - 1

View File

@ -146,8 +146,8 @@ def _should_validate_sub_attributes(attribute, sub_attr):
"""Verify that sub-attributes are iterable and should be validated."""
validate = attribute.get('validate')
return (validate and isinstance(sub_attr, abc.Iterable) and
any([k.startswith('type:dict') and
v for (k, v) in validate.items()]))
any(k.startswith('type:dict') and v
for (k, v) in validate.items()))
def _build_subattr_match_rule(attr_name, attr, action, target):
@ -386,11 +386,15 @@ class FieldCheck(policy.Check):
(resource, field, value))
# Value might need conversion - we need help from the attribute map
def _no_conv(x):
return x
try:
attr = attributes.RESOURCES[resource][field]
conv_func = attr['convert_to']
except KeyError:
conv_func = lambda x: x
conv_func = _no_conv
self.field = field
self.resource = resource

View File

@ -177,8 +177,7 @@ def _get_rpc_workers(plugin=None):
if workers is None:
# By default, half as many rpc workers as api workers
workers = int(_get_api_workers() / 2)
if workers < 1:
workers = 1
workers = max(workers, 1)
# If workers > 0 then start_rpc_listeners would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is

View File

@ -175,10 +175,13 @@ def get_logs_bound_port(context, port_id):
project_id=project_id,
resource_type=constants.SECURITY_GROUP,
enabled=True)
is_bound = lambda log: (log.resource_id in port.security_group_ids or
log.target_id == port.id or
(not log.target_id and not log.resource_id))
return [log for log in logs if is_bound(log)]
def _is_bound(log):
return (log.resource_id in port.security_group_ids or
log.target_id == port.id or
(not log.target_id and not log.resource_id))
return [log for log in logs if _is_bound(log)]
def get_logs_bound_sg(context, sg_id=None, project_id=None, port_id=None,

View File

@ -67,7 +67,7 @@ def setup_logging():
def find_deleted_sg_rules(old_port, new_ports):
del_rules = list()
del_rules = []
for port in new_ports:
if old_port.id == port.id:
for rule in old_port.secgroup_rules:

View File

@ -17,8 +17,8 @@ from neutron_lib.api.definitions import network_ip_availability
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions
import neutron.db.db_base_plugin_v2 as db_base_plugin_v2
import neutron.db.network_ip_availability_db as ip_availability_db
from neutron.db import db_base_plugin_v2
from neutron.db import network_ip_availability_db as ip_availability_db
class NetworkIPAvailabilityPlugin(ip_availability_db.IpAvailabilityMixin,

View File

@ -94,12 +94,15 @@ def bridge_has_port(bridge, is_port_predicate):
return any(iface for iface in ifaces if is_port_predicate(iface))
def _is_instance_port(port_name):
return not is_trunk_service_port(port_name)
def bridge_has_instance_port(bridge):
"""True if there is an OVS port that doesn't have bridge or patch ports
prefix.
"""
is_instance_port = lambda p: not is_trunk_service_port(p)
return bridge_has_port(bridge, is_instance_port)
return bridge_has_port(bridge, _is_instance_port)
def bridge_has_service_port(bridge):

View File

@ -123,7 +123,7 @@ class TrunkPlugin(service_base.ServicePluginBase):
def check_driver_compatibility(self):
"""Fail to load if no compatible driver is found."""
if not any([driver.is_loaded for driver in self._drivers]):
if not any(driver.is_loaded for driver in self._drivers):
raise trunk_exc.IncompatibleTrunkPluginConfiguration()
def check_segmentation_compatibility(self):