Merge "pyupgrade changes for Python3.9+"

This commit is contained in:
Zuul 2024-11-01 12:52:10 +00:00 committed by Gerrit Code Review
commit 5d6a720611
472 changed files with 4448 additions and 4169 deletions

View File

@ -183,7 +183,7 @@ def _catch_timeout(f):
class _CatchTimeoutMetaclass(abc.ABCMeta):
def __init__(cls, name, bases, dct):
super(_CatchTimeoutMetaclass, cls).__init__(name, bases, dct)
super().__init__(name, bases, dct)
for name, method in inspect.getmembers(
# NOTE(ihrachys): we should use isroutine because it will catch
# both unbound methods (python2) and functions (python3)
@ -205,7 +205,7 @@ class DietTestCase(base.BaseTestCase, metaclass=_CatchTimeoutMetaclass):
"""
def setUp(self):
super(DietTestCase, self).setUp()
super().setUp()
# NOTE(slaweq): Make deprecation warnings only happen once.
warnings.simplefilter("once", DeprecationWarning)
@ -295,7 +295,7 @@ class DietTestCase(base.BaseTestCase, metaclass=_CatchTimeoutMetaclass):
testtools.content.TracebackContent(
(ctx.type_, ctx.value, ctx.tb), self))
return super(DietTestCase, self).addOnException(safe_handler)
return super().addOnException(safe_handler)
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
@ -371,7 +371,7 @@ class ProcessMonitorFixture(fixtures.Fixture):
class BaseTestCase(DietTestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
super().setUp()
self.useFixture(lockutils.ExternalLockFixture())
self.useFixture(fixture.APIDefinitionFixture())
@ -485,7 +485,7 @@ class BaseTestCase(DietTestCase):
raise_on_exception=False):
class SimpleThread(threading.Thread):
def __init__(self, q):
super(SimpleThread, self).__init__()
super().__init__()
self.q = q
self.exception = None
@ -533,7 +533,7 @@ class BaseTestCase(DietTestCase):
class PluginFixture(fixtures.Fixture):
def __init__(self, core_plugin=None):
super(PluginFixture, self).__init__()
super().__init__()
self.core_plugin = core_plugin
def _setUp(self):
@ -581,7 +581,7 @@ class Timeout(fixtures.Fixture):
"""
def __init__(self, timeout=None, scaling=1):
super(Timeout, self).__init__()
super().__init__()
if timeout is None:
timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
@ -595,6 +595,6 @@ class Timeout(fixtures.Fixture):
raise ValueError('scaling value must be >= 1')
def setUp(self):
super(Timeout, self).setUp()
super().setUp()
if self.test_timeout > 0:
self.useFixture(fixtures.Timeout(self.test_timeout, gentle=True))

View File

@ -98,7 +98,7 @@ def wait_until_pkt_meter_rule_applied_ovs(bridge, port_vif, port_id,
def _pkt_rate_limit_rule_applied():
port_num = bridge.get_port_ofport(port_vif)
port_vlan = bridge.get_port_tag_by_name(port_vif)
key = "%s_%s_%s" % (type_, port_id, direction)
key = "{}_{}_{}".format(type_, port_id, direction)
meter_id = bridge.get_value_from_other_config(
port_vif, key, value_type=int)

View File

@ -32,7 +32,8 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
orig_build_ns_name = namespaces.build_ns_name
def build_ns_name(prefix, identifier):
return "%s%s" % (orig_build_ns_name(prefix, identifier), ns_suffix)
return "{}{}".format(
orig_build_ns_name(prefix, identifier), ns_suffix)
build_ns = mock.patch.object(namespaces, 'build_ns_name').start()
build_ns.side_effect = build_ns_name
@ -58,7 +59,7 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
parse_id = mock.patch.object(namespaces, 'get_id_from_ns_name').start()
parse_id.side_effect = get_id_from_ns_name
super(L3NATAgentForTest, self).__init__(host, conf)
super().__init__(host, conf)
def _create_router(self, router_id, router):
"""Create a router with suffix added to the router namespace name.
@ -67,7 +68,7 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
on the same node.
"""
router = (
super(L3NATAgentForTest, self)._create_router(router_id, router))
super()._create_router(router_id, router))
router.get_internal_device_name = types.MethodType(
get_internal_device_name, router)
@ -80,7 +81,7 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
def _append_suffix(dev_name):
# If dev_name = 'xyz123' and the suffix is 'hostB' then the result
# will be 'xy_stB'
return '%s_%s' % (dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:])
return '{}_{}'.format(dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:])
def get_internal_device_name(ri, port_id):

View File

@ -38,7 +38,7 @@ def get_tunnel_name_full(cls, network_type, local_ip, remote_ip):
local_tunnel_hash = encodeutils.to_utf8(local_tunnel_hash)
source_ip_hash = hashlib.sha1(local_tunnel_hash).hexdigest()[:hashlen]
return '%s-%s-%s' % (network_type, source_ip_hash, remote_ip_hash)
return '{}-{}-{}'.format(network_type, source_ip_hash, remote_ip_hash)
ovs_neutron_agent.OVSNeutronAgent.get_tunnel_name = get_tunnel_name_full

View File

@ -23,7 +23,7 @@ from neutron.tests import base
class ConfigDict(base.AttributeDict):
def update(self, other):
self.convert_to_attr_dict(other)
super(ConfigDict, self).update(other)
super().update(other)
def convert_to_attr_dict(self, other):
"""Convert nested dicts to AttributeDict.
@ -46,7 +46,7 @@ class ConfigFileFixture(fixtures.Fixture):
"""
def __init__(self, base_filename, config, temp_dir):
super(ConfigFileFixture, self).__init__()
super().__init__()
self.base_filename = base_filename
self.config = config
self.temp_dir = temp_dir

View File

