From 25a694c0984a472cab988e34062a0bce92100a60 Mon Sep 17 00:00:00 2001 From: Miguel Lavalle Date: Tue, 20 Oct 2020 17:37:22 -0500 Subject: [PATCH] Agent side push notifications for address groups Adds agent side code to enable the OVS agent to receive address groups from the push notifications cache. Change-Id: I1f27eccb2a69c553631fdc12d34e9025925844c5 Partial-Bug: #1592028 --- neutron/agent/rpc.py | 3 +- neutron/agent/securitygroups_rpc.py | 10 +++++ .../api/rpc/handlers/securitygroups_rpc.py | 21 ++++++++++ .../rpc/handlers/test_securitygroups_rpc.py | 42 ++++++++++++++++++- 4 files changed, 74 insertions(+), 2 deletions(-) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 247a0b26d9d..ebfb53e815f 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -208,7 +208,8 @@ class CacheBackedPluginApi(PluginApi): resources.SECURITYGROUP, resources.SECURITYGROUPRULE, resources.NETWORK, - resources.SUBNET] + resources.SUBNET, + resources.ADDRESSGROUP] def __init__(self, *args, **kwargs): super(CacheBackedPluginApi, self).__init__(*args, **kwargs) diff --git a/neutron/agent/securitygroups_rpc.py b/neutron/agent/securitygroups_rpc.py index 15a18652311..a99d2a0dee1 100644 --- a/neutron/agent/securitygroups_rpc.py +++ b/neutron/agent/securitygroups_rpc.py @@ -212,6 +212,16 @@ class SecurityGroupAgentRpc(object): else: self.refresh_firewall(devices) + def address_group_updated(self, address_group_id): + LOG.info("Address group updated %r", address_group_id) + # TODO(mlavalle) A follow up patch in the address groups implementation + # series will add more code here + + def address_group_deleted(self, address_group_id): + LOG.info("Address group deleted %r", address_group_id) + # TODO(mlavalle) A follow up patch in the address groups implementation + # series will add more code here + def remove_devices_filter(self, device_ids): if not device_ids: return diff --git a/neutron/api/rpc/handlers/securitygroups_rpc.py b/neutron/api/rpc/handlers/securitygroups_rpc.py index e122a5f03de..b1864723df4 100644 --- a/neutron/api/rpc/handlers/securitygroups_rpc.py +++ b/neutron/api/rpc/handlers/securitygroups_rpc.py @@ -228,6 +228,12 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin): 'Port', events.AFTER_DELETE) registry.subscribe(self._handle_sg_member_update, 'Port', events.AFTER_UPDATE) + self._register_legacy_ag_notification_callbacks(sg_agent) + + def _register_legacy_ag_notification_callbacks(self, sg_agent): + for event in (events.AFTER_UPDATE, events.AFTER_DELETE): + registry.subscribe(self._handle_address_group_event, + resources.ADDRESSGROUP, event) def security_group_info_for_devices(self, context, devices): ports = self._get_devices_info(context, devices) @@ -240,6 +246,14 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin): # error. raise NotImplementedError() + def get_address_group_details(self, address_group_id): + ag_obj = self.rcache.get_resource_by_id(resources.ADDRESSGROUP, + address_group_id) + if not ag_obj: + LOG.debug("Address group %s does not exist in cache.", + address_group_id) + return ag_obj + def _add_child_sg_rules(self, rtype, event, trigger, context, updated, **kwargs): # whenever we receive a full security group, add all child rules @@ -292,6 +306,13 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin): if sgs: self._sg_agent.security_groups_member_updated(sgs) + def _handle_address_group_event(self, rtype, event, trigger, context, + resource_id, **kwargs): + if event == events.AFTER_UPDATE: + self._sg_agent.address_group_updated(resource_id) + else: + self._sg_agent.address_group_deleted(resource_id) + def _get_devices_info(self, context, devices): # NOTE(kevinbenton): this format is required by the sg code, it is # defined in get_port_from_device and mimics diff --git a/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py index 4e694fab1ac..6cf5908d466 100644 --- a/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_securitygroups_rpc.py @@ -22,6 +22,7 @@ from neutron.agent import resource_cache from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import securitygroups_rpc from neutron import objects +from neutron.objects import address_group from neutron.objects.port.extensions import port_security as psec from neutron.objects import ports from neutron.objects import securitygroup @@ -70,7 +71,7 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase): super(SecurityGroupServerAPIShimTestCase, self).setUp() objects.register_objects() resource_types = [resources.PORT, resources.SECURITYGROUP, - resources.SECURITYGROUPRULE] + resources.SECURITYGROUPRULE, resources.ADDRESSGROUP] self.rcache = resource_cache.RemoteResourceCache(resource_types) # prevent any server lookup attempts mock.patch.object(self.rcache, '_flood_cache_for_query').start() @@ -93,6 +94,26 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase): self.rcache.record_resource_update(self.ctx, 'Port', p) return p + def _make_address_group_ovo(self): + id = uuidutils.generate_uuid() + address_associations = [ + address_group.AddressAssociation( + self.ctx, + address=netaddr.IPNetwork('10.0.0.1/32'), + address_group_id=id), + address_group.AddressAssociation( + self.ctx, + address=netaddr.IPNetwork('2001:db8::/32'), + address_group_id=id) + ] + ag = address_group.AddressGroup(self.ctx, id=id, + name='an-address-group', + description='An address group', + addresses=address_associations) + self.rcache.record_resource_update(self.ctx, resources.ADDRESSGROUP, + ag) + return ag + @mock.patch.object(securitygroup.SecurityGroup, 'is_shared_with_tenant', return_value=False) def _make_security_group_ovo(self, *args, **kwargs): @@ -172,3 +193,22 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase): self.rcache.record_resource_delete(self.ctx, 'Port', p1.id) self.sg_agent.security_groups_member_updated.assert_called_with( {s1.id}) + + def test_get_address_group_details(self): + ag = self._make_address_group_ovo() + retrieved_ag = self.shim.get_address_group_details(ag.id) + self.assertEqual(ag.id, retrieved_ag.id) + self.assertEqual(ag.name, retrieved_ag.name) + self.assertEqual(ag.description, retrieved_ag.description) + self.assertEqual(ag.addresses[0].address, + retrieved_ag.addresses[0].address) + self.assertEqual(ag.addresses[1].address, + retrieved_ag.addresses[1].address) + + def test_address_group_update_events(self): + ag = self._make_address_group_ovo() + self.sg_agent.address_group_updated.assert_called_with(ag.id) + self.sg_agent.address_group_updated.reset_mock() + self.rcache.record_resource_delete(self.ctx, resources.ADDRESSGROUP, + ag.id) + self.sg_agent.address_group_deleted.assert_called_with(ag.id)