SDK for Neutron ports
Related-Bug: #1999774 Change-Id: I4f7ab64ad5b6131fd868f2dd5de2410a6015918d
This commit is contained in:
parent
dfc5b4b64e
commit
c7a6407330
@ -1829,8 +1829,10 @@ def subnetpool_delete(request, subnetpool_id):
|
||||
@memoized
|
||||
def port_list(request, **params):
|
||||
LOG.debug("port_list(): params=%s", params)
|
||||
ports = neutronclient(request).list_ports(**params).get('ports')
|
||||
return [Port(p) for p in ports]
|
||||
ports = networkclient(request).ports(**params)
|
||||
if not isinstance(ports, (types.GeneratorType, list)):
|
||||
ports = [ports]
|
||||
return [Port(p.to_dict()) for p in ports]
|
||||
|
||||
|
||||
@profiler.trace
|
||||
@ -1853,7 +1855,7 @@ def port_list_with_trunk_types(request, **params):
|
||||
if not is_extension_supported(request, 'trunk'):
|
||||
return port_list(request, **params)
|
||||
|
||||
ports = neutronclient(request).list_ports(**params)['ports']
|
||||
ports = networkclient(request).ports(**params)
|
||||
trunk_filters = {}
|
||||
if 'tenant_id' in params:
|
||||
trunk_filters['tenant_id'] = params['tenant_id']
|
||||
@ -1878,11 +1880,10 @@ def port_list_with_trunk_types(request, **params):
|
||||
|
||||
|
||||
@profiler.trace
|
||||
def port_get(request, port_id, **params):
|
||||
LOG.debug("port_get(): portid=%(port_id)s, params=%(params)s",
|
||||
{'port_id': port_id, 'params': params})
|
||||
port = neutronclient(request).show_port(port_id, **params).get('port')
|
||||
return Port(port)
|
||||
def port_get(request, port_id):
|
||||
LOG.debug("port_get(): portid=%(port_id)s", {'port_id': port_id})
|
||||
port = networkclient(request).get_port(port_id)
|
||||
return Port(port.to_dict())
|
||||
|
||||
|
||||
def unescape_port_kwargs(**kwargs):
|
||||
@ -1907,18 +1908,17 @@ def port_create(request, network_id, **kwargs):
|
||||
LOG.debug("port_create(): netid=%(network_id)s, kwargs=%(kwargs)s",
|
||||
{'network_id': network_id, 'kwargs': kwargs})
|
||||
kwargs = unescape_port_kwargs(**kwargs)
|
||||
body = {'port': {'network_id': network_id}}
|
||||
if 'tenant_id' not in kwargs:
|
||||
kwargs['tenant_id'] = request.user.project_id
|
||||
body['port'].update(kwargs)
|
||||
port = neutronclient(request).create_port(body=body).get('port')
|
||||
kwargs['network_id'] = network_id
|
||||
port = networkclient(request).create_port(**kwargs).to_dict()
|
||||
return Port(port)
|
||||
|
||||
|
||||
@profiler.trace
|
||||
def port_delete(request, port_id):
|
||||
LOG.debug("port_delete(): portid=%s", port_id)
|
||||
neutronclient(request).delete_port(port_id)
|
||||
networkclient(request).delete_port(port_id)
|
||||
|
||||
|
||||
@profiler.trace
|
||||
@ -1926,8 +1926,7 @@ def port_update(request, port_id, **kwargs):
|
||||
LOG.debug("port_update(): portid=%(port_id)s, kwargs=%(kwargs)s",
|
||||
{'port_id': port_id, 'kwargs': kwargs})
|
||||
kwargs = unescape_port_kwargs(**kwargs)
|
||||
body = {'port': kwargs}
|
||||
port = neutronclient(request).update_port(port_id, body=body).get('port')
|
||||
port = networkclient(request).update_port(port_id, **kwargs).to_dict()
|
||||
return Port(port)
|
||||
|
||||
|
||||
|
@ -194,7 +194,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'network_name': network.name,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
'binding__host_id': port.binding__host_id,
|
||||
@ -236,7 +236,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
tenant_id=network.tenant_id,
|
||||
network_id=network.id,
|
||||
name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
device_id=port.device_id,
|
||||
device_owner=port.device_owner,
|
||||
binding__host_id=port.binding__host_id,
|
||||
@ -266,7 +266,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'network_name': network.name,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
'binding__host_id': port.binding__host_id,
|
||||
@ -298,7 +298,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
tenant_id=network.tenant_id,
|
||||
network_id=network.id,
|
||||
name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
device_id=port.device_id,
|
||||
device_owner=port.device_owner,
|
||||
binding__host_id=port.binding__host_id,
|
||||
@ -340,7 +340,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'network_name': network.name,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'mac_state': True,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
@ -386,7 +386,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
tenant_id=network.tenant_id,
|
||||
network_id=network.id,
|
||||
name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
device_id=port.device_id,
|
||||
device_owner=port.device_owner,
|
||||
binding__host_id=port.binding__host_id,
|
||||
@ -460,7 +460,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'port_id': port.id,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
'binding__host_id': port.binding__host_id,
|
||||
@ -502,7 +502,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
self.mock_port_update.assert_called_once_with(
|
||||
test.IsHttpRequest(), port.id,
|
||||
name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
device_id=port.device_id,
|
||||
device_owner=port.device_owner,
|
||||
binding__host_id=port.binding__host_id,
|
||||
@ -539,7 +539,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'port_id': port.id,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
'binding__host_id': port.binding__host_id,
|
||||
@ -578,7 +578,7 @@ class NetworkPortTests(test.BaseAdminViewTests):
|
||||
self.mock_port_update.assert_called_once_with(
|
||||
test.IsHttpRequest(), port.id,
|
||||
name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
device_id=port.device_id,
|
||||
device_owner=port.device_owner,
|
||||
binding__host_id=port.binding__host_id,
|
||||
|
@ -144,7 +144,7 @@ class PortsTable(tables.DataTable):
|
||||
status = tables.Column("status",
|
||||
verbose_name=_("Status"),
|
||||
display_choices=STATUS_DISPLAY_CHOICES)
|
||||
admin_state = tables.Column("admin_state",
|
||||
admin_state = tables.Column("is_admin_state_up",
|
||||
verbose_name=_("Admin State"),
|
||||
display_choices=DISPLAY_CHOICES)
|
||||
mac_state = tables.Column("mac_state", empty_value=api.neutron.OFF_STATE,
|
||||
|
@ -60,9 +60,9 @@ class NetworkPortTests(test.TestCase):
|
||||
'security_group_list')})
|
||||
def _test_port_detail(self, mac_learning=False):
|
||||
# Use a port associated with security group
|
||||
port = [p for p in self.ports.list() if p.security_groups][0]
|
||||
port = [p for p in self.ports.list() if p.security_group_ids][0]
|
||||
sgs = [sg for sg in self.security_groups.list()
|
||||
if sg.id in port.security_groups]
|
||||
if sg.id in port.security_group_ids]
|
||||
network_id = self.networks.first().id
|
||||
self.mock_port_get.return_value = port
|
||||
self.mock_security_group_list.return_value = sgs
|
||||
@ -79,7 +79,7 @@ class NetworkPortTests(test.TestCase):
|
||||
port.id)
|
||||
self.mock_security_group_list.assert_called_once_with(
|
||||
test.IsHttpRequest(),
|
||||
id=tuple(sg.id for sg in port.security_groups))
|
||||
id=tuple(port.security_group_ids))
|
||||
self._check_is_extension_supported({'mac-learning': 2,
|
||||
'allowed-address-pairs': 2})
|
||||
self.mock_network_get.assert_called_once_with(test.IsHttpRequest(),
|
||||
@ -168,7 +168,7 @@ class NetworkPortTests(test.TestCase):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'port_id': port.id,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up}
|
||||
'admin_state': port.is_admin_state_up}
|
||||
if binding:
|
||||
form_data['binding__vnic_type'] = port.binding__vnic_type
|
||||
if mac_learning:
|
||||
@ -196,7 +196,7 @@ class NetworkPortTests(test.TestCase):
|
||||
extension_kwargs.pop('port_security_enabled')
|
||||
self.mock_port_update.assert_called_once_with(
|
||||
test.IsHttpRequest(), port.id, name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
**extension_kwargs)
|
||||
|
||||
def test_port_update_post_exception(self):
|
||||
@ -229,7 +229,7 @@ class NetworkPortTests(test.TestCase):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'port_id': port.id,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up}
|
||||
'admin_state': port.is_admin_state_up}
|
||||
if binding:
|
||||
form_data['binding__vnic_type'] = port.binding__vnic_type
|
||||
if mac_learning:
|
||||
@ -265,7 +265,7 @@ class NetworkPortTests(test.TestCase):
|
||||
extension_kwargs.pop('port_security_enabled')
|
||||
self.mock_port_update.assert_called_once_with(
|
||||
test.IsHttpRequest(), port.id, name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
**extension_kwargs)
|
||||
|
||||
@test.create_mocks({api.neutron: ('port_get',
|
||||
@ -606,7 +606,7 @@ class NetworkPortTests(test.TestCase):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'network_name': network.name,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
'specify_ip': 'fixed_ip',
|
||||
@ -647,7 +647,7 @@ class NetworkPortTests(test.TestCase):
|
||||
test.IsHttpRequest(),
|
||||
network_id=network.id,
|
||||
name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
device_id=port.device_id,
|
||||
device_owner=port.device_owner,
|
||||
fixed_ips=fixed_ips,
|
||||
@ -672,7 +672,7 @@ class NetworkPortTests(test.TestCase):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'network_name': network.name,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
'specify_ip': 'subnet_id',
|
||||
@ -706,7 +706,7 @@ class NetworkPortTests(test.TestCase):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'network_name': network.name,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
'specify_ip': 'fixed_ip',
|
||||
@ -750,7 +750,7 @@ class NetworkPortTests(test.TestCase):
|
||||
form_data = {'network_id': port.network_id,
|
||||
'network_name': network.name,
|
||||
'name': port.name,
|
||||
'admin_state': port.admin_state_up,
|
||||
'admin_state': port.is_admin_state_up,
|
||||
'mac_state': True,
|
||||
'device_id': port.device_id,
|
||||
'device_owner': port.device_owner,
|
||||
@ -790,7 +790,7 @@ class NetworkPortTests(test.TestCase):
|
||||
test.IsHttpRequest(),
|
||||
network_id=network.id,
|
||||
name=port.name,
|
||||
admin_state_up=port.admin_state_up,
|
||||
admin_state_up=port.is_admin_state_up,
|
||||
device_id=port.device_id,
|
||||
device_owner=port.device_owner,
|
||||
fixed_ips=fixed_ips,
|
||||
|
@ -69,8 +69,8 @@ class DetailView(tabs.TabbedTableView):
|
||||
|
||||
try:
|
||||
port = api.neutron.port_get(self.request, port_id)
|
||||
port.admin_state_label = STATE_DICT.get(port.admin_state,
|
||||
port.admin_state)
|
||||
port.admin_state_label = STATE_DICT.get(port.is_admin_state_up,
|
||||
port.is_admin_state_up)
|
||||
port.status_label = STATUS_DICT.get(port.status,
|
||||
port.status)
|
||||
if port.get('binding__vnic_type'):
|
||||
@ -122,7 +122,7 @@ class DetailView(tabs.TabbedTableView):
|
||||
|
||||
results = futurist_utils.call_functions_parallel(
|
||||
(get_network, [port.network_id]),
|
||||
(get_security_groups, [tuple(port.security_groups)]))
|
||||
(get_security_groups, [tuple(port.security_group_ids)]))
|
||||
network, port.security_groups = results
|
||||
|
||||
port.network_name = network.get('name')
|
||||
@ -173,7 +173,7 @@ class UpdateView(workflows.WorkflowView):
|
||||
'network_id': port['network_id'],
|
||||
'tenant_id': port['tenant_id'],
|
||||
'name': port['name'],
|
||||
'admin_state': port['admin_state_up'],
|
||||
'admin_state': port['is_admin_state_up'],
|
||||
'mac_address': port['mac_address'],
|
||||
"target_tenant_id": self.request.user.project_id}
|
||||
if port.get('binding__vnic_type'):
|
||||
|
@ -15,6 +15,7 @@
|
||||
import copy
|
||||
|
||||
from openstack.network.v2 import network as sdk_net
|
||||
from openstack.network.v2 import port as sdk_port
|
||||
from openstack.network.v2 import subnet as sdk_subnet
|
||||
from openstack.network.v2 import trunk as sdk_trunk
|
||||
from oslo_utils import uuidutils
|
||||
@ -90,6 +91,8 @@ def data(TEST):
|
||||
TEST.api_subnets_sdk = list()
|
||||
TEST.api_tp_trunks_sdk = list()
|
||||
TEST.api_trunks_sdk = list()
|
||||
TEST.api_ports_sdk = list()
|
||||
TEST.api_tp_ports_sdk = list()
|
||||
|
||||
# 1st network.
|
||||
network_dict = {'is_admin_state_up': True,
|
||||
@ -151,7 +154,7 @@ def data(TEST):
|
||||
|
||||
# Ports on 1st network.
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': 'af75c8e5-a1cc-4567-8d04-44fcd6922890',
|
||||
'device_owner': 'network:dhcp',
|
||||
'fixed_ips': [{'ip_address': '10.0.0.3',
|
||||
@ -169,14 +172,15 @@ def data(TEST):
|
||||
'mac_address': 'fa:16:3e:7a:7b:18'}
|
||||
],
|
||||
'port_security_enabled': True,
|
||||
'security_groups': [],
|
||||
'security_group_ids': [],
|
||||
}
|
||||
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': '1',
|
||||
'device_owner': 'compute:nova',
|
||||
'fixed_ips': [{'ip_address': '10.0.0.4',
|
||||
@ -192,7 +196,7 @@ def data(TEST):
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'port_security_enabled': True,
|
||||
'security_groups': [
|
||||
'security_group_ids': [
|
||||
# sec_group_1 ID below
|
||||
'faad7c80-3b62-4440-967c-13808c37131d',
|
||||
# sec_group_2 ID below
|
||||
@ -200,11 +204,12 @@ def data(TEST):
|
||||
],
|
||||
}
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
assoc_port = port_dict
|
||||
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': '279989f7-54bb-41d9-ba42-0d61f12fda61',
|
||||
'device_owner': 'network:router_interface',
|
||||
'fixed_ips': [{'ip_address': '10.0.0.1',
|
||||
@ -217,12 +222,13 @@ def data(TEST):
|
||||
'tenant_id': network_dict['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [],
|
||||
'security_group_ids': [],
|
||||
}
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': '279989f7-54bb-41d9-ba42-0d61f12fda61',
|
||||
'device_owner': 'network:router_interface',
|
||||
'fixed_ips': [{'ip_address': 'fdb6:b88a:488e::1',
|
||||
@ -235,14 +241,15 @@ def data(TEST):
|
||||
'tenant_id': network_dict['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [],
|
||||
'security_group_ids': [],
|
||||
}
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
|
||||
# unbound port on 1st network
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': '',
|
||||
'device_owner': '',
|
||||
'fixed_ips': [{'ip_address': '10.0.0.5',
|
||||
@ -255,9 +262,10 @@ def data(TEST):
|
||||
'tenant_id': network_dict['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': '',
|
||||
'security_groups': [],
|
||||
'security_group_ids': [],
|
||||
}
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
|
||||
# 2nd network.
|
||||
@ -300,7 +308,7 @@ def data(TEST):
|
||||
TEST.subnets.add(subnet)
|
||||
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': '2',
|
||||
'device_owner': 'compute:nova',
|
||||
'fixed_ips': [{'ip_address': '172.16.88.3',
|
||||
@ -313,13 +321,14 @@ def data(TEST):
|
||||
'tenant_id': network_dict['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [
|
||||
'security_group_ids': [
|
||||
# sec_group_1 ID below
|
||||
'faad7c80-3b62-4440-967c-13808c37131d',
|
||||
],
|
||||
}
|
||||
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
|
||||
# External not shared network.
|
||||
@ -616,7 +625,7 @@ def data(TEST):
|
||||
|
||||
# Set up router data.
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': '7180cede-bcd8-4334-b19f-f7ef2f331f53',
|
||||
'device_owner': 'network:router_gateway',
|
||||
'fixed_ips': [{'ip_address': '10.0.0.3',
|
||||
@ -629,9 +638,10 @@ def data(TEST):
|
||||
'tenant_id': '1',
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [],
|
||||
'security_group_ids': [],
|
||||
}
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
|
||||
trunk_dict = {'status': 'UP',
|
||||
@ -1042,7 +1052,7 @@ def data(TEST):
|
||||
|
||||
# ports on 4th network
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': '9872faaa-b2b2-eeee-9911-21332eedaa77',
|
||||
'device_owner': 'network:dhcp',
|
||||
'fixed_ips': [{'ip_address': '11.10.0.3',
|
||||
@ -1056,9 +1066,10 @@ def data(TEST):
|
||||
'tenant_id': TEST.networks.first().tenant_id,
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [],
|
||||
'security_group_ids': [],
|
||||
}
|
||||
TEST.api_ports.add(port_dict)
|
||||
TEST.api_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.ports.add(neutron.Port(port_dict))
|
||||
|
||||
availability = {'network_ip_availability': {
|
||||
@ -1230,7 +1241,7 @@ def data(TEST):
|
||||
|
||||
# port parent
|
||||
parent_port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': tdata['parent']['device_id'],
|
||||
'device_owner': 'compute:nova',
|
||||
'fixed_ips': [{'ip_address': tdata['parent']['ip'],
|
||||
@ -1243,7 +1254,7 @@ def data(TEST):
|
||||
'tenant_id': tdata['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [tdata['security_group']],
|
||||
'security_group_ids': [tdata['security_group']],
|
||||
'trunk_details': {
|
||||
'sub_ports': [{'segmentation_type': 'vlan',
|
||||
'mac_address': tdata['child1']['mac'],
|
||||
@ -1256,11 +1267,12 @@ def data(TEST):
|
||||
'trunk_id': tdata['trunk_id']}
|
||||
}
|
||||
TEST.api_tp_ports.add(parent_port_dict)
|
||||
TEST.api_tp_ports_sdk.append(sdk_port.Port(**parent_port_dict))
|
||||
TEST.tp_ports.add(neutron.PortTrunkParent(parent_port_dict))
|
||||
|
||||
# port child1
|
||||
child1_port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': tdata['child1']['device_id'],
|
||||
'device_owner': 'compute:nova',
|
||||
'fixed_ips': [{'ip_address': tdata['child1']['ip'],
|
||||
@ -1273,9 +1285,10 @@ def data(TEST):
|
||||
'tenant_id': tdata['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [tdata['security_group']]
|
||||
'security_group_ids': [tdata['security_group']]
|
||||
}
|
||||
TEST.api_tp_ports.add(child1_port_dict)
|
||||
TEST.api_tp_ports_sdk.append(sdk_port.Port(**child1_port_dict))
|
||||
TEST.tp_ports.add(neutron.PortTrunkSubport(
|
||||
child1_port_dict,
|
||||
{'trunk_id': tdata['trunk_id'],
|
||||
@ -1284,7 +1297,7 @@ def data(TEST):
|
||||
|
||||
# port plain
|
||||
port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': tdata['plain']['device_id'],
|
||||
'device_owner': 'compute:nova',
|
||||
'fixed_ips': [{'ip_address': tdata['plain']['ip'],
|
||||
@ -1297,16 +1310,17 @@ def data(TEST):
|
||||
'tenant_id': tdata['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [tdata['security_group']]
|
||||
'security_group_ids': [tdata['security_group']]
|
||||
}
|
||||
TEST.api_tp_ports.add(port_dict)
|
||||
TEST.api_tp_ports_sdk.append(sdk_port.Port(**port_dict))
|
||||
TEST.tp_ports.add(neutron.Port(port_dict))
|
||||
|
||||
# network tstalt
|
||||
|
||||
# port child2
|
||||
child2_port_dict = {
|
||||
'admin_state_up': True,
|
||||
'is_admin_state_up': True,
|
||||
'device_id': tdata['child2']['device_id'],
|
||||
'device_owner': 'compute:nova',
|
||||
'fixed_ips': [{'ip_address': tdata['child2']['ip'],
|
||||
@ -1319,9 +1333,10 @@ def data(TEST):
|
||||
'tenant_id': tdata['tenant_id'],
|
||||
'binding:vnic_type': 'normal',
|
||||
'binding:host_id': 'host',
|
||||
'security_groups': [tdata['security_group']]
|
||||
'security_group_ids': [tdata['security_group']]
|
||||
}
|
||||
TEST.api_tp_ports.add(child2_port_dict)
|
||||
TEST.api_tp_ports_sdk.append(sdk_port.Port(**child2_port_dict))
|
||||
TEST.tp_ports.add(neutron.PortTrunkSubport(
|
||||
child2_port_dict,
|
||||
{'trunk_id': tdata['trunk_id'],
|
||||
|
@ -71,7 +71,7 @@ class NetworkApiNeutronTests(test.APIMockTestCase):
|
||||
|
||||
servers = self.servers.list()
|
||||
server_ids = tuple([server.id for server in servers])
|
||||
server_ports = [p for p in self.api_ports.list()
|
||||
server_ports = [p for p in self.api_ports_sdk
|
||||
if p['device_id'] in server_ids]
|
||||
server_port_ids = tuple([p['id'] for p in server_ports])
|
||||
if router_enabled:
|
||||
@ -81,12 +81,15 @@ class NetworkApiNeutronTests(test.APIMockTestCase):
|
||||
server_networks = [net for net in self.api_networks_sdk
|
||||
if net['id'] in server_network_ids]
|
||||
|
||||
list_ports_retvals = [{'ports': server_ports}]
|
||||
self.qclient.list_ports.side_effect = list_ports_retvals
|
||||
list_ports_retvals = server_ports
|
||||
port_w_router = None
|
||||
if router_enabled:
|
||||
self.qclient.list_floatingips.return_value = {'floatingips':
|
||||
assoc_fips}
|
||||
list_ports_retvals.append({'ports': self.api_ports.list()})
|
||||
port_w_router = self.api_ports_sdk
|
||||
self.sdk_net_client.ports.side_effect = [list_ports_retvals,
|
||||
port_w_router]
|
||||
self.sdk_net_client.get_port.side_effect = list_ports_retvals
|
||||
self.sdk_net_client.networks.return_value = server_networks
|
||||
self.sdk_net_client.subnets.return_value = self.api_subnets_sdk
|
||||
|
||||
@ -131,7 +134,7 @@ class NetworkApiNeutronTests(test.APIMockTestCase):
|
||||
expected_list_ports.append(mock.call(tenant_id=tenant_id))
|
||||
else:
|
||||
self.assertEqual(0, self.qclient.list_floatingips.call_count)
|
||||
self.qclient.list_ports.assert_has_calls(expected_list_ports)
|
||||
self.sdk_net_client.ports.assert_has_calls(expected_list_ports)
|
||||
nets_calls = []
|
||||
for server_net_id in server_network_ids:
|
||||
nets_calls.append(mock.call(id=server_net_id))
|
||||
|
@ -17,6 +17,7 @@ from unittest import mock
|
||||
import netaddr
|
||||
from neutronclient.common import exceptions as neutron_exc
|
||||
from openstack import exceptions as sdk_exceptions
|
||||
from openstack.network.v2 import port as sdk_port
|
||||
from openstack.network.v2 import trunk as sdk_trunk
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
@ -1166,35 +1167,32 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
|
||||
neutronclient.delete_subnetpool.assert_called_once_with(subnetpool_id)
|
||||
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
def test_port_list(self, mock_neutronclient):
|
||||
ports = {'ports': self.api_ports.list()}
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_port_list(self, mock_networkclient):
|
||||
ports = self.api_ports_sdk
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
neutronclient.list_ports.return_value = ports
|
||||
network_client = mock_networkclient.return_value
|
||||
network_client.ports.return_value = ports
|
||||
|
||||
ret_val = api.neutron.port_list(self.request)
|
||||
|
||||
for p in ret_val:
|
||||
self.assertIsInstance(p, api.neutron.Port)
|
||||
neutronclient.list_ports.assert_called_once_with()
|
||||
network_client.ports.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(api.neutron, 'is_extension_supported')
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_port_list_with_trunk_types(
|
||||
self, mock_networkclient, mock_neutronclient,
|
||||
mock_is_extension_supported):
|
||||
ports = self.api_tp_ports.list()
|
||||
trunks = self.api_tp_trunks_sdk
|
||||
self, mock_networkclient, mock_is_extension_supported):
|
||||
ports = self.api_tp_ports_sdk
|
||||
trunks = self.api_tp_trunks.list()
|
||||
|
||||
# list_extensions is decorated with memoized_with_request, so
|
||||
# neutronclient() is not called. We need to mock it separately.
|
||||
mock_is_extension_supported.return_value = True # trunk
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
network_client = mock_networkclient.return_value
|
||||
neutronclient.list_ports.return_value = {'ports': ports}
|
||||
network_client.ports.return_value = ports
|
||||
network_client.trunks.return_value = trunks
|
||||
|
||||
expected_parent_port_ids = set()
|
||||
@ -1223,21 +1221,21 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
|
||||
mock_is_extension_supported.assert_called_once_with(
|
||||
test.IsHttpRequest(), 'trunk')
|
||||
neutronclient.list_ports.assert_called_once_with()
|
||||
network_client.ports.assert_called_once_with()
|
||||
network_client.trunks.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(api.neutron, 'is_extension_supported')
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_port_list_with_trunk_types_without_trunk_extension(
|
||||
self, mock_neutronclient, mock_is_extension_supported):
|
||||
ports = self.api_tp_ports.list()
|
||||
self, mock_networkclient, mock_is_extension_supported):
|
||||
ports = self.api_tp_ports_sdk
|
||||
|
||||
# list_extensions is decorated with memoized_with_request,
|
||||
# the simpliest way is to mock it directly.
|
||||
mock_is_extension_supported.return_value = False # trunk
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
neutronclient.list_ports.return_value = {'ports': ports}
|
||||
networkclient = mock_networkclient.return_value
|
||||
networkclient.ports.return_value = ports
|
||||
|
||||
ret_val = api.neutron.port_list_with_trunk_types(self.request)
|
||||
|
||||
@ -1250,24 +1248,24 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
|
||||
mock_is_extension_supported.assert_called_once_with(
|
||||
test.IsHttpRequest(), 'trunk')
|
||||
neutronclient.list_ports.assert_called_once_with()
|
||||
networkclient.ports.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
def test_port_get(self, mock_neutronclient):
|
||||
port = {'port': self.api_ports.first()}
|
||||
port_id = self.api_ports.first()['id']
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_port_get(self, mock_networkclient):
|
||||
port = self.api_ports_sdk[0]
|
||||
port_id = self.api_ports_sdk[0]['id']
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
neutronclient.show_port.return_value = port
|
||||
network_client = mock_networkclient.return_value
|
||||
network_client.get_port.return_value = port
|
||||
|
||||
ret_val = api.neutron.port_get(self.request, port_id)
|
||||
|
||||
self.assertIsInstance(ret_val, api.neutron.Port)
|
||||
neutronclient.show_port.assert_called_once_with(port_id)
|
||||
network_client.get_port.assert_called_once_with(port_id)
|
||||
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
def test_port_create(self, mock_neutronclient):
|
||||
port = self.api_ports.first()
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_port_create(self, mock_networkclient):
|
||||
port = self.api_ports_sdk[0]
|
||||
params = {'network_id': port['network_id'],
|
||||
'tenant_id': port['tenant_id'],
|
||||
'name': port['name'],
|
||||
@ -1276,19 +1274,18 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
params['binding__vnic_type'] = port['binding:vnic_type']
|
||||
api_params['binding:vnic_type'] = port['binding:vnic_type']
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
neutronclient.create_port.return_value = {'port': port}
|
||||
network_client = mock_networkclient.return_value
|
||||
network_client.create_port.return_value = port
|
||||
|
||||
ret_val = api.neutron.port_create(self.request, **params)
|
||||
|
||||
self.assertIsInstance(ret_val, api.neutron.Port)
|
||||
self.assertEqual(api.neutron.Port(port).id, ret_val.id)
|
||||
neutronclient.create_port.assert_called_once_with(
|
||||
body={'port': api_params})
|
||||
self.assertEqual(port.id, ret_val.id)
|
||||
network_client.create_port.assert_called_once_with(**api_params)
|
||||
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
def test_port_update(self, mock_neutronclient):
|
||||
port_data = self.api_ports.first()
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_port_update(self, mock_networkclient):
|
||||
port_data = self.api_ports_sdk[0]
|
||||
port_id = port_data['id']
|
||||
params = {'name': port_data['name'],
|
||||
'device_id': port_data['device_id']}
|
||||
@ -1296,26 +1293,26 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
params['binding__vnic_type'] = port_data['binding:vnic_type']
|
||||
api_params['binding:vnic_type'] = port_data['binding:vnic_type']
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
neutronclient.update_port.return_value = {'port': port_data}
|
||||
network_client = mock_networkclient.return_value
|
||||
network_client.update_port.return_value = port_data
|
||||
|
||||
ret_val = api.neutron.port_update(self.request, port_id, **params)
|
||||
|
||||
self.assertIsInstance(ret_val, api.neutron.Port)
|
||||
self.assertEqual(api.neutron.Port(port_data).id, ret_val.id)
|
||||
neutronclient.update_port.assert_called_once_with(
|
||||
port_id, body={'port': api_params})
|
||||
self.assertEqual(port_data.id, ret_val.id)
|
||||
network_client.update_port.assert_called_once_with(
|
||||
port_id, **api_params)
|
||||
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
def test_port_delete(self, mock_neutronclient):
|
||||
port_id = self.api_ports.first()['id']
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_port_delete(self, mock_networkclient):
|
||||
port_id = self.api_ports_sdk[0]['id']
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
neutronclient.delete_port.return_value = None
|
||||
network_client = mock_networkclient.return_value
|
||||
network_client.delete_port.return_value = None
|
||||
|
||||
api.neutron.port_delete(self.request, port_id)
|
||||
|
||||
neutronclient.delete_port.assert_called_once_with(port_id)
|
||||
network_client.delete_port.assert_called_once_with(port_id)
|
||||
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_trunk_list(self, mock_networkclient):
|
||||
@ -1741,8 +1738,8 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
def test_get_router_ha_permission_without_l3_ha_extension(self):
|
||||
self._test_get_router_ha_permission_with_policy_check(False)
|
||||
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
def test_list_resources_with_long_filters(self, mock_neutronclient):
|
||||
@mock.patch.object(api.neutron, 'networkclient')
|
||||
def test_list_resources_with_long_filters(self, mock_networkclient):
|
||||
# In this tests, port_list is called with id=[10 port ID]
|
||||
# filter. It generates about 40*10 char length URI.
|
||||
# Each port ID is converted to "id=<UUID>&" in URI and
|
||||
@ -1752,18 +1749,17 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
# As a result three API calls with 4, 4, 2 port ID
|
||||
# are expected.
|
||||
|
||||
ports = [{'id': uuidutils.generate_uuid(),
|
||||
'name': 'port%s' % i,
|
||||
'admin_state_up': True}
|
||||
ports = [sdk_port.Port(**{'id': uuidutils.generate_uuid(),
|
||||
'name': 'port%s' % i, 'admin_state_up': True})
|
||||
for i in range(10)]
|
||||
port_ids = tuple([port['id'] for port in ports])
|
||||
|
||||
neutronclient = mock_neutronclient.return_value
|
||||
network_client = mock_networkclient.return_value
|
||||
uri_len_exc = neutron_exc.RequestURITooLong(excess=220)
|
||||
list_ports_retval = [uri_len_exc]
|
||||
for i in range(0, 10, 4):
|
||||
list_ports_retval.append({'ports': ports[i:i + 4]})
|
||||
neutronclient.list_ports.side_effect = list_ports_retval
|
||||
list_ports_retval.append(ports[i:i + 4])
|
||||
network_client.ports.side_effect = list_ports_retval
|
||||
|
||||
ret_val = api.neutron.list_resources_with_long_filters(
|
||||
api.neutron.port_list, 'id', tuple(port_ids),
|
||||
@ -1775,7 +1771,7 @@ class NeutronApiTests(test.APIMockTestCase):
|
||||
expected_calls.append(mock.call(id=tuple(port_ids)))
|
||||
for i in range(0, 10, 4):
|
||||
expected_calls.append(mock.call(id=tuple(port_ids[i:i + 4])))
|
||||
neutronclient.list_ports.assert_has_calls(expected_calls)
|
||||
network_client.ports.assert_has_calls(expected_calls)
|
||||
|
||||
@mock.patch.object(api.neutron, 'neutronclient')
|
||||
def test_qos_policies_list(self, mock_neutronclient):
|
||||
@ -2072,7 +2068,9 @@ class NeutronApiSecurityGroupTests(test.APIMockTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
neutronclient = mock.patch.object(api.neutron, 'neutronclient').start()
|
||||
networkclient = mock.patch.object(api.neutron, 'networkclient').start()
|
||||
self.qclient = neutronclient.return_value
|
||||
self.netclient = networkclient.return_value
|
||||
self.sg_dict = dict([(sg['id'], sg['name']) for sg
|
||||
in self.api_security_groups.list()])
|
||||
|
||||
@ -2284,7 +2282,7 @@ class NeutronApiSecurityGroupTests(test.APIMockTestCase):
|
||||
sg_rule['id'])
|
||||
|
||||
def _get_instance(self, cur_sg_ids):
|
||||
instance_port = [p for p in self.api_ports.list()
|
||||
instance_port = [p for p in self.api_ports_sdk
|
||||
if p['device_owner'].startswith('compute:')][0]
|
||||
instance_id = instance_port['device_id']
|
||||
# Emulate an instance with two ports
|
||||
@ -2299,14 +2297,14 @@ class NeutronApiSecurityGroupTests(test.APIMockTestCase):
|
||||
def test_server_security_groups(self):
|
||||
cur_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
|
||||
instance_id, instance_ports = self._get_instance(cur_sg_ids)
|
||||
self.qclient.list_ports.return_value = {'ports': instance_ports}
|
||||
self.netclient.ports.return_value = instance_ports
|
||||
secgroups = copy.deepcopy(self.api_security_groups.list())
|
||||
self.qclient.list_security_groups.return_value = \
|
||||
{'security_groups': secgroups}
|
||||
|
||||
api.neutron.server_security_groups(self.request, instance_id)
|
||||
|
||||
self.qclient.list_ports.assert_called_once_with(device_id=instance_id)
|
||||
self.netclient.ports.assert_called_once_with(device_id=instance_id)
|
||||
self.qclient.list_security_groups.assert_called_once_with(
|
||||
id=set(cur_sg_ids))
|
||||
|
||||
@ -2315,18 +2313,17 @@ class NeutronApiSecurityGroupTests(test.APIMockTestCase):
|
||||
new_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
|
||||
instance_id, instance_ports = self._get_instance(cur_sg_ids)
|
||||
|
||||
self.qclient.list_ports.return_value = {'ports': instance_ports}
|
||||
self.qclient.update_port.side_effect = \
|
||||
[{'port': p} for p in instance_ports]
|
||||
self.netclient.ports.return_value = instance_ports
|
||||
self.netclient.update_port.side_effect = \
|
||||
[p for p in instance_ports]
|
||||
|
||||
api.neutron.server_update_security_groups(
|
||||
self.request, instance_id, new_sg_ids)
|
||||
|
||||
self.qclient.list_ports.assert_called_once_with(device_id=instance_id)
|
||||
body = {'port': {'security_groups': new_sg_ids}}
|
||||
expected_calls = [mock.call(p['id'], body=body)
|
||||
self.netclient.ports.assert_called_once_with(device_id=instance_id)
|
||||
expected_calls = [mock.call(p['id'], security_groups=new_sg_ids)
|
||||
for p in instance_ports]
|
||||
self.qclient.update_port.assert_has_calls(expected_calls)
|
||||
self.netclient.update_port.assert_has_calls(expected_calls)
|
||||
|
||||
|
||||
class NeutronApiFloatingIpPortForwardingTest(test.APIMockTestCase):
|
||||
@ -2438,7 +2435,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
filters = {'tenant_id': self.request.user.tenant_id}
|
||||
|
||||
self.qclient.list_floatingips.return_value = {'floatingips': fips}
|
||||
self.qclient.list_ports.return_value = {'ports': self.api_ports.list()}
|
||||
self.netclient.ports.return_value = self.api_ports_sdk
|
||||
|
||||
rets = api.neutron.tenant_floating_ip_list(self.request)
|
||||
|
||||
@ -2455,17 +2452,17 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
self.assertIsNone(ret.instance_id)
|
||||
self.assertIsNone(ret.instance_type)
|
||||
self.qclient.list_floatingips.assert_called_once_with(**filters)
|
||||
self.qclient.list_ports.assert_called_once_with(**filters)
|
||||
self.netclient.ports.assert_called_once_with(**filters)
|
||||
|
||||
def test_floating_ip_list_all_tenants(self):
|
||||
fips = self.api_floating_ips.list()
|
||||
self.qclient.list_floatingips.return_value = {'floatingips': fips}
|
||||
self.qclient.list_ports.return_value = {'ports': self.api_ports.list()}
|
||||
self.netclient.ports.return_value = self.api_ports_sdk
|
||||
|
||||
fip_manager = api.neutron.FloatingIpManager(self.request)
|
||||
rets = fip_manager.list(all_tenants=True)
|
||||
|
||||
assoc_port = self.api_ports.list()[1]
|
||||
assoc_port = self.api_ports_sdk[1]
|
||||
self.assertEqual(len(fips), len(rets))
|
||||
for ret, exp in zip(rets, fips):
|
||||
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
|
||||
@ -2478,12 +2475,12 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
self.assertIsNone(ret.instance_id)
|
||||
self.assertIsNone(ret.instance_type)
|
||||
self.qclient.list_floatingips.assert_called_once_with()
|
||||
self.qclient.list_ports.assert_called_once_with()
|
||||
self.netclient.ports.assert_called_once_with()
|
||||
|
||||
def _test_floating_ip_get_associated(self, assoc_port, exp_instance_type):
|
||||
fip = self.api_floating_ips.list()[1]
|
||||
self.qclient.show_floatingip.return_value = {'floatingip': fip}
|
||||
self.qclient.show_port.return_value = {'port': assoc_port}
|
||||
self.netclient.get_port.return_value = assoc_port
|
||||
|
||||
ret = api.neutron.tenant_floating_ip_get(self.request, fip['id'])
|
||||
|
||||
@ -2492,14 +2489,14 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
self.assertEqual(assoc_port['device_id'], ret.instance_id)
|
||||
self.assertEqual(exp_instance_type, ret.instance_type)
|
||||
self.qclient.show_floatingip.assert_called_once_with(fip['id'])
|
||||
self.qclient.show_port.assert_called_once_with(assoc_port['id'])
|
||||
self.netclient.get_port.assert_called_once_with(assoc_port['id'])
|
||||
|
||||
def test_floating_ip_get_associated(self):
|
||||
assoc_port = self.api_ports.list()[1]
|
||||
assoc_port = self.api_ports_sdk[1]
|
||||
self._test_floating_ip_get_associated(assoc_port, 'compute')
|
||||
|
||||
def test_floating_ip_get_associated_with_loadbalancer_vip(self):
|
||||
assoc_port = copy.deepcopy(self.api_ports.list()[1])
|
||||
assoc_port = copy.deepcopy(self.api_ports_sdk[1])
|
||||
assoc_port['device_owner'] = 'neutron:LOADBALANCER'
|
||||
assoc_port['device_id'] = uuidutils.generate_uuid()
|
||||
assoc_port['name'] = 'vip-' + uuidutils.generate_uuid()
|
||||
@ -2586,7 +2583,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
)
|
||||
@mock.patch.object(api._nova, 'novaclient')
|
||||
def test_floating_ip_target_list(self, mock_novaclient):
|
||||
ports = self.api_ports.list()
|
||||
ports = self.api_ports_sdk
|
||||
# Port on the first subnet is connected to a router
|
||||
# attached to external network in neutron_data.
|
||||
subnet_id = self.subnets.first().id
|
||||
@ -2607,7 +2604,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
self._get_target_id(p, ip['ip_address']),
|
||||
self._get_target_name(p, ip['ip_address'])))
|
||||
filters = {'tenant_id': self.request.user.tenant_id}
|
||||
self.qclient.list_ports.return_value = {'ports': ports}
|
||||
self.netclient.ports.return_value = ports
|
||||
servers = self.servers.list()
|
||||
novaclient = mock_novaclient.return_value
|
||||
ver = mock.Mock(min_version='2.1', version='2.45')
|
||||
@ -2633,7 +2630,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
self.assertEqual(exp[0], ret.id)
|
||||
self.assertEqual(exp[1], ret.name)
|
||||
|
||||
self.qclient.list_ports.assert_called_once_with(**filters)
|
||||
self.netclient.ports.assert_called_once_with(**filters)
|
||||
novaclient.versions.get_current.assert_called_once_with()
|
||||
novaclient.servers.list.assert_called_once_with(
|
||||
False, {'project_id': self.request.user.tenant_id})
|
||||
@ -2650,12 +2647,12 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
# list_ports and list_networks are called multiple times,
|
||||
# we prepare a list for return values.
|
||||
list_ports_retvals = []
|
||||
self.qclient.list_ports.side_effect = list_ports_retvals
|
||||
self.netclient.ports.side_effect = list_ports_retvals
|
||||
list_nets_retvals = []
|
||||
self.netclient.networks.side_effect = list_nets_retvals
|
||||
|
||||
# _target_ports_by_instance()
|
||||
list_ports_retvals.append({'ports': candidates})
|
||||
list_ports_retvals.append(candidates)
|
||||
|
||||
# _get_reachable_subnets()
|
||||
ext_nets = [n for n in self.api_networks_sdk
|
||||
@ -2666,7 +2663,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
self.api_routers.list()}]
|
||||
rinfs = [p for p in ports
|
||||
if p['device_owner'] in api.neutron.ROUTER_INTERFACE_OWNERS]
|
||||
list_ports_retvals.append({'ports': rinfs})
|
||||
list_ports_retvals.append(rinfs)
|
||||
shared_nets = [n for n in self.api_networks_sdk if n['is_shared']]
|
||||
list_nets_retvals.append(shared_nets)
|
||||
shared_subnet_ids = [s for n in shared_nets for s in n['subnets']]
|
||||
@ -2683,7 +2680,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
ret_val = api.neutron.floating_ip_target_list_by_instance(self.request,
|
||||
server.id)
|
||||
|
||||
self.qclient.list_ports.assert_has_calls([
|
||||
self.netclient.ports.assert_has_calls([
|
||||
mock.call(device_id=server.id),
|
||||
mock.call(device_owner=api.neutron.ROUTER_INTERFACE_OWNERS),
|
||||
])
|
||||
@ -2700,7 +2697,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
|
||||
def test_target_floating_ip_port_by_instance(self):
|
||||
server = self.servers.first()
|
||||
ports = self.api_ports.list()
|
||||
ports = self.api_ports_sdk
|
||||
candidates = [p for p in ports if p['device_id'] == server.id]
|
||||
|
||||
ret = self._test_target_floating_ip_port_by_instance(server, ports,
|
||||
@ -2713,7 +2710,7 @@ class NeutronApiFloatingIpTests(test.APIMockTestCase):
|
||||
|
||||
def test_target_floating_ip_port_by_instance_with_ipv6(self):
|
||||
server = self.servers.first()
|
||||
ports = self.api_ports.list()
|
||||
ports = self.api_ports_sdk
|
||||
candidates = [p for p in ports if p['device_id'] == server.id]
|
||||
# Move the IPv6 entry first
|
||||
fixed_ips = candidates[0]['fixed_ips']
|
||||
|
Loading…
Reference in New Issue
Block a user