Merge "Test helpers to facilitate testing BGP dynamic routing."

This commit is contained in:
Jenkins 2016-02-09 22:21:24 +00:00 committed by Gerrit Code Review
commit dbb26a9777
5 changed files with 64 additions and 8 deletions
neutron/tests

@ -426,10 +426,10 @@ class BaseNetworkTest(test.BaseTestCase):
@classmethod
def create_subnetpool(cls, name, is_admin=False, **kwargs):
if is_admin:
body = cls.admin_client.create_subnetpool(name=name, **kwargs)
body = cls.admin_client.create_subnetpool(name, **kwargs)
cls.admin_subnetpools.append(body['subnetpool'])
else:
body = cls.client.create_subnetpool(name=name, **kwargs)
body = cls.client.create_subnetpool(name, **kwargs)
cls.subnetpools.append(body['subnetpool'])
return body['subnetpool']

@ -180,9 +180,9 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
ip_version=4)
address_scope_id = address_scope['id']
created_subbnetpool = self._create_subnetpool(self.client)
created_subnetpool = self._create_subnetpool(self.client)
self.assertRaises(lib_exc.NotFound, self.client.update_subnetpool,
created_subbnetpool['id'],
created_subnetpool['id'],
address_scope_id=address_scope_id)
def _test_update_subnetpool_prefix_intersect_helper(

@ -170,6 +170,56 @@ class NetworkClientJSON(service_client.ServiceClient):
return method_functors[index](name[prefix_len:])
raise AttributeError(name)
# Subnetpool methods
def create_subnetpool(self, name, **kwargs):
subnetpool_data = {'name': name}
for arg in kwargs:
subnetpool_data[arg] = kwargs[arg]
post_data = {'subnetpool': subnetpool_data}
body = self.serialize_list(post_data, "subnetpools", "subnetpool")
uri = self.get_uri("subnetpools")
resp, body = self.post(uri, body)
body = {'subnetpool': self.deserialize_list(body)}
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def get_subnetpool(self, id):
uri = self.get_uri("subnetpools")
subnetpool_uri = '%s/%s' % (uri, id)
resp, body = self.get(subnetpool_uri)
body = {'subnetpool': self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def delete_subnetpool(self, id):
uri = self.get_uri("subnetpools")
subnetpool_uri = '%s/%s' % (uri, id)
resp, body = self.delete(subnetpool_uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_subnetpools(self):
uri = self.get_uri("subnetpools")
resp, body = self.get(uri)
body = {'subnetpools': self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def update_subnetpool(self, id, **kwargs):
subnetpool_data = {}
for arg in kwargs:
subnetpool_data[arg] = kwargs[arg]
post_data = {'subnetpool': subnetpool_data}
body = self.serialize_list(post_data, "subnetpools", "subnetpool")
uri = self.get_uri("subnetpools")
subnetpool_uri = '%s/%s' % (uri, id)
resp, body = self.put(subnetpool_uri, body)
body = {'subnetpool': self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
# Common methods that are hard to automate
def create_bulk_network(self, names, shared=False):
network_list = [{'name': name, 'shared': shared} for name in names]

@ -321,7 +321,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
'cidr': cidr,
'ip_version': 4,
'tenant_id': self._tenant_id}}
for arg in ('ip_version', 'tenant_id',
if cidr:
data['subnet']['cidr'] = cidr
for arg in ('ip_version', 'tenant_id', 'subnetpool_id', 'prefixlen',
'enable_dhcp', 'allocation_pools',
'dns_nameservers', 'host_routes',
'shared', 'ipv6_ra_mode', 'ipv6_address_mode'):
@ -443,7 +445,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_subnet(self, fmt, network, gateway, cidr,
def _make_subnet(self, fmt, network, gateway, cidr, subnetpool_id=None,
allocation_pools=None, ip_version=4, enable_dhcp=True,
dns_nameservers=None, host_routes=None, shared=None,
ipv6_ra_mode=None, ipv6_address_mode=None,
@ -451,6 +453,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
subnetpool_id=subnetpool_id,
gateway_ip=gateway,
tenant_id=(tenant_id or
network['network']['tenant_id']),
@ -588,6 +591,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def subnet(self, network=None,
gateway_ip=attributes.ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
subnetpool_id=None,
fmt=None,
ip_version=4,
allocation_pools=None,
@ -606,6 +610,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
network_to_use,
gateway_ip,
cidr,
subnetpool_id,
allocation_pools,
ip_version,
enable_dhcp,

@ -463,8 +463,9 @@ class L3NatTestCaseMixin(object):
@contextlib.contextmanager
def floatingip_with_assoc(self, port_id=None, fmt=None, fixed_ip=None,
set_context=False, tenant_id=None):
with self.subnet(cidr='11.0.0.0/24',
public_cidr='11.0.0.0/24', set_context=False,
tenant_id=None):
with self.subnet(cidr=public_cidr,
set_context=set_context,
tenant_id=tenant_id) as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])