Merge "Stop memoizing on request argument in neutron api"
This commit is contained in:
commit
1b9e47411b
@ -1178,9 +1178,9 @@ def _server_get_addresses(request, server, ports, floating_ips, network_names):
|
|||||||
def list_extensions(request):
|
def list_extensions(request):
|
||||||
extensions_list = neutronclient(request).list_extensions()
|
extensions_list = neutronclient(request).list_extensions()
|
||||||
if 'extensions' in extensions_list:
|
if 'extensions' in extensions_list:
|
||||||
return extensions_list['extensions']
|
return tuple(extensions_list['extensions'])
|
||||||
else:
|
else:
|
||||||
return {}
|
return ()
|
||||||
|
|
||||||
|
|
||||||
@memoized
|
@memoized
|
||||||
|
@ -182,7 +182,7 @@ class AggregatesViewTests(test.BaseAdminViewTests):
|
|||||||
'availability_zone_list',
|
'availability_zone_list',
|
||||||
'tenant_absolute_limits',),
|
'tenant_absolute_limits',),
|
||||||
api.cinder: ('tenant_absolute_limits',),
|
api.cinder: ('tenant_absolute_limits',),
|
||||||
api.neutron: ('list_extensions',),
|
api.neutron: ('is_extension_supported',),
|
||||||
api.network: ('tenant_floating_ip_list',
|
api.network: ('tenant_floating_ip_list',
|
||||||
'security_group_list'),
|
'security_group_list'),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
@ -191,8 +191,9 @@ class AggregatesViewTests(test.BaseAdminViewTests):
|
|||||||
MultipleTimes().AndReturn(self.limits['absolute'])
|
MultipleTimes().AndReturn(self.limits['absolute'])
|
||||||
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)). \
|
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)). \
|
||||||
MultipleTimes().AndReturn(self.cinder_limits['absolute'])
|
MultipleTimes().AndReturn(self.cinder_limits['absolute'])
|
||||||
api.neutron.list_extensions(IsA(http.HttpRequest)). \
|
api.neutron.\
|
||||||
AndReturn(self.api_extensions.list())
|
is_extension_supported(IsA(http.HttpRequest), 'security-group'). \
|
||||||
|
MultipleTimes().AndReturn(True)
|
||||||
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
|
||||||
.AndReturn(self.floating_ips.list())
|
.AndReturn(self.floating_ips.list())
|
||||||
api.network.security_group_list(IsA(http.HttpRequest)) \
|
api.network.security_group_list(IsA(http.HttpRequest)) \
|
||||||
|
@ -361,20 +361,19 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
self.assertItemsEqual(subnets, [self.subnets.first()])
|
self.assertItemsEqual(subnets, [self.subnets.first()])
|
||||||
|
|
||||||
@test.create_stubs({api.neutron: ('profile_list',
|
@test.create_stubs({api.neutron: ('profile_list',
|
||||||
'list_extensions',),
|
'is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
def test_network_create_get(self,
|
def test_network_create_get(self,
|
||||||
test_with_profile=False):
|
test_with_profile=False):
|
||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
api.keystone.tenant_list(IsA(
|
api.keystone.tenant_list(IsA(
|
||||||
http.HttpRequest)).AndReturn([tenants, False])
|
http.HttpRequest)).AndReturn([tenants, False])
|
||||||
if test_with_profile:
|
if test_with_profile:
|
||||||
net_profiles = self.net_profiles.list()
|
net_profiles = self.net_profiles.list()
|
||||||
api.neutron.profile_list(IsA(http.HttpRequest),
|
api.neutron.profile_list(IsA(http.HttpRequest),
|
||||||
'network').AndReturn(net_profiles)
|
'network').AndReturn(net_profiles)
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
AndReturn(True)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
url = reverse('horizon:admin:networks:create')
|
url = reverse('horizon:admin:networks:create')
|
||||||
@ -389,14 +388,14 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
|
|
||||||
@test.create_stubs({api.neutron: ('network_create',
|
@test.create_stubs({api.neutron: ('network_create',
|
||||||
'profile_list',
|
'profile_list',
|
||||||
'list_extensions',),
|
'is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
def test_network_create_post(self,
|
def test_network_create_post(self,
|
||||||
test_with_profile=False):
|
test_with_profile=False):
|
||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
tenant_id = self.tenants.first().id
|
tenant_id = self.tenants.first().id
|
||||||
network = self.networks.first()
|
network = self.networks.first()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
api.keystone.tenant_list(IsA(http.HttpRequest))\
|
api.keystone.tenant_list(IsA(http.HttpRequest))\
|
||||||
.AndReturn([tenants, False])
|
.AndReturn([tenants, False])
|
||||||
params = {'name': network.name,
|
params = {'name': network.name,
|
||||||
@ -411,8 +410,8 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
api.neutron.profile_list(IsA(http.HttpRequest),
|
api.neutron.profile_list(IsA(http.HttpRequest),
|
||||||
'network').AndReturn(net_profiles)
|
'network').AndReturn(net_profiles)
|
||||||
params['net_profile_id'] = net_profile_id
|
params['net_profile_id'] = net_profile_id
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
MultipleTimes().AndReturn(True)
|
||||||
api.neutron.network_create(IsA(http.HttpRequest), **params)\
|
api.neutron.network_create(IsA(http.HttpRequest), **params)\
|
||||||
.AndReturn(network)
|
.AndReturn(network)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
@ -438,14 +437,14 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
|
|
||||||
@test.create_stubs({api.neutron: ('network_create',
|
@test.create_stubs({api.neutron: ('network_create',
|
||||||
'profile_list',
|
'profile_list',
|
||||||
'list_extensions',),
|
'is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
def test_network_create_post_network_exception(self,
|
def test_network_create_post_network_exception(self,
|
||||||
test_with_profile=False):
|
test_with_profile=False):
|
||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
tenant_id = self.tenants.first().id
|
tenant_id = self.tenants.first().id
|
||||||
network = self.networks.first()
|
network = self.networks.first()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
|
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
|
||||||
False])
|
False])
|
||||||
params = {'name': network.name,
|
params = {'name': network.name,
|
||||||
@ -460,8 +459,8 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
api.neutron.profile_list(IsA(http.HttpRequest),
|
api.neutron.profile_list(IsA(http.HttpRequest),
|
||||||
'network').AndReturn(net_profiles)
|
'network').AndReturn(net_profiles)
|
||||||
params['net_profile_id'] = net_profile_id
|
params['net_profile_id'] = net_profile_id
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
MultipleTimes().AndReturn(True)
|
||||||
api.neutron.network_create(IsA(http.HttpRequest),
|
api.neutron.network_create(IsA(http.HttpRequest),
|
||||||
**params).AndRaise(self.exceptions.neutron)
|
**params).AndRaise(self.exceptions.neutron)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
@ -486,18 +485,18 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
self.test_network_create_post_network_exception(
|
self.test_network_create_post_network_exception(
|
||||||
test_with_profile=True)
|
test_with_profile=True)
|
||||||
|
|
||||||
@test.create_stubs({api.neutron: ('list_extensions',),
|
@test.create_stubs({api.neutron: ('is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
def test_network_create_vlan_segmentation_id_invalid(self):
|
def test_network_create_vlan_segmentation_id_invalid(self):
|
||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
tenant_id = self.tenants.first().id
|
tenant_id = self.tenants.first().id
|
||||||
network = self.networks.first()
|
network = self.networks.first()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
api.keystone.tenant_list(
|
api.keystone.tenant_list(
|
||||||
IsA(http.HttpRequest)
|
IsA(http.HttpRequest)
|
||||||
).MultipleTimes().AndReturn([tenants, False])
|
).MultipleTimes().AndReturn([tenants, False])
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider')\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
.MultipleTimes().AndReturn(True)
|
||||||
|
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
form_data = {'tenant_id': tenant_id,
|
form_data = {'tenant_id': tenant_id,
|
||||||
@ -514,20 +513,19 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
self.assertFormErrors(res, 1)
|
self.assertFormErrors(res, 1)
|
||||||
self.assertContains(res, "1 through 4094")
|
self.assertContains(res, "1 through 4094")
|
||||||
|
|
||||||
@test.create_stubs({api.neutron: ('list_extensions',),
|
@test.create_stubs({api.neutron: ('is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
def test_network_create_gre_segmentation_id_invalid(self):
|
def test_network_create_gre_segmentation_id_invalid(self):
|
||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
tenant_id = self.tenants.first().id
|
tenant_id = self.tenants.first().id
|
||||||
network = self.networks.first()
|
network = self.networks.first()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
|
|
||||||
api.keystone.tenant_list(
|
api.keystone.tenant_list(
|
||||||
IsA(http.HttpRequest)
|
IsA(http.HttpRequest)
|
||||||
).MultipleTimes().AndReturn([tenants, False])
|
).MultipleTimes().AndReturn([tenants, False])
|
||||||
|
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
MultipleTimes().AndReturn(True)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
form_data = {'tenant_id': tenant_id,
|
form_data = {'tenant_id': tenant_id,
|
||||||
@ -544,7 +542,7 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
self.assertFormErrors(res, 1)
|
self.assertFormErrors(res, 1)
|
||||||
self.assertContains(res, "1 through %s" % ((2 ** 32) - 1))
|
self.assertContains(res, "1 through %s" % ((2 ** 32) - 1))
|
||||||
|
|
||||||
@test.create_stubs({api.neutron: ('list_extensions',),
|
@test.create_stubs({api.neutron: ('is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
@test.update_settings(
|
@test.update_settings(
|
||||||
OPENSTACK_NEUTRON_NETWORK={
|
OPENSTACK_NEUTRON_NETWORK={
|
||||||
@ -553,13 +551,13 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
tenant_id = self.tenants.first().id
|
tenant_id = self.tenants.first().id
|
||||||
network = self.networks.first()
|
network = self.networks.first()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
api.keystone.tenant_list(
|
api.keystone.tenant_list(
|
||||||
IsA(http.HttpRequest)
|
IsA(http.HttpRequest)
|
||||||
).MultipleTimes().AndReturn([tenants, False])
|
).MultipleTimes().AndReturn([tenants, False])
|
||||||
|
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider')\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
.MultipleTimes().AndReturn(True)
|
||||||
|
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
form_data = {'tenant_id': tenant_id,
|
form_data = {'tenant_id': tenant_id,
|
||||||
@ -576,18 +574,18 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
self.assertFormErrors(res, 1)
|
self.assertFormErrors(res, 1)
|
||||||
self.assertContains(res, "10 through 20")
|
self.assertContains(res, "10 through 20")
|
||||||
|
|
||||||
@test.create_stubs({api.neutron: ('list_extensions',),
|
@test.create_stubs({api.neutron: ('is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
@test.update_settings(
|
@test.update_settings(
|
||||||
OPENSTACK_NEUTRON_NETWORK={
|
OPENSTACK_NEUTRON_NETWORK={
|
||||||
'supported_provider_types': []})
|
'supported_provider_types': []})
|
||||||
def test_network_create_no_provider_types(self):
|
def test_network_create_no_provider_types(self):
|
||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
|
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
|
||||||
False])
|
False])
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
AndReturn(True)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
url = reverse('horizon:admin:networks:create')
|
url = reverse('horizon:admin:networks:create')
|
||||||
@ -599,18 +597,17 @@ class NetworkTests(test.BaseAdminViewTests):
|
|||||||
'<input type="hidden" name="network_type" id="id_network_type" />',
|
'<input type="hidden" name="network_type" id="id_network_type" />',
|
||||||
html=True)
|
html=True)
|
||||||
|
|
||||||
@test.create_stubs({api.neutron: ('list_extensions',),
|
@test.create_stubs({api.neutron: ('is_extension_supported',),
|
||||||
api.keystone: ('tenant_list',)})
|
api.keystone: ('tenant_list',)})
|
||||||
@test.update_settings(
|
@test.update_settings(
|
||||||
OPENSTACK_NEUTRON_NETWORK={
|
OPENSTACK_NEUTRON_NETWORK={
|
||||||
'supported_provider_types': ['local', 'flat', 'gre']})
|
'supported_provider_types': ['local', 'flat', 'gre']})
|
||||||
def test_network_create_unsupported_provider_types(self):
|
def test_network_create_unsupported_provider_types(self):
|
||||||
tenants = self.tenants.list()
|
tenants = self.tenants.list()
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
|
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
|
||||||
False])
|
False])
|
||||||
api.neutron.list_extensions(
|
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
|
||||||
IsA(http.HttpRequest)).AndReturn(extensions)
|
AndReturn(True)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
url = reverse('horizon:admin:networks:create')
|
url = reverse('horizon:admin:networks:create')
|
||||||
|
@ -40,6 +40,7 @@ class NetworkClientTestCase(test.APITestCase):
|
|||||||
self.assertIsInstance(nc.floating_ips, api.nova.FloatingIpManager)
|
self.assertIsInstance(nc.floating_ips, api.nova.FloatingIpManager)
|
||||||
self.assertIsInstance(nc.secgroups, api.nova.SecurityGroupManager)
|
self.assertIsInstance(nc.secgroups, api.nova.SecurityGroupManager)
|
||||||
|
|
||||||
|
@test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_networkclient_neutron(self):
|
def test_networkclient_neutron(self):
|
||||||
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
|
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
|
||||||
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
|
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
|
||||||
@ -47,22 +48,24 @@ class NetworkClientTestCase(test.APITestCase):
|
|||||||
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
|
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
|
||||||
.AndReturn(True)
|
.AndReturn(True)
|
||||||
self.neutronclient = self.stub_neutronclient()
|
self.neutronclient = self.stub_neutronclient()
|
||||||
self.neutronclient.list_extensions() \
|
|
||||||
.AndReturn({'extensions': self.api_extensions.list()})
|
api.neutron.is_extension_supported(IsA(http.HttpRequest),
|
||||||
|
'security-group').AndReturn(True)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
nc = api.network.NetworkClient(self.request)
|
nc = api.network.NetworkClient(self.request)
|
||||||
self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager)
|
self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager)
|
||||||
self.assertIsInstance(nc.secgroups, api.neutron.SecurityGroupManager)
|
self.assertIsInstance(nc.secgroups, api.neutron.SecurityGroupManager)
|
||||||
|
|
||||||
|
@test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_networkclient_neutron_with_nova_security_group(self):
|
def test_networkclient_neutron_with_nova_security_group(self):
|
||||||
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
|
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
|
||||||
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
|
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
|
||||||
.AndReturn(True)
|
.AndReturn(True)
|
||||||
|
api.neutron.is_extension_supported(IsA(http.HttpRequest),
|
||||||
|
'security-group').AndReturn(False)
|
||||||
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
|
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
|
||||||
.AndReturn(True)
|
.AndReturn(True)
|
||||||
self.neutronclient = self.stub_neutronclient()
|
|
||||||
self.neutronclient.list_extensions().AndReturn({'extensions': []})
|
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
nc = api.network.NetworkClient(self.request)
|
nc = api.network.NetworkClient(self.request)
|
||||||
@ -382,9 +385,12 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
for (exprule, retrule) in six.moves.zip(exp_rules, ret_sg.rules):
|
for (exprule, retrule) in six.moves.zip(exp_rules, ret_sg.rules):
|
||||||
self._cmp_sg_rule(exprule, retrule)
|
self._cmp_sg_rule(exprule, retrule)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_security_group_list(self):
|
def test_security_group_list(self):
|
||||||
sgs = self.api_q_secgroups.list()
|
sgs = self.api_q_secgroups.list()
|
||||||
tenant_id = self.request.user.tenant_id
|
tenant_id = self.request.user.tenant_id
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group').\
|
||||||
|
# AndReturn(True)
|
||||||
# use deepcopy to ensure self.api_q_secgroups is not modified.
|
# use deepcopy to ensure self.api_q_secgroups is not modified.
|
||||||
self.qclient.list_security_groups(tenant_id=tenant_id) \
|
self.qclient.list_security_groups(tenant_id=tenant_id) \
|
||||||
.AndReturn({'security_groups': copy.deepcopy(sgs)})
|
.AndReturn({'security_groups': copy.deepcopy(sgs)})
|
||||||
@ -395,6 +401,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
for (exp, ret) in six.moves.zip(sgs, rets):
|
for (exp, ret) in six.moves.zip(sgs, rets):
|
||||||
self._cmp_sg(exp, ret)
|
self._cmp_sg(exp, ret)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_security_group_get(self):
|
def test_security_group_get(self):
|
||||||
secgroup = self.api_q_secgroups.first()
|
secgroup = self.api_q_secgroups.first()
|
||||||
sg_ids = set([secgroup['id']] +
|
sg_ids = set([secgroup['id']] +
|
||||||
@ -403,6 +410,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
if rule['remote_group_id']])
|
if rule['remote_group_id']])
|
||||||
related_sgs = [sg for sg in self.api_q_secgroups.list()
|
related_sgs = [sg for sg in self.api_q_secgroups.list()
|
||||||
if sg['id'] in sg_ids]
|
if sg['id'] in sg_ids]
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
# use deepcopy to ensure self.api_q_secgroups is not modified.
|
# use deepcopy to ensure self.api_q_secgroups is not modified.
|
||||||
self.qclient.show_security_group(secgroup['id']) \
|
self.qclient.show_security_group(secgroup['id']) \
|
||||||
.AndReturn({'security_group': copy.deepcopy(secgroup)})
|
.AndReturn({'security_group': copy.deepcopy(secgroup)})
|
||||||
@ -412,12 +421,15 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
ret = api.network.security_group_get(self.request, secgroup['id'])
|
ret = api.network.security_group_get(self.request, secgroup['id'])
|
||||||
self._cmp_sg(secgroup, ret)
|
self._cmp_sg(secgroup, ret)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_security_group_create(self):
|
def test_security_group_create(self):
|
||||||
secgroup = self.api_q_secgroups.list()[1]
|
secgroup = self.api_q_secgroups.list()[1]
|
||||||
body = {'security_group':
|
body = {'security_group':
|
||||||
{'name': secgroup['name'],
|
{'name': secgroup['name'],
|
||||||
'description': secgroup['description'],
|
'description': secgroup['description'],
|
||||||
'tenant_id': self.request.user.project_id}}
|
'tenant_id': self.request.user.project_id}}
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.create_security_group(body) \
|
self.qclient.create_security_group(body) \
|
||||||
.AndReturn({'security_group': copy.deepcopy(secgroup)})
|
.AndReturn({'security_group': copy.deepcopy(secgroup)})
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
@ -425,6 +437,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
secgroup['description'])
|
secgroup['description'])
|
||||||
self._cmp_sg(secgroup, ret)
|
self._cmp_sg(secgroup, ret)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_security_group_update(self):
|
def test_security_group_update(self):
|
||||||
secgroup = self.api_q_secgroups.list()[1]
|
secgroup = self.api_q_secgroups.list()[1]
|
||||||
secgroup = copy.deepcopy(secgroup)
|
secgroup = copy.deepcopy(secgroup)
|
||||||
@ -433,6 +446,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
body = {'security_group':
|
body = {'security_group':
|
||||||
{'name': secgroup['name'],
|
{'name': secgroup['name'],
|
||||||
'description': secgroup['description']}}
|
'description': secgroup['description']}}
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.update_security_group(secgroup['id'], body) \
|
self.qclient.update_security_group(secgroup['id'], body) \
|
||||||
.AndReturn({'security_group': secgroup})
|
.AndReturn({'security_group': secgroup})
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
@ -442,12 +457,16 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
secgroup['description'])
|
secgroup['description'])
|
||||||
self._cmp_sg(secgroup, ret)
|
self._cmp_sg(secgroup, ret)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_security_group_delete(self):
|
def test_security_group_delete(self):
|
||||||
secgroup = self.api_q_secgroups.first()
|
secgroup = self.api_q_secgroups.first()
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.delete_security_group(secgroup['id'])
|
self.qclient.delete_security_group(secgroup['id'])
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
api.network.security_group_delete(self.request, secgroup['id'])
|
api.network.security_group_delete(self.request, secgroup['id'])
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_security_group_rule_create(self):
|
def test_security_group_rule_create(self):
|
||||||
sg_rule = [r for r in self.api_q_secgroup_rules.list()
|
sg_rule = [r for r in self.api_q_secgroup_rules.list()
|
||||||
if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0]
|
if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0]
|
||||||
@ -459,6 +478,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
del post_rule['id']
|
del post_rule['id']
|
||||||
del post_rule['tenant_id']
|
del post_rule['tenant_id']
|
||||||
post_body = {'security_group_rule': post_rule}
|
post_body = {'security_group_rule': post_rule}
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.create_security_group_rule(post_body) \
|
self.qclient.create_security_group_rule(post_body) \
|
||||||
.AndReturn({'security_group_rule': copy.deepcopy(sg_rule)})
|
.AndReturn({'security_group_rule': copy.deepcopy(sg_rule)})
|
||||||
self.qclient.list_security_groups(id=set([sg_id]),
|
self.qclient.list_security_groups(id=set([sg_id]),
|
||||||
@ -473,8 +494,11 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
sg_rule['remote_ip_prefix'], sg_rule['remote_group_id'])
|
sg_rule['remote_ip_prefix'], sg_rule['remote_group_id'])
|
||||||
self._cmp_sg_rule(sg_rule, ret)
|
self._cmp_sg_rule(sg_rule, ret)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_security_group_rule_delete(self):
|
def test_security_group_rule_delete(self):
|
||||||
sg_rule = self.api_q_secgroup_rules.first()
|
sg_rule = self.api_q_secgroup_rules.first()
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.delete_security_group_rule(sg_rule['id'])
|
self.qclient.delete_security_group_rule(sg_rule['id'])
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
api.network.security_group_rule_delete(self.request, sg_rule['id'])
|
api.network.security_group_rule_delete(self.request, sg_rule['id'])
|
||||||
@ -495,7 +519,6 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
def test_server_security_groups(self):
|
def test_server_security_groups(self):
|
||||||
cur_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
|
cur_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
|
||||||
instance_id, instance_ports = self._get_instance(cur_sg_ids)
|
instance_id, instance_ports = self._get_instance(cur_sg_ids)
|
||||||
|
|
||||||
self.qclient.list_ports(device_id=instance_id) \
|
self.qclient.list_ports(device_id=instance_id) \
|
||||||
.AndReturn({'ports': instance_ports})
|
.AndReturn({'ports': instance_ports})
|
||||||
secgroups = copy.deepcopy(self.api_q_secgroups.list())
|
secgroups = copy.deepcopy(self.api_q_secgroups.list())
|
||||||
@ -509,7 +532,6 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
|
|||||||
cur_sg_ids = [self.api_q_secgroups.first()['id']]
|
cur_sg_ids = [self.api_q_secgroups.first()['id']]
|
||||||
new_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
|
new_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
|
||||||
instance_id, instance_ports = self._get_instance(cur_sg_ids)
|
instance_id, instance_ports = self._get_instance(cur_sg_ids)
|
||||||
|
|
||||||
self.qclient.list_ports(device_id=instance_id) \
|
self.qclient.list_ports(device_id=instance_id) \
|
||||||
.AndReturn({'ports': instance_ports})
|
.AndReturn({'ports': instance_ports})
|
||||||
for p in instance_ports:
|
for p in instance_ports:
|
||||||
@ -544,10 +566,13 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
self.assertFalse(api.network.floating_ip_supported(self.request))
|
self.assertFalse(api.network.floating_ip_supported(self.request))
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_floating_ip_pools_list(self):
|
def test_floating_ip_pools_list(self):
|
||||||
search_opts = {'router:external': True}
|
search_opts = {'router:external': True}
|
||||||
ext_nets = [n for n in self.api_networks.list()
|
ext_nets = [n for n in self.api_networks.list()
|
||||||
if n['router:external']]
|
if n['router:external']]
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.list_networks(**search_opts) \
|
self.qclient.list_networks(**search_opts) \
|
||||||
.AndReturn({'networks': ext_nets})
|
.AndReturn({'networks': ext_nets})
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
@ -560,6 +585,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
def test_floating_ip_list(self):
|
def test_floating_ip_list(self):
|
||||||
fips = self.api_q_floating_ips.list()
|
fips = self.api_q_floating_ips.list()
|
||||||
filters = {'tenant_id': self.request.user.tenant_id}
|
filters = {'tenant_id': self.request.user.tenant_id}
|
||||||
|
|
||||||
self.qclient.list_floatingips(**filters) \
|
self.qclient.list_floatingips(**filters) \
|
||||||
.AndReturn({'floatingips': fips})
|
.AndReturn({'floatingips': fips})
|
||||||
self.qclient.list_ports(**filters) \
|
self.qclient.list_ports(**filters) \
|
||||||
@ -611,6 +637,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
|
|
||||||
def _test_floating_ip_get_associated(self, assoc_port, exp_instance_type):
|
def _test_floating_ip_get_associated(self, assoc_port, exp_instance_type):
|
||||||
fip = self.api_q_floating_ips.list()[1]
|
fip = self.api_q_floating_ips.list()[1]
|
||||||
|
|
||||||
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
|
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
|
||||||
self.qclient.show_port(assoc_port['id']) \
|
self.qclient.show_port(assoc_port['id']) \
|
||||||
.AndReturn({'port': assoc_port})
|
.AndReturn({'port': assoc_port})
|
||||||
@ -635,6 +662,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
|
|
||||||
def test_floating_ip_get_unassociated(self):
|
def test_floating_ip_get_unassociated(self):
|
||||||
fip = self.api_q_floating_ips.list()[0]
|
fip = self.api_q_floating_ips.list()[0]
|
||||||
|
|
||||||
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
|
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
@ -662,8 +690,11 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
self.assertIsNone(ret.instance_id)
|
self.assertIsNone(ret.instance_id)
|
||||||
self.assertIsNone(ret.instance_type)
|
self.assertIsNone(ret.instance_type)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_floating_ip_release(self):
|
def test_floating_ip_release(self):
|
||||||
fip = self.api_q_floating_ips.first()
|
fip = self.api_q_floating_ips.first()
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.delete_floatingip(fip['id'])
|
self.qclient.delete_floatingip(fip['id'])
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
@ -684,6 +715,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
|
|
||||||
def test_floating_ip_disassociate(self):
|
def test_floating_ip_disassociate(self):
|
||||||
fip = self.api_q_floating_ips.list()[1]
|
fip = self.api_q_floating_ips.list()[1]
|
||||||
|
|
||||||
self.qclient.update_floatingip(fip['id'],
|
self.qclient.update_floatingip(fip['id'],
|
||||||
{'floatingip': {'port_id': None}})
|
{'floatingip': {'port_id': None}})
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
@ -709,6 +741,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
'enable_fip_topology_check': True,
|
'enable_fip_topology_check': True,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_floating_ip_target_list(self):
|
def test_floating_ip_target_list(self):
|
||||||
ports = self.api_ports.list()
|
ports = self.api_ports.list()
|
||||||
# Port on the first subnet is connected to a router
|
# Port on the first subnet is connected to a router
|
||||||
@ -723,6 +756,10 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
(set(shared_subnet_ids) & set(self._subs_from_port(p)))))
|
(set(shared_subnet_ids) & set(self._subs_from_port(p)))))
|
||||||
]
|
]
|
||||||
filters = {'tenant_id': self.request.user.tenant_id}
|
filters = {'tenant_id': self.request.user.tenant_id}
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'lbaas'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.list_ports(**filters).AndReturn({'ports': ports})
|
self.qclient.list_ports(**filters).AndReturn({'ports': ports})
|
||||||
servers = self.servers.list()
|
servers = self.servers.list()
|
||||||
novaclient = self.stub_novaclient()
|
novaclient = self.stub_novaclient()
|
||||||
@ -752,10 +789,13 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
self.assertEqual(exp[0], ret.id)
|
self.assertEqual(exp[0], ret.id)
|
||||||
self.assertEqual(exp[1], ret.name)
|
self.assertEqual(exp[1], ret.name)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_floating_ip_target_get_by_instance(self):
|
def test_floating_ip_target_get_by_instance(self):
|
||||||
ports = self.api_ports.list()
|
ports = self.api_ports.list()
|
||||||
candidates = [p for p in ports if p['device_id'] == '1']
|
candidates = [p for p in ports if p['device_id'] == '1']
|
||||||
search_opts = {'device_id': '1'}
|
search_opts = {'device_id': '1'}
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates})
|
self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates})
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
@ -774,20 +814,26 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
|
|||||||
self.assertEqual(self._get_target_id(candidates[0]), ret[0])
|
self.assertEqual(self._get_target_id(candidates[0]), ret[0])
|
||||||
self.assertEqual(len(candidates), len(ret))
|
self.assertEqual(len(candidates), len(ret))
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_floating_ip_target_get_by_instance_with_preloaded_target(self):
|
def test_floating_ip_target_get_by_instance_with_preloaded_target(self):
|
||||||
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
|
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
|
||||||
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
|
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
|
||||||
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
|
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group'). \
|
||||||
|
# AndReturn(True)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
ret = api.network.floating_ip_target_get_by_instance(
|
ret = api.network.floating_ip_target_get_by_instance(
|
||||||
self.request, 'vm2', target_list)
|
self.request, 'vm2', target_list)
|
||||||
self.assertEqual('id21', ret)
|
self.assertEqual('id21', ret)
|
||||||
|
|
||||||
|
# @test.create_stubs({api.neutron: ('is_extension_supported', )})
|
||||||
def test_target_floating_ip_port_by_instance_with_preloaded_target(self):
|
def test_target_floating_ip_port_by_instance_with_preloaded_target(self):
|
||||||
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
|
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
|
||||||
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
|
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
|
||||||
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
|
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
|
||||||
|
# api.neutron.is_extension_supported(self.request, 'security-group').\
|
||||||
|
# AndReturn(True)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
ret = api.network.floating_ip_target_list_by_instance(
|
ret = api.network.floating_ip_target_list_by_instance(
|
||||||
|
@ -461,10 +461,12 @@ class NeutronApiTests(test.APITestCase):
|
|||||||
api.neutron.router_remove_interface(
|
api.neutron.router_remove_interface(
|
||||||
self.request, router_id, port_id=fake_port)
|
self.request, router_id, port_id=fake_port)
|
||||||
|
|
||||||
|
@test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def test_is_extension_supported(self):
|
def test_is_extension_supported(self):
|
||||||
neutronclient = self.stub_neutronclient()
|
api.neutron.is_extension_supported(self.request, "quotas")\
|
||||||
neutronclient.list_extensions().MultipleTimes() \
|
.AndReturn(True)
|
||||||
.AndReturn({'extensions': self.api_extensions.list()})
|
api.neutron.is_extension_supported(self.request, "doesntexist") \
|
||||||
|
.AndReturn(False)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
@ -524,13 +526,10 @@ class NeutronApiTests(test.APITestCase):
|
|||||||
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_distributed_router':
|
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_distributed_router':
|
||||||
True},
|
True},
|
||||||
POLICY_CHECK_FUNCTION=None)
|
POLICY_CHECK_FUNCTION=None)
|
||||||
|
@test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def _test_get_dvr_permission_dvr_supported(self, dvr_enabled):
|
def _test_get_dvr_permission_dvr_supported(self, dvr_enabled):
|
||||||
neutronclient = self.stub_neutronclient()
|
api.neutron.is_extension_supported(self.request, 'dvr').\
|
||||||
extensions = self.api_extensions.list()
|
AndReturn(dvr_enabled)
|
||||||
if not dvr_enabled:
|
|
||||||
extensions = [ext for ext in extensions if ext['alias'] != 'dvr']
|
|
||||||
neutronclient.list_extensions() \
|
|
||||||
.AndReturn({'extensions': extensions})
|
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
self.assertEqual(dvr_enabled,
|
self.assertEqual(dvr_enabled,
|
||||||
api.neutron.get_feature_permission(self.request,
|
api.neutron.get_feature_permission(self.request,
|
||||||
@ -545,6 +544,7 @@ class NeutronApiTests(test.APITestCase):
|
|||||||
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_distributed_router':
|
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_distributed_router':
|
||||||
True},
|
True},
|
||||||
POLICY_CHECK_FUNCTION=policy.check)
|
POLICY_CHECK_FUNCTION=policy.check)
|
||||||
|
@test.create_stubs({api.neutron: ('is_extension_supported',)})
|
||||||
def _test_get_dvr_permission_with_policy_check(self, policy_check_allowed,
|
def _test_get_dvr_permission_with_policy_check(self, policy_check_allowed,
|
||||||
operation):
|
operation):
|
||||||
self.mox.StubOutWithMock(policy, 'check')
|
self.mox.StubOutWithMock(policy, 'check')
|
||||||
@ -554,9 +554,8 @@ class NeutronApiTests(test.APITestCase):
|
|||||||
role = (("network", "get_router:distributed"),)
|
role = (("network", "get_router:distributed"),)
|
||||||
policy.check(role, self.request).AndReturn(policy_check_allowed)
|
policy.check(role, self.request).AndReturn(policy_check_allowed)
|
||||||
if policy_check_allowed:
|
if policy_check_allowed:
|
||||||
neutronclient = self.stub_neutronclient()
|
api.neutron.is_extension_supported(self.request, 'dvr').\
|
||||||
neutronclient.list_extensions() \
|
AndReturn(policy_check_allowed)
|
||||||
.AndReturn({'extensions': self.api_extensions.list()})
|
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
self.assertEqual(policy_check_allowed,
|
self.assertEqual(policy_check_allowed,
|
||||||
api.neutron.get_feature_permission(self.request,
|
api.neutron.get_feature_permission(self.request,
|
||||||
@ -603,16 +602,13 @@ class NeutronApiTests(test.APITestCase):
|
|||||||
|
|
||||||
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_ha_router': True},
|
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_ha_router': True},
|
||||||
POLICY_CHECK_FUNCTION=policy.check)
|
POLICY_CHECK_FUNCTION=policy.check)
|
||||||
|
@test.create_stubs({api.neutron: ('is_extension_supported', )})
|
||||||
def _test_get_router_ha_permission_with_policy_check(self, ha_enabled):
|
def _test_get_router_ha_permission_with_policy_check(self, ha_enabled):
|
||||||
self.mox.StubOutWithMock(policy, 'check')
|
self.mox.StubOutWithMock(policy, 'check')
|
||||||
role = (("network", "create_router:ha"),)
|
role = (("network", "create_router:ha"),)
|
||||||
policy.check(role, self.request).AndReturn(True)
|
policy.check(role, self.request).AndReturn(True)
|
||||||
neutronclient = self.stub_neutronclient()
|
api.neutron.is_extension_supported(self.request, 'l3-ha')\
|
||||||
if ha_enabled:
|
.AndReturn(ha_enabled)
|
||||||
extensions = self.api_extensions.list()
|
|
||||||
else:
|
|
||||||
extensions = {}
|
|
||||||
neutronclient.list_extensions().AndReturn({'extensions': extensions})
|
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
self.assertEqual(ha_enabled,
|
self.assertEqual(ha_enabled,
|
||||||
api.neutron.get_feature_permission(self.request,
|
api.neutron.get_feature_permission(self.request,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user