diff --git a/devstack/ovn-local.conf.sample b/devstack/ovn-local.conf.sample index 0d96894039f..42ba24bd88a 100644 --- a/devstack/ovn-local.conf.sample +++ b/devstack/ovn-local.conf.sample @@ -44,6 +44,7 @@ enable_service q-dns enable_service q-port-forwarding enable_service q-qos enable_service neutron-segments +enable_service q-log # Enable neutron tempest plugin tests enable_plugin neutron-tempest-plugin https://opendev.org/openstack/neutron-tempest-plugin diff --git a/neutron/common/ovn/constants.py b/neutron/common/ovn/constants.py index 031cf065fcc..fb21664c625 100644 --- a/neutron/common/ovn/constants.py +++ b/neutron/common/ovn/constants.py @@ -77,6 +77,7 @@ ACL_PRIORITY_ALLOW = 1002 ACL_PRIORITY_DROP = 1001 ACL_ACTION_DROP = 'drop' +ACL_ACTION_REJECT = 'reject' ACL_ACTION_ALLOW_RELATED = 'allow-related' ACL_ACTION_ALLOW = 'allow' diff --git a/neutron/common/ovn/extensions.py b/neutron/common/ovn/extensions.py index 9f2e8955d6f..acdcc964a93 100644 --- a/neutron/common/ovn/extensions.py +++ b/neutron/common/ovn/extensions.py @@ -27,6 +27,7 @@ from neutron_lib.api.definitions import fip_port_details from neutron_lib.api.definitions import floating_ip_port_forwarding from neutron_lib.api.definitions import l3 from neutron_lib.api.definitions import l3_ext_gw_mode +from neutron_lib.api.definitions import logging from neutron_lib.api.definitions import multiprovidernet from neutron_lib.api.definitions import network_availability_zone from neutron_lib.api.definitions import network_ip_availability @@ -114,5 +115,6 @@ ML2_SUPPORTED_API_EXTENSIONS = [ expose_port_forwarding_in_fip.ALIAS, fip_pf_description.ALIAS, floating_ip_port_forwarding.ALIAS, - vlantransparent.ALIAS + vlantransparent.ALIAS, + logging.ALIAS, ] diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py index 5fe370aa140..38be2e817f4 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py @@ -56,6 +56,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker +from neutron.services.logapi.drivers.ovn import driver as log_driver from neutron.services.qos.drivers.ovn import driver as qos_driver from neutron.services.segments import db as segment_service_db from neutron.services.trunk.drivers.ovn import trunk_driver @@ -125,6 +126,7 @@ class OVNMechanismDriver(api.MechanismDriver): self.subscribe() self.qos_driver = qos_driver.OVNQosDriver.create(self) self.trunk_driver = trunk_driver.OVNTrunkDriver.create(self) + self.log_driver = log_driver.register(self) @property def nb_schema_helper(self): diff --git a/neutron/services/logapi/drivers/ovn/__init__.py b/neutron/services/logapi/drivers/ovn/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/services/logapi/drivers/ovn/driver.py b/neutron/services/logapi/drivers/ovn/driver.py new file mode 100644 index 00000000000..1a5ce941964 --- /dev/null +++ b/neutron/services/logapi/drivers/ovn/driver.py @@ -0,0 +1,363 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import namedtuple + +from neutron_lib.api.definitions import portbindings +from neutron_lib.callbacks import resources +from neutron_lib import exceptions as n_exceptions +from neutron_lib.plugins import constants as plugin_constants +from neutron_lib.plugins import directory +from neutron_lib.services.logapi import constants as log_const +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import importutils +from ovsdbapp.backend.ovs_idl import idlutils + +from neutron._i18n import _ +from neutron.common.ovn import constants as ovn_const +from neutron.common.ovn import utils +from neutron.conf.services import logging as log_cfg +from neutron.services.logapi.common import db_api +from neutron.services.logapi.common import sg_callback +from neutron.services.logapi.drivers import base +from neutron.services.logapi.drivers import manager + +LOG = logging.getLogger(__name__) + +DRIVER = None + +log_cfg.register_log_driver_opts() + +SUPPORTED_LOGGING_TYPES = [log_const.SECURITY_GROUP] + + +class LoggingNotSupported(n_exceptions.NeutronException): + message = _("The current OVN version does not offer support " + "for neutron network log functionality.") + + +class OVNDriver(base.DriverBase): + + def __init__(self): + super().__init__( + name="ovn", + vif_types=[portbindings.VIF_TYPE_OVS, + portbindings.VIF_TYPE_VHOST_USER], + vnic_types=[portbindings.VNIC_NORMAL], + supported_logging_types=SUPPORTED_LOGGING_TYPES, + requires_rpc=False) + self._log_plugin_property = None + self.meter_name = ( + cfg.CONF.network_log.local_output_log_base or "acl_log_meter") + + @staticmethod + def network_logging_supported(ovn_nb): + columns = list(ovn_nb._tables["Meter"].columns) + return ("fair" in columns) + + @classmethod + def create(cls, plugin_driver): + cls.plugin_driver = plugin_driver + return OVNDriver() + + @property + def _log_plugin(self): + if self._log_plugin_property is None: + self._log_plugin_property = directory.get_plugin( + plugin_constants.LOG_API) + return self._log_plugin_property + + @staticmethod + def _log_dict_to_obj(log_dict): + cls = namedtuple('Log_obj', log_dict) + cls.__new__.__defaults__ = tuple(log_dict.values()) + return cls() + + def _get_logs(self, context): + log_objs = self._log_plugin.get_logs(context) + return [self._log_dict_to_obj(lo) for lo in log_objs] + + @property + def ovn_nb(self): + return self.plugin_driver.nb_ovn + + def _create_ovn_fair_meter(self, ovn_txn): + """Create row in Meter table with fair attribute set to True. + + Create a row in OVN's NB Meter table based on well-known name. This + method uses the network_log configuration to specify the attributes + of the meter. Current implementation needs only one 'fair' meter row + which is then referred by multiple ACL rows. + + :param ovn_txn: ovn nortbound idl transaction. + + """ + meter = self.ovn_nb.db_find_rows( + "Meter", ("name", "=", self.meter_name)).execute(check_error=True) + if meter: + meter = meter[0] + try: + meter_band = self.ovn_nb.lookup("Meter_Band", + meter.bands[0].uuid) + if all((meter.unit == "pktps", + meter.fair[0], + meter_band.rate == cfg.CONF.network_log.rate_limit, + meter_band.burst_size == + cfg.CONF.network_log.burst_limit)): + # Meter (and its meter-band) unchanged: noop. + return + except idlutils.RowNotFound: + pass + # Re-create meter (and its meter-band) with the new attributes. + # This is supposed to happen only if configuration changed, so + # doing updates is an overkill: better to leverage the ovsdbapp + # library to avoid the complexity. + ovn_txn.add(self.ovn_nb.meter_del(meter.uuid)) + # Create meter + LOG.info("Creating network log fair meter %s", self.meter_name) + ovn_txn.add(self.ovn_nb.meter_add( + name=self.meter_name, + unit="pktps", + rate=cfg.CONF.network_log.rate_limit, + fair=True, + burst_size=cfg.CONF.network_log.burst_limit, + may_exist=False, + external_ids={ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY: + log_const.LOGGING_PLUGIN})) + + @staticmethod + def _acl_actions_enabled(log_obj): + if not log_obj.enabled: + return set() + if log_obj.event == log_const.ACCEPT_EVENT: + return {ovn_const.ACL_ACTION_ALLOW_RELATED, + ovn_const.ACL_ACTION_ALLOW} + if log_obj.event == log_const.DROP_EVENT: + return {ovn_const.ACL_ACTION_DROP, + ovn_const.ACL_ACTION_REJECT} + # Fall through case: log_const.ALL_EVENT + return {ovn_const.ACL_ACTION_DROP, + ovn_const.ACL_ACTION_REJECT, + ovn_const.ACL_ACTION_ALLOW_RELATED, + ovn_const.ACL_ACTION_ALLOW} + + def _remove_acls_log(self, pgs, ovn_txn, log_name=None): + acl_changes, acl_visits = 0, 0 + for pg in pgs: + for acl_uuid in pg["acls"]: + acl_visits += 1 + # skip acls used by a different network log + if log_name: + acl = self.ovn_nb.lookup("ACL", acl_uuid) + if acl.name and acl.name[0] != log_name: + continue + ovn_txn.add(self.ovn_nb.db_set( + "ACL", acl_uuid, + ("log", False), + ("meter", []), + ("name", []), + ("severity", []) + )) + acl_changes += 1 + msg = "Cleared %d (out of %d visited) ACLs" + if log_name: + msg += " for network log {}".format(log_name) + LOG.info(msg, acl_changes, acl_visits) + + def _set_acls_log(self, pgs, ovn_txn, actions_enabled, log_name): + acl_changes, acl_visits = 0, 0 + for pg in pgs: + for acl_uuid in pg["acls"]: + acl_visits += 1 + acl = self.ovn_nb.lookup("ACL", acl_uuid) + # skip acls used by a different network log + if acl.name and acl.name[0] != log_name: + continue + ovn_txn.add(self.ovn_nb.db_set( + "ACL", acl_uuid, + ("log", acl.action in actions_enabled), + ("meter", self.meter_name), + ("name", log_name), + ("severity", "info") + )) + acl_changes += 1 + LOG.info("Set %d (out of %d visited) ACLs for network log %s", + acl_changes, acl_visits, log_name) + + def _update_log_objs(self, context, ovn_txn, log_objs): + for log_obj in log_objs: + pgs = self._pgs_from_log_obj(context, log_obj) + actions_enabled = self._acl_actions_enabled(log_obj) + self._set_acls_log(pgs, ovn_txn, actions_enabled, + utils.ovn_name(log_obj.id)) + + def _pgs_all(self): + return self.ovn_nb.db_list( + "Port_Group", columns=["name", "acls"]).execute(check_error=True) + + def _pgs_from_log_obj(self, context, log_obj): + """Map Neutron log_obj into affected port groups in OVN. + + :param context: current running context information + :param log_obj: a log_object to be analyzed. + + """ + if not log_obj.resource_id and not log_obj.target_id: + # No sg, no port: return all pgs + return self._pgs_all() + + pgs = [] + # include special pg_drop to log DROP and ALL actions + if not log_obj.event or log_obj.event in (log_const.DROP_EVENT, + log_const.ALL_EVENT): + try: + pg = self.ovn_nb.lookup("Port_Group", + ovn_const.OVN_DROP_PORT_GROUP_NAME) + pgs.append({"name": pg.name, + "acls": [r.uuid for r in pg.acls]}) + except idlutils.RowNotFound: + pass + + if log_obj.resource_id: + try: + pg = self.ovn_nb.lookup("Port_Group", + utils.ovn_port_group_name( + log_obj.resource_id)) + pgs.append({"name": pg.name, + "acls": [r.uuid for r in pg.acls]}) + except idlutils.RowNotFound: + pass + # Note: when sg is provided, it is redundant to get sgs from port, + # because model will ensure that sg is associated with neutron port + elif log_obj.target_id: + sg_ids = db_api._get_sgs_attached_to_port(context, + log_obj.target_id) + for sg_id in sg_ids: + try: + pg = self.ovn_nb.lookup("Port_Group", + utils.ovn_port_group_name(sg_id)) + pgs.append({"name": pg.name, + "acls": [r.uuid for r in pg.acls]}) + except idlutils.RowNotFound: + pass + return pgs + + def create_log(self, context, log_obj): + """Create a log_obj invocation. + + :param context: current running context information + :param log_obj: a log objects being created + """ + LOG.debug("Create_log %s", log_obj) + + pgs = self._pgs_from_log_obj(context, log_obj) + actions_enabled = self._acl_actions_enabled(log_obj) + with self.ovn_nb.transaction(check_error=True) as ovn_txn: + self._create_ovn_fair_meter(ovn_txn) + self._set_acls_log(pgs, ovn_txn, actions_enabled, + utils.ovn_name(log_obj.id)) + + def create_log_precommit(self, context, log_obj): + """Create a log_obj precommit. + + :param context: current running context information + :param log_obj: a log object being created + """ + LOG.debug("Create_log_precommit %s", log_obj) + + if not self.network_logging_supported(self.ovn_nb): + raise LoggingNotSupported() + + def update_log(self, context, log_obj): + """Update a log_obj invocation. + + :param context: current running context information + :param log_obj: a log object being updated + + """ + LOG.debug("Update_log %s", log_obj) + + pgs = self._pgs_from_log_obj(context, log_obj) + actions_enabled = self._acl_actions_enabled(log_obj) + with self.ovn_nb.transaction(check_error=True) as ovn_txn: + self._set_acls_log(pgs, ovn_txn, actions_enabled, + utils.ovn_name(log_obj.id)) + + def delete_log(self, context, log_obj): + """Delete a log_obj invocation. + + :param context: current running context information + :param log_obj: a log_object being deleted + + """ + LOG.debug("Delete_log %s", log_obj) + + # If we are removing the last log_obj, let's clear log from all acls. + # This is a simple way of ensuring that no acl logs are left behind! + log_objs = self._get_logs(context) + if not log_objs or ( + len(log_objs) == 1 and log_objs[0].id == log_obj.id): + pgs = self._pgs_all() + with self.ovn_nb.transaction(check_error=True) as ovn_txn: + self._remove_acls_log(pgs, ovn_txn) + ovn_txn.add(self.ovn_nb.meter_del(self.meter_name, + if_exists=True)) + LOG.info("All ACL logs cleared after deletion of log_obj %s", + log_obj.id) + return + + # Remove log_obj and revisit all remaining ones, since the acls that + # were serving the removed log_obj may be usable by the remaining + # log_objs. + pgs = self._pgs_from_log_obj(context, log_obj) + with self.ovn_nb.transaction(check_error=True) as ovn_txn: + self._remove_acls_log(pgs, ovn_txn, utils.ovn_name(log_obj.id)) + + # TODO(flaviof): We needed to break this second part into a separate + # transaction because logic that determines the value of the 'freed up' + # acl rows will not see the modified rows unless it was inside an an + # idl command. + with self.ovn_nb.transaction(check_error=True) as ovn_txn: + self._update_log_objs(context, ovn_txn, [lo for lo in log_objs + if lo.id != log_obj.id]) + + def resource_update(self, context, log_objs): + """Tell the agent when resources related to log_objects are + being updated + + :param context: current running context information + :param log_objs: a list of log_objects, whose related resources are + being updated. + """ + LOG.debug("Resource_update %s", log_objs) + + with self.ovn_nb.transaction(check_error=True) as ovn_txn: + self._update_log_objs(context, ovn_txn, log_objs) + + +def register(plugin_driver): + """Register the driver.""" + global DRIVER + if not DRIVER: + DRIVER = OVNDriver.create(plugin_driver) + + # Trigger decorator + importutils.import_module( + "neutron.services.logapi.common.sg_validate" + ) + # Register resource callback handler + manager.register( + resources.SECURITY_GROUP_RULE, sg_callback.SecurityGroupRuleCallBack) + + LOG.info("OVN logging driver registered") + return DRIVER diff --git a/neutron/tests/functional/base.py b/neutron/tests/functional/base.py index 8b79d0975e6..4e436fc5ab5 100644 --- a/neutron/tests/functional/base.py +++ b/neutron/tests/functional/base.py @@ -197,6 +197,15 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase, self.pf_plugin = manager.NeutronManager.load_class_for_provider( 'neutron.service_plugins', 'port_forwarding')() self.pf_plugin._rpc_notifications_required = False + self.log_plugin = directory.get_plugin(constants.LOG_API) + if not self.log_plugin: + self.log_plugin = manager.NeutronManager.load_class_for_provider( + 'neutron.service_plugins', 'log')() + directory.add_plugin(constants.LOG_API, self.log_plugin) + self.log_plugin.driver_manager.register_driver( + self.mech_driver.log_driver) + self.mech_driver.log_driver.plugin_driver = self.mech_driver + self.mech_driver.log_driver._log_plugin_property = None self.ovn_northd_mgr = None self.maintenance_worker = maintenance_worker mock.patch( diff --git a/neutron/tests/functional/services/logapi/drivers/__init__.py b/neutron/tests/functional/services/logapi/drivers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/functional/services/logapi/drivers/ovn/__init__.py b/neutron/tests/functional/services/logapi/drivers/ovn/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/functional/services/logapi/drivers/ovn/test_driver.py b/neutron/tests/functional/services/logapi/drivers/ovn/test_driver.py new file mode 100644 index 00000000000..3cf503e3d71 --- /dev/null +++ b/neutron/tests/functional/services/logapi/drivers/ovn/test_driver.py @@ -0,0 +1,279 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from neutron_lib import constants as n_const +from neutron_lib import context +from neutron_lib import exceptions +from neutron_lib.services.logapi import constants as log_const + +from neutron.common.ovn import constants as ovn_const +from neutron.common.ovn import utils +from neutron.services.logapi.common import exceptions as log_exc +from neutron.tests.functional import base as functional_base + + +class LogApiTestCaseBase(functional_base.TestOVNFunctionalBase): + def setUp(self): + super().setUp() + self.log_driver = self.mech_driver.log_driver + self._check_is_supported() + self.ctxt = context.Context('admin', 'fake_tenant') + + def _check_is_supported(self): + if not self.log_driver.network_logging_supported(self.nb_api): + self.skipTest("The current OVN version does not offer support " + "for neutron network log functionality.") + self.assertIsNotNone(self.log_plugin) + + def _log_data(self, sg_id=None, port_id=None, enabled=True): + log_data = {'project_id': self.ctxt.project_id, + 'resource_type': 'security_group', + 'description': 'test net log', + 'name': 'logme', + 'enabled': enabled} + if sg_id: + log_data['resource_id'] = sg_id + if port_id: + log_data['target_id'] = port_id + return {'log': log_data} + + +class LogApiTestCaseSimple(LogApiTestCaseBase): + def test_basic_get(self): + log_obj = self.log_plugin.create_log(self.ctxt, self._log_data()) + self.assertIsNotNone(log_obj) + log_obj_get = self.log_plugin.get_log(self.ctxt, log_obj['id']) + self.assertEqual(log_obj, log_obj_get) + log_obj2 = self.log_plugin.create_log(self.ctxt, self._log_data()) + self.assertIsNotNone(log_obj2) + log_objs_get = self.log_plugin.get_logs(self.ctxt) + log_objs_ids = {x['id'] for x in log_objs_get} + self.assertEqual({log_obj['id'], log_obj2['id']}, log_objs_ids) + + def test_log_ovn_unsupported(self): + with mock.patch.object(self.log_driver, 'network_logging_supported', + return_value=False) as supported_mock: + log_data = {'log': {'resource_type': 'security_group', + 'enabled': True}} + self.assertRaises(exceptions.DriverCallError, + self.log_plugin.create_log, + self.ctxt, log_data) + supported_mock.assert_called_once() + + +class LogApiTestCaseComplex(LogApiTestCaseBase): + def setUp(self): + super().setUp() + self._prepare_env() + + def _prepare_env(self): + self.net = self._create_network( + self.fmt, 'private', admin_state_up=True).json['network']['id'] + self.subnet = self._create_subnet( + self.fmt, self.net, '10.0.0.0/24', enable_dhcp=False).json[ + 'subnet']['id'] + + self.sg1 = self._create_security_group('test_sg1_ssh') + self.sg2 = self._create_security_group('test_sg2_http') + self.sg3 = self._create_security_group('test_sg3_telnet_ssh') + self.sg1rs = [self._create_security_group_rule(self.sg1, 22)] + self.sg2rs = [self._create_security_group_rule(self.sg2, 80)] + self.sg3rs = [self._create_security_group_rule(self.sg3, 23), + self._create_security_group_rule(self.sg3, 22)] + self.sgs = [self.sg1, self.sg2, self.sg3] + self.sgrs = self.sg1rs + self.sg2rs + self.sg3rs + + self.port1_sgs = [self.sg1] + self.port1_sgrs = self.sg1rs + self.port1 = self._create_port(self.fmt, self.net, + security_groups=self.port1_sgs) + self.port2_sgs = [self.sg2, self.sg3] + self.port2_sgrs = self.sg2rs + self.sg3rs + self.port2 = self._create_port(self.fmt, self.net, + security_groups=self.port2_sgs) + self.port3_sgs = [self.sg1, self.sg3] + self.port3_sgrs = self.sg1rs + self.sg3rs + self.port3 = self._create_port(self.fmt, self.net, + security_groups=self.port3_sgs) + + def _create_port(self, name, net_id, security_groups): + data = {'port': {'name': name, + 'tenant_id': self.ctxt.project_id, + 'network_id': net_id, + 'security_groups': security_groups}} + req = self.new_create_request('ports', data, self.fmt) + res = req.get_response(self.api) + return self.deserialize(self.fmt, res)['port']['id'] + + def _create_security_group(self, name): + data = {'security_group': {'name': name, + 'tenant_id': self.ctxt.project_id}} + req = self.new_create_request('security-groups', data, self.fmt) + res = req.get_response(self.api) + return self.deserialize(self.fmt, res)['security_group']['id'] + + def _create_security_group_rule(self, sg_id, tcp_port): + data = {'security_group_rule': {'security_group_id': sg_id, + 'direction': 'ingress', + 'protocol': n_const.PROTO_NAME_TCP, + 'ethertype': n_const.IPv4, + 'port_range_min': tcp_port, + 'port_range_max': tcp_port, + 'tenant_id': self.ctxt.project_id}} + req = self.new_create_request('security-group-rules', data, self.fmt) + res = req.get_response(self.api) + return self.deserialize(self.fmt, res)['security_group_rule']['id'] + + def _find_security_group_row_by_id(self, sg_id): + for row in self.nb_api._tables['Port_Group'].rows.values(): + if row.name == utils.ovn_port_group_name(sg_id): + return row + + def _find_security_group_rule_row_by_id(self, sgr_id): + for row in self.nb_api._tables['ACL'].rows.values(): + if (row.external_ids.get( + ovn_const.OVN_SG_RULE_EXT_ID_KEY) == sgr_id): + return row + + def _check_acl_log(self, sgr, is_enabled=True): + acl = self._find_security_group_rule_row_by_id(sgr) + self.assertIsNotNone(acl) + self.assertEqual(is_enabled, acl.log) + return acl + + def _check_sgrs(self, sgrs=None, is_enabled=True): + if not sgrs: + sgrs = self.sgrs + for sgr in sgrs: + self._check_acl_log(sgr, is_enabled) + + def test_add_and_remove(self): + self._check_sgrs(is_enabled=False) + self.assertEqual([], + self.nb_api.meter_list().execute(check_error=True)) + + log_obj = self.log_plugin.create_log(self.ctxt, self._log_data()) + for sgr in self.sgrs: + acl = self._check_acl_log(sgr) + self.assertEqual(utils.ovn_name(log_obj['id']), acl.name[0]) + meter = self.nb_api.meter_get(acl.meter[0]).execute( + check_error=True) + self.assertEqual([True], meter.fair) + self.assertEqual('pktps', meter.unit) + self.assertEqual(1, len(meter.bands)) + self.assertEqual({ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY: + log_const.LOGGING_PLUGIN}, meter.external_ids) + + self.log_plugin.delete_log(self.ctxt, log_obj['id']) + self._check_sgrs(is_enabled=False) + self.assertEqual([], + self.nb_api.meter_list().execute(check_error=True)) + + log_objs = [] + for sg in self.sgs: + log_data = self._log_data(sg_id=sg) + log_objs.append(self.log_plugin.create_log(self.ctxt, log_data)) + self.assertEqual(len(log_objs), + len(self.log_plugin.get_logs(self.ctxt))) + self._check_sgrs(is_enabled=True) + + # Attempt to delete non-existing row + self.assertRaises(log_exc.LogResourceNotFound, + self.log_plugin.delete_log, + self.ctxt, log_obj['id']) + + self.log_plugin.delete_log(self.ctxt, log_objs[1]['id']) + self._check_sgrs(sgrs=self.sg1rs, is_enabled=True) + self._check_sgrs(sgrs=self.sg2rs, is_enabled=False) + self._check_sgrs(sgrs=self.sg3rs, is_enabled=True) + + self.log_plugin.delete_log(self.ctxt, log_objs[2]['id']) + self._check_sgrs(sgrs=self.sg1rs, is_enabled=True) + self._check_sgrs(sgrs=self.sg2rs, is_enabled=False) + self._check_sgrs(sgrs=self.sg3rs, is_enabled=False) + + self.log_plugin.delete_log(self.ctxt, log_objs[0]['id']) + self.assertEqual([], self.log_plugin.get_logs(self.ctxt)) + self._check_sgrs(is_enabled=False) + + # Attempt to delete from empty table + self.assertRaises(log_exc.LogResourceNotFound, + self.log_plugin.delete_log, + self.ctxt, log_objs[0]['id']) + + def test_update_all(self): + # Note: only these fields are supported for update: + # openstack network log set [-h] [--description ] + # [--enable | --disable] [--name ] + + log_data = self._log_data() + log_obj = self.log_plugin.create_log(self.ctxt, log_data) + self._check_sgrs() + + log_data['log']['name'] = 'logme-nay' + log_data['log']['enabled'] = False + self.log_plugin.update_log(self.ctxt, log_obj['id'], log_data) + self._check_sgrs(is_enabled=False) + + log_data['log']['name'] = 'logme-yay' + log_data['log']['description'] = 'logs are a beautiful thing' + log_data['log']['enabled'] = True + self.log_plugin.update_log(self.ctxt, log_obj['id'], log_data) + self._check_sgrs() + + def test_update_one_sg(self): + log_data = self._log_data(sg_id=self.sg2, enabled=False) + log_obj = self.log_plugin.create_log(self.ctxt, log_data) + self._check_sgrs(is_enabled=False) + + log_data['log']['enabled'] = True + self.log_plugin.update_log(self.ctxt, log_obj['id'], log_data) + self._check_sgrs(sgrs=self.sg1rs, is_enabled=False) + self._check_sgrs(sgrs=self.sg2rs, is_enabled=True) + self._check_sgrs(sgrs=self.sg3rs, is_enabled=False) + + def test_overlap_net_logs(self): + log_data1 = self._log_data(sg_id=self.sg3, port_id=self.port3) + log_obj1 = self.log_plugin.create_log(self.ctxt, log_data1) + self._check_sgrs(sgrs=self.sg1rs, is_enabled=False) + self._check_sgrs(sgrs=self.sg2rs, is_enabled=False) + self._check_sgrs(sgrs=self.sg3rs, is_enabled=True) + + log_data2 = self._log_data(port_id=self.port2) + log_obj2 = self.log_plugin.create_log(self.ctxt, log_data2) + self._check_sgrs(sgrs=self.sg1rs, is_enabled=False) + + # port 2 uses sg2 and sg3. However, sg3 is in use by log_obj1 + # so only acls for 2 would be associated with log_obj2 + for sgr in self.sg2rs: + acl = self._check_acl_log(sgr) + self.assertEqual(utils.ovn_name(log_obj2['id']), acl.name[0]) + for sgr in self.sg3rs: + acl = self._check_acl_log(sgr) + self.assertEqual(utils.ovn_name(log_obj1['id']), acl.name[0]) + + # Next, delete log_obj1 and make sure that lob_obj2 gets to + # claim what it could not use before + self.log_plugin.delete_log(self.ctxt, log_obj1['id']) + self._check_sgrs(sgrs=self.sg1rs, is_enabled=False) + for sgr in self.sg2rs + self.sg3rs: + acl = self._check_acl_log(sgr) + self.assertEqual(utils.ovn_name(log_obj2['id']), acl.name[0]) + + # Delete log_obj2 and ensure that logs are off and meter is no + # longer used + self.log_plugin.delete_log(self.ctxt, log_obj2['id']) + self._check_sgrs(is_enabled=False) + self.assertEqual([], + self.nb_api.meter_list().execute(check_error=True)) diff --git a/neutron/tests/unit/fake_resources.py b/neutron/tests/unit/fake_resources.py index 6ab13d71e56..867f19e499d 100644 --- a/neutron/tests/unit/fake_resources.py +++ b/neutron/tests/unit/fake_resources.py @@ -149,6 +149,8 @@ class FakeOvsdbNbOvnIdl(object): self.check_liveness = mock.Mock() self.ha_chassis_group_get = mock.Mock() self.qos_del_ext_ids = mock.Mock() + self.meter_add = mock.Mock() + self.meter_del = mock.Mock() class FakeOvsdbSbOvnIdl(object): diff --git a/neutron/tests/unit/services/logapi/drivers/ovn/__init__.py b/neutron/tests/unit/services/logapi/drivers/ovn/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/services/logapi/drivers/ovn/test_driver.py b/neutron/tests/unit/services/logapi/drivers/ovn/test_driver.py new file mode 100644 index 00000000000..cf0431e09dd --- /dev/null +++ b/neutron/tests/unit/services/logapi/drivers/ovn/test_driver.py @@ -0,0 +1,323 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from neutron_lib.plugins import constants as plugin_constants +from neutron_lib.services.logapi import constants as log_const +from oslo_utils import uuidutils +from ovsdbapp.backend.ovs_idl import idlutils + +from neutron.common import utils as neutron_utils + +from neutron.common.ovn import constants as ovn_const +from neutron.common.ovn import utils as ovn_utils +from neutron.services.logapi.drivers.ovn import driver as ovn_driver +from neutron.tests import base +from neutron.tests.unit import fake_resources + +FAKE_CFG_RATE = 123 +FAKE_CFG_BURST = 321 + + +class TestOVNDriver(base.BaseTestCase): + + def setUp(self): + super().setUp() + + self.context = mock.Mock() + self.plugin_driver = mock.Mock() + self.plugin_driver.nb_ovn = fake_resources.FakeOvsdbNbOvnIdl() + + self.log_plugin = mock.Mock() + get_mock_log_plugin = lambda alias: self.log_plugin if ( + alias == plugin_constants.LOG_API) else None + self.fake_get_dir_object = mock.patch( + "neutron_lib.plugins.directory.get_plugin", + side_effect=get_mock_log_plugin).start() + + self.fake_get_sgs_attached_to_port = mock.patch( + "neutron.services.logapi.common.db_api._get_sgs_attached_to_port", + return_value=[]).start() + + self.fake_cfg_network_log = mock.patch( + "oslo_config.cfg.CONF.network_log").start() + self.fake_cfg_network_log.local_output_log_base = None + self.fake_cfg_network_log.rate_limit = FAKE_CFG_RATE + self.fake_cfg_network_log.burst_limit = FAKE_CFG_BURST + + self._log_driver_property = None + + @property + def _nb_ovn(self): + return self.plugin_driver.nb_ovn + + @property + def _log_driver(self): + if self._log_driver_property is None: + self._log_driver_property = ovn_driver.OVNDriver.create( + self.plugin_driver) + return self._log_driver_property + + def _log_driver_reinit(self): + self._log_driver_property = None + return self._log_driver + + def _fake_meter(self, **kwargs): + meter_defaults_dict = { + 'uuid': uuidutils.generate_uuid(), + 'bands': [mock.Mock(uuid='test_band')], + 'unit': 'pktps', + 'fair': [True], + } + meter_obj_dict = {**meter_defaults_dict, **kwargs} + return mock.Mock(**meter_obj_dict) + + def _fake_meter_band(self, **kwargs): + meter_band_defaults_dict = { + 'uuid': 'test_band', + 'rate': self.fake_cfg_network_log.rate_limit, + 'burst_size': self.fake_cfg_network_log.burst_limit, + } + meter_band_obj_dict = {**meter_band_defaults_dict, **kwargs} + return mock.Mock(**meter_band_obj_dict) + + def test_create(self): + driver = self._log_driver + self.assertEqual(self.log_plugin, driver._log_plugin) + self.assertEqual(self.plugin_driver, driver.plugin_driver) + self.assertEqual(self.plugin_driver.nb_ovn, driver.ovn_nb) + + def test_create_meter_name(self): + driver = self._log_driver + self.assertEqual("acl_log_meter", driver.meter_name) + + test_log_base = neutron_utils.get_rand_name() + self.fake_cfg_network_log.local_output_log_base = test_log_base + driver2 = self._log_driver_reinit() + self.assertEqual(test_log_base, driver2.meter_name) + + def test__create_ovn_fair_meter(self): + mock_find_rows = mock.Mock() + mock_find_rows.execute.return_value = None + self._nb_ovn.db_find_rows.return_value = mock_find_rows + self._log_driver._create_ovn_fair_meter(self._nb_ovn.transaction) + self.assertFalse(self._nb_ovn.meter_del.called) + self.assertTrue(self._nb_ovn.meter_add.called) + self.assertFalse( + self._nb_ovn.transaction.return_value.__enter__.called) + self._nb_ovn.meter_add.assert_called_once_with( + name="acl_log_meter", + unit="pktps", + rate=FAKE_CFG_RATE, + fair=True, + burst_size=FAKE_CFG_BURST, + may_exist=False, + external_ids={ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY: + log_const.LOGGING_PLUGIN}) + + def test__create_ovn_fair_meter_unchanged(self): + mock_find_rows = mock.Mock() + mock_find_rows.execute.return_value = [self._fake_meter()] + self._nb_ovn.db_find_rows.return_value = mock_find_rows + self._nb_ovn.lookup.side_effect = lambda table, key: ( + self._fake_meter_band() if key == "test_band" else None) + self._log_driver._create_ovn_fair_meter(self._nb_ovn.transaction) + self.assertFalse(self._nb_ovn.meter_del.called) + self.assertFalse(self._nb_ovn.meter_add.called) + + def test__create_ovn_fair_meter_changed(self): + mock_find_rows = mock.Mock() + mock_find_rows.execute.return_value = [self._fake_meter(fair=[False])] + self._nb_ovn.db_find_rows.return_value = mock_find_rows + self._nb_ovn.lookup.return_value = self._fake_meter_band() + self._log_driver._create_ovn_fair_meter(self._nb_ovn.transaction) + self.assertTrue(self._nb_ovn.meter_del.called) + self.assertTrue(self._nb_ovn.meter_add.called) + + def test__create_ovn_fair_meter_band_changed(self): + mock_find_rows = mock.Mock() + mock_find_rows.execute.return_value = [self._fake_meter()] + self._nb_ovn.db_find_rows.return_value = mock_find_rows + self._nb_ovn.lookup.return_value = self._fake_meter_band(rate=666) + self._log_driver._create_ovn_fair_meter(self._nb_ovn.transaction) + self.assertTrue(self._nb_ovn.meter_del.called) + self.assertTrue(self._nb_ovn.meter_add.called) + + def test__create_ovn_fair_meter_band_missing(self): + mock_find_rows = mock.Mock() + mock_find_rows.execute.return_value = [self._fake_meter()] + self._nb_ovn.db_find_rows.return_value = mock_find_rows + self._nb_ovn.lookup.side_effect = idlutils.RowNotFound + self._log_driver._create_ovn_fair_meter(self._nb_ovn.transaction) + self.assertTrue(self._nb_ovn.meter_del.called) + self.assertTrue(self._nb_ovn.meter_add.called) + + class _fake_acl(): + def __init__(self, name=None, **acl_dict): + acl_defaults_dict = { + "name": [name] if name else [], + "action": ovn_const.ACL_ACTION_ALLOW_RELATED, + } + self.__dict__ = {**acl_defaults_dict, **acl_dict} + + def _fake_pg_dict(self, **kwargs): + pg_defaults_dict = { + "name": ovn_utils.ovn_port_group_name(uuidutils.generate_uuid()), + "acls": [] + } + return {**pg_defaults_dict, **kwargs} + + def _fake_pg(self, **kwargs): + pg_defaults_dict = { + "name": ovn_utils.ovn_port_group_name(uuidutils.generate_uuid()), + "acls": [] + } + pg_dict = {**pg_defaults_dict, **kwargs} + return mock.Mock(**pg_dict) + + def _fake_log_obj(self, **kwargs): + log_obj_defaults_dict = { + 'uuid': uuidutils.generate_uuid(), + 'resource_id': None, + 'target_id': None, + 'event': log_const.ALL_EVENT, + } + log_obj_obj_dict = {**log_obj_defaults_dict, **kwargs} + return mock.Mock(**log_obj_obj_dict) + + def test__pgs_from_log_obj_pg_all(self): + expected_pgs = [self._fake_pg()] + with mock.patch.object(self._log_driver, '_pgs_all', + return_value=expected_pgs) as mock_pgs_all: + log_obj = self._fake_log_obj() + pgs = self._log_driver._pgs_from_log_obj(self.context, log_obj) + mock_pgs_all.assert_called_once() + self.assertEqual(expected_pgs, pgs) + + def test__pgs_from_log_obj_empty(self): + with mock.patch.object(self._log_driver, '_pgs_all', + return_value=[]) as mock_pgs_all: + self._nb_ovn.lookup.side_effect = idlutils.RowNotFound + log_obj = self._fake_log_obj(target_id='target_id') + pgs = self._log_driver._pgs_from_log_obj(self.context, log_obj) + mock_pgs_all.assert_not_called() + self._nb_ovn.lookup.assert_called_once_with( + "Port_Group", ovn_const.OVN_DROP_PORT_GROUP_NAME) + self.fake_get_sgs_attached_to_port.assert_called_once_with( + self.context, 'target_id') + self.assertEqual([], pgs) + + def test__pgs_from_log_obj_pg_drop(self): + with mock.patch.object(self._log_driver, '_pgs_all', + return_value=[]) as mock_pgs_all: + pg = self._fake_pg() + + def _mock_lookup(_pg_table, pg_name): + if pg_name == ovn_const.OVN_DROP_PORT_GROUP_NAME: + return pg + raise idlutils.RowNotFound + + self._nb_ovn.lookup.side_effect = _mock_lookup + log_obj = self._fake_log_obj(resource_id='resource_id') + pgs = self._log_driver._pgs_from_log_obj(self.context, log_obj) + mock_pgs_all.assert_not_called() + self.assertEqual(2, self._nb_ovn.lookup.call_count) + self.assertEqual([{'acls': [], 'name': pg.name}], pgs) + + def test__pgs_from_log_obj_pg(self): + with mock.patch.object(self._log_driver, '_pgs_all', + return_value=[]) as mock_pgs_all: + pg = self._fake_pg() + self._nb_ovn.lookup.return_value = pg + log_obj = self._fake_log_obj(resource_id='resource_id', + target_id='target_id', + event=log_const.ACCEPT_EVENT) + pgs = self._log_driver._pgs_from_log_obj(self.context, log_obj) + mock_pgs_all.assert_not_called() + self._nb_ovn.lookup.assert_called_once_with( + "Port_Group", ovn_utils.ovn_port_group_name('resource_id')) + self.assertEqual([{'acls': [], 'name': pg.name}], pgs) + + def test__pgs_from_log_obj_port(self): + with mock.patch.object(self._log_driver, '_pgs_all', + return_value=[]) as mock_pgs_all: + sg_id = uuidutils.generate_uuid() + pg_name = ovn_utils.ovn_port_group_name(sg_id) + pg = self._fake_pg(name=pg_name) + self._nb_ovn.lookup.return_value = pg + log_obj = self._fake_log_obj(target_id='target_id', + event=log_const.ACCEPT_EVENT) + self.fake_get_sgs_attached_to_port.return_value = [sg_id] + pgs = self._log_driver._pgs_from_log_obj(self.context, log_obj) + mock_pgs_all.assert_not_called() + self._nb_ovn.lookup.assert_called_once_with("Port_Group", pg_name) + self.fake_get_sgs_attached_to_port.assert_called_once_with( + self.context, 'target_id') + self.assertEqual([{'acls': [], 'name': pg.name}], pgs) + + @mock.patch.object(ovn_driver.LOG, 'info') + def test__remove_acls_log(self, m_info): + pg_dict = self._fake_pg_dict(acls=['acl1', 'acl2']) + self._log_driver._remove_acls_log([pg_dict], self._nb_ovn.transaction) + info_args, _info_kwargs = m_info.call_args_list[0] + self.assertIn('Cleared %d (out of %d visited) ACLs', info_args[0]) + self._nb_ovn.lookup.assert_not_called() + self.assertEqual(len(pg_dict["acls"]), info_args[1]) + self.assertEqual(len(pg_dict["acls"]), info_args[2]) + self.assertEqual(len(pg_dict["acls"]), self._nb_ovn.db_set.call_count) + + @mock.patch.object(ovn_driver.LOG, 'info') + def test__remove_acls_log_with_log_name(self, m_info): + pg_dict = self._fake_pg_dict(acls=['acl1', 'acl2', 'acl3', 'acl4']) + log_name = 'test_obj_name' + used_name = 'test_used_name' + + def _mock_lookup(_pg_table, acl_uuid): + if acl_uuid == 'acl2': + return self._fake_acl(name=used_name) + return self._fake_acl(name=log_name) + + self._nb_ovn.lookup.side_effect = _mock_lookup + self._log_driver._remove_acls_log([pg_dict], self._nb_ovn.transaction, + log_name) + info_args, _info_kwargs = m_info.call_args_list[0] + self.assertIn('Cleared %d (out of %d visited) ACLs', info_args[0]) + self.assertIn('for network log {}'.format(log_name), info_args[0]) + self.assertEqual(len(pg_dict["acls"]) - 1, info_args[1]) + self.assertEqual(len(pg_dict["acls"]), info_args[2]) + self.assertEqual(len(pg_dict["acls"]) - 1, + self._nb_ovn.db_set.call_count) + + @mock.patch.object(ovn_driver.LOG, 'info') + def test__set_acls_log(self, m_info): + pg_dict = self._fake_pg_dict(acls=['acl1', 'acl2', 'acl3', 'acl4']) + log_name = 'test_obj_name' + used_name = 'test_used_name' + + def _mock_lookup(_pg_table, acl_uuid): + if acl_uuid == 'acl3': + return self._fake_acl() + return self._fake_acl(name=used_name) + + self._nb_ovn.lookup.side_effect = _mock_lookup + actions_enabled = self._log_driver._acl_actions_enabled( + self._fake_log_obj(event=log_const.ALL_EVENT)) + self._log_driver._set_acls_log([pg_dict], self._nb_ovn.transaction, + actions_enabled, log_name) + info_args, _info_kwargs = m_info.call_args_list[0] + self.assertIn('Set %d (out of %d visited) ACLs for network log %s', + info_args[0]) + self.assertEqual(1, info_args[1]) + self.assertEqual(len(pg_dict["acls"]), info_args[2]) + self.assertEqual(log_name, info_args[3]) + self.assertEqual(1, self._nb_ovn.db_set.call_count) diff --git a/releasenotes/notes/add-sg-logging-ovn-83cc121a657a1d14.yaml b/releasenotes/notes/add-sg-logging-ovn-83cc121a657a1d14.yaml new file mode 100644 index 00000000000..6f0dd402e4b --- /dev/null +++ b/releasenotes/notes/add-sg-logging-ovn-83cc121a657a1d14.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Support for network logging based on security groups added to + OVN backend. + For more information see bug + `1914757 `_.