@ -180,8 +180,8 @@ class ConnectionTester(fixtures.Fixture):
timeout=icmp_timeout)
except RuntimeError:
raise ConnectionTesterException(
"ICMP packets can't get from %s namespace to %s address" % (
src_namespace, ip_address))
"ICMP packets can't get from {} namespace to {} "
"address".format(src_namespace, ip_address))
def _test_arp_connectivity(self, direction, protocol, src_port, dst_port):
src_namespace, ip_address = self._get_namespace_and_address(direction)
@ -205,8 +205,8 @@ class ConnectionTester(fixtures.Fixture):
except ConnectionTesterException:
pass
else:
dst_port_info = str()
src_port_info = str()
dst_port_info = ''
src_port_info = ''
if dst_port is not None:
dst_port_info = " and destination port %d" % dst_port
if src_port is not None:
@ -399,11 +399,11 @@ class OVSConnectionTester(OVSBaseConnectionTester):
"""
def __init__(self, ip_cidr, br_int_cls):
super(OVSConnectionTester, self).__init__(ip_cidr)
super().__init__(ip_cidr)
self.br_int_cls = br_int_cls
def _setUp(self):
super(OVSConnectionTester, self)._setUp()
super()._setUp()
br_name = self.useFixture(
net_helpers.OVSBridgeFixture()).bridge.br_name
self.bridge = self.br_int_cls(br_name)
@ -487,11 +487,11 @@ class OVSTrunkConnectionTester(OVSBaseConnectionTester):
"""
def __init__(self, ip_cidr, br_trunk_name):
super(OVSTrunkConnectionTester, self).__init__(ip_cidr)
super().__init__(ip_cidr)
self._br_trunk_name = br_trunk_name
def _setUp(self):
super(OVSTrunkConnectionTester, self)._setUp()
super()._setUp()
self.bridge = self.useFixture(
net_helpers.OVSBridgeFixture()).bridge
self.br_trunk = self.useFixture(
@ -569,10 +569,10 @@ class LinuxBridgeConnectionTester(ConnectionTester):
def __init__(self, *args, **kwargs):
self.bridge_name = kwargs.pop('bridge_name', None)
super(LinuxBridgeConnectionTester, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def _setUp(self):
super(LinuxBridgeConnectionTester, self)._setUp()
super()._setUp()
bridge_args = {}
if self.bridge_name:
bridge_args = {'prefix': self.bridge_name,
@ -599,4 +599,4 @@ class LinuxBridgeConnectionTester(ConnectionTester):
def flush_arp_tables(self):
self.bridge.neigh.flush(4, 'all')
super(LinuxBridgeConnectionTester, self).flush_arp_tables()
super().flush_arp_tables()

View File

@ -54,9 +54,9 @@ class ExclusiveIPAddress(resource_allocator.ExclusiveResource):
"""
def __init__(self, low, high):
super(ExclusiveIPAddress, self).__init__(
super().__init__(
'ip_addresses', functools.partial(get_random_ip, low, high))
def _setUp(self):
super(ExclusiveIPAddress, self)._setUp()
super()._setUp()
self.address = netaddr.IPAddress(self.resource)

View File

@ -22,7 +22,7 @@ from neutron.tests.common.exclusive_resources import resource_allocator
def _get_random_network(low, high, netmask):
ip = ip_address.get_random_ip(low, high)
return str(netaddr.IPNetwork("%s/%s" % (ip, netmask)).cidr)
return str(netaddr.IPNetwork("{}/{}".format(ip, netmask)).cidr)
class ExclusiveIPNetwork(resource_allocator.ExclusiveResource):
@ -33,13 +33,13 @@ class ExclusiveIPNetwork(resource_allocator.ExclusiveResource):
"""
def __init__(self, low, high, netmask):
super(ExclusiveIPNetwork, self).__init__(
super().__init__(
'ip_networks',
functools.partial(_get_random_network, low, high, netmask),
self.is_valid)
def _setUp(self):
super(ExclusiveIPNetwork, self)._setUp()
super()._setUp()
self.network = netaddr.IPNetwork(self.resource)
def is_valid(self, new_resource, allocated_resources):

View File

@ -26,11 +26,11 @@ class ExclusivePort(resource_allocator.ExclusiveResource):
"""
def __init__(self, protocol, start=1024, end=None):
super(ExclusivePort, self).__init__(
super().__init__(
'ports',
functools.partial(net_helpers.get_free_namespace_port, protocol,
start=start, end=end))
def _setUp(self):
super(ExclusivePort, self)._setUp()
super()._setUp()
self.port = self.resource

View File

@ -35,7 +35,7 @@ class ExclusiveResource(fixtures.Fixture):
self.addCleanup(self.ra.release, self.resource)
class ResourceAllocator(object):
class ResourceAllocator:
"""ResourceAllocator persists cross-process allocations of a resource.
Allocations are persisted to a file determined by the 'resource_name',
@ -103,9 +103,9 @@ class ResourceAllocator(object):
fileutils.ensure_tree(TMP_DIR, mode=0o755)
try:
with open(self._state_file_path, 'r') as allocations_file:
with open(self._state_file_path) as allocations_file:
contents = allocations_file.read()
except IOError:
except OSError:
contents = None
# If the file was empty, we want to return an empty set, not {''}

View File

@ -167,10 +167,12 @@ def _get_l2_agent_dict(host, agent_type, binary, tunnel_types=None,
def register_ovs_agent(host=HOST, agent_type=constants.AGENT_TYPE_OVS,
binary=constants.AGENT_PROCESS_OVS,
tunnel_types=['vxlan'], tunneling_ip='20.0.0.1',
tunnel_types=None, tunneling_ip='20.0.0.1',
interface_mappings=None, bridge_mappings=None,
l2pop_network_types=None, plugin=None, start_flag=True,
integration_bridge=None):
if tunnel_types is None:
tunnel_types = ['vxlan']
agent = _get_l2_agent_dict(host, agent_type, binary, tunnel_types,
tunneling_ip, interface_mappings,
bridge_mappings, l2pop_network_types,
@ -182,9 +184,11 @@ def register_ovs_agent(host=HOST, agent_type=constants.AGENT_TYPE_OVS,
def register_linuxbridge_agent(host=HOST,
agent_type=constants.AGENT_TYPE_LINUXBRIDGE,
binary=constants.AGENT_PROCESS_LINUXBRIDGE,
tunnel_types=['vxlan'], tunneling_ip='20.0.0.1',
tunnel_types=None, tunneling_ip='20.0.0.1',
interface_mappings=None, bridge_mappings=None,
plugin=None):
if tunnel_types is None:
tunnel_types = ['vxlan']
agent = _get_l2_agent_dict(host, agent_type, binary, tunnel_types,
tunneling_ip=tunneling_ip,
interface_mappings=interface_mappings,
@ -242,7 +246,7 @@ class TestTimerTimeout(Exception):
pass
class TestTimer(object):
class TestTimer:
"""Timer context manager class for testing.
This class can be used inside a fixtures._fixtures.timeout.Timeout context.

View File

@ -24,7 +24,7 @@ from neutron.common import ipv6_utils
_uuid = uuidutils.generate_uuid
class FakeDev(object):
class FakeDev:
def __init__(self, name):
self.name = name
@ -39,10 +39,10 @@ def get_ha_interface(ip='169.254.192.1', mac='12:34:56:78:2b:5d'):
'subnet_id': subnet_id}],
'id': _uuid(),
'mac_address': mac,
'name': u'L3 HA Admin port 0',
'name': 'L3 HA Admin port 0',
'mtu': 1500,
'network_id': _uuid(),
'status': u'ACTIVE',
'status': 'ACTIVE',
'subnets': [{'cidr': '169.254.192.0/18',
'gateway_ip': '169.254.255.254',
'id': subnet_id}],

View File

@ -101,14 +101,14 @@ class FakeMachineBase(fixtures.Fixture):
class FakeMachine(FakeMachineBase):
def __init__(self, bridge, ip_cidr, gateway_ip=None, ipv6_cidr=None):
super(FakeMachine, self).__init__()
super().__init__()
self.bridge = bridge
self._ip_cidr = ip_cidr
self._ipv6_cidr = ipv6_cidr
self.gateway_ip = gateway_ip
def _setUp(self):
super(FakeMachine, self)._setUp()
super()._setUp()
self.port = self.useFixture(
net_helpers.PortFixture.get(self.bridge, self.namespace)).port
@ -169,7 +169,7 @@ class PeerMachines(fixtures.Fixture):
CIDR = '192.168.0.1/24'
def __init__(self, bridge, ip_cidr=None, gateway_ip=None, amount=2):
super(PeerMachines, self).__init__()
super().__init__()
self.bridge = bridge
self.ip_cidr = ip_cidr or self.CIDR
self.gateway_ip = gateway_ip

View File

@ -270,7 +270,7 @@ def create_patch_ports(source, destination):
:param destination: Instance of OVSBridge
"""
common = common_utils.get_rand_name(max_length=4, prefix='')
prefix = '%s-%s-' % (PATCH_PREFIX, common)
prefix = '{}-{}-'.format(PATCH_PREFIX, common)
source_name = common_utils.get_rand_device_name(prefix=prefix)
destination_name = common_utils.get_rand_device_name(prefix=prefix)
@ -316,7 +316,7 @@ class RootHelperProcess(subprocess.Popen):
cmd = shlex.split(root_helper) + cmd
self.child_pid = None
LOG.debug("Spawning process %s", cmd)
super(RootHelperProcess, self).__init__(cmd, *args, **kwargs)
super().__init__(cmd, *args, **kwargs)
self._wait_for_child_process()
def kill(self, sig=signal.SIGKILL, skip_errors=None):
@ -391,7 +391,7 @@ class RootHelperProcess(subprocess.Popen):
return self.poll() is None
class Pinger(object):
class Pinger:
"""Class for sending ICMP packets asynchronously
The aim is to keep sending ICMP packets on background while executing other
@ -480,7 +480,7 @@ class Pinger(object):
"first")
class NetcatTester(object):
class NetcatTester:
TCP = n_const.PROTO_NAME_TCP
UDP = n_const.PROTO_NAME_UDP
SCTP = n_const.PROTO_NAME_SCTP
@ -642,7 +642,7 @@ class NamespaceFixture(fixtures.Fixture):
"""
def __init__(self, prefix=NS_PREFIX):
super(NamespaceFixture, self).__init__()
super().__init__()
self.prefix = prefix
def _setUp(self):
@ -713,7 +713,7 @@ class NamedVethFixture(VethFixture):
"""
def __init__(self, veth0_prefix=VETH0_PREFIX, veth1_prefix=VETH1_PREFIX):
super(NamedVethFixture, self).__init__()
super().__init__()
self.veth0_name = self.get_veth_name(veth0_prefix)
self.veth1_name = self.get_veth_name(veth1_prefix)
@ -742,7 +742,7 @@ class MacvtapFixture(fixtures.Fixture):
:type ip_dev: IPDevice
"""
def __init__(self, src_dev=None, mode=None, prefix=MACVTAP_PREFIX):
super(MacvtapFixture, self).__init__()
super().__init__()
self.src_dev = src_dev
self.mode = mode
self.prefix = prefix
@ -773,7 +773,7 @@ class PortFixture(fixtures.Fixture, metaclass=abc.ABCMeta):
"""
def __init__(self, bridge=None, namespace=None, mac=None, port_id=None):
super(PortFixture, self).__init__()
super().__init__()
self.bridge = bridge
self.namespace = namespace
self.mac = (mac or
@ -786,7 +786,7 @@ class PortFixture(fixtures.Fixture, metaclass=abc.ABCMeta):
@abc.abstractmethod
def _setUp(self):
super(PortFixture, self)._setUp()
super()._setUp()
if not self.bridge:
self.bridge = self.useFixture(self._create_bridge_fixture()).bridge
@ -826,7 +826,7 @@ class OVSBridgeFixture(fixtures.Fixture):
"""
def __init__(self, prefix=BR_PREFIX):
super(OVSBridgeFixture, self).__init__()
super().__init__()
self.prefix = prefix
def _setUp(self):
@ -846,7 +846,7 @@ class OVSTrunkBridgeFixture(OVSBridgeFixture):
class OVSTrunkBridgeFixtureTrunkBridge(fixtures.Fixture):
def __init__(self, trunk_id):
super(OVSTrunkBridgeFixtureTrunkBridge, self).__init__()
super().__init__()
self.trunk_id = trunk_id
def _setUp(self):
@ -860,7 +860,7 @@ class OVSPortFixture(PortFixture):
def __init__(self, bridge=None, namespace=None, mac=None, port_id=None,
hybrid_plug=False):
super(OVSPortFixture, self).__init__(bridge, namespace, mac, port_id)
super().__init__(bridge, namespace, mac, port_id)
self.hybrid_plug = hybrid_plug
self.vlan_tag = None
@ -868,7 +868,7 @@ class OVSPortFixture(PortFixture):
return OVSBridgeFixture()
def _setUp(self):
super(OVSPortFixture, self)._setUp()
super()._setUp()
# because in some tests this port can be used to providing connection
# between linuxbridge agents and vlan_id can be also added to this
@ -963,7 +963,7 @@ class LinuxBridgeFixture(fixtures.Fixture):
"""
def __init__(self, prefix=BR_PREFIX, namespace=UNDEFINED,
prefix_is_full_name=False):
super(LinuxBridgeFixture, self).__init__()
super().__init__()
self.prefix = prefix
self.prefix_is_full_name = prefix_is_full_name
self.namespace = namespace
@ -1011,7 +1011,7 @@ class LinuxBridgePortFixture(PortFixture):
"""
def __init__(self, bridge, namespace=None, mac=None, port_id=None):
super(LinuxBridgePortFixture, self).__init__(
super().__init__(
bridge, namespace, mac, port_id)
# we need to override port_id value here because in Port() class it is
# always generated as random. In LinuxBridgePortFixture we need to have
@ -1023,7 +1023,7 @@ class LinuxBridgePortFixture(PortFixture):
return LinuxBridgeFixture()
def _setUp(self):
super(LinuxBridgePortFixture, self)._setUp()
super()._setUp()
br_port_name = self._get_port_name()
if br_port_name:
self.veth_fixture = self.useFixture(
@ -1052,7 +1052,7 @@ class LinuxBridgePortFixture(PortFixture):
return None
class VethBridge(object):
class VethBridge:
def __init__(self, ports):
self.ports = ports
@ -1089,7 +1089,7 @@ class VethPortFixture(PortFixture):
return VethBridgeFixture()
def _setUp(self):
super(VethPortFixture, self)._setUp()
super()._setUp()
self.port = self.bridge.allocate_port()
ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)

View File

@ -33,7 +33,7 @@ OPTS = [
def _get_namespace_name(id_, suffix=None):
suffix = suffix or cfg.CONF.test_namespace_suffix
return "%s%s%s" % (linux_dhcp.NS_PREFIX, id_, suffix)
return "{}{}{}".format(linux_dhcp.NS_PREFIX, id_, suffix)
def NetModel_init(self, d):
@ -62,7 +62,8 @@ def monkeypatch_dhcplocalprocess_init():
def new_init(self, conf, network, process_monitor, version=None,
plugin=None, segment=None):
network_copy = copy.deepcopy(network)
network_copy.id = "%s%s" % (network.id, cfg.CONF.test_namespace_suffix)
network_copy.id = "{}{}".format(
network.id, cfg.CONF.test_namespace_suffix)
original_init(
self, conf, network_copy, process_monitor, version, plugin,
segment)

View File

@ -55,7 +55,7 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
CLEAN_DB_AFTER_TEST = True
def setUp(self, environment):
super(BaseFullStackTestCase, self).setUp()
super().setUp()
tests_base.setup_test_logging(
cfg.CONF, DEFAULT_LOG_DIR, '%s.txt' % self.get_name())
@ -92,7 +92,7 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
def get_name(self):
class_name, test_name = self.id().split(".")[-2:]
return "%s.%s" % (class_name, test_name)
return "{}.{}".format(class_name, test_name)
def _wait_until_agent_up(self, agent_id):
def _agent_up():

View File

@ -37,7 +37,7 @@ class ClientFixture(fixtures.Fixture):
"""Manage and cleanup neutron resources."""
def __init__(self, client):
super(ClientFixture, self).__init__()
super().__init__()
self.client = client
def _create_resource(self, resource_type, spec):
@ -76,7 +76,7 @@ class ClientFixture(fixtures.Fixture):
def create_local_ip_association(self, local_ip_id, port_id, fixed_ip=None):
delete = self.delete_local_ip_association
path = '/local_ips/{0}/port_associations'.format(local_ip_id)
path = f'/local_ips/{local_ip_id}/port_associations'
body = {'port_association': {'fixed_port_id': port_id}}
if fixed_ip:
body['port_association']['fixed_ip'] = fixed_ip
@ -86,11 +86,11 @@ class ClientFixture(fixtures.Fixture):
return data
def delete_local_ip(self, local_ip_id):
path = "/local-ips/{0}".format(local_ip_id)
path = f"/local-ips/{local_ip_id}"
self.client.delete(path)
def delete_local_ip_association(self, local_ip_id, port_id):
path = "/local_ips/{0}/port_associations/{1}".format(
path = "/local_ips/{}/port_associations/{}".format(
local_ip_id, port_id)
self.client.delete(path)

View File

@ -50,7 +50,7 @@ class ConfigFixture(config_fixtures.ConfigFileFixture):
is initializing a new instance of the class.
"""
def __init__(self, env_desc, host_desc, temp_dir, base_filename):
super(ConfigFixture, self).__init__(
super().__init__(
base_filename, config_fixtures.ConfigDict(), temp_dir)
self.env_desc = env_desc
self.host_desc = host_desc
@ -63,7 +63,7 @@ class NeutronConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir,
connection, rabbitmq_environment, use_local_apipaste=True):
super(NeutronConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='neutron.conf')
self.config.update({
@ -123,7 +123,7 @@ class NeutronConfigFixture(ConfigFixture):
if env_desc.has_placement:
service_plugins = self.config['DEFAULT']['service_plugins']
self.config['DEFAULT']['service_plugins'] = (
'%s,%s' % (service_plugins, 'placement')
'{},{}'.format(service_plugins, 'placement')
)
self.config.update({
'placement': {
@ -150,7 +150,7 @@ class NeutronConfigFixture(ConfigFixture):
start=NEUTRON_SERVER_PORT_START,
end=NEUTRON_SERVER_PORT_END)).port
})
super(NeutronConfigFixture, self)._setUp()
super()._setUp()
def _generate_host(self):
return utils.get_rand_name(prefix='host-')
@ -173,7 +173,7 @@ class NeutronConfigFixture(ConfigFixture):
class ML2ConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, tenant_network_types):
super(ML2ConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='ml2_conf.ini')
mechanism_drivers = self.env_desc.mech_drivers
@ -213,7 +213,7 @@ class ML2ConfigFixture(ConfigFixture):
class OVSConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, local_ip, **kwargs):
super(OVSConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir,
base_filename='openvswitch_agent.ini')
@ -223,7 +223,8 @@ class OVSConfigFixture(ConfigFixture):
'ovs': {
'local_ip': local_ip,
'integration_bridge': self._generate_integration_bridge(),
'bridge_mappings': '%s:%s' % (PHYSICAL_NETWORK_NAME, ext_dev),
'bridge_mappings': '{}:{}'.format(
PHYSICAL_NETWORK_NAME, ext_dev),
'of_inactivity_probe': '0',
'ovsdb_debug': 'True',
},
@ -248,8 +249,8 @@ class OVSConfigFixture(ConfigFixture):
else:
if env_desc.report_bandwidths:
self.config['ovs'][constants.RP_BANDWIDTHS] = \
'%s:%s:%s' % (ext_dev, MINIMUM_BANDWIDTH_EGRESS_KBPS,
MINIMUM_BANDWIDTH_INGRESS_KBPS)
'{}:{}:{}'.format(ext_dev, MINIMUM_BANDWIDTH_EGRESS_KBPS,
MINIMUM_BANDWIDTH_INGRESS_KBPS)
if env_desc.qos:
self.config['agent']['extensions'] = 'qos'
@ -282,7 +283,7 @@ class OVSConfigFixture(ConfigFixture):
start=OVS_OF_PORT_LISTEN_START,
end=OVS_OF_PORT_LISTEN_END)).port
})
super(OVSConfigFixture, self)._setUp()
super()._setUp()
def _generate_integration_bridge(self):
return utils.get_rand_device_name(prefix='br-int')
@ -300,8 +301,8 @@ class OVSConfigFixture(ConfigFixture):
log_dir_path = os.path.join(fullstack_base.DEFAULT_LOG_DIR, test_name)
if not os.path.exists(log_dir_path):
os.mkdir(log_dir_path, 0o755)
return '%s/%s.log' % (log_dir_path,
utils.get_rand_name(prefix="test-sg-"))
return '{}/{}.log'.format(log_dir_path,
utils.get_rand_name(prefix="test-sg-"))
def get_br_int_name(self):
return self.config.ovs.integration_bridge
@ -315,20 +316,19 @@ class OVSConfigFixture(ConfigFixture):
class SRIOVConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, local_ip):
super(SRIOVConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir,
base_filename='sriov_agent.ini')
device1 = utils.get_rand_device_name(prefix='ens5')
device2 = utils.get_rand_device_name(prefix='ens6')
phys_dev_mapping = '%s:%s,%s:%s' % (PHYSICAL_NETWORK_NAME, device1,
PHYSICAL_NETWORK_NAME, device2)
rp_bandwidths = '%s:%s:%s,%s:%s:%s' % (device1,
MINIMUM_BANDWIDTH_EGRESS_KBPS,
MINIMUM_BANDWIDTH_INGRESS_KBPS,
device2,
MINIMUM_BANDWIDTH_EGRESS_KBPS,
MINIMUM_BANDWIDTH_INGRESS_KBPS)
phys_dev_mapping = '{pnn}:{d1},{pnn}:{d2}'.format(
pnn=PHYSICAL_NETWORK_NAME, d1=device1, d2=device2)
rp_bandwidths = '{d1}:{mbek}:{mbik},{d2}:{mbek}:{mbik}'.format(
d1=device1,
d2=device2,
mbek=MINIMUM_BANDWIDTH_EGRESS_KBPS,
mbik=MINIMUM_BANDWIDTH_INGRESS_KBPS)
self.config.update({
'sriov_nic': {
'physical_device_mappings': phys_dev_mapping,
@ -337,13 +337,13 @@ class SRIOVConfigFixture(ConfigFixture):
})
def _setUp(self):
super(SRIOVConfigFixture, self)._setUp()
super()._setUp()
class PlacementConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir):
super(PlacementConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='placement.ini')
self.config.update({
'DEFAULT': {
@ -353,14 +353,14 @@ class PlacementConfigFixture(ConfigFixture):
})
def _setUp(self):
super(PlacementConfigFixture, self)._setUp()
super()._setUp()
class LinuxBridgeConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, local_ip,
physical_device_name):
super(LinuxBridgeConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir,
base_filename="linuxbridge_agent.ini"
)
@ -407,14 +407,14 @@ class LinuxBridgeConfigFixture(ConfigFixture):
def _generate_bridge_mappings(self, device_name):
bridge_mappings_extra = ('_lb' if 'segments' in self.service_plugins
else '')
return '%s%s:%s' % (PHYSICAL_NETWORK_NAME, bridge_mappings_extra,
device_name)
return '{}{}:{}'.format(PHYSICAL_NETWORK_NAME, bridge_mappings_extra,
device_name)
class L3ConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, integration_bridge=None):
super(L3ConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='l3_agent.ini')
if host_desc.l2_agent_type == constants.AGENT_TYPE_OVS:
self._prepare_config_with_ovs_agent(integration_bridge)
@ -462,7 +462,7 @@ class L3ConfigFixture(ConfigFixture):
class DhcpConfigFixture(ConfigFixture):
def __init__(self, env_desc, host_desc, temp_dir, integration_bridge=None):
super(DhcpConfigFixture, self).__init__(
super().__init__(
env_desc, host_desc, temp_dir, base_filename='dhcp_agent.ini')
if host_desc.l2_agent_type == constants.AGENT_TYPE_OVS:
@ -483,7 +483,7 @@ class DhcpConfigFixture(ConfigFixture):
})
def _setUp(self):
super(DhcpConfigFixture, self)._setUp()
super()._setUp()
self.addCleanup(self._clean_dhcp_path)
def _prepare_config_with_ovs_agent(self, integration_bridge):

View File

@ -34,7 +34,7 @@ from neutron.tests.fullstack.resources import process
LOG = logging.getLogger(__name__)
class EnvironmentDescription(object):
class EnvironmentDescription:
"""A set of characteristics of an environment setup.
Does the setup, as a whole, support tunneling? How about l2pop?
@ -85,7 +85,7 @@ class EnvironmentDescription(object):
return self.network_type in ('vxlan', 'gre')
class HostDescription(object):
class HostDescription:
"""A set of characteristics of an environment Host.
What agents should the host spawn? What mode should each agent operate
@ -398,7 +398,7 @@ class Environment(fixtures.Fixture):
:param hosts_desc: A list of HostDescription instances.
"""
super(Environment, self).__init__()
super().__init__()
self.env_desc = env_desc
self.hosts_desc = hosts_desc
self.hosts = []

View File

@ -64,7 +64,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
def __init__(self, host, network_id, tenant_id, safe_client,
neutron_port=None, bridge_name=None, use_dhcp=False,
use_dhcp6=False):
super(FakeFullstackMachine, self).__init__()
super().__init__()
self.host = host
self.tenant_id = tenant_id
self.network_id = network_id
@ -76,7 +76,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
self.use_dhcp6 = use_dhcp6
def _setUp(self):
super(FakeFullstackMachine, self)._setUp()
super()._setUp()
self.bridge = self._get_bridge()
@ -104,8 +104,8 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
self.neutron_port['id'],
{'port': {pbs.HOST_ID: self.host.hostname}})
self.addCleanup(self.safe_client.client.update_port,
self.neutron_port['id'],
{'port': {pbs.HOST_ID: ''}})
self.neutron_port['id'],
{'port': {pbs.HOST_ID: ''}})
def _get_bridge(self):
if self.bridge_name is None:
@ -135,7 +135,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
else:
self._ip = fixed_ip['ip_address']
prefixlen = netaddr.IPNetwork(subnet['subnet']['cidr']).prefixlen
self._ip_cidr = '%s/%s' % (self._ip, prefixlen)
self._ip_cidr = '{}/{}'.format(self._ip, prefixlen)
self.gateway_ip = subnet['subnet']['gateway_ip']
if self.use_dhcp:
@ -159,8 +159,8 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
def _start_async_dhclient(self, port_id, version=constants.IP_VERSION_4):
cmd = ["dhclient", '-%s' % version,
'-lf',
'%s/%s.lease' % (self.host.neutron_config.temp_dir,
port_id),
'{}/{}.lease'.format(self.host.neutron_config.temp_dir,
port_id),
'-sf', self.NO_RESOLV_CONF_DHCLIENT_SCRIPT_PATH,
'--no-pid', '-d', self.port.name]
self.dhclient_async = async_process.AsyncProcess(
@ -244,7 +244,7 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
class FakeFullstackTrunkMachine(FakeFullstackMachine):
def __init__(self, trunk, *args, **kwargs):
super(FakeFullstackTrunkMachine, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.trunk = trunk
def add_vlan_interface(self, mac_address, ip_address, segmentation_id):

View File

@ -38,7 +38,7 @@ CMD_FOLDER = 'agents'
class ProcessFixture(fixtures.Fixture):
def __init__(self, test_name, process_name, exec_name, config_filenames,
namespace=None, slice_name=None):
super(ProcessFixture, self).__init__()
super().__init__()
self.test_name = test_name
self.process_name = process_name
self.exec_name = exec_name
@ -67,7 +67,7 @@ class ProcessFixture(fixtures.Fixture):
fileutils.ensure_tree(log_dir, mode=0o755)
timestamp = datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S-%f")
log_file = "%s--%s.log" % (self.process_name, timestamp)
log_file = "{}--{}.log".format(self.process_name, timestamp)
run_as_root = bool(self.namespace)
exec_name = (self.exec_name
if run_as_root
@ -175,7 +175,7 @@ class ProcessFixture(fixtures.Fixture):
class RabbitmqEnvironmentFixture(fixtures.Fixture):
def __init__(self, host="127.0.0.1"):
super(RabbitmqEnvironmentFixture, self).__init__()
super().__init__()
self.host = host
def _setUp(self):
@ -216,7 +216,7 @@ class NeutronServerFixture(ServiceFixture):
def __init__(self, env_desc, host_desc,
test_name, neutron_cfg_fixture, plugin_cfg_fixture,
service_cfg_fixtures=None):
super(NeutronServerFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -259,7 +259,7 @@ class OVSAgentFixture(ServiceFixture):
def __init__(self, env_desc, host_desc,
test_name, neutron_cfg_fixture, agent_cfg_fixture):
super(OVSAgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -291,7 +291,7 @@ class OVSAgentFixture(ServiceFixture):
class PlacementFixture(ServiceFixture):
def __init__(self, env_desc, host_desc, test_name, placement_cfg_fixture):
super(PlacementFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -313,7 +313,7 @@ class SRIOVAgentFixture(ServiceFixture):
def __init__(self, env_desc, host_desc,
test_name, neutron_cfg_fixture, agent_cfg_fixture):
super(SRIOVAgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -340,7 +340,7 @@ class LinuxBridgeAgentFixture(ServiceFixture):
def __init__(self, env_desc, host_desc, test_name,
neutron_cfg_fixture, agent_cfg_fixture,
namespace=None):
super(LinuxBridgeAgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -371,7 +371,7 @@ class LinuxBridgeAgentFixture(ServiceFixture):
class NamespaceCleanupFixture(ServiceFixture):
def _setUp(self):
super(NamespaceCleanupFixture, self)._setUp()
super()._setUp()
self.addCleanup(self.clean_namespaces)
def clean_namespaces(self):
@ -398,7 +398,7 @@ class L3AgentFixture(NamespaceCleanupFixture):
def __init__(self, env_desc, host_desc, test_name,
neutron_cfg_fixture, l3_agent_cfg_fixture,
namespace=None):
super(L3AgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -408,7 +408,7 @@ class L3AgentFixture(NamespaceCleanupFixture):
self.hostname = self.neutron_cfg_fixture.config['DEFAULT']['host']
def _setUp(self):
super(L3AgentFixture, self)._setUp()
super()._setUp()
self.plugin_config = self.l3_agent_cfg_fixture.config
@ -446,7 +446,7 @@ class DhcpAgentFixture(NamespaceCleanupFixture):
def __init__(self, env_desc, host_desc, test_name,
neutron_cfg_fixture, agent_cfg_fixture, namespace=None):
super(DhcpAgentFixture, self).__init__()
super().__init__()
self.env_desc = env_desc
self.host_desc = host_desc
self.test_name = test_name
@ -455,7 +455,7 @@ class DhcpAgentFixture(NamespaceCleanupFixture):
self.namespace = namespace
def _setUp(self):
super(DhcpAgentFixture, self)._setUp()
super()._setUp()
self.plugin_config = self.agent_cfg_fixture.config

View File

@ -31,7 +31,7 @@ class AlwaysTheOtherAgentScheduler(base_scheduler.BaseChanceScheduler,
def __init__(self):
self.last_selected_agent_ids = []
super(AlwaysTheOtherAgentScheduler, self).__init__(
super().__init__(
dhcp_agent_scheduler.DhcpFilter())
def select(self, plugin, context, resource_hostable_agents,

View File

@ -34,7 +34,7 @@ placement_opts = [
cfg.CONF.register_opts(placement_opts)
class FakePlacement(object):
class FakePlacement:
rp_template = {
"uuid": None,

View File

@ -44,12 +44,12 @@ def _add_new_device_to_agent_config(l2_agent_config, mapping_key_name,
return
new_mappings = 'physnetnew:%s' % new_dev
new_bw = '%s:%s:%s' % (new_dev,
f_const.MINIMUM_BANDWIDTH_EGRESS_KBPS,
f_const.MINIMUM_BANDWIDTH_INGRESS_KBPS)
l2_agent_config[mapping_key_name] = '%s,%s' % (
new_bw = '{}:{}:{}'.format(new_dev,
f_const.MINIMUM_BANDWIDTH_EGRESS_KBPS,
f_const.MINIMUM_BANDWIDTH_INGRESS_KBPS)
l2_agent_config[mapping_key_name] = '{},{}'.format(
old_mappings, new_mappings)
l2_agent_config[constants.RP_BANDWIDTHS] = '%s,%s' % (
l2_agent_config[constants.RP_BANDWIDTHS] = '{},{}'.format(
old_bw, new_bw)
@ -105,7 +105,7 @@ class TestAgentBandwidthReport(base.BaseFullStackTestCase):
)
env = environment.Environment(env_desc, host_desc)
super(TestAgentBandwidthReport, self).setUp(env)
super().setUp(env)
def _check_agent_configurations(self, agent_id, expected_physnets):
agent = self.client.show_agent(agent_id)['agent']
@ -209,7 +209,7 @@ class TestPlacementBandwidthReport(base.BaseFullStackTestCase):
placement_port=self.placement_port
)
env = environment.Environment(env_desc, host_desc)
super(TestPlacementBandwidthReport, self).setUp(env)
super().setUp(env)
def _check_agent_not_synced(self):
return not self._check_agent_synced()

View File

@ -57,7 +57,7 @@ class BaseConnectivitySameNetworkTest(base.BaseFullStackTestCase):
l2_pop=self.l2_pop,
arp_responder=self.arp_responder),
host_descriptions)
super(BaseConnectivitySameNetworkTest, self).setUp(env)
super().setUp(env)
def _prepare_network(self, tenant_uuid):
net_args = {'network_type': self.network_type}

View File

@ -55,7 +55,7 @@ class BaseDhcpAgentTest(base.BaseFullStackTestCase):
),
host_descriptions)
super(BaseDhcpAgentTest, self).setUp(env)
super().setUp(env)
self.project_id = uuidutils.generate_uuid()
if self.boot_vm_for_test:
self._create_network_subnet_and_vm()
@ -200,7 +200,7 @@ class TestDhcpAgentHARaceCondition(BaseDhcpAgentTest):
'AlwaysTheOtherAgentScheduler')
def setUp(self):
super(TestDhcpAgentHARaceCondition, self).setUp()
super().setUp()
self._create_network_with_multiple_subnets()
def _create_network_with_multiple_subnets(self):

View File

@ -51,7 +51,7 @@ class FirewallMigrationTestCase(base.BaseFullStackTestCase):
env = environment.Environment(
environment.EnvironmentDescription(),
host_descriptions)
super(FirewallMigrationTestCase, self).setUp(env)
super().setUp(env)
# fullstack doesn't separate nodes running ovs agent so iptables rules
# are implemented in root namespace
self.iptables_manager = iptables_manager.IptablesManager()

View File

@ -178,7 +178,7 @@ class TestL3Agent(base.BaseFullStackTestCase):
suffix = agent.get_namespace_suffix()
else:
suffix = self.environment.hosts[0].l3_agent.get_namespace_suffix()
return "%s@%s" % (namespace, suffix)
return "{}@{}".format(namespace, suffix)
def _get_l3_agents_with_ha_state(
self, router_id, ha_state=None):
@ -332,7 +332,7 @@ class TestLegacyL3Agent(TestL3Agent):
network_type='vlan', l2_pop=False,
qos=True),
host_descriptions)
super(TestLegacyL3Agent, self).setUp(env)
super().setUp(env)
def test_mtu_update(self):
tenant_id = uuidutils.generate_uuid()
@ -400,7 +400,7 @@ class TestHAL3Agent(TestL3Agent):
agent_down_time=30,
qos=True),
host_descriptions)
super(TestHAL3Agent, self).setUp(env)
super().setUp(env)
def _is_ha_router_active_on_one_agent(self, router_id):
agents = self.client.list_l3_agent_hosting_routers(router_id)
@ -566,7 +566,7 @@ class TestHAL3Agent(TestL3Agent):
self._test_ha_router_failover('disconnect')
def _get_keepalived_state(self, keepalived_state_file):
with open(keepalived_state_file, "r") as fd:
with open(keepalived_state_file) as fd:
return fd.read()
def _get_state_file_for_primary_agent(self, router_id):

View File

@ -46,7 +46,7 @@ class LocalIPTestCase(base.BaseFullStackTestCase):
mech_drivers='openvswitch',
local_ip_ext=True)
env = environment.Environment(env_desc, host_desc)
super(LocalIPTestCase, self).setUp(env)
super().setUp(env)
self.project_id = uuidutils.generate_uuid()
self.network = self.safe_client.create_network(

View File

@ -37,7 +37,7 @@ class BaseLoggingTestCase(base.BaseFullStackTestCase):
env_desc = environment.EnvironmentDescription(
mech_drivers='openvswitch', log=True)
env = environment.Environment(env_desc, host_desc)
super(BaseLoggingTestCase, self).setUp(env)
super().setUp(env)
self.tenant_id = uuidutils.generate_uuid()
self.network = self.safe_client.create_network(
@ -61,7 +61,7 @@ class BaseLoggingTestCase(base.BaseFullStackTestCase):
flows = vm.bridge.dump_flows_for_table(table)
flows_list = flows.splitlines()
pattern = re.compile(
r"^.* table=%s.* actions=%s" % (table, actions))
r"^.* table={}.* actions={}".format(table, actions))
for flow in flows_list:
if pattern.match(flow.strip()):
return True
@ -74,7 +74,7 @@ class BaseLoggingTestCase(base.BaseFullStackTestCase):
def _is_log_event(log_id, action, regex_str):
regex_p = re.compile(
r"^.*action=%s.* log_resource_ids=\[[^\]]*%s" % (
r"^.*action={}.* log_resource_ids=\[[^\]]*{}".format(
action, log_id) + ".*" + regex_str if regex_str else "")
with open(config.network_log.local_output_log_base) as f:

View File

@ -29,7 +29,7 @@ class MTUNetworkTestSetup(base.BaseFullStackTestCase):
env = environment.Environment(
environment.EnvironmentDescription(),
self.host_desc)
super(MTUNetworkTestSetup, self).setUp(env)
super().setUp(env)
self.tenant_id = uuidutils.generate_uuid()

View File

@ -50,7 +50,7 @@ class TestMultiSegs(base.BaseFullStackTestCase):
),
host_descriptions)
super(TestMultiSegs, self).setUp(env)
super().setUp(env)
self.project_id = uuidutils.generate_uuid()
def _spawn_vm(self, neutron_port=None):

View File

@ -39,7 +39,7 @@ class OvsDHCPExtensionTestCase(base.BaseFullStackTestCase):
mech_drivers='openvswitch',
enable_traditional_dhcp=False)
env = environment.Environment(env_desc, host_desc)
super(OvsDHCPExtensionTestCase, self).setUp(env)
super().setUp(env)
self.tenant_id = uuidutils.generate_uuid()
network = self.safe_client.create_network(

View File

@ -34,7 +34,7 @@ class TestPortsApi(base.BaseFullStackTestCase):
agent_down_time=10,
ml2_extension_drivers=['uplink_status_propagation']),
host_descriptions)
super(TestPortsApi, self).setUp(env)
super().setUp(env)
self.tenant_id = uuidutils.generate_uuid()
self.network = self.safe_client.create_network(self.tenant_id)

View File

@ -36,7 +36,7 @@ class TestPortsBinding(base.BaseFullStackTestCase):
agent_down_time=10),
host_descriptions)
super(TestPortsBinding, self).setUp(env)
super().setUp(env)
self.l2_agent_process = self.environment.hosts[0].l2_agent
self.l2_agent = self.safe_client.client.list_agents(

View File

@ -44,7 +44,7 @@ class TestPortsRebind(base.BaseFullStackTestCase):
agent_down_time=10),
host_descriptions)
super(TestPortsRebind, self).setUp(env)
super().setUp(env)
self.l2_agent_process = self.environment.hosts[0].l2_agent
self.l2_agent = self.safe_client.client.list_agents(
@ -143,7 +143,7 @@ class TestRouterPortRebind(TestPortsRebind):
use_l3_agent = True
def setUp(self):
super(TestRouterPortRebind, self).setUp()
super().setUp()
self.tenant_id = uuidutils.generate_uuid()
self.ext_net = self.safe_client.create_network(

View File

@ -15,7 +15,7 @@
import functools
from neutron_lib import constants
from neutron_lib.plugins.ml2 import ovs_constants as ovs_constants
from neutron_lib.plugins.ml2 import ovs_constants
from neutron_lib.services.qos import constants as qos_consts
from neutronclient.common import exceptions
from oslo_utils import uuidutils
@ -42,7 +42,7 @@ PACKET_RATE_LIMIT = 10000
PACKET_RATE_BURST = 1000
class BaseQoSRuleTestCase(object):
class BaseQoSRuleTestCase:
number_of_hosts = 1
physical_network = None
@ -63,7 +63,7 @@ class BaseQoSRuleTestCase(object):
agent_down_time=10,
qos=True)
env = environment.Environment(env_desc, host_desc)
super(BaseQoSRuleTestCase, self).setUp(env)
super().setUp(env)
self.l2_agent_process = self.environment.hosts[0].l2_agent
self.l2_agent = self.safe_client.client.list_agents(
agent_type=self.l2_agent_type)['agents'][0]
@ -190,9 +190,9 @@ class _TestBwLimitQoS(BaseQoSRuleTestCase):
self.l2_agent_process.start()
self._wait_until_agent_up(self.l2_agent['id'])
all_directions = set([self.direction, self.reverse_direction])
all_directions = {self.direction, self.reverse_direction}
for final_rule in final_rules:
all_directions -= set([final_rule['direction']])
all_directions -= {final_rule['direction']}
self._wait_for_bw_rule_applied(
vm, final_rule.get('limit'),
final_rule.get('burst'), final_rule['direction'])
@ -552,7 +552,7 @@ class TestQoSPolicyIsDefault(base.BaseFullStackTestCase):
host_desc = [] # No need to register agents for this test case
env_desc = environment.EnvironmentDescription(qos=True)
env = environment.Environment(env_desc, host_desc)
super(TestQoSPolicyIsDefault, self).setUp(env)
super().setUp(env)
def _create_qos_policy(self, project_id, is_default):
return self.safe_client.create_qos_policy(

View File

@ -34,7 +34,7 @@ class StatelessRulesNotConfiguredException(Exception):
pass
class OVSVersionChecker(object):
class OVSVersionChecker:
conntrack_supported = None
sanity_check.setup_conf()
@ -61,7 +61,7 @@ class BaseSecurityGroupsSameNetworkTest(base.BaseFullStackTestCase):
network_type=self.network_type,
debug_iptables=debug_iptables),
host_descriptions)
super(BaseSecurityGroupsSameNetworkTest, self).setUp(env)
super().setUp(env)
if (self.firewall_driver == 'openvswitch' and
not OVSVersionChecker.supports_ovsfirewall()):

View File

@ -36,7 +36,7 @@ class BaseSegmentationIdTest(base.BaseFullStackTestCase):
network_type=self.network_type),
host_descriptions)
super(BaseSegmentationIdTest, self).setUp(env)
super().setUp(env)
self.project_id = uuidutils.generate_uuid()
def _create_network(self):

View File

@ -33,7 +33,7 @@ class TestSubnet(base.BaseFullStackTestCase):
environment.EnvironmentDescription(network_type='vlan',
l2_pop=False),
host_descriptions)
super(TestSubnet, self).setUp(env)
super().setUp(env)
self._project_id = uuidutils.generate_uuid()
self._network = self._create_network(self._project_id)

View File

@ -45,14 +45,14 @@ class WaitForPortCreateEvent(event.WaitEvent):
table = 'Port'
events = (self.ROW_CREATE, self.ROW_UPDATE)
conditions = (('name', '=', port_name),)
super(WaitForPortCreateEvent, self).__init__(
super().__init__(
events, table, conditions, timeout=15)
class BaseOVSTestCase(base.BaseSudoTestCase):
def setUp(self):
super(BaseOVSTestCase, self).setUp()
super().setUp()
self.br_name = ('br-' + uuidutils.generate_uuid())[:10]
self.port_id = ('port-' + uuidutils.generate_uuid())[:8]
self.ovs = ovs_lib.OVSBridge(self.br_name)

View File

@ -29,8 +29,8 @@ class SimpleInterfaceMonitorTestCase(base.BaseSudoTestCase):
def _check_port_events(self, monitor, ports_expected=None,
ports_not_expected=None):
ports_expected = ports_expected or set([])
ports_not_expected = ports_not_expected or set([])
ports_expected = ports_expected or set()
ports_not_expected = ports_not_expected or set()
added_events = monitor.get_events().get('added', [])
added_port_names = {port['name'] for port in added_events}
intersection = ports_not_expected & added_port_names

View File

@ -46,7 +46,7 @@ from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
class OVSOFControllerHelper(object):
class OVSOFControllerHelper:
"""Helper class that runs os-ken openflow controller."""
def start_of_controller(self, conf):
@ -111,7 +111,7 @@ class OVSOFControllerHelper(object):
class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper):
def setUp(self):
super(OVSAgentTestFramework, self).setUp()
super().setUp()
agent_rpc = ('neutron.plugins.ml2.drivers.openvswitch.agent.'
'ovs_neutron_agent.OVSPluginApi')
mock.patch(agent_rpc).start()

View File

@ -63,15 +63,15 @@ class OVSAgentQoSExtensionTestFramework(base.OVSAgentTestFramework):
max_burst_kbps=9)
def setUp(self):
super(OVSAgentQoSExtensionTestFramework, self).setUp()
super().setUp()
self.config.set_override('extensions', ['qos'], 'agent')
self._set_pull_mock()
self.set_test_qos_rules(TEST_POLICY_ID1,
[self.test_bw_limit_rule_1,
self.test_dscp_marking_rule_1])
self.test_dscp_marking_rule_1])
self.set_test_qos_rules(TEST_POLICY_ID2,
[self.test_bw_limit_rule_2,
self.test_dscp_marking_rule_2])
self.test_dscp_marking_rule_2])
def _set_pull_mock(self):
@ -101,15 +101,13 @@ class OVSAgentQoSExtensionTestFramework(base.OVSAgentTestFramework):
self.qos_policies[policy_id] = qos_policy
def _create_test_port_dict(self, policy_id=None):
port_dict = super(OVSAgentQoSExtensionTestFramework,
self)._create_test_port_dict()
port_dict = super()._create_test_port_dict()
port_dict['qos_policy_id'] = policy_id
port_dict['qos_network_policy_id'] = None
return port_dict
def _get_device_details(self, port, network):
dev = super(OVSAgentQoSExtensionTestFramework,
self)._get_device_details(port, network)
dev = super()._get_device_details(port, network)
dev['qos_policy_id'] = port['qos_policy_id']
return dev
@ -184,7 +182,7 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
]
def setUp(self):
super(TestOVSAgentQosExtension, self).setUp()
super().setUp()
self.test_bw_limit_rule_1.direction = self.direction
self.test_bw_limit_rule_2.direction = self.direction

