From 4c590d7cff82b2978284ce0b0771b8949d9c6423 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Wed, 10 Oct 2018 14:01:08 +0200 Subject: [PATCH] Improve security groups management. - Add create_security_group_rule method to base class to make easier creating rules for IPv6 test cases. - Add delete_security_group method. - Make sure segurity groups are deleted using the client that has been used to create them. - Improve security group client handling. Change-Id: I50858d5198d8a70a2bb9fb278786c433d7cb12ca --- neutron_tempest_plugin/api/base.py | 88 ++++++++++++++++++---- neutron_tempest_plugin/common/constants.py | 8 ++ neutron_tempest_plugin/scenario/base.py | 31 ++++---- 3 files changed, 95 insertions(+), 32 deletions(-) diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py index 1b8239b8..bcafe039 100644 --- a/neutron_tempest_plugin/api/base.py +++ b/neutron_tempest_plugin/api/base.py @@ -189,15 +189,15 @@ class BaseNetworkTest(test.BaseTestCase): network['id']) # Clean up security groups - for secgroup in cls.security_groups: - cls._try_delete_resource(cls.client.delete_security_group, - secgroup['id']) + for security_group in cls.security_groups: + cls._try_delete_resource(cls.delete_security_group, + security_group) # Clean up admin security groups - for secgroup in cls.admin_security_groups: - cls._try_delete_resource( - cls.admin_client.delete_security_group, - secgroup['id']) + for security_group in cls.admin_security_groups: + cls._try_delete_resource(cls.delete_security_group, + security_group, + client=cls.admin_client) for subnetpool in cls.subnetpools: cls._try_delete_resource(cls.client.delete_subnetpool, @@ -717,18 +717,78 @@ class BaseNetworkTest(test.BaseTestCase): description=test_description)['project'] cls.projects.append(project) # Create a project will create a default security group. - # We make these security groups into admin_security_groups. sgs_list = cls.admin_client.list_security_groups( tenant_id=project['id'])['security_groups'] - for sg in sgs_list: - cls.admin_security_groups.append(sg) + for security_group in sgs_list: + # Make sure delete_security_group method will use + # the admin client for this group + security_group['client'] = cls.admin_client + cls.security_groups.append(security_group) return project @classmethod - def create_security_group(cls, name, **kwargs): - body = cls.client.create_security_group(name=name, **kwargs) - cls.security_groups.append(body['security_group']) - return body['security_group'] + def create_security_group(cls, name=None, project=None, client=None, + **kwargs): + if project: + client = client or cls.admin_client + project_id = kwargs.setdefault('project_id', project['id']) + tenant_id = kwargs.setdefault('tenant_id', project['id']) + if project_id != project['id'] or tenant_id != project['id']: + raise ValueError('Project ID specified multiple times') + else: + client = client or cls.client + + name = name or data_utils.rand_name(cls.__name__) + security_group = client.create_security_group(name=name, **kwargs)[ + 'security_group'] + security_group['client'] = client + cls.security_groups.append(security_group) + return security_group + + @classmethod + def delete_security_group(cls, security_group, client=None): + client = client or security_group.get('client') or cls.client + client.delete_security_group(security_group['id']) + + @classmethod + def create_security_group_rule(cls, security_group=None, project=None, + client=None, ip_version=None, **kwargs): + if project: + client = client or cls.admin_client + project_id = kwargs.setdefault('project_id', project['id']) + tenant_id = kwargs.setdefault('tenant_id', project['id']) + if project_id != project['id'] or tenant_id != project['id']: + raise ValueError('Project ID specified multiple times') + + if 'security_group_id' not in kwargs: + security_group = (security_group or + cls.get_security_group(client=client)) + + if security_group: + client = client or security_group.get('client') + security_group_id = kwargs.setdefault('security_group_id', + security_group['id']) + if security_group_id != security_group['id']: + raise ValueError('Security group ID specified multiple times.') + + ip_version = ip_version or cls._ip_version + default_params = ( + constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version]) + for key, value in default_params.items(): + kwargs.setdefault(key, value) + + client = client or cls.client + return client.create_security_group_rule(**kwargs)[ + 'security_group_rule'] + + @classmethod + def get_security_group(cls, name='default', client=None): + client = client or cls.client + security_groups = client.list_security_groups()['security_groups'] + for security_group in security_groups: + if security_group['name'] == name: + return security_group + raise ValueError("No such security group named {!r}".format(name)) @classmethod def create_keypair(cls, client=None, name=None, **kwargs): diff --git a/neutron_tempest_plugin/common/constants.py b/neutron_tempest_plugin/common/constants.py index 4dc78446..f695f6ce 100644 --- a/neutron_tempest_plugin/common/constants.py +++ b/neutron_tempest_plugin/common/constants.py @@ -171,3 +171,11 @@ VALID_FLOATINGIP_STATUS = (lib_constants.FLOATINGIP_STATUS_ACTIVE, # Possible types of values (e.g. in QoS rule types) VALUES_TYPE_CHOICES = "choices" VALUES_TYPE_RANGE = "range" + +# Security group parameters values mapped by IP version +DEFAULT_SECURITY_GROUP_RULE_PARAMS = { + lib_constants.IP_VERSION_4: {'ethertype': lib_constants.IPv4, + 'remote_ip_prefix': lib_constants.IPv4_ANY}, + lib_constants.IP_VERSION_6: {'ethertype': lib_constants.IPv6, + 'remote_ip_prefix': lib_constants.IPv6_ANY}, +} diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py index 32c5db85..cc1ca4c2 100644 --- a/neutron_tempest_plugin/scenario/base.py +++ b/neutron_tempest_plugin/scenario/base.py @@ -122,29 +122,24 @@ class BaseTempestTestCase(base_api.BaseNetworkTest): Setting a group_id would only permit traffic from ports belonging to the same security group. """ - - rule_list = [{'protocol': 'tcp', - 'direction': 'ingress', - 'port_range_min': 22, - 'port_range_max': 22, - 'remote_ip_prefix': '0.0.0.0/0'}] - client = client or cls.os_primary.network_client - cls.create_secgroup_rules(rule_list, client=client, - secgroup_id=secgroup_id) + return cls.create_security_group_rule( + security_group_id=secgroup_id, + client=client, + protocol=neutron_lib_constants.PROTO_NAME_TCP, + direction=neutron_lib_constants.INGRESS_DIRECTION, + port_range_min=22, + port_range_max=22) @classmethod def create_pingable_secgroup_rule(cls, secgroup_id=None, client=None): - """This rule is intended to permit inbound ping""" + """This rule is intended to permit inbound ping - rule_list = [{'protocol': 'icmp', - 'direction': 'ingress', - 'port_range_min': 8, # type - 'port_range_max': 0, # code - 'remote_ip_prefix': '0.0.0.0/0'}] - client = client or cls.os_primary.network_client - cls.create_secgroup_rules(rule_list, client=client, - secgroup_id=secgroup_id) + """ + return cls.create_security_group_rule( + security_group_id=secgroup_id, client=client, + protocol=neutron_lib_constants.PROTO_NAME_ICMP, + direction=neutron_lib_constants.INGRESS_DIRECTION) @classmethod def create_router_by_client(cls, is_admin=False, **kwargs):