From b02c96974edc3bdf91b4b11afc689569562b6c78 Mon Sep 17 00:00:00 2001 From: Slawek Kaplonski Date: Thu, 29 Jun 2023 12:02:24 +0200 Subject: [PATCH] Add support for default security group rules in SDK Neutron introduced new API for "default security group rules" which are kind of template of the SG rules which are next added to each newly created SG. This patch adds support for that new API in SDK. Depends-On: https://review.opendev.org/c/openstack/neutron/+/884474 Related-bug: #1983053 Change-Id: I82f905590bc4341c79d6aed6db0064763fd7cb0a --- doc/source/user/proxies/network.rst | 9 ++ openstack/network/v2/_proxy.py | 114 ++++++++++++++++++ .../network/v2/default_security_group_rule.py | 89 ++++++++++++++ .../v2/test_default_security_group_rule.py | 83 +++++++++++++ .../v2/test_default_security_group_rule.py | 85 +++++++++++++ 5 files changed, 380 insertions(+) create mode 100644 openstack/network/v2/default_security_group_rule.py create mode 100644 openstack/tests/functional/network/v2/test_default_security_group_rule.py create mode 100644 openstack/tests/unit/network/v2/test_default_security_group_rule.py diff --git a/doc/source/user/proxies/network.rst b/doc/source/user/proxies/network.rst index 4ce20b92b..7eabad4d0 100644 --- a/doc/source/user/proxies/network.rst +++ b/doc/source/user/proxies/network.rst @@ -71,6 +71,15 @@ Auto Allocated Topology Operations :members: delete_auto_allocated_topology, get_auto_allocated_topology, validate_auto_allocated_topology +Default Security Group Rules Operations +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: openstack.network.v2._proxy.Proxy + :noindex: + :members: create_default_security_group_rule, + find_default_security_group_rule, get_default_security_group_rule, + delete_default_security_group_rule, default_security_group_rules + Security Group Operations ^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/openstack/network/v2/_proxy.py b/openstack/network/v2/_proxy.py index d891cf7d8..496c00cdb 100644 --- a/openstack/network/v2/_proxy.py +++ b/openstack/network/v2/_proxy.py @@ -34,6 +34,9 @@ from openstack.network.v2 import ( from openstack.network.v2 import ( bgpvpn_router_association as _bgpvpn_router_association, ) +from openstack.network.v2 import ( + default_security_group_rule as _default_security_group_rule, +) from openstack.network.v2 import extension from openstack.network.v2 import firewall_group as _firewall_group from openstack.network.v2 import firewall_policy as _firewall_policy @@ -118,6 +121,9 @@ class Proxy(proxy.Proxy, Generic[T]): "bgpvpn_router_association": ( _bgpvpn_router_association.BgpVpnRouterAssociation ), + "default_security_group_rule": ( + _default_security_group_rule.DefaultSecurityGroupRule + ), "extension": extension.Extension, "firewall_group": _firewall_group.FirewallGroup, "firewall_policy": _firewall_policy.FirewallPolicy, @@ -4834,6 +4840,114 @@ class Proxy(proxy.Proxy, Generic[T]): """ return self._list(_security_group_rule.SecurityGroupRule, **query) + def create_default_security_group_rule(self, **attrs): + """Create a new default security group rule from attributes + + :param attrs: Keyword arguments which will be used to create a + :class:`~openstack.network.v2.default_security_group_rule. + DefaultSecurityGroupRule`, + comprised of the properties on the DefaultSecurityGroupRule class. + + :returns: The results of default security group rule creation + :rtype: + :class:`~openstack.network.v2.default_security_group_rule. + DefaultSecurityGroupRule` + """ + return self._create( + _default_security_group_rule.DefaultSecurityGroupRule, **attrs + ) + + def delete_default_security_group_rule( + self, + default_security_group_rule, + ignore_missing=True, + ): + """Delete a default security group rule + + :param default_security_group_rule: + The value can be either the ID of a default security group rule + or a + :class:`~openstack.network.v2.default_security_group_rule. + DefaultSecurityGroupRule` instance. + :param bool ignore_missing: When set to ``False`` + :class:`~openstack.exceptions.ResourceNotFound` will be + raised when the defaul security group rule does not exist. + When set to ``True``, no exception will be set when + attempting to delete a nonexistent default security group rule. + + :returns: ``None`` + """ + self._delete( + _default_security_group_rule.DefaultSecurityGroupRule, + default_security_group_rule, + ignore_missing=ignore_missing, + ) + + def find_default_security_group_rule( + self, name_or_id, ignore_missing=True, **query + ): + """Find a single default security group rule + + :param str name_or_id: The ID of a default security group rule. + :param bool ignore_missing: When set to ``False`` + :class:`~openstack.exceptions.ResourceNotFound` will be + raised when the resource does not exist. + When set to ``True``, None will be returned when + attempting to find a nonexistent resource. + :param dict query: Any additional parameters to be passed into + underlying methods. such as query filters. + :returns: One + :class:`~openstack.network.v2.default_security_group_rule. + DefaultSecurityGroupRule` or None + """ + return self._find( + _default_security_group_rule.DefaultSecurityGroupRule, + name_or_id, + ignore_missing=ignore_missing, + **query, + ) + + def get_default_security_group_rule(self, default_security_group_rule): + """Get a single default security group rule + + :param default_security_group_rule: + The value can be the ID of a default security group rule or a + :class:`~openstack.network.v2.default_security_group_rule. + DefaultSecurityGroupRule` instance. + + :returns: + :class:`~openstack.network.v2.default_security_group_rule. + DefaultSecurityGroupRule` + :raises: :class:`~openstack.exceptions.ResourceNotFound` + when no resource can be found. + """ + return self._get( + _default_security_group_rule.DefaultSecurityGroupRule, + default_security_group_rule, + ) + + def default_security_group_rules(self, **query): + """Return a generator of default security group rules + + :param kwargs query: Optional query parameters to be sent to limit + the resources being returned. Available parameters include: + + * ``description``: The default security group rule description + * ``direction``: Default security group rule direction + * ``ether_type``: Must be IPv4 or IPv6, and addresses represented + in CIDR must match the ingress or egress rule. + * ``protocol``: Default security group rule protocol + * ``remote_group_id``: ID of a remote security group + + :returns: A generator of default security group rule objects + :rtype: + :class:`~openstack.network.v2.default_security_group_rule. + DefaultSecurityGroupRule` + """ + return self._list( + _default_security_group_rule.DefaultSecurityGroupRule, **query + ) + def create_segment(self, **attrs): """Create a new segment from attributes diff --git a/openstack/network/v2/default_security_group_rule.py b/openstack/network/v2/default_security_group_rule.py new file mode 100644 index 000000000..0d40649ac --- /dev/null +++ b/openstack/network/v2/default_security_group_rule.py @@ -0,0 +1,89 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from openstack.network.v2 import _base +from openstack import resource + + +class DefaultSecurityGroupRule(_base.NetworkResource): + resource_key = 'default_security_group_rule' + resources_key = 'default_security_group_rules' + base_path = '/default-security-group-rules' + + # capabilities + allow_create = True + allow_fetch = True + allow_commit = False + allow_delete = True + allow_list = True + + _query_mapping = resource.QueryParameters( + 'id', + 'description', + 'remote_group_id', + 'remote_address_group_id', + 'direction', + 'protocol', + 'port_range_min', + 'port_range_max', + 'remote_ip_prefix', + 'used_in_default_sg', + 'used_in_non_default_sg', + 'sort_dir', + 'sort_key', + ether_type='ethertype', + ) + + # Properties + #: The default security group rule description. + description = resource.Body('description') + #: The remote security group ID to be associated with this security + #: group rule created from this template. + #: You can specify either ``remote_group_id`` or #: + #: ``remote_address_group_id`` or ``remote_ip_prefix``. + remote_group_id = resource.Body('remote_group_id') + #: The remote address group ID to be associated with this security + #: group rule created from that template. + #: You can specify either ``remote_group_id`` or + #: ``remote_address_group_id`` or ``remote_ip_prefix``. + remote_address_group_id = resource.Body('remote_address_group_id') + #: ``ingress`` or ``egress``: The direction in which the security group #: + #: rule will be applied. See 'direction' field in the security group rule + #: API. + direction = resource.Body('direction') + #: The protocol that is matched by the security group rule. + #: Valid values are ``null``, ``tcp``, ``udp``, and ``icmp``. + protocol = resource.Body('protocol') + #: The minimum port number in the range that is matched by the + #: security group rule. If the protocol is TCP or UDP, this value + #: must be less than or equal to the value of the port_range_max + #: attribute. If the protocol is ICMP, this value must be an ICMP type. + port_range_min = resource.Body('port_range_min', type=int) + #: The maximum port number in the range that is matched by the + #: security group rule. The port_range_min attribute constrains + #: the port_range_max attribute. If the protocol is ICMP, this + #: value must be an ICMP type. + port_range_max = resource.Body('port_range_max', type=int) + #: The remote IP prefix to be associated with this security group rule. + #: You can specify either ``remote_group_id`` or + #: ``remote_address_group_id`` or ``remote_ip_prefix``. + #: This attribute matches the specified IP prefix as the source or + #: destination IP address of the IP packet depending on direction. + remote_ip_prefix = resource.Body('remote_ip_prefix') + #: Must be IPv4 or IPv6, and addresses represented in CIDR must match + #: the ingress or egress rules. + ether_type = resource.Body('ethertype') + #: Indicate if this template be used to create security group rules in the + #: default security group created automatically for each project. + used_in_default_sg = resource.Body('used_in_default_sg', type=bool) + #: Indicate if this template be used to create security group rules in the + #: custom security groups created in the project by users. + used_in_non_default_sg = resource.Body('used_in_non_default_sg', type=bool) diff --git a/openstack/tests/functional/network/v2/test_default_security_group_rule.py b/openstack/tests/functional/network/v2/test_default_security_group_rule.py new file mode 100644 index 000000000..11bf5cf50 --- /dev/null +++ b/openstack/tests/functional/network/v2/test_default_security_group_rule.py @@ -0,0 +1,83 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from openstack.network.v2 import default_security_group_rule +from openstack.tests.functional import base + + +class TestDefaultSecurityGroupRule(base.BaseFunctionalTest): + def setUp(self): + super().setUp() + if not self.user_cloud._has_neutron_extension( + "security-groups-default-rules" + ): + self.skipTest( + "Neutron security-groups-default-rules extension " + "is required for this test" + ) + + self.IPV4 = random.choice(["IPv4", "IPv6"]) + self.PROTO = random.choice(["tcp", "udp"]) + self.PORT = random.randint(1, 65535) + self.DIR = random.choice(["ingress", "egress"]) + self.USED_IN_DEFAULT_SG = random.choice([True, False]) + self.USED_IN_NON_DEFAULT_SG = random.choice([True, False]) + + rul = self.operator_cloud.network.create_default_security_group_rule( + direction=self.DIR, + ethertype=self.IPV4, + port_range_max=self.PORT, + port_range_min=self.PORT, + protocol=self.PROTO, + used_in_default_sg=self.USED_IN_DEFAULT_SG, + used_in_non_default_sg=self.USED_IN_NON_DEFAULT_SG, + ) + assert isinstance( + rul, default_security_group_rule.DefaultSecurityGroupRule + ) + self.RULE_ID = rul.id + + def tearDown(self): + sot = self.operator_cloud.network.delete_default_security_group_rule( + self.RULE_ID, ignore_missing=False + ) + self.assertIsNone(sot) + super(TestDefaultSecurityGroupRule, self).tearDown() + + def test_find(self): + sot = self.operator_cloud.network.find_default_security_group_rule( + self.RULE_ID + ) + self.assertEqual(self.RULE_ID, sot.id) + + def test_get(self): + sot = self.operator_cloud.network.get_default_security_group_rule( + self.RULE_ID + ) + self.assertEqual(self.RULE_ID, sot.id) + self.assertEqual(self.DIR, sot.direction) + self.assertEqual(self.PROTO, sot.protocol) + self.assertEqual(self.PORT, sot.port_range_min) + self.assertEqual(self.PORT, sot.port_range_max) + self.assertEqual(self.USED_IN_DEFAULT_SG, sot.used_in_default_sg) + self.assertEqual( + self.USED_IN_NON_DEFAULT_SG, sot.used_in_non_default_sg + ) + + def test_list(self): + ids = [ + o.id + for o in self.operator_cloud.network.default_security_group_rules() + ] + self.assertIn(self.RULE_ID, ids) diff --git a/openstack/tests/unit/network/v2/test_default_security_group_rule.py b/openstack/tests/unit/network/v2/test_default_security_group_rule.py new file mode 100644 index 000000000..f4b0e27d7 --- /dev/null +++ b/openstack/tests/unit/network/v2/test_default_security_group_rule.py @@ -0,0 +1,85 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.network.v2 import default_security_group_rule +from openstack.tests.unit import base + + +IDENTIFIER = 'IDENTIFIER' +EXAMPLE = { + 'description': '1', + 'direction': '2', + 'ethertype': '3', + 'id': IDENTIFIER, + 'port_range_max': 4, + 'port_range_min': 5, + 'protocol': '6', + 'remote_group_id': '7', + 'remote_ip_prefix': '8', + 'remote_address_group_id': '13', + 'used_in_default_sg': True, + 'used_in_non_default_sg': True, +} + + +class TestDefaultSecurityGroupRule(base.TestCase): + def test_basic(self): + sot = default_security_group_rule.DefaultSecurityGroupRule() + self.assertEqual('default_security_group_rule', sot.resource_key) + self.assertEqual('default_security_group_rules', sot.resources_key) + self.assertEqual('/default-security-group-rules', sot.base_path) + self.assertTrue(sot.allow_create) + self.assertTrue(sot.allow_fetch) + self.assertFalse(sot.allow_commit) + self.assertTrue(sot.allow_delete) + self.assertTrue(sot.allow_list) + + self.assertDictEqual( + { + 'description': 'description', + 'direction': 'direction', + 'id': 'id', + 'ether_type': 'ethertype', + 'limit': 'limit', + 'marker': 'marker', + 'port_range_max': 'port_range_max', + 'port_range_min': 'port_range_min', + 'protocol': 'protocol', + 'remote_group_id': 'remote_group_id', + 'remote_address_group_id': 'remote_address_group_id', + 'remote_ip_prefix': 'remote_ip_prefix', + 'sort_dir': 'sort_dir', + 'sort_key': 'sort_key', + 'used_in_default_sg': 'used_in_default_sg', + 'used_in_non_default_sg': 'used_in_non_default_sg', + }, + sot._query_mapping._mapping, + ) + + def test_make_it(self): + sot = default_security_group_rule.DefaultSecurityGroupRule(**EXAMPLE) + self.assertEqual(EXAMPLE['description'], sot.description) + self.assertEqual(EXAMPLE['direction'], sot.direction) + self.assertEqual(EXAMPLE['ethertype'], sot.ether_type) + self.assertEqual(EXAMPLE['id'], sot.id) + self.assertEqual(EXAMPLE['port_range_max'], sot.port_range_max) + self.assertEqual(EXAMPLE['port_range_min'], sot.port_range_min) + self.assertEqual(EXAMPLE['protocol'], sot.protocol) + self.assertEqual(EXAMPLE['remote_group_id'], sot.remote_group_id) + self.assertEqual( + EXAMPLE['remote_address_group_id'], sot.remote_address_group_id + ) + self.assertEqual(EXAMPLE['remote_ip_prefix'], sot.remote_ip_prefix) + self.assertEqual(EXAMPLE['used_in_default_sg'], sot.used_in_default_sg) + self.assertEqual( + EXAMPLE['used_in_non_default_sg'], sot.used_in_non_default_sg + )