View File

@ -66,7 +66,7 @@ class L3AgentFipQoSExtensionTestFramework(framework.L3AgentTestFramework):
direction=constants.EGRESS_DIRECTION)
def setUp(self):
super(L3AgentFipQoSExtensionTestFramework, self).setUp()
super().setUp()
self.conf.set_override('extensions', ['fip_qos'], 'agent')
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)

View File

@ -33,7 +33,7 @@ class L3AgentConntrackHelperExtensionTestFramework(
framework.L3AgentTestFramework):
def setUp(self):
super(L3AgentConntrackHelperExtensionTestFramework, self).setUp()
super().setUp()
self.conf.set_override('extensions', ['conntrack_helper'], 'agent')
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)

View File

@ -66,7 +66,7 @@ class RouterGatewayIPQosAgentExtensionTestFramework(
direction=constants.EGRESS_DIRECTION)
def setUp(self):
super(RouterGatewayIPQosAgentExtensionTestFramework, self).setUp()
super().setUp()
self.conf.set_override('extensions', ['gateway_ip_qos'], 'agent')
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)

View File

@ -38,7 +38,7 @@ HOSTNAME = 'agent1'
class L3AgentNDPProxyTestFramework(framework.L3AgentTestFramework):
def setUp(self):
super(L3AgentNDPProxyTestFramework, self).setUp()
super().setUp()
# TODO(slaweq): Investigate why those tests are failing with enabled
# debug_iptables_rules config option, but for now lets just disable it
cfg.CONF.set_override('debug_iptables_rules', False, group='AGENT')
@ -176,8 +176,8 @@ class L3AgentNDPProxyTestFramework(framework.L3AgentTestFramework):
expected_iptable_rules = []
expected_proxy_address = []
for ndp_proxy in self.ndp_proxies:
rule = '-i %s --destination %s -j ACCEPT' % (interface_name,
ndp_proxy.ip_address)
rule = '-i {} --destination {} -j ACCEPT'.format(
interface_name, ndp_proxy.ip_address)
rule_obj = iptable_mng.IptablesRule('NDP', rule, True, True,
iptables_manager.wrap_name)
expected_iptable_rules.append(rule_obj)
@ -270,9 +270,7 @@ class TestL3AgentNDPProxyExtensionDVR(test_dvr_router.TestDvrRouter,
self.agent._process_updated_router(ri.router)
self._assert_ndp_proxy_state_iptable_rules_is_set(
ri, iptables_manager, interface_name)
super(
TestL3AgentNDPProxyExtensionDVR,
self)._assect_ndp_proxy_rules_is_set(
super()._assect_ndp_proxy_rules_is_set(
ip_wrapper, iptables_manager,
interface_name, namespace)
ri.router['enable_ndp_proxy'] = False

View File

@ -37,7 +37,7 @@ class L3AgentFipPortForwardingExtensionTestFramework(
framework.L3AgentTestFramework):
def setUp(self):
super(L3AgentFipPortForwardingExtensionTestFramework, self).setUp()
super().setUp()
self.conf.set_override('extensions', ['port_forwarding'], 'agent')
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)
@ -142,7 +142,7 @@ class L3AgentFipPortForwardingExtensionTestFramework(
conf_path = os.path.join(keepalived_pm.pids_path, keepalived_pm.uuid,
'keepalived.conf')
regex = "%s dev %s" % (fip_pf, interface_name)
regex = "{} dev {}".format(fip_pf, interface_name)
pattern = re.compile(regex)
def check_harouter_fip_is_set():

View File

@ -94,7 +94,7 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
NESTED_NAMESPACE_SEPARATOR = '@'
def setUp(self):
super(L3AgentTestFramework, self).setUp()
super().setUp()
self.mock_plugin_api = mock.patch(
'neutron.agent.l3.agent.L3PluginApi').start().return_value
mock.patch('neutron.agent.rpc.PluginReportStateAPI').start()
@ -238,9 +238,9 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
self.assertTrue(rpc.called)
# Assert that every defined FIP is updated via RPC
expected_fips = set([
expected_fips = {
(fip['id'], constants.FLOATINGIP_STATUS_ACTIVE) for fip in
router.router[constants.FLOATINGIP_KEY]])
router.router[constants.FLOATINGIP_KEY]}
call = [args[0] for args in rpc.call_args_list][0]
actual_fips = set(list(call[2].items()))
self.assertEqual(expected_fips, actual_fips)
@ -344,8 +344,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
slaac = constants.IPV6_SLAAC
slaac_mode = {'ra_mode': slaac, 'address_mode': slaac}
subnet_modes = [slaac_mode] * 2
self._add_internal_interface_by_subnet(router.router,
count=2, ip_version=constants.IP_VERSION_6,
self._add_internal_interface_by_subnet(
router.router, count=2, ip_version=constants.IP_VERSION_6,
ipv6_subnet_modes=subnet_modes)
router.process()
@ -587,7 +587,7 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
for ip_version in ip_versions:
_routes = ip_lib.list_ip_routes(ns_name, ip_version)
routes.extend(_routes)
routes = set(route['cidr'] for route in routes)
routes = {route['cidr'] for route in routes}
ex_gw_port = router.get_ex_gw_port()
if not ex_gw_port:
if not enable_gw:
@ -614,29 +614,29 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
def _create_router(self, router_info, agent):
ns_name = "%s%s%s" % (
ns_name = "{}{}{}".format(
'qrouter-' + router_info['id'],
self.NESTED_NAMESPACE_SEPARATOR, agent.host)
ext_name = "qg-%s-%s" % (agent.host, _uuid()[-4:])
int_name = "qr-%s-%s" % (agent.host, _uuid()[-4:])
ext_name = "qg-{}-{}".format(agent.host, _uuid()[-4:])
int_name = "qr-{}-{}".format(agent.host, _uuid()[-4:])
get_ns_name = mock.patch.object(
namespaces.RouterNamespace, '_get_ns_name').start()
get_ns_name.return_value = ns_name
get_ext_name = mock.patch.object(l3_router_info.RouterInfo,
'get_external_device_name').start()
'get_external_device_name').start()
get_ext_name.return_value = ext_name
get_int_name = mock.patch.object(l3_router_info.RouterInfo,
'get_internal_device_name').start()
'get_internal_device_name').start()
get_int_name.return_value = int_name
router = self.manage_router(agent, router_info)
router_ext_name = mock.patch.object(router,
'get_external_device_name').start()
'get_external_device_name').start()
router_ext_name.return_value = get_ext_name.return_value
router_int_name = mock.patch.object(router,
'get_internal_device_name').start()
'get_internal_device_name').start()
router_int_name.return_value = get_int_name.return_value
return router

