Improve security groups management.
- Add create_security_group_rule method to base class to make easier creating rules for IPv6 test cases. - Add delete_security_group method. - Make sure segurity groups are deleted using the client that has been used to create them. - Improve security group client handling. Change-Id: I50858d5198d8a70a2bb9fb278786c433d7cb12ca
This commit is contained in:
parent
6f9bb77d98
commit
4c590d7cff
@ -189,15 +189,15 @@ class BaseNetworkTest(test.BaseTestCase):
|
||||
network['id'])
|
||||
|
||||
# Clean up security groups
|
||||
for secgroup in cls.security_groups:
|
||||
cls._try_delete_resource(cls.client.delete_security_group,
|
||||
secgroup['id'])
|
||||
for security_group in cls.security_groups:
|
||||
cls._try_delete_resource(cls.delete_security_group,
|
||||
security_group)
|
||||
|
||||
# Clean up admin security groups
|
||||
for secgroup in cls.admin_security_groups:
|
||||
cls._try_delete_resource(
|
||||
cls.admin_client.delete_security_group,
|
||||
secgroup['id'])
|
||||
for security_group in cls.admin_security_groups:
|
||||
cls._try_delete_resource(cls.delete_security_group,
|
||||
security_group,
|
||||
client=cls.admin_client)
|
||||
|
||||
for subnetpool in cls.subnetpools:
|
||||
cls._try_delete_resource(cls.client.delete_subnetpool,
|
||||
@ -717,18 +717,78 @@ class BaseNetworkTest(test.BaseTestCase):
|
||||
description=test_description)['project']
|
||||
cls.projects.append(project)
|
||||
# Create a project will create a default security group.
|
||||
# We make these security groups into admin_security_groups.
|
||||
sgs_list = cls.admin_client.list_security_groups(
|
||||
tenant_id=project['id'])['security_groups']
|
||||
for sg in sgs_list:
|
||||
cls.admin_security_groups.append(sg)
|
||||
for security_group in sgs_list:
|
||||
# Make sure delete_security_group method will use
|
||||
# the admin client for this group
|
||||
security_group['client'] = cls.admin_client
|
||||
cls.security_groups.append(security_group)
|
||||
return project
|
||||
|
||||
@classmethod
|
||||
def create_security_group(cls, name, **kwargs):
|
||||
body = cls.client.create_security_group(name=name, **kwargs)
|
||||
cls.security_groups.append(body['security_group'])
|
||||
return body['security_group']
|
||||
def create_security_group(cls, name=None, project=None, client=None,
|
||||
**kwargs):
|
||||
if project:
|
||||
client = client or cls.admin_client
|
||||
project_id = kwargs.setdefault('project_id', project['id'])
|
||||
tenant_id = kwargs.setdefault('tenant_id', project['id'])
|
||||
if project_id != project['id'] or tenant_id != project['id']:
|
||||
raise ValueError('Project ID specified multiple times')
|
||||
else:
|
||||
client = client or cls.client
|
||||
|
||||
name = name or data_utils.rand_name(cls.__name__)
|
||||
security_group = client.create_security_group(name=name, **kwargs)[
|
||||
'security_group']
|
||||
security_group['client'] = client
|
||||
cls.security_groups.append(security_group)
|
||||
return security_group
|
||||
|
||||
@classmethod
|
||||
def delete_security_group(cls, security_group, client=None):
|
||||
client = client or security_group.get('client') or cls.client
|
||||
client.delete_security_group(security_group['id'])
|
||||
|
||||
@classmethod
|
||||
def create_security_group_rule(cls, security_group=None, project=None,
|
||||
client=None, ip_version=None, **kwargs):
|
||||
if project:
|
||||
client = client or cls.admin_client
|
||||
project_id = kwargs.setdefault('project_id', project['id'])
|
||||
tenant_id = kwargs.setdefault('tenant_id', project['id'])
|
||||
if project_id != project['id'] or tenant_id != project['id']:
|
||||
raise ValueError('Project ID specified multiple times')
|
||||
|
||||
if 'security_group_id' not in kwargs:
|
||||
security_group = (security_group or
|
||||
cls.get_security_group(client=client))
|
||||
|
||||
if security_group:
|
||||
client = client or security_group.get('client')
|
||||
security_group_id = kwargs.setdefault('security_group_id',
|
||||
security_group['id'])
|
||||
if security_group_id != security_group['id']:
|
||||
raise ValueError('Security group ID specified multiple times.')
|
||||
|
||||
ip_version = ip_version or cls._ip_version
|
||||
default_params = (
|
||||
constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
|
||||
for key, value in default_params.items():
|
||||
kwargs.setdefault(key, value)
|
||||
|
||||
client = client or cls.client
|
||||
return client.create_security_group_rule(**kwargs)[
|
||||
'security_group_rule']
|
||||
|
||||
@classmethod
|
||||
def get_security_group(cls, name='default', client=None):
|
||||
client = client or cls.client
|
||||
security_groups = client.list_security_groups()['security_groups']
|
||||
for security_group in security_groups:
|
||||
if security_group['name'] == name:
|
||||
return security_group
|
||||
raise ValueError("No such security group named {!r}".format(name))
|
||||
|
||||
@classmethod
|
||||
def create_keypair(cls, client=None, name=None, **kwargs):
|
||||
|
@ -171,3 +171,11 @@ VALID_FLOATINGIP_STATUS = (lib_constants.FLOATINGIP_STATUS_ACTIVE,
|
||||
# Possible types of values (e.g. in QoS rule types)
|
||||
VALUES_TYPE_CHOICES = "choices"
|
||||
VALUES_TYPE_RANGE = "range"
|
||||
|
||||
# Security group parameters values mapped by IP version
|
||||
DEFAULT_SECURITY_GROUP_RULE_PARAMS = {
|
||||
lib_constants.IP_VERSION_4: {'ethertype': lib_constants.IPv4,
|
||||
'remote_ip_prefix': lib_constants.IPv4_ANY},
|
||||
lib_constants.IP_VERSION_6: {'ethertype': lib_constants.IPv6,
|
||||
'remote_ip_prefix': lib_constants.IPv6_ANY},
|
||||
}
|
||||
|
@ -122,29 +122,24 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
|
||||
Setting a group_id would only permit traffic from ports
|
||||
belonging to the same security group.
|
||||
"""
|
||||
|
||||
rule_list = [{'protocol': 'tcp',
|
||||
'direction': 'ingress',
|
||||
'port_range_min': 22,
|
||||
'port_range_max': 22,
|
||||
'remote_ip_prefix': '0.0.0.0/0'}]
|
||||
client = client or cls.os_primary.network_client
|
||||
cls.create_secgroup_rules(rule_list, client=client,
|
||||
secgroup_id=secgroup_id)
|
||||
return cls.create_security_group_rule(
|
||||
security_group_id=secgroup_id,
|
||||
client=client,
|
||||
protocol=neutron_lib_constants.PROTO_NAME_TCP,
|
||||
direction=neutron_lib_constants.INGRESS_DIRECTION,
|
||||
port_range_min=22,
|
||||
port_range_max=22)
|
||||
|
||||
@classmethod
|
||||
def create_pingable_secgroup_rule(cls, secgroup_id=None,
|
||||
client=None):
|
||||
"""This rule is intended to permit inbound ping"""
|
||||
"""This rule is intended to permit inbound ping
|
||||
|
||||
rule_list = [{'protocol': 'icmp',
|
||||
'direction': 'ingress',
|
||||
'port_range_min': 8, # type
|
||||
'port_range_max': 0, # code
|
||||
'remote_ip_prefix': '0.0.0.0/0'}]
|
||||
client = client or cls.os_primary.network_client
|
||||
cls.create_secgroup_rules(rule_list, client=client,
|
||||
secgroup_id=secgroup_id)
|
||||
"""
|
||||
return cls.create_security_group_rule(
|
||||
security_group_id=secgroup_id, client=client,
|
||||
protocol=neutron_lib_constants.PROTO_NAME_ICMP,
|
||||
direction=neutron_lib_constants.INGRESS_DIRECTION)
|
||||
|
||||
@classmethod
|
||||
def create_router_by_client(cls, is_admin=False, **kwargs):
|
||||
|
Loading…
Reference in New Issue
Block a user