View File

@ -191,7 +191,7 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
_safe_fipnamespace_delete_on_ext_net,
router['gw_port']['network_id'])
return super(TestDvrRouter, self).manage_router(agent, router)
return super().manage_router(agent, router)
def test_dvr_update_floatingip_statuses(self):
self.agent.conf.agent_mode = 'dvr'
@ -1282,7 +1282,7 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
snat_ns = router_updated.snat_namespace.name
fixed_ip_cent = centralized_floatingip['fixed_ip_address']
router_updated.get_centralized_fip_cidr_set = mock.Mock(
return_value=set(["19.4.4.3/32"]))
return_value={"19.4.4.3/32"})
self.assertTrue(self._assert_iptables_rules_not_exist(
router_updated.snat_iptables_manager, 'nat', expected_rules))
port = router_updated.get_ex_gw_port()
@ -1303,9 +1303,9 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
router = self.agent.router_info[router_info['id']]
centralized_fips = router.get_centralized_fip_cidr_set()
if expected_result_empty:
self.assertEqual(set([]), centralized_fips)
self.assertEqual(set(), centralized_fips)
else:
self.assertNotEqual(set([]), centralized_fips)
self.assertNotEqual(set(), centralized_fips)
def test_get_centralized_fip_cidr_set(self):
router_info = self.generate_dvr_router_info(
@ -1815,8 +1815,8 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
# When floatingip are associated, make sure that the
# corresponding rules and routes in route table are created
# for the router.
expected_rule = {u'from': lib_constants.IPv4_ANY,
u'iif': fip_ns_int_name,
expected_rule = {'from': lib_constants.IPv4_ANY,
'iif': fip_ns_int_name,
'priority': str(router_fip_table_idx),
'table': str(router_fip_table_idx),
'type': 'unicast'}
@ -2264,9 +2264,9 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
routes = ip_device.route.list_onlink_routes(lib_constants.IP_VERSION_4)
self.assertGreater(len(routes), 0)
self.assertEqual(len(fip_agent_gw_port['extra_subnets']), len(routes))
extra_subnet_cidr = set(extra_subnet['cidr'] for extra_subnet
in fip_agent_gw_port['extra_subnets'])
routes_cidr = set(route['cidr'] for route in routes)
extra_subnet_cidr = {extra_subnet['cidr'] for extra_subnet
in fip_agent_gw_port['extra_subnets']}
routes_cidr = {route['cidr'] for route in routes}
self.assertEqual(extra_subnet_cidr, routes_cidr)
def test_dvr_router_update_ecmp_routes(self):

View File

@ -249,8 +249,8 @@ class L3HATestCase(framework.L3AgentTestFramework):
_check_lla_status(router1, False)
def test_ha_router_process_ipv6_subnets_to_existing_port(self):
router_info = self.generate_router_info(enable_ha=True,
ip_version=constants.IP_VERSION_6)
router_info = self.generate_router_info(
enable_ha=True, ip_version=constants.IP_VERSION_6)
router = self.manage_router(self.agent, router_info)
def verify_ip_in_keepalived_config(router, iface):
@ -264,10 +264,9 @@ class L3HATestCase(framework.L3AgentTestFramework):
slaac_mode = {'ra_mode': slaac, 'address_mode': slaac}
# Add a second IPv6 subnet to the router internal interface.
self._add_internal_interface_by_subnet(router.router, count=1,
ip_version=constants.IP_VERSION_6,
ipv6_subnet_modes=[slaac_mode],
interface_id=interface_id)
self._add_internal_interface_by_subnet(
router.router, count=1, ip_version=constants.IP_VERSION_6,
ipv6_subnet_modes=[slaac_mode], interface_id=interface_id)
router.process()
self.wait_until_ha_router_has_state(router, 'primary')
@ -487,7 +486,7 @@ class L3HATestCase(framework.L3AgentTestFramework):
class L3HATestFailover(framework.L3AgentTestFramework):
def setUp(self):
super(L3HATestFailover, self).setUp()
super().setUp()
conf = self._configure_agent('agent2')
self.failover_agent = neutron_l3_agent.L3NATAgentWithStateReport(
'agent2', conf)

View File

@ -37,7 +37,7 @@ def has_expected_arp_entry(device_name, namespace, ip, mac):
class TestMonitorDaemon(base.BaseLoggingTestCase):
def setUp(self):
super(TestMonitorDaemon, self).setUp()
super().setUp()
self.conf_dir = self.get_default_temp_dir().path
self.pid_file = os.path.join(self.conf_dir, 'pid_file')
self.log_file = os.path.join(self.conf_dir, 'log_file')
@ -103,11 +103,13 @@ class TestMonitorDaemon(base.BaseLoggingTestCase):
namespace=self.router.namespace).get_devices():
devices[dev.name] = [addr['cidr'] for addr in dev.addr.list()]
# NOTE: we need to read here the content of the file.
self.fail(
'Text not found in file %(file_name)s: "%(text)s".\nDevice '
'addresses: %(devices)s.\nFile content:\n%(file_content)s' %
{'file_name': file_name, 'text': text, 'devices': devices,
'file_content': open(file_name).read()})
with open(file_name) as fn:
self.fail(
'Text not found in file %(file_name)s: "%(text)s".\n '
'Device addresses: %(devices)s.\nFile content:\n'
'%(file_content)s' %
{'file_name': file_name, 'text': text, 'devices': devices,
'file_content': fn.read()})
def test_read_queue_change_state(self):
self._run_monitor()
@ -121,13 +123,15 @@ class TestMonitorDaemon(base.BaseLoggingTestCase):
# No tracked IP (self.cidr) is configured in the monitored interface
# (self.router.port)
self._run_monitor()
msg = 'Initial status of router %s is %s' % (self.router_id, 'backup')
msg = 'Initial status of router {} is {}'.format(
self.router_id, 'backup')
self._search_in_file(self.log_file, msg)
def test_handle_initial_state_primary(self):
self.router.port.addr.add(self.cidr)
self._run_monitor()
msg = 'Initial status of router %s is %s' % (self.router_id, 'primary')
msg = 'Initial status of router {} is {}'.format(
self.router_id, 'primary')
self._search_in_file(self.log_file, msg)
def test_handle_initial_state_backup_error_reading_initial_status(self):
@ -146,5 +150,6 @@ class TestMonitorDaemon(base.BaseLoggingTestCase):
msg = ('Timeout reading the initial status of router %s' %
self.router_id)
self._search_in_file(self.log_file, msg)
msg = 'Initial status of router %s is %s' % (self.router_id, 'backup')
msg = 'Initial status of router {} is {}'.format(
self.router_id, 'backup')
self._search_in_file(self.log_file, msg)

View File

@ -36,7 +36,7 @@ METADATA_REQUEST_SLEEP = 5
TOO_MANY_REQUESTS_CODE = '429'
class MetadataFakeProxyHandler(object):
class MetadataFakeProxyHandler:
def __init__(self, status):
self.status = status
@ -274,7 +274,7 @@ class UnprivilegedUserMetadataL3AgentTestCase(MetadataL3AgentTestCase):
SOCKET_MODE = 0o664
def setUp(self):
super(UnprivilegedUserMetadataL3AgentTestCase, self).setUp()
super().setUp()
self.agent.conf.set_override('metadata_proxy_user', '65534')
@ -290,6 +290,6 @@ class UnprivilegedUserGroupMetadataL3AgentTestCase(MetadataL3AgentTestCase):
SOCKET_MODE = 0o666
def setUp(self):
super(UnprivilegedUserGroupMetadataL3AgentTestCase, self).setUp()
super().setUp()
self.agent.conf.set_override('metadata_proxy_user', '65534')
self.agent.conf.set_override('metadata_proxy_group', '65534')

View File

@ -30,7 +30,7 @@ _uuid = uuidutils.generate_uuid
class NamespaceManagerTestFramework(base.BaseSudoTestCase):
def setUp(self):
super(NamespaceManagerTestFramework, self).setUp()
super().setUp()
self.agent_conf = cfg.CONF
self.metadata_driver_mock = mock.Mock()
self.namespace_manager = namespace_manager.NamespaceManager(

View File

@ -26,7 +26,7 @@ class RecursivePermDirFixture(fixtures.Fixture):
"""Ensure at least perms permissions on directory and ancestors."""
def __init__(self, directory, perms):
super(RecursivePermDirFixture, self).__init__()
super().__init__()
self.directory = directory
self.least_perms = perms
@ -45,7 +45,7 @@ class AdminDirFixture(fixtures.Fixture):
"""Handle directory create/delete with admin permissions required"""
def __init__(self, directory):
super(AdminDirFixture, self).__init__()
super().__init__()
self.directory = directory
def _setUp(self):
@ -64,7 +64,7 @@ class SleepyProcessFixture(fixtures.Fixture):
"""Process fixture to perform time.sleep for a given number of seconds."""
def __init__(self, timeout=60):
super(SleepyProcessFixture, self).__init__()
super().__init__()
self.timeout = timeout
@staticmethod

View File

@ -23,7 +23,7 @@ from neutron.tests.functional import base
class TestGetTagFromOtherConfig(base.BaseSudoTestCase):
def setUp(self):
super(TestGetTagFromOtherConfig, self).setUp()
super().setUp()
self.bridge = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
def set_port_tag(self, port_name, tag):

View File

@ -27,7 +27,7 @@ from neutron.tests.functional import base
class TestHelper(base.BaseSudoTestCase):
def setUp(self):
super(TestHelper, self).setUp()
super().setUp()
self.bridge = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
self.iptables_firewall = (
@ -79,8 +79,8 @@ class TestHelper(base.BaseSudoTestCase):
self.iptables_firewall.iptables.get_rules_for_table('filter'))
for line in iptables_rules:
if tap_name in line:
raise Exception("port %s still has iptables rules in %s" % (
tap_name, line))
raise Exception("port {} still has iptables rules "
"in {}".format(tap_name, line))
def test_migration(self):
sg_rules = [{'ethertype': constants.IPv4,

View File

@ -24,7 +24,7 @@ from neutron.tests.functional import base
class AsyncProcessTestFramework(base.BaseLoggingTestCase):
def setUp(self):
super(AsyncProcessTestFramework, self).setUp()
super().setUp()
self.test_file_path = self.get_temp_file_path('test_async_process.tmp')
self.data = [str(x) for x in range(4)]
with open(self.test_file_path, 'w') as f:

View File

@ -32,7 +32,7 @@ MAC_ALL_NODES_ADDRESS = '33:33:00:00:00:01'
class BridgeLibTestCase(base.BaseSudoTestCase):
def setUp(self):
super(BridgeLibTestCase, self).setUp()
super().setUp()
self.bridge, self.port_fixture = self.create_bridge_port_fixture()
def create_bridge_port_fixture(self):
@ -96,12 +96,12 @@ class BridgeLibTestCase(base.BaseSudoTestCase):
self.bridge.name)
# first, make sure it's enabled
with open(sysfs_path, 'r') as sysfs_disable_ipv6_file:
with open(sysfs_path) as sysfs_disable_ipv6_file:
sysfs_disable_ipv6 = sysfs_disable_ipv6_file.read()
self.assertEqual("0\n", sysfs_disable_ipv6)
self.assertEqual(0, self.bridge.disable_ipv6())
with open(sysfs_path, 'r') as sysfs_disable_ipv6_file:
with open(sysfs_path) as sysfs_disable_ipv6_file:
sysfs_disable_ipv6 = sysfs_disable_ipv6_file.read()
self.assertEqual("1\n", sysfs_disable_ipv6)
@ -140,7 +140,7 @@ class FdbInterfaceTestCase(testscenarios.WithScenarios, base.BaseSudoTestCase):
]
def setUp(self):
super(FdbInterfaceTestCase, self).setUp()
super().setUp()
_uuid = uuidutils.generate_uuid()
self.device = ('int_' + _uuid)[:constants.DEVICE_NAME_MAX_LEN]
self.device_vxlan = ('vxlan_' + _uuid)[:constants.DEVICE_NAME_MAX_LEN]
@ -198,7 +198,7 @@ class FdbInterfaceTestCase(testscenarios.WithScenarios, base.BaseSudoTestCase):
dev=device, namespace=self.namespace).items():
self.assertEqual(device, _device)
for _ in (fdb for fdb in fdbs if fdb['mac'] == mac_address and
fdb['dst_ip'] == ip_address):
fdb['dst_ip'] == ip_address):
return
self.fail(msg)

View File

@ -32,7 +32,7 @@ from neutron.tests.functional import base as functional_base
class TestDhcp(functional_base.BaseSudoTestCase):
def setUp(self):
super(TestDhcp, self).setUp()
super().setUp()
conf = cfg.ConfigOpts()
config.register_interface_driver_opts_helper(conf)
config.register_interface_opts(conf)

View File

@ -29,7 +29,7 @@ from neutron.tests.functional.agent.linux import base as linux_base
from neutron.tests.functional import base
class InterfaceDriverTestCaseMixin(object):
class InterfaceDriverTestCaseMixin:
def _test_mtu_set_after_action(self, device_name, br_name, namespace,
action=None):
mac_address = net.get_random_mac('fa:16:3e:00:00:00'.split(':'))
@ -89,7 +89,7 @@ class InterfaceDriverTestCaseMixin(object):
class OVSInterfaceDriverTestCase(linux_base.BaseOVSLinuxTestCase,
InterfaceDriverTestCaseMixin):
def setUp(self):
super(OVSInterfaceDriverTestCase, self).setUp()
super().setUp()
conf = cfg.ConfigOpts()
config.register_interface_opts(conf)
self.interface = interface.OVSInterfaceDriver(conf)
@ -145,7 +145,7 @@ class OVSInterfaceDriverTestCase(linux_base.BaseOVSLinuxTestCase,
class BridgeInterfaceDriverTestCase(base.BaseSudoTestCase,
InterfaceDriverTestCaseMixin):
def setUp(self):
super(BridgeInterfaceDriverTestCase, self).setUp()
super().setUp()
conf = cfg.ConfigOpts()
config.register_interface_opts(conf)
self.interface = interface.BridgeInterfaceDriver(conf)

View File

@ -57,7 +57,7 @@ TEST_IP_NUD_STATES = ((TEST_IP_NEIGH, 'permanent'),
class IpLibTestFramework(functional_base.BaseSudoTestCase):
def setUp(self):
super(IpLibTestFramework, self).setUp()
super().setUp()
self._configure()
def _configure(self):
@ -271,7 +271,7 @@ class IpLibTestCase(IpLibTestFramework):
self.addCleanup(ip.netns.delete, attr.namespace)
self.assertFalse(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
device = ip.add_vxlan('test_vxlan_device', 9999, local='fd00::1',
group=TEST_IP6_VXLAN_GROUP, dev='test_device')
group=TEST_IP6_VXLAN_GROUP, dev='test_device')
self.addCleanup(self._safe_delete_device, device)
self.assertTrue(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
device.link.delete()
@ -740,7 +740,7 @@ class TestSetIpNonlocalBind(functional_base.BaseSudoTestCase):
class NamespaceTestCase(functional_base.BaseSudoTestCase):
def setUp(self):
super(NamespaceTestCase, self).setUp()
super().setUp()
self.namespace = 'test_ns_' + uuidutils.generate_uuid()
ip_lib.create_network_namespace(self.namespace)
self.addCleanup(self._delete_namespace)
@ -772,7 +772,7 @@ class NamespaceTestCase(functional_base.BaseSudoTestCase):
class IpMonitorTestCase(functional_base.BaseLoggingTestCase):
def setUp(self):
super(IpMonitorTestCase, self).setUp()
super().setUp()
self.addCleanup(self._cleanup)
self.namespace = 'ns_' + uuidutils.generate_uuid()
priv_ip_lib.create_netns(self.namespace)
@ -805,7 +805,7 @@ class IpMonitorTestCase(functional_base.BaseLoggingTestCase):
def _read_file(self, ip_addresses):
try:
registers = []
with open(self.temp_file, 'r') as f:
with open(self.temp_file) as f:
data = f.read()
for line in data.splitlines():
register = jsonutils.loads(line)
@ -816,7 +816,7 @@ class IpMonitorTestCase(functional_base.BaseLoggingTestCase):
if ip_address not in registers:
return False
return True
except (OSError, IOError, ValueError):
except (OSError, ValueError):
return False
def _check_read_file(self, ip_addresses):
@ -824,7 +824,7 @@ class IpMonitorTestCase(functional_base.BaseLoggingTestCase):
utils.wait_until_true(lambda: self._read_file(ip_addresses),
timeout=30)
except utils.WaitTimeout:
with open(self.temp_file, 'r') as f:
with open(self.temp_file) as f:
registers = f.read()
self.fail('Defined IP addresses: %s, IP addresses registered: %s' %
(ip_addresses, registers))
@ -917,7 +917,7 @@ class IpMonitorTestCase(functional_base.BaseLoggingTestCase):
class IpRouteCommandTestCase(functional_base.BaseSudoTestCase):
def setUp(self):
super(IpRouteCommandTestCase, self).setUp()
super().setUp()
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
ip_lib.IPWrapper(self.namespace).add_dummy('test_device')
self.device = ip_lib.IPDevice('test_device', namespace=self.namespace)
@ -1072,7 +1072,7 @@ class IpRouteCommandTestCase(functional_base.BaseSudoTestCase):
class IpAddrCommandTestCase(functional_base.BaseSudoTestCase):
def setUp(self):
super(IpAddrCommandTestCase, self).setUp()
super().setUp()
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
ip_lib.IPWrapper(self.namespace).add_dummy('test_device')
self.device = ip_lib.IPDevice('test_device', namespace=self.namespace)

View File

@ -30,7 +30,7 @@ UNRELATED_IP = '1.1.1.1'
class IpsetBase(functional_base.BaseSudoTestCase):
def setUp(self):
super(IpsetBase, self).setUp()
super().setUp()
bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
self.source, self.destination = self.useFixture(

View File

@ -35,7 +35,7 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
'--dport %(port)d -j DROP')
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
super().setUp()
bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
self.client, self.server = self.useFixture(
@ -85,7 +85,7 @@ class IptablesManagerTestCase(functional_base.BaseSudoTestCase):
self.client.namespace, self.server.namespace,
self.server.ip, self.port, protocol)
self.addCleanup(netcat.stop_processes)
filter_params = 'direction %s, port %s and protocol %s' % (
filter_params = 'direction {}, port {} and protocol {}'.format(
direction, port, protocol)
self.assertTrue(netcat.test_connectivity(),
'Failed connectivity check before applying a filter '

View File

@ -33,7 +33,7 @@ class KeepalivedManagerTestCase(base.BaseSudoTestCase,
test_keepalived.KeepalivedConfBaseMixin):
def setUp(self):
super(KeepalivedManagerTestCase, self).setUp()
super().setUp()
l3_config.register_l3_agent_config_opts(l3_config.OPTS, cfg.CONF)
cfg.CONF.set_override('check_child_processes_interval', 1, 'AGENT')

View File

@ -28,7 +28,7 @@ arping = net_helpers.assert_arping
class LinuxBridgeARPSpoofTestCase(functional_base.BaseSudoTestCase):
def setUp(self):
super(LinuxBridgeARPSpoofTestCase, self).setUp()
super().setUp()
lbfixture = self.useFixture(net_helpers.LinuxBridgeFixture())
self.addCleanup(setattr, arp_protect, 'NAMESPACE', None)

View File

@ -24,7 +24,7 @@ class OFMonitorTestCase(functional_base.BaseSudoTestCase):
DEFAULT_FLOW = {'table': 0, 'cookie': '0', 'actions': 'NORMAL'}
def setUp(self):
super(OFMonitorTestCase, self).setUp()
super().setUp()
self.bridge = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
self.of_monitor = of_monitor.OFMonitor(self.bridge.br_name,
start=False)
@ -66,8 +66,8 @@ class OFMonitorTestCase(functional_base.BaseSudoTestCase):
try:
utils.wait_until_true(_read_and_check, timeout=5)
except utils.WaitTimeout:
self.fail('Flow "%s" with action %s not found' % (reference_flow,
event_type))
self.fail('Flow "{}" with action {} not found'.format(
reference_flow, event_type))
event = events_container.pop()
self.assertEqual(event_type, event.event_type)
self.assertEqual(self._format_flow(reference_flow, event_type),

View File

@ -36,7 +36,7 @@ from neutron.tests.functional.agent.linux import base as linux_base
class BaseMonitorTest(linux_base.BaseOVSLinuxTestCase):
def setUp(self):
super(BaseMonitorTest, self).setUp()
super().setUp()
rootwrap_not_configured = (cfg.CONF.AGENT.root_helper == base.SUDO_CMD)
if rootwrap_not_configured:
@ -60,7 +60,7 @@ class BaseMonitorTest(linux_base.BaseOVSLinuxTestCase):
class TestOvsdbMonitor(BaseMonitorTest):
def setUp(self):
super(TestOvsdbMonitor, self).setUp()
super().setUp()
self.monitor = ovsdb_monitor.OvsdbMonitor('Bridge')
self.addCleanup(self.monitor.stop)
@ -81,7 +81,7 @@ class TestOvsdbMonitor(BaseMonitorTest):
class TestSimpleInterfaceMonitor(BaseMonitorTest):
def setUp(self):
super(TestSimpleInterfaceMonitor, self).setUp()
super().setUp()
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor()
self.addCleanup(self.monitor.stop)

View File

@ -30,7 +30,7 @@ SERVICE_NAME = "service"
class BaseTestProcessMonitor(base.BaseLoggingTestCase):
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
super().setUp()
cfg.CONF.set_override('check_child_processes_interval', 1, 'AGENT')
self._child_processes = []
self._process_monitor = None

View File

@ -256,7 +256,7 @@ class TcFiltersTestCase(functional_base.BaseSudoTestCase):
self.vxlan_id, namespace=self.ns[0])
tc_classes = tc_lib.list_tc_policy_class(self.device[0],
namespace=self.ns[0])
namespace=self.ns[0])
for tc_class in (c for c in tc_classes if
c['classid'] == chosen_class_id):
bytes = tc_class['stats']['bytes']

View File

@ -95,7 +95,7 @@ class TestGetRootHelperChildPid(functional_base.BaseSudoTestCase):
"get_root_helper_child_pid is expected to return the pid of the "
"bash process")
self._addcleanup_sleep_process(child_pid)
with open('/proc/%s/cmdline' % child_pid, 'r') as f_proc_cmdline:
with open('/proc/%s/cmdline' % child_pid) as f_proc_cmdline:
cmdline = f_proc_cmdline.readline().split('\0')[0]
self.assertIn('bash', cmdline)
@ -174,7 +174,7 @@ class TestFindChildPids(functional_base.BaseSudoTestCase):
self.assertIn(_pid, child_pids_recursive_after)
def test_find_non_existing_process(self):
with open('/proc/sys/kernel/pid_max', 'r') as fd:
with open('/proc/sys/kernel/pid_max') as fd:
pid_max = int(fd.readline().strip())
self.assertEqual([], utils.find_child_pids(pid_max))
@ -208,8 +208,8 @@ line 4
self.assertEqual(file, content)
def test_read_if_exists_no_file(self):
temp_dir = tempfile.TemporaryDirectory()
content = utils.read_if_exists(
os.path.join(temp_dir.name, 'non-existing-file'),
run_as_root=self.run_as_root)
with tempfile.TemporaryDirectory() as temp_dir:
content = utils.read_if_exists(
os.path.join(temp_dir, 'non-existing-file'),
run_as_root=self.run_as_root)
self.assertEqual('', content)

View File

@ -50,14 +50,14 @@ class MetadataAgentHealthEvent(event.WaitEvent):
def __init__(self, chassis, sb_cfg, timeout=5):
self.chassis = chassis
self.sb_cfg = sb_cfg
super(MetadataAgentHealthEvent, self).__init__(
super().__init__(
(self.ROW_UPDATE,),
AGENT_CHASSIS_TABLE,
(('name', '=', self.chassis),),
timeout=timeout)
def matches(self, event, row, old=None):
if not super(MetadataAgentHealthEvent, self).matches(event, row, old):
if not super().matches(event, row, old):
return False
return int(row.external_ids.get(
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, 0)) >= self.sb_cfg
@ -70,7 +70,7 @@ class MetadataPortCreateEvent(event.WaitEvent):
table = 'Port_Binding'
events = (self.ROW_CREATE,)
conditions = (('logical_port', '=', metadata_port),)
super(MetadataPortCreateEvent, self).__init__(
super().__init__(
events, table, conditions, timeout=timeout
)
@ -80,7 +80,7 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
FAKE_CHASSIS_HOST = 'ovn-host-fake'
def setUp(self):
super(TestMetadataAgent, self).setUp()
super().setUp()
self.handler = self.sb_api.idl.notify_handler
# We only have OVN NB and OVN SB running for functional tests
self.mock_ovsdb_idl = mock.Mock()
@ -478,7 +478,8 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
agent.MetadataAgent, 'provision_datapath') as m_provision:
# Update the additional_chassis
self.sb_api.db_set('Port_Binding', pb.uuid,
self.sb_api.db_set(
'Port_Binding', pb.uuid,
additional_chassis=[agent_chassis.uuid]).execute(
check_error=True, log_errors=True)
@ -513,7 +514,8 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
# Update the additional_chassis, the agent should not see the
# notification because it has only its own chassis row locally and
# does not see other chassis
self.sb_api.db_set('Port_Binding', pb.uuid,
self.sb_api.db_set(
'Port_Binding', pb.uuid,
additional_chassis=[other_chassis.uuid]).execute(
check_error=True, log_errors=True)
@ -544,7 +546,8 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
agent.MetadataAgent, 'provision_datapath') as m_provision:
# Update the additional_chassis
self.sb_api.db_set('Port_Binding', pb.uuid,
self.sb_api.db_set(
'Port_Binding', pb.uuid,
additional_chassis=[agent_chassis.uuid]).execute(
check_error=True, log_errors=True)
@ -559,7 +562,7 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
# Remove the additional_chassis but keep the chassis. This is
# simulates the live migration has failed
self.sb_api.db_set('Port_Binding', pb.uuid,
additional_chassis=[]).execute(
additional_chassis=[]).execute(
check_error=True, log_errors=True)
n_utils.wait_until_true(
@ -588,7 +591,8 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
agent.MetadataAgent, 'provision_datapath') as m_provision:
# Update the additional_chassis
self.sb_api.db_set('Port_Binding', pb.uuid,
self.sb_api.db_set(
'Port_Binding', pb.uuid,
additional_chassis=[agent_chassis.uuid]).execute(
check_error=True, log_errors=True)
@ -600,7 +604,8 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
m_provision.reset_mock()
self.sb_api.db_set('Port_Binding', pb.uuid,
self.sb_api.db_set(
'Port_Binding', pb.uuid,
additional_chassis=[], chassis=agent_chassis.uuid).execute(
check_error=True, log_errors=True)
@ -630,7 +635,8 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
agent.MetadataAgent, 'provision_datapath') as m_provision:
# Update the additional_chassis
self.sb_api.db_set('Port_Binding', pb.uuid,
self.sb_api.db_set(
'Port_Binding', pb.uuid,
additional_chassis=[agent_chassis.uuid]).execute(
check_error=True, log_errors=True)
@ -642,7 +648,8 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
m_provision.reset_mock()
self.sb_api.db_set('Port_Binding', pb.uuid,
self.sb_api.db_set(
'Port_Binding', pb.uuid,
additional_chassis=[], chassis=new_chassis_uuid).execute(
check_error=True, log_errors=True)
@ -681,7 +688,7 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
agent.MetadataAgent, 'provision_datapath') as m_provision:
self.sb_api.db_add('Port_Binding', pb.uuid,
'external_ids', {'foo': 'bar'}).execute(
'external_ids', {'foo': 'bar'}).execute(
check_error=True, log_errors=True)
with testtools.ExpectedException(NoDatapathProvision):

View File

@ -29,7 +29,7 @@ class WaitForBridgesEvent(event.RowEvent):
self.bridges_not_seen = set(bridges)
self.timeout = timeout
self.event = threading.Event()
super(WaitForBridgesEvent, self).__init__(
super().__init__(
(self.ROW_CREATE,), 'Bridge', None)
def matches(self, event, row, old=None):

View File

@ -60,7 +60,7 @@ class DHCPAgentOVSTestFramework(base.BaseSudoTestCase):
'gateway': '2001:db8:0:1::c0a8:a01'}, }
def setUp(self):
super(DHCPAgentOVSTestFramework, self).setUp()
super().setUp()
config.setup_logging()
self.conf_fixture = self.useFixture(fixture_config.Config())
self.conf = self.conf_fixture.conf
@ -135,7 +135,7 @@ class DHCPAgentOVSTestFramework(base.BaseSudoTestCase):
def create_port_dict(self, network_id, subnet_id, mac_address,
ip_version=lib_const.IP_VERSION_4, ip_address=None):
ip_address = (self._IP_ADDRS[ip_version]['addr']
if not ip_address else ip_address)
if not ip_address else ip_address)
port_dict = dhcp.DictModel(id=uuidutils.generate_uuid(),
name="foo",
mac_address=mac_address,

View File

@ -97,7 +97,7 @@ class BaseFirewallTestCase(linux_base.BaseOVSLinuxTestCase):
def setUp(self):
security_config.register_securitygroups_opts()
self.net_id = uuidutils.generate_uuid()
super(BaseFirewallTestCase, self).setUp()
super().setUp()
self.tester, self.firewall = getattr(self, self.initialize)()
if self.firewall_name == "openvswitch":
self.assign_vlan_to_peers()
@ -274,8 +274,9 @@ class FirewallTestCase(BaseFirewallTestCase):
self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
def _assert_sg_out_tcp_rules_appear_in_order(self, sg_rules):
outgoing_rule_pref = '-A %s-o%s' % (self.firewall.iptables.wrap_name,
self.src_port_desc['device'][3:13])
outgoing_rule_pref = '-A {}-o{}'.format(
self.firewall.iptables.wrap_name,
self.src_port_desc['device'][3:13])
rules = [
r for r in self.firewall.iptables.get_rules_for_table('filter')
if r.startswith(outgoing_rule_pref)

View File

@ -31,7 +31,7 @@ lba = linuxbridge_neutron_agent
class LinuxBridgeAgentTests(test_ip_lib.IpLibTestFramework):
def setUp(self):
super(LinuxBridgeAgentTests, self).setUp()
super().setUp()
agent_rpc = ('neutron.agent.rpc.PluginApi')
mock.patch(agent_rpc).start()
mock.patch('neutron.agent.rpc.PluginReportStateAPI').start()

View File

@ -80,12 +80,10 @@ class TestOVSAgent(base.OVSAgentTestFramework):
# verify no stale drop flows
self.assertEqual(0,
num_ports_with_drop_flows(
ofports,
self.agent.int_br.dump_flows(
ovs_constants.LOCAL_SWITCHING
)
))
num_ports_with_drop_flows(
ofports,
self.agent.int_br.dump_flows(
ovs_constants.LOCAL_SWITCHING)))
def _check_datapath_type_netdev(self, expected, default=False):
if not default:
@ -351,7 +349,7 @@ class TestOVSAgent(base.OVSAgentTestFramework):
class TestOVSAgentExtensionConfig(base.OVSAgentTestFramework):
def setUp(self):
super(TestOVSAgentExtensionConfig, self).setUp()
super().setUp()
self.config.set_override('extensions', ['qos'], 'agent')
self.agent = self.create_agent(create_tunnels=False)

View File

@ -48,7 +48,7 @@ class OVSAgentTestBase(test_ovs_lib.OVSBridgeTestBase,
l2_base.OVSOFControllerHelper):
def setUp(self):
super(OVSAgentTestBase, self).setUp()
super().setUp()
self.br = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
self.start_of_controller(cfg.CONF)
self.br_int = self.br_int_cls(self.br.br_name)
@ -70,7 +70,7 @@ class OVSAgentTestBase(test_ovs_lib.OVSBridgeTestBase,
trace[l] = r
for k in required_keys:
if k not in trace:
self.fail("%s not found in trace %s" % (k, trace_lines))
self.fail("{} not found in trace {}".format(k, trace_lines))
return trace
@ -80,7 +80,7 @@ class ARPSpoofTestCase(OVSAgentTestBase):
# NOTE(kevinbenton): it would be way cooler to use scapy for
# these but scapy requires the python process to be running as
# root to bind to the ports.
super(ARPSpoofTestCase, self).setUp()
super().setUp()
self.skip_without_arp_support()
self.src_addr = '192.168.0.1'
self.dst_addr = '192.168.0.2'
@ -97,10 +97,10 @@ class ARPSpoofTestCase(OVSAgentTestBase):
self.addOnException(self.collect_flows_and_ports)
def collect_flows_and_ports(self, exc_info):
nicevif = lambda x: ['%s=%s' % (k, getattr(x, k))
nicevif = lambda x: ['{}={}'.format(k, getattr(x, k))
for k in ['ofport', 'port_name', 'switch',
'vif_id', 'vif_mac']]
nicedev = lambda x: ['%s=%s' % (k, getattr(x, k))
nicedev = lambda x: ['{}={}'.format(k, getattr(x, k))
for k in ['name', 'namespace']] + x.addr.list()
details = {'flows': self.br.dump_all_flows(),
'vifs': map(nicevif, self.br.get_vif_ports()),
@ -321,7 +321,7 @@ class OVSFlowTestCase(OVSAgentTestBase):
cfg.CONF.set_override('enable_distributed_routing',
dvr_enabled,
group='AGENT')
super(OVSFlowTestCase, self).setUp()
super().setUp()
self.phys_br = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
self.br_phys = self.br_phys_cls(self.phys_br.br_name)
self.br_phys.set_secure_mode()

View File

@ -31,7 +31,7 @@ class OVSBridgeTestBase(base.BaseOVSLinuxTestCase):
# TODO(twilson) So far, only ovsdb-related tests are written. It would be
# good to also add the openflow-related functions
def setUp(self):
super(OVSBridgeTestBase, self).setUp()
super().setUp()
self.ovs = ovs_lib.BaseOVS()
self.br = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
@ -204,8 +204,8 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
br_other_config = self.ovs.ovsdb.db_find(
'Bridge', ('name', '=', self.br.br_name), columns=['other_config']
).execute()[0]['other_config']
expected_flood_value = ('false' if
cfg.CONF.OVS.igmp_flood_unregistered else 'true')
expected_flood_value = (
'false' if cfg.CONF.OVS.igmp_flood_unregistered else 'true')
self.assertEqual(
expected_flood_value,
br_other_config.get(
@ -293,7 +293,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
# Nothing seems to use this function?
(port_name, ofport) = self.create_ovs_port()
stats = set(self.br.get_port_stats(port_name).keys())
self.assertTrue(set(['rx_packets', 'tx_packets']).issubset(stats))
self.assertTrue({'rx_packets', 'tx_packets'}.issubset(stats))
def test_get_vif_ports(self):
for i in range(2):
@ -326,7 +326,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
self.create_ovs_port()
vif_ports = [self.create_ovs_vif_port() for i in range(2)]
ports = self.br.get_vif_port_set()
expected = set([x.vif_id for x in vif_ports])
expected = {x.vif_id for x in vif_ports}
self.assertEqual(expected, ports)
def test_get_vif_port_set_with_missing_port(self):
@ -339,7 +339,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
mock.patch.object(self.br, 'get_port_name_list',
new=new_port_name_list).start()
ports = self.br.get_vif_port_set()
expected = set([vif_ports[0].vif_id])
expected = {vif_ports[0].vif_id}
self.assertEqual(expected, ports)
def test_get_vif_port_set_on_empty_bridge_returns_empty_set(self):
@ -500,7 +500,7 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
class OVSLibTestCase(base.BaseOVSLinuxTestCase):
def setUp(self):
super(OVSLibTestCase, self).setUp()
super().setUp()
self.ovs = ovs_lib.BaseOVS()
def test_bridge_lifecycle_baseovs(self):

View File

@ -40,7 +40,7 @@ class APIPolicyTestCase(base.BaseLoggingTestCase):
api_version = "2.0"
def setUp(self):
super(APIPolicyTestCase, self).setUp()
super().setUp()
self.useFixture(fixture.APIDefinitionFixture())
self.extension_path = os.path.abspath(os.path.join(
TEST_PATH, "../../../extensions"))

View File

@ -103,7 +103,7 @@ class BaseSudoTestCase(BaseLoggingTestCase):
"""
def setUp(self):
super(BaseSudoTestCase, self).setUp()
super().setUp()
if not base.bool_from_env('OS_SUDO_TESTING'):
self.skipTest('Testing with sudo is not enabled')
self.setup_rootwrap()
@ -150,14 +150,14 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
def setUp(self, maintenance_worker=False, service_plugins=None):
ml2_config.cfg.CONF.set_override('extension_drivers',
self._extension_drivers,
group='ml2')
self._extension_drivers,
group='ml2')
ml2_config.cfg.CONF.set_override('tenant_network_types',
['geneve'],
group='ml2')
['geneve'],
group='ml2')
ml2_config.cfg.CONF.set_override('vni_ranges',
['1:65536'],
group='ml2_type_geneve')
['1:65536'],
group='ml2_type_geneve')
# ensure viable minimum is set for OVN's Geneve
ml2_config.cfg.CONF.set_override('max_header_size', 38,
group='ml2_type_geneve')
@ -170,7 +170,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
self.ovsdb_server_mgr = None
self._service_plugins = service_plugins
super(TestOVNFunctionalBase, self).setUp()
super().setUp()
self.test_log_dir = os.path.join(DEFAULT_LOG_DIR, self.id())
base.setup_test_logging(
cfg.CONF, self.test_log_dir, "testrun.txt")
@ -234,7 +234,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
errno.ENOENT, os.strerror(errno.ENOENT), msg)
def get_additional_service_plugins(self):
p = super(TestOVNFunctionalBase, self).get_additional_service_plugins()
p = super().get_additional_service_plugins()
p.update({'revision_plugin_name': 'revisions',
'segments': 'neutron.services.segments.plugin.Plugin'})
if self._service_plugins:
@ -350,15 +350,15 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
timestamp = datetime.now().strftime('%y-%m-%d_%H-%M-%S')
for database in ("nb", "sb"):
for file_suffix in ("log", "db"):
src_filename = "ovn_%(db)s.%(suffix)s" % {
'db': database,
'suffix': file_suffix
}
dst_filename = "ovn_%(db)s-%(timestamp)s.%(suffix)s" % {
'db': database,
'suffix': file_suffix,
'timestamp': timestamp,
}
src_filename = "ovn_{db}.{suffix}".format(
db=database,
suffix=file_suffix
)
dst_filename = "ovn_{db}-{timestamp}.{suffix}".format(
db=database,
suffix=file_suffix,
timestamp=timestamp,
)
try:
self._copy_log_file(src_filename, dst_filename)
except FileNotFoundError:
@ -369,10 +369,10 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
# Copy northd logs
northd_log = "ovn_northd"
dst_northd = "%(northd)s-%(timestamp)s.log" % {
"northd": northd_log,
"timestamp": timestamp,
}
dst_northd = "{northd}-{timestamp}.log".format(
northd=northd_log,
timestamp=timestamp,
)
self._copy_log_file("%s.log" % northd_log, dst_northd)
def _copy_log_file(self, src_filename, dst_filename):
@ -427,7 +427,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
if enable_chassis_as_extport:
append_cms_options(other_config, 'enable-chassis-as-extport-host')
bridge_mapping = ",".join(["%s:br-provider%s" % (phys_net, i)
bridge_mapping = ",".join(["{}:br-provider{}".format(phys_net, i)
for i, phys_net in enumerate(physical_nets)])
if name is None:
name = uuidutils.generate_uuid()

View File

@ -56,10 +56,10 @@ OPTS = [
},
required=False),
cfg.BoolOpt('parent_listen',
short='pl',
default=True,
help='Parent process must listen too',
required=False),
short='pl',
default=True,
help='Parent process must listen too',
required=False),
cfg.BoolOpt('ignore_sigterm',
short='i',
default=False,
@ -102,7 +102,7 @@ class ProcessSpawn(daemon.Daemon):
self.parent_must_listen = parent_must_listen
self.child_pids = []
super(ProcessSpawn, self).__init__(pidfile)
super().__init__(pidfile)
def start_listening(self):
socket_family = self.DCT_FAMILY[self.family]
@ -126,7 +126,7 @@ class ProcessSpawn(daemon.Daemon):
# Pick a non privileged port
port = random.randint(1024, 65535)
self.listen_socket.bind(('', port))
except socket.error:
except OSError:
retries += 1
else:
if n_const.PROTO_NAME_TCP in self.proto:
@ -174,10 +174,10 @@ def main():
cfg.CONF.register_cli_opts(OPTS)
cfg.CONF(project='neutron', default_config_files=[])
proc_spawn = ProcessSpawn(num_children=cfg.CONF.num_children,
family=cfg.CONF.family,
proto=cfg.CONF.proto,
parent_must_listen=cfg.CONF.parent_listen,
ignore_sigterm=cfg.CONF.ignore_sigterm)
family=cfg.CONF.family,
proto=cfg.CONF.proto,
parent_must_listen=cfg.CONF.parent_listen,
ignore_sigterm=cfg.CONF.ignore_sigterm)
proc_spawn.start()

View File

@ -26,7 +26,7 @@ from neutron.tests.functional import base
class TestDestroyPatchPorts(base.BaseSudoTestCase):
def setUp(self):
super(TestDestroyPatchPorts, self).setUp()
super().setUp()
self.int_br = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
bridge_mappings = {}
self.bridges = []
@ -43,7 +43,7 @@ class TestDestroyPatchPorts(base.BaseSudoTestCase):
config.set_override('integration_bridge', self.int_br.br_name, "OVS")
config.set_override(
'bridge_mappings',
','.join(["%s:%s" % (net, br)
','.join(["{}:{}".format(net, br)
for net, br in bridge_mappings.items()]),
"OVS")

View File

@ -20,7 +20,7 @@ class TestIPSetCLIConfig(base.BaseLoggingTestCase):
def setup_config(self, args=None):
self.conf = ipset_cleanup.setup_conf()
super(TestIPSetCLIConfig, self).setup_config(args=args)
super().setup_config(args=args)
def test_ipset_opts_registration(self):
self.assertFalse(self.conf.allsets)

View File

@ -39,7 +39,7 @@ NUM_SUBPROCESSES = 6
class NetnsCleanupTest(base.BaseSudoTestCase):
def setUp(self):
super(NetnsCleanupTest, self).setUp()
super().setUp()
self.get_namespaces_p = mock.patch(GET_NAMESPACES)
self.get_namespaces = self.get_namespaces_p.start()
@ -79,7 +79,7 @@ class NetnsCleanupTest(base.BaseSudoTestCase):
timeout=15)
except eventlet.Timeout:
num_spawned_procs = self._get_num_spawned_procs()
err_str = ("Expected number/spawned number: {0}/{1}\nProcess "
err_str = ("Expected number/spawned number: {}/{}\nProcess "
"information:\n".format(num_spawned_procs,
procs_launched))
cmd = ['ps', '-f', '-u', 'root']
@ -159,7 +159,7 @@ class TestNETNSCLIConfig(basetest.BaseTestCase):
def setup_config(self, args=None):
self.conf = netns_cleanup.setup_conf()
super(TestNETNSCLIConfig, self).setup_config(args=args)
super().setup_config(args=args)
def test_netns_opts_registration(self):
self.assertFalse(self.conf.force)

View File

@ -25,7 +25,7 @@ class TestOVSCLIConfig(base.BaseOVSLinuxTestCase):
def setup_config(self, args=None):
self.conf = ovs_cleanup.setup_conf()
super(TestOVSCLIConfig, self).setup_config(args=args)
super().setup_config(args=args)
def test_ovs_opts_registration(self):
self.assertFalse(self.conf.ovs_all_ports)

View File

@ -20,7 +20,7 @@ from neutron.db.migration.alembic_migrations.versions.yoga.expand import \
from neutron.tests.functional.db import test_migrations
class TestAddIndexesToRbacsMixin(object):
class TestAddIndexesToRbacsMixin:
"""Validates binding_index for NetworkDhcpAgentBinding migration."""
@staticmethod

View File

@ -21,7 +21,7 @@ from oslo_utils import uuidutils
from neutron.tests.functional.db import test_migrations
class NetworkDhcpAgentBindingMigrationMixin(object):
class NetworkDhcpAgentBindingMigrationMixin:
"""Validates binding_index for NetworkDhcpAgentBinding migration."""
def _create_so(self, o_type, values):

View File

@ -38,7 +38,7 @@ load_tests = testlib_api.module_load_tests
class IpamTestCase(testlib_api.SqlTestCase):
"""Base class for tests that aim to test ip allocation."""
def setUp(self):
super(IpamTestCase, self).setUp()
super().setUp()
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
self.setup_coreplugin(DB_PLUGIN_KLASS)
@ -57,7 +57,7 @@ class IpamTestCase(testlib_api.SqlTestCase):
def result_set_to_dicts(self, resultset, keys):
dicts = []
for item in resultset:
item_dict = dict((x, item[x]) for x in keys)
item_dict = {x: item[x] for x in keys}
dicts.append(item_dict)
return dicts

View File

@ -155,7 +155,7 @@ class _TestModelsMigrations(test_migrations.ModelsMigrationsSync):
def setUp(self):
config.register_common_config_options()
super(_TestModelsMigrations, self).setUp()
super().setUp()
self.cfg = self.useFixture(config_fixture.Config())
self.cfg.config(core_plugin='ml2')
self.alembic_config = migration.get_neutron_config()
@ -178,7 +178,7 @@ class _TestModelsMigrations(test_migrations.ModelsMigrationsSync):
name in external.TABLES):
return False
return super(_TestModelsMigrations, self).include_object(
return super().include_object(
object_, name, type_, reflected, compare_to)
def filter_metadata_diff(self, diff):
@ -379,7 +379,7 @@ class TestModelsMigrationsMySQL(testlib_api.MySQLTestCaseMixin,
self.assertEqual(0, len(res), "%s non InnoDB tables created" % res)
def test_models_sync(self):
super(TestModelsMigrationsMySQL, self).test_models_sync()
super().test_models_sync()
class TestModelsMigrationsPostgreSQL(testlib_api.PostgreSQLTestCaseMixin,
@ -392,7 +392,7 @@ class TestSanityCheck(testlib_api.SqlTestCaseLight):
BUILD_SCHEMA = False
def setUp(self):
super(TestSanityCheck, self).setUp()
super().setUp()
self.alembic_config = migration.get_neutron_config()
self.alembic_config.neutron_config = cfg.CONF
@ -507,7 +507,7 @@ class TestSanityCheck(testlib_api.SqlTestCaseLight):
class TestWalkDowngrade(oslotest_base.BaseTestCase):
def setUp(self):
super(TestWalkDowngrade, self).setUp()
super().setUp()
self.alembic_config = migration.get_neutron_config()
self.alembic_config.neutron_config = cfg.CONF
@ -525,7 +525,7 @@ class TestWalkDowngrade(oslotest_base.BaseTestCase):
return True
class _TestWalkMigrations(object):
class _TestWalkMigrations:
'''This will add framework for testing schema migration
for different backends.
@ -606,7 +606,7 @@ class TestWalkMigrationsMySQL(testlib_api.MySQLTestCaseMixin,
# timeout is required only when for testing with 'mysql' backend.
@test_base.set_timeout(600)
def test_walk_versions(self):
super(TestWalkMigrationsMySQL, self).test_walk_versions()
super().test_walk_versions()
class TestWalkMigrationsPostgreSQL(testlib_api.PostgreSQLTestCaseMixin,

View File

@ -27,7 +27,7 @@ class TestDBCreation(base.BaseLoggingTestCase):
"""
def setUp(self):
super(TestDBCreation, self).setUp()
super().setUp()
self.engine = sqlalchemy.create_engine('sqlite://')
def _test_creation(self, module):

View File

@ -31,7 +31,7 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase):
"""Base class to test network RBAC policies"""
def setUp(self):
super(NetworkRBACTestCase, self).setUp()
super().setUp()
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
DB_PLUGIN_KLASS = 'neutron.plugins.ml2.plugin.Ml2Plugin'
self.setup_coreplugin(DB_PLUGIN_KLASS)

View File

@ -68,7 +68,7 @@ class _SegmentAllocation(testlib_api.SqlTestCase):
self._create_segments(self.NUM_SEGIDS, self.PHYSNETS)
for _ in range(len(self.segments)):
unalloc = m_get(self.context)
segment = dict((k, unalloc[k]) for k in self.primary_keys)
segment = {k: unalloc[k] for k in self.primary_keys}
m_alloc(self.context, **segment)
if self.is_vlan:
self.segments.remove((unalloc['physical_network'],

View File

@ -58,7 +58,7 @@ class TestRootController(test_functional.PecanFunctionalTest):
base_url = '/'
def setUp(self):
super(TestRootController, self).setUp()
super().setUp()
self.setup_service_plugin()
self.plugin = directory.get_plugin()
self.ctx = context.get_admin_context()
@ -154,7 +154,7 @@ class TestExtensionsController(TestRootController):
def test_get(self):
# Fetch any extension supported by plugins
test_alias = self._get_supported_extensions().pop()
response = self.app.get('%s/%s' % (self.base_url, test_alias))
response = self.app.get('{}/{}'.format(self.base_url, test_alias))
self.assertEqual(response.status_int, 200)
json_body = jsonutils.loads(response.body)
self.assertEqual(test_alias, json_body['extension']['alias'])
@ -296,7 +296,7 @@ class TestQuotasController(test_functional.PecanFunctionalTest):
def test_get_project_info(self):
for key in ('project', 'tenant'):
response = self.app.get('%s/%s.json' % (self.base_url, key),
response = self.app.get('{}/{}.json'.format(self.base_url, key),
headers={'X-Project-Id': 'admin',
'X-Roles': 'admin'})
self.assertEqual(200, response.status_int)
@ -311,7 +311,7 @@ class TestResourceController(TestRootController):
base_url = '/v2.0'
def setUp(self):
super(TestResourceController, self).setUp()
super().setUp()
policy.init()
self.addCleanup(policy.reset)
self._gen_port()
@ -343,7 +343,7 @@ class TestResourceController(TestRootController):
query_params = ['fields=%s' % field for field in fields]
url = '/v2.0/ports.json'
if query_params:
url = '%s?%s' % (url, '&'.join(query_params))
url = '{}?{}'.format(url, '&'.join(query_params))
list_resp = self.app.get(url, headers={'X-Project-Id': 'tenid'})
self.assertEqual(200, list_resp.status_int)
for item in jsonutils.loads(list_resp.body).get('ports', []):
@ -487,11 +487,11 @@ class TestResourceController(TestRootController):
response = self.app.post_json(
'/v2.0/ports.json',
params={'ports': [{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'}]
'project_id': 'tenid'}]
},
headers={'X-Project-Id': 'tenid'})
self.assertEqual(201, response.status_int)
@ -517,13 +517,13 @@ class TestResourceController(TestRootController):
port_response = self.app.post_json(
'/v2.0/ports.json',
params={'ports': [{'network_id': self.port['network_id'],
'admin_state_up': True,
'security_groups': [sg_id],
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'security_groups': [sg_id],
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'security_groups': [sg_id],
'project_id': 'tenid'}]
'security_groups': [sg_id],
'project_id': 'tenid'}]
},
headers={'X-Project-Id': 'tenid'})
self.assertEqual(201, port_response.status_int)
@ -539,11 +539,11 @@ class TestResourceController(TestRootController):
response = self.app.post_json(
'/v2.0/ports.json',
params={'ports': [{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'}]
'project_id': 'tenid'}]
},
headers={'X-Project-Id': 'tenid'})
self.assertEqual(response.status_int, 201)
@ -556,14 +556,14 @@ class TestResourceController(TestRootController):
response = self.app.post_json(
'/v2.0/ports.json',
params={'ports': [{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'},
{'network_id': self.port['network_id'],
'admin_state_up': True,
'project_id': 'tenid'},
{'network_id': 'bad_net_id',
'project_id': 'tenid'},
{'network_id': 'bad_net_id',
'admin_state_up': True,
'project_id': 'tenid'}]
'project_id': 'tenid'}]
},
headers={'X-Project-Id': 'tenid'},
expect_errors=True)
@ -594,7 +594,7 @@ class TestPaginationAndSorting(test_functional.PecanFunctionalTest):
RESOURCE_COUNT = 6
def setUp(self):
super(TestPaginationAndSorting, self).setUp()
super().setUp()
policy.init()
self.addCleanup(policy.reset)
self.plugin = directory.get_plugin()
@ -631,10 +631,11 @@ class TestPaginationAndSorting(test_functional.PecanFunctionalTest):
query_params.append('sort_key=%s' % sort_key)
if sort_dir:
query_params.append('sort_dir=%s' % sort_dir)
query_params.extend(['%s%s' % ('fields=', field) for field in fields])
query_params.extend(['{}{}'.format('fields=', field)
for field in fields])
url = '/v2.0/%s.json' % collection
if query_params:
url = '%s?%s' % (url, '&'.join(query_params))
url = '{}?{}'.format(url, '&'.join(query_params))
list_resp = self.app.get(url,
headers={'X-Project-Id': self._project_id})
self.assertEqual(200, list_resp.status_int)
@ -716,7 +717,7 @@ class TestPaginationAndSorting(test_functional.PecanFunctionalTest):
class TestRequestProcessing(TestRootController):
def setUp(self):
super(TestRequestProcessing, self).setUp()
super().setUp()
mock.patch('neutron.pecan_wsgi.hooks.notifier.registry').start()
# request.context is thread-local storage so it has to be accessed by
# the controller. We can capture it into a list here to assert on after
@ -850,7 +851,7 @@ class TestRouterController(TestResourceController):
'service_plugins',
['neutron.services.l3_router.l3_router_plugin.L3RouterPlugin',
'neutron.services.flavors.flavors_plugin.FlavorsPlugin'])
super(TestRouterController, self).setUp()
super().setUp()
policy.init()
self.addCleanup(policy.reset)
plugin = directory.get_plugin()
@ -893,7 +894,7 @@ class TestRouterController(TestResourceController):
class TestDHCPAgentShimControllers(test_functional.PecanFunctionalTest):
def setUp(self):
super(TestDHCPAgentShimControllers, self).setUp()
super().setUp()
policy.init()
policy._ENFORCER.set_rules(
oslo_policy.Rules.from_dict(
@ -932,8 +933,8 @@ class TestDHCPAgentShimControllers(test_functional.PecanFunctionalTest):
headers=headers)
self.assertIn(self.agent.id,
[a['id'] for a in response.json['agents']])
self.app.delete('/v2.0/agents/%(a)s/dhcp-networks/%(n)s.json' % {
'a': self.agent.id, 'n': self.network['id']}, headers=headers)
self.app.delete('/v2.0/agents/{a}/dhcp-networks/{n}.json'.format(
a=self.agent.id, n=self.network['id']), headers=headers)
response = self.app.get(
'/v2.0/networks/%s/dhcp-agents.json' % self.network['id'],
headers=headers)
@ -948,7 +949,7 @@ class TestL3AgentShimControllers(test_functional.PecanFunctionalTest):
'service_plugins',
['neutron.services.l3_router.l3_router_plugin.L3RouterPlugin',
'neutron.services.flavors.flavors_plugin.FlavorsPlugin'])
super(TestL3AgentShimControllers, self).setUp()
super().setUp()
policy.init()
policy._ENFORCER.set_rules(
oslo_policy.Rules.from_dict(
@ -987,8 +988,8 @@ class TestL3AgentShimControllers(test_functional.PecanFunctionalTest):
self.assertIn(self.agent.id,
[a['id'] for a in response.json['agents']])
response = self.app.delete(
'/v2.0/agents/%(a)s/l3-routers/%(n)s.json' % {
'a': self.agent.id, 'n': self.router['id']}, headers=headers)
'/v2.0/agents/{a}/l3-routers/{n}.json'.format(
a=self.agent.id, n=self.router['id']), headers=headers)
self.assertEqual(204, response.status_int)
self.assertFalse(response.body)
response = self.app.get(
@ -1005,7 +1006,7 @@ class TestShimControllers(test_functional.PecanFunctionalTest):
fake_plugin = pecan_utils.FakePlugin()
plugins = {pecan_utils.FakePlugin.PLUGIN_TYPE: fake_plugin}
new_extensions = {fake_ext.get_alias(): fake_ext}
super(TestShimControllers, self).setUp(
super().setUp(
service_plugins=plugins, extensions=new_extensions)
policy.init()
policy._ENFORCER.set_rules(
@ -1020,7 +1021,7 @@ class TestShimControllers(test_functional.PecanFunctionalTest):
collection = pecan_utils.FakeExtension.HYPHENATED_COLLECTION.replace(
'_', '-')
resource = pecan_utils.FakeExtension.HYPHENATED_RESOURCE
url = '/v2.0/{}/something.json'.format(collection)
url = f'/v2.0/{collection}/something.json'
resp = self.app.get(url)
self.assertEqual(200, resp.status_int)
self.assertEqual({resource: {'fake': 'something'}}, resp.json)
@ -1028,7 +1029,7 @@ class TestShimControllers(test_functional.PecanFunctionalTest):
def test_hyphenated_collection_controller_not_shimmed(self):
body_collection = pecan_utils.FakeExtension.HYPHENATED_COLLECTION
uri_collection = body_collection.replace('_', '-')
url = '/v2.0/{}.json'.format(uri_collection)
url = f'/v2.0/{uri_collection}.json'
resp = self.app.get(url)
self.assertEqual(200, resp.status_int)
self.assertEqual({body_collection: [{'fake': 'fake'}]}, resp.json)
@ -1040,7 +1041,7 @@ class TestShimControllers(test_functional.PecanFunctionalTest):
sub_resource_collection = (
pecan_utils.FakeExtension.FAKE_SUB_RESOURCE_COLLECTION)
temp_id = uuidutils.generate_uuid()
url = '/v2.0/{0}/{1}/{2}'.format(
url = '/v2.0/{}/{}/{}'.format(
uri_collection,
temp_id,
sub_resource_collection.replace('_', '-'))
@ -1056,19 +1057,19 @@ class TestMemberActionController(test_functional.PecanFunctionalTest):
fake_plugin = pecan_utils.FakePlugin()
plugins = {pecan_utils.FakePlugin.PLUGIN_TYPE: fake_plugin}
new_extensions = {fake_ext.get_alias(): fake_ext}
super(TestMemberActionController, self).setUp(
super().setUp(
service_plugins=plugins, extensions=new_extensions)
hyphen_collection = pecan_utils.FakeExtension.HYPHENATED_COLLECTION
self.collection = hyphen_collection.replace('_', '-')
def test_get_member_action_controller(self):
url = '/v2.0/{}/something/boo_meh.json'.format(self.collection)
url = f'/v2.0/{self.collection}/something/boo_meh.json'
resp = self.app.get(url)
self.assertEqual(200, resp.status_int)
self.assertEqual({'boo_yah': 'something'}, resp.json)
def test_put_member_action_controller(self):
url = '/v2.0/{}/something/put_meh.json'.format(self.collection)
url = f'/v2.0/{self.collection}/something/put_meh.json'
resp = self.app.put_json(url, params={'it_matters_not': 'ok'})
self.assertEqual(200, resp.status_int)
self.assertEqual({'poo_yah': 'something'}, resp.json)
@ -1087,13 +1088,13 @@ class TestMemberActionController(test_functional.PecanFunctionalTest):
self.assertEqual(404, resp.status_int)
def test_put_on_get_member_action(self):
url = '/v2.0/{}/something/boo_meh.json'.format(self.collection)
url = f'/v2.0/{self.collection}/something/boo_meh.json'
resp = self.app.put_json(url, params={'it_matters_not': 'ok'},
expect_errors=True)
self.assertEqual(405, resp.status_int)
def test_get_on_put_member_action(self):
url = '/v2.0/{}/something/put_meh.json'.format(self.collection)
url = f'/v2.0/{self.collection}/something/put_meh.json'
resp = self.app.get(url, expect_errors=True)
self.assertEqual(405, resp.status_int)
@ -1104,7 +1105,7 @@ class TestParentSubresourceController(test_functional.PecanFunctionalTest):
fake_plugin = pecan_utils.FakePlugin()
plugins = {pecan_utils.FakePlugin.PLUGIN_TYPE: fake_plugin}
new_extensions = {fake_ext.get_alias(): fake_ext}
super(TestParentSubresourceController, self).setUp(
super().setUp(
service_plugins=plugins, extensions=new_extensions)
policy.init()
policy._ENFORCER.set_rules(
@ -1119,21 +1120,21 @@ class TestParentSubresourceController(test_functional.PecanFunctionalTest):
FAKE_PARENT_SUBRESOURCE_COLLECTION)
def test_get_duplicate_parent_resource(self):
url = '/v2.0/{}'.format(self.fake_collection)
url = f'/v2.0/{self.fake_collection}'
resp = self.app.get(url)
self.assertEqual(200, resp.status_int)
self.assertEqual({'fake_duplicates': [{'fake': 'fakeduplicates'}]},
resp.json)
def test_get_duplicate_parent_resource_item(self):
url = '/v2.0/{}/something'.format(self.fake_collection)
url = f'/v2.0/{self.fake_collection}/something'
resp = self.app.get(url)
self.assertEqual(200, resp.status_int)
self.assertEqual({'fake_duplicate': {'fake': 'something'}}, resp.json)
def test_get_parent_resource_and_duplicate_subresources(self):
url = '/v2.0/{0}/something/{1}'.format(self.collection,
self.fake_collection)
url = '/v2.0/{}/something/{}'.format(self.collection,
self.fake_collection)
resp = self.app.get(url)
self.assertEqual(200, resp.status_int)
self.assertEqual({'fake_duplicates': [{'fake': 'something'}]},
@ -1147,8 +1148,8 @@ class TestParentSubresourceController(test_functional.PecanFunctionalTest):
{'get_meh_meh_fake_duplicate': ''}
)
)
url = '/v2.0/{0}/something/{1}'.format(self.collection,
self.fake_collection)
url = '/v2.0/{}/something/{}'.format(self.collection,
self.fake_collection)
resp = self.app.get(url)
self.assertEqual(200, resp.status_int)
self.assertEqual({'fake_duplicates': [{'fake': 'something'}]},
@ -1158,7 +1159,7 @@ class TestParentSubresourceController(test_functional.PecanFunctionalTest):
class TestExcludeAttributePolicy(test_functional.PecanFunctionalTest):
def setUp(self):
super(TestExcludeAttributePolicy, self).setUp()
super().setUp()
policy.init()
self.addCleanup(policy.reset)
plugin = directory.get_plugin()
@ -1169,7 +1170,7 @@ class TestExcludeAttributePolicy(test_functional.PecanFunctionalTest):
def test_get_networks(self):
response = self.app.get('/v2.0/networks/%s.json' % self.network_id,
headers={'X-Project-Id': 'tenid'})
headers={'X-Project-Id': 'tenid'})
json_body = jsonutils.loads(response.body)
self.assertEqual(response.status_int, 200)
self.assertEqual('tenid', json_body['network']['project_id'])

View File

@ -76,7 +76,7 @@ class PecanFunctionalTest(testlib_api.SqlTestCase,
def setUp(self, service_plugins=None, extensions=None):
self.setup_coreplugin('ml2', load_plugins=False)
super(PecanFunctionalTest, self).setUp()
super().setUp()
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
self.set_config_overrides()
manager.init()

View File

@ -68,7 +68,7 @@ class TestQueryParametersHookWithRevision(test_functional.PecanFunctionalTest):
def setUp(self):
cfg.CONF.set_override('service_plugins', ['revisions'])
super(TestQueryParametersHookWithRevision, self).setUp()
super().setUp()
def test_if_match_on_update(self):
net_response = jsonutils.loads(self.app.post_json(
@ -169,7 +169,7 @@ class TestPolicyEnforcementHook(test_functional.PecanFunctionalTest):
# independent from the evolution of the API (so if one changes the API
# or the default policies there won't be any risk of breaking these
# tests, or at least I hope so)
super(TestPolicyEnforcementHook, self).setUp()
super().setUp()
self.mock_plugin = mock.Mock()
attributes.RESOURCES.update(self.FAKE_RESOURCE)
manager.NeutronManager.set_plugin_for_resource('mehs',
@ -332,7 +332,7 @@ class TestMetricsNotifierHook(test_functional.PecanFunctionalTest):
patcher = mock.patch('neutron.pecan_wsgi.hooks.notifier.NotifierHook.'
'_notifier')
self.mock_notifier = patcher.start().info
super(TestMetricsNotifierHook, self).setUp()
super().setUp()
def test_post_put_delete_triggers_notification(self):
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
@ -451,7 +451,7 @@ class TestMetricsNotifierHook(test_functional.PecanFunctionalTest):
class TestCallbackRegistryNotifier(test_functional.PecanFunctionalTest):
def setUp(self):
super(TestCallbackRegistryNotifier, self).setUp()
super().setUp()
patcher = mock.patch('neutron.pecan_wsgi.hooks.notifier.registry')
self.mock_notifier = patcher.start().publish

View File

@ -58,7 +58,7 @@ class FakeSingularCollectionExtension(api_extensions.ExtensionDescriptor):
return [pecan_utils.PecanResourceExtension(self.RESOURCE, ctrllr)]
class FakeSingularCollectionPlugin(object):
class FakeSingularCollectionPlugin:
supported_extension_aliases = ['fake-sc']
@ -204,7 +204,7 @@ class FakeExtension(api_extensions.ExtensionDescriptor):
return {}
class FakePlugin(object):
class FakePlugin:
PLUGIN_TYPE = 'fake-ext-plugin'
supported_extension_aliases = ['fake-ext']

View File

@ -25,7 +25,7 @@ from neutron.tests.functional import base as functional_base
class MacvtapAgentTestCase(functional_base.BaseSudoTestCase):
def setUp(self):
super(MacvtapAgentTestCase, self).setUp()
super().setUp()
self.mgr = macvtap_neutron_agent.MacvtapManager({})
def test_get_all_devices(self):
@ -50,9 +50,9 @@ class MacvtapAgentTestCase(functional_base.BaseSudoTestCase):
lambda: {macvtap.link.address} == self.mgr.get_all_devices(),
timeout=10)
except common_utils.WaitTimeout:
msg = 'MacVTap address: %s, read devices: %s\n' % (
msg = 'MacVTap address: {}, read devices: {}\n'.format(
macvtap.link.address, self.mgr.get_all_devices())
for device in ip_lib.IPWrapper().get_devices():
msg += ' Device %s, MAC: %s' % (device.name,
device.link.address)
msg += ' Device {}, MAC: {}'.format(device.name,
device.link.address)
self.fail(msg)

View File

@ -118,7 +118,7 @@ class TestOVNClientQosExtensionBase(base.TestOVNFunctionalBase):
class TestOVNClientQosExtension(TestOVNClientQosExtensionBase):
def setUp(self, maintenance_worker=False):
super(TestOVNClientQosExtension, self).setUp(
super().setUp(
maintenance_worker=maintenance_worker)
self._add_logical_switch()
self.qos_driver = qos_extension.OVNClientQosExtension(
@ -197,7 +197,7 @@ class TestOVNClientQosExtension(TestOVNClientQosExtensionBase):
class TestOVNClientQosExtensionEndToEnd(TestOVNClientQosExtensionBase):
def setUp(self, maintenance_worker=False):
super(TestOVNClientQosExtensionEndToEnd, self).setUp(
super().setUp(
maintenance_worker=maintenance_worker)
self.qos_driver = self.l3_plugin._ovn_client._qos_driver
self._mock_qos_rules = mock.patch.object(self.qos_driver, '_qos_rules')

Some files were not shown because too many files have changed in this diff Show More