Merge "[log]: Add validator to logging api"
This commit is contained in:
commit
90098180fa
@ -18,3 +18,6 @@ DROP_EVENT = 'DROP'
|
|||||||
ALL_EVENT = 'ALL'
|
ALL_EVENT = 'ALL'
|
||||||
LOG_EVENTS = [ACCEPT_EVENT, DROP_EVENT, ALL_EVENT]
|
LOG_EVENTS = [ACCEPT_EVENT, DROP_EVENT, ALL_EVENT]
|
||||||
LOGGING_PLUGIN = 'logging-plugin'
|
LOGGING_PLUGIN = 'logging-plugin'
|
||||||
|
|
||||||
|
# supported logging types
|
||||||
|
SECURITY_GROUP = 'security_group'
|
||||||
|
@ -23,3 +23,22 @@ class LogResourceNotFound(n_exc.NotFound):
|
|||||||
|
|
||||||
class InvalidLogResourceType(n_exc.InvalidInput):
|
class InvalidLogResourceType(n_exc.InvalidInput):
|
||||||
message = _("Invalid log resource_type: %(resource_type)s.")
|
message = _("Invalid log resource_type: %(resource_type)s.")
|
||||||
|
|
||||||
|
|
||||||
|
class LoggingTypeNotSupported(n_exc.Conflict):
|
||||||
|
message = _("Logging type %(log_type)s is not supported on "
|
||||||
|
"port %(port_id)s.")
|
||||||
|
|
||||||
|
|
||||||
|
class TargetResourceNotFound(n_exc.NotFound):
|
||||||
|
message = _("Target resource %(target_id)s could not be found.")
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceNotFound(n_exc.NotFound):
|
||||||
|
message = _("Resource %(resource_id)s could not be found.")
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidResourceConstraint(n_exc.InvalidInput):
|
||||||
|
message = _("Invalid resource constraint between resource "
|
||||||
|
"(%(resource)s %(resource_id)s) and target resource "
|
||||||
|
"(%(target_resource)s %(target_id)s).")
|
||||||
|
137
neutron/services/logapi/common/validators.py
Normal file
137
neutron/services/logapi/common/validators.py
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
# Copyright (c) 2017 Fujitsu Limited
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from neutron_lib.api.definitions import portbindings
|
||||||
|
from neutron_lib.plugins import directory
|
||||||
|
from oslo_log import log as logging
|
||||||
|
from sqlalchemy.orm import exc as orm_exc
|
||||||
|
|
||||||
|
from neutron.db import _utils as db_utils
|
||||||
|
from neutron.db.models import securitygroup as sg_db
|
||||||
|
from neutron.objects import ports
|
||||||
|
from neutron.objects import securitygroup as sg_object
|
||||||
|
from neutron.plugins.common import constants
|
||||||
|
from neutron.services.logapi.common import constants as log_const
|
||||||
|
from neutron.services.logapi.common import exceptions as log_exc
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
SKIPPED_VIF_TYPES = [
|
||||||
|
portbindings.VIF_TYPE_UNBOUND,
|
||||||
|
portbindings.VIF_TYPE_BINDING_FAILED,
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _check_port_bound_sg(context, sg_id, port_id):
|
||||||
|
try:
|
||||||
|
db_utils.model_query(context, sg_db.SecurityGroupPortBinding)\
|
||||||
|
.filter_by(security_group_id=sg_id, port_id=port_id).one()
|
||||||
|
except orm_exc.NoResultFound:
|
||||||
|
raise log_exc.InvalidResourceConstraint(resource='security_group',
|
||||||
|
resource_id=sg_id,
|
||||||
|
target_resource='port',
|
||||||
|
target_id=port_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_secgroup_exists(context, sg_id):
|
||||||
|
number_of_matching = sg_object.SecurityGroup.count(context, id=sg_id)
|
||||||
|
if number_of_matching < 1:
|
||||||
|
raise log_exc.ResourceNotFound(resource_id=sg_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_port(context, port_id):
|
||||||
|
port = ports.Port.get_object(context, id=port_id)
|
||||||
|
if not port:
|
||||||
|
raise log_exc.TargetResourceNotFound(target_id=port_id)
|
||||||
|
return port
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_vnic_type(driver, vnic_type, port_id):
|
||||||
|
if driver.is_vnic_compatible(vnic_type):
|
||||||
|
return True
|
||||||
|
LOG.debug("vnic_type %(vnic_type)s of port %(port_id)s "
|
||||||
|
"is not compatible with logging driver %(driver)s",
|
||||||
|
{'vnic_type': vnic_type,
|
||||||
|
'port_id': port_id,
|
||||||
|
'driver': driver.name})
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_vif_type(driver, vif_type, port_id):
|
||||||
|
if driver.is_vif_type_compatible(vif_type):
|
||||||
|
return True
|
||||||
|
LOG.debug("vif_type %(vif_type)s of port %(port_id)s "
|
||||||
|
"is not compatible with logging driver %(driver)s",
|
||||||
|
{'vif_type': vif_type,
|
||||||
|
'port_id': port_id,
|
||||||
|
'driver': driver.name})
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def validate_log_type_for_port(log_type, port):
|
||||||
|
"""Validate a specific logging type on a specific port
|
||||||
|
|
||||||
|
This method checks whether or not existing a log_driver which supports for
|
||||||
|
the logging type on the port.
|
||||||
|
|
||||||
|
:param log_type: a logging type (e.g security_group)
|
||||||
|
:param port: a port object
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
log_plugin = directory.get_plugin(alias=constants.LOG_API)
|
||||||
|
drivers = log_plugin.driver_manager.drivers
|
||||||
|
for driver in drivers:
|
||||||
|
vif_type = port.binding.vif_type
|
||||||
|
if vif_type not in SKIPPED_VIF_TYPES:
|
||||||
|
if not _validate_vif_type(driver, vif_type, port['id']):
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
vnic_type = port.binding.vnic_type
|
||||||
|
if not _validate_vnic_type(driver, vnic_type, port['id']):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if driver.is_logging_type_supported(log_type):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def validate_request(context, log_data):
|
||||||
|
"""Validate a log request
|
||||||
|
|
||||||
|
This method validates log request is satisfied or not. A ResourceNotFound
|
||||||
|
will be raised if resource_id in log_data not exists or a
|
||||||
|
TargetResourceNotFound will be raised if target_id in log_data not exists.
|
||||||
|
This method will also raise a LoggingTypeNotSupported, if there is no
|
||||||
|
log_driver supporting for resource_type in log_data.
|
||||||
|
|
||||||
|
In addition, if log_data specify both resource_id and target_id. A
|
||||||
|
InvalidResourceConstraint will be raised if there is no constraint
|
||||||
|
between resource_id and target_id.
|
||||||
|
|
||||||
|
"""
|
||||||
|
resource_id = log_data.get('resource_id')
|
||||||
|
target_id = log_data.get('target_id')
|
||||||
|
resource_type = log_data.get('resource_type')
|
||||||
|
if resource_type == log_const.SECURITY_GROUP:
|
||||||
|
if resource_id:
|
||||||
|
_check_secgroup_exists(context, resource_id)
|
||||||
|
if target_id:
|
||||||
|
port = _get_port(context, target_id)
|
||||||
|
if not validate_log_type_for_port(resource_type, port):
|
||||||
|
raise log_exc.LoggingTypeNotSupported(log_type=resource_type,
|
||||||
|
port_id=target_id)
|
||||||
|
if resource_id and target_id:
|
||||||
|
_check_port_bound_sg(context, resource_id, target_id)
|
@ -19,6 +19,7 @@ from neutron.extensions import logging as log_ext
|
|||||||
from neutron.objects import base as base_obj
|
from neutron.objects import base as base_obj
|
||||||
from neutron.objects.logapi import logging_resource as log_object
|
from neutron.objects.logapi import logging_resource as log_object
|
||||||
from neutron.services.logapi.common import exceptions as log_exc
|
from neutron.services.logapi.common import exceptions as log_exc
|
||||||
|
from neutron.services.logapi.common import validators
|
||||||
from neutron.services.logapi.drivers import manager as driver_mgr
|
from neutron.services.logapi.drivers import manager as driver_mgr
|
||||||
|
|
||||||
|
|
||||||
@ -68,6 +69,7 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
|
|||||||
if resource_type not in self.supported_logging_types:
|
if resource_type not in self.supported_logging_types:
|
||||||
raise log_exc.InvalidLogResourceType(
|
raise log_exc.InvalidLogResourceType(
|
||||||
resource_type=resource_type)
|
resource_type=resource_type)
|
||||||
|
validators.validate_request(context, log_data)
|
||||||
with db_api.context_manager.writer.using(context):
|
with db_api.context_manager.writer.using(context):
|
||||||
# body 'log' contains both tenant_id and project_id
|
# body 'log' contains both tenant_id and project_id
|
||||||
# but only latter needs to be used to create Log object.
|
# but only latter needs to be used to create Log object.
|
||||||
|
165
neutron/tests/unit/services/logapi/common/test_validators.py
Normal file
165
neutron/tests/unit/services/logapi/common/test_validators.py
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
# Copyright (c) 2017 Fujitsu Limited
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from neutron_lib.api.definitions import portbindings
|
||||||
|
from neutron_lib import context
|
||||||
|
from neutron_lib.plugins import directory
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
from sqlalchemy.orm import exc as orm_exc
|
||||||
|
|
||||||
|
from neutron.objects import ports
|
||||||
|
from neutron.objects import securitygroup as sg_object
|
||||||
|
from neutron.plugins.common import constants
|
||||||
|
from neutron.services.logapi.common import exceptions as log_exc
|
||||||
|
from neutron.services.logapi.common import validators
|
||||||
|
from neutron.tests import base
|
||||||
|
from neutron.tests.unit.services.logapi.drivers import (
|
||||||
|
test_manager as drv_mgr)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRequestValidations(base.BaseTestCase):
|
||||||
|
"""Test validation for a request"""
|
||||||
|
|
||||||
|
def test_validate_request_resource_sg_not_exists(self):
|
||||||
|
log_data = {'resource_type': 'security_group',
|
||||||
|
'resource_id': 'fake_sg_id'}
|
||||||
|
with mock.patch.object(sg_object.SecurityGroup, 'count',
|
||||||
|
return_value=0):
|
||||||
|
self.assertRaises(log_exc.ResourceNotFound,
|
||||||
|
validators.validate_request,
|
||||||
|
mock.ANY,
|
||||||
|
log_data)
|
||||||
|
|
||||||
|
def test_validate_request_target_resource_port_not_exists(self):
|
||||||
|
log_data = {'resource_type': 'security_group',
|
||||||
|
'target_id': 'fake_port_id'}
|
||||||
|
with mock.patch.object(ports.Port, 'get_object', return_value=None):
|
||||||
|
self.assertRaises(log_exc.TargetResourceNotFound,
|
||||||
|
validators.validate_request,
|
||||||
|
mock.ANY,
|
||||||
|
log_data)
|
||||||
|
|
||||||
|
def test_validate_request_log_type_not_supported_on_port(self):
|
||||||
|
log_data = {'resource_type': 'security_group',
|
||||||
|
'target_id': 'fake_port_id'}
|
||||||
|
with mock.patch.object(ports.Port, 'get_object',
|
||||||
|
return_value=mock.ANY):
|
||||||
|
with mock.patch.object(validators, 'validate_log_type_for_port',
|
||||||
|
return_value=False):
|
||||||
|
self.assertRaises(log_exc.LoggingTypeNotSupported,
|
||||||
|
validators.validate_request,
|
||||||
|
mock.ANY,
|
||||||
|
log_data)
|
||||||
|
|
||||||
|
def test_validate_request_invalid_resource_constraint(self):
|
||||||
|
log_data = {'resource_type': 'security_group',
|
||||||
|
'resource_id': 'fake_sg_id',
|
||||||
|
'target_id': 'fake_port_id'}
|
||||||
|
|
||||||
|
class FakeFiltered(object):
|
||||||
|
def one(self):
|
||||||
|
raise orm_exc.NoResultFound
|
||||||
|
|
||||||
|
class FakeSGPortBinding(object):
|
||||||
|
def filter_by(self, security_group_id, port_id):
|
||||||
|
return FakeFiltered()
|
||||||
|
|
||||||
|
with mock.patch.object(
|
||||||
|
sg_object.SecurityGroup, 'count', return_value=1):
|
||||||
|
with mock.patch.object(
|
||||||
|
ports.Port, 'get_object', return_value=mock.ANY):
|
||||||
|
with mock.patch.object(validators,
|
||||||
|
'validate_log_type_for_port',
|
||||||
|
return_value=True):
|
||||||
|
with mock.patch('neutron.db._utils.model_query',
|
||||||
|
return_value=FakeSGPortBinding()):
|
||||||
|
self.assertRaises(
|
||||||
|
log_exc.InvalidResourceConstraint,
|
||||||
|
validators.validate_request,
|
||||||
|
mock.ANY,
|
||||||
|
log_data)
|
||||||
|
|
||||||
|
|
||||||
|
class TestLogDriversLoggingTypeValidations(drv_mgr.TestLogDriversManagerBase):
|
||||||
|
"""Test validation of logging type for a port"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestLogDriversLoggingTypeValidations, self).setUp()
|
||||||
|
self.ctxt = context.Context('fake_user', 'fake_tenant')
|
||||||
|
|
||||||
|
def _get_port(self, vif_type, vnic_type):
|
||||||
|
port_id = uuidutils.generate_uuid()
|
||||||
|
port_binding = ports.PortBinding(
|
||||||
|
self.ctxt, port_id=port_id, vif_type=vif_type, vnic_type=vnic_type)
|
||||||
|
return ports.Port(
|
||||||
|
self.ctxt, id=uuidutils.generate_uuid(), binding=port_binding)
|
||||||
|
|
||||||
|
def _test_validate_log_type_for_port(self, port, expected_result):
|
||||||
|
driver_manager = self._create_manager_with_drivers({
|
||||||
|
'driver-A': {
|
||||||
|
'is_loaded': True,
|
||||||
|
'supported_logging_types': ['security_group'],
|
||||||
|
'vif_types': [portbindings.VIF_TYPE_OVS],
|
||||||
|
'vnic_types': [portbindings.VNIC_NORMAL]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
is_log_type_supported_mock = mock.Mock()
|
||||||
|
if expected_result:
|
||||||
|
is_log_type_supported_mock.return_value = expected_result
|
||||||
|
log_driver = list(driver_manager.drivers)[0]
|
||||||
|
log_driver.is_logging_type_supported = (
|
||||||
|
is_log_type_supported_mock
|
||||||
|
)
|
||||||
|
|
||||||
|
class FakeLoggingPlugin(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.driver_manager = driver_manager
|
||||||
|
|
||||||
|
directory.add_plugin(constants.LOG_API, FakeLoggingPlugin())
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
expected_result,
|
||||||
|
validators.validate_log_type_for_port('security_group', port))
|
||||||
|
if expected_result:
|
||||||
|
is_log_type_supported_mock.assert_called_once_with(
|
||||||
|
'security_group')
|
||||||
|
else:
|
||||||
|
is_log_type_supported_mock.assert_not_called()
|
||||||
|
|
||||||
|
def test_validate_log_type_for_port_vif_type_supported(self):
|
||||||
|
port = self._get_port(
|
||||||
|
portbindings.VIF_TYPE_OVS, portbindings.VNIC_NORMAL)
|
||||||
|
self._test_validate_log_type_for_port(
|
||||||
|
port, expected_result=True)
|
||||||
|
|
||||||
|
def test_validate_log_type_for_port_vif_type_not_supported(self):
|
||||||
|
port = self._get_port(
|
||||||
|
portbindings.VIF_TYPE_OTHER, portbindings.VNIC_NORMAL)
|
||||||
|
self._test_validate_log_type_for_port(
|
||||||
|
port, expected_result=False)
|
||||||
|
|
||||||
|
def test_validate_log_type_for_port_unbound_vnic_type_supported(self):
|
||||||
|
port = self._get_port(
|
||||||
|
portbindings.VIF_TYPE_UNBOUND, portbindings.VNIC_NORMAL)
|
||||||
|
self._test_validate_log_type_for_port(
|
||||||
|
port, expected_result=True)
|
||||||
|
|
||||||
|
def test_validate_log_type_for_port_unbound_vnic_type_not_supported(self):
|
||||||
|
port = self._get_port(
|
||||||
|
portbindings.VIF_TYPE_UNBOUND, portbindings.VNIC_BAREMETAL)
|
||||||
|
self._test_validate_log_type_for_port(
|
||||||
|
port, expected_result=False)
|
@ -94,20 +94,22 @@ class TestLoggingPlugin(base.BaseLogTestCase):
|
|||||||
'enabled': True,
|
'enabled': True,
|
||||||
'resource_id': uuidutils.generate_uuid(),
|
'resource_id': uuidutils.generate_uuid(),
|
||||||
'target_id': uuidutils.generate_uuid()}}
|
'target_id': uuidutils.generate_uuid()}}
|
||||||
sg = mock.Mock()
|
|
||||||
port = mock.Mock()
|
port = mock.Mock()
|
||||||
new_log = mock.Mock()
|
new_log = mock.Mock()
|
||||||
with mock.patch.object(sg_object.SecurityGroup, 'get_object',
|
with mock.patch.object(sg_object.SecurityGroup, 'count',
|
||||||
return_value=sg):
|
return_value=1):
|
||||||
with mock.patch.object(ports.Port, 'get_object',
|
with mock.patch.object(ports.Port, 'get_object',
|
||||||
return_value=port):
|
return_value=port):
|
||||||
with mock.patch('neutron.objects.logapi.'
|
with mock.patch('neutron.services.logapi.common.'
|
||||||
'logging_resource.Log',
|
'validators.validate_log_type_for_port',
|
||||||
return_value=new_log) as init_log_mock:
|
return_value=True):
|
||||||
self.log_plugin.create_log(self.ctxt, log)
|
with mock.patch('neutron.objects.logapi.'
|
||||||
init_log_mock.assert_called_once_with(context=self.ctxt,
|
'logging_resource.Log',
|
||||||
**log['log'])
|
return_value=new_log) as init_log_mock:
|
||||||
self.assertTrue(new_log.create.called)
|
self.log_plugin.create_log(self.ctxt, log)
|
||||||
|
init_log_mock.assert_called_once_with(
|
||||||
|
context=self.ctxt, **log['log'])
|
||||||
|
self.assertTrue(new_log.create.called)
|
||||||
|
|
||||||
def test_create_log_without_sg_resource(self):
|
def test_create_log_without_sg_resource(self):
|
||||||
log = {'log': {'resource_type': 'security_group',
|
log = {'log': {'resource_type': 'security_group',
|
||||||
@ -117,12 +119,15 @@ class TestLoggingPlugin(base.BaseLogTestCase):
|
|||||||
new_log.enabled = True
|
new_log.enabled = True
|
||||||
port = mock.Mock()
|
port = mock.Mock()
|
||||||
with mock.patch.object(ports.Port, 'get_object', return_value=port):
|
with mock.patch.object(ports.Port, 'get_object', return_value=port):
|
||||||
with mock.patch('neutron.objects.logapi.logging_resource.Log',
|
with mock.patch('neutron.services.logapi.common.'
|
||||||
return_value=new_log) as init_log_mock:
|
'validators.validate_log_type_for_port',
|
||||||
self.log_plugin.create_log(self.ctxt, log)
|
return_value=True):
|
||||||
init_log_mock.assert_called_once_with(
|
with mock.patch('neutron.objects.logapi.logging_resource.Log',
|
||||||
context=self.ctxt, **log['log'])
|
return_value=new_log) as init_log_mock:
|
||||||
self.assertTrue(new_log.create.called)
|
self.log_plugin.create_log(self.ctxt, log)
|
||||||
|
init_log_mock.assert_called_once_with(
|
||||||
|
context=self.ctxt, **log['log'])
|
||||||
|
self.assertTrue(new_log.create.called)
|
||||||
|
|
||||||
def test_create_log_without_parent_resource(self):
|
def test_create_log_without_parent_resource(self):
|
||||||
log = {'log': {'resource_type': 'security_group',
|
log = {'log': {'resource_type': 'security_group',
|
||||||
@ -130,9 +135,8 @@ class TestLoggingPlugin(base.BaseLogTestCase):
|
|||||||
'resource_id': uuidutils.generate_uuid()}}
|
'resource_id': uuidutils.generate_uuid()}}
|
||||||
new_log = mock.Mock()
|
new_log = mock.Mock()
|
||||||
new_log.enabled = True
|
new_log.enabled = True
|
||||||
sg = mock.Mock()
|
with mock.patch.object(sg_object.SecurityGroup, 'count',
|
||||||
with mock.patch.object(sg_object.SecurityGroup, 'get_object',
|
return_value=1):
|
||||||
return_value=sg):
|
|
||||||
with mock.patch('neutron.objects.logapi.logging_resource.Log',
|
with mock.patch('neutron.objects.logapi.logging_resource.Log',
|
||||||
return_value=new_log) as init_log_mock:
|
return_value=new_log) as init_log_mock:
|
||||||
self.log_plugin.create_log(self.ctxt, log)
|
self.log_plugin.create_log(self.ctxt, log)
|
||||||
@ -153,6 +157,49 @@ class TestLoggingPlugin(base.BaseLogTestCase):
|
|||||||
**log['log'])
|
**log['log'])
|
||||||
self.assertTrue(new_log.create.called)
|
self.assertTrue(new_log.create.called)
|
||||||
|
|
||||||
|
def test_create_log_nonexistent_sg_resource(self):
|
||||||
|
log = {'log': {'resource_type': 'security_group',
|
||||||
|
'enabled': True,
|
||||||
|
'resource_id': uuidutils.generate_uuid()}}
|
||||||
|
with mock.patch.object(sg_object.SecurityGroup, 'count',
|
||||||
|
return_value=0):
|
||||||
|
self.assertRaises(
|
||||||
|
log_exc.ResourceNotFound,
|
||||||
|
self.log_plugin.create_log,
|
||||||
|
self.ctxt,
|
||||||
|
log)
|
||||||
|
|
||||||
|
def test_create_log_nonexistent_target(self):
|
||||||
|
log = {'log': {'resource_type': 'security_group',
|
||||||
|
'enabled': True,
|
||||||
|
'target_id': uuidutils.generate_uuid()}}
|
||||||
|
with mock.patch.object(ports.Port, 'get_object',
|
||||||
|
return_value=None):
|
||||||
|
self.assertRaises(
|
||||||
|
log_exc.TargetResourceNotFound,
|
||||||
|
self.log_plugin.create_log,
|
||||||
|
self.ctxt,
|
||||||
|
log)
|
||||||
|
|
||||||
|
def test_create_log_not_bound_port(self):
|
||||||
|
log = {'log': {'resource_type': 'security_group',
|
||||||
|
'enabled': True,
|
||||||
|
'resource_id': uuidutils.generate_uuid(),
|
||||||
|
'target_id': uuidutils.generate_uuid()}}
|
||||||
|
port = mock.Mock()
|
||||||
|
with mock.patch.object(sg_object.SecurityGroup, 'count',
|
||||||
|
return_value=1):
|
||||||
|
with mock.patch.object(ports.Port, 'get_object',
|
||||||
|
return_value=port):
|
||||||
|
with mock.patch('neutron.services.logapi.common.'
|
||||||
|
'validators.validate_log_type_for_port',
|
||||||
|
return_value=True):
|
||||||
|
self.assertRaises(
|
||||||
|
log_exc.InvalidResourceConstraint,
|
||||||
|
self.log_plugin.create_log,
|
||||||
|
self.ctxt,
|
||||||
|
log)
|
||||||
|
|
||||||
def test_create_log_disabled(self):
|
def test_create_log_disabled(self):
|
||||||
log_data = {'log': {'resource_type': 'security_group',
|
log_data = {'log': {'resource_type': 'security_group',
|
||||||
'enabled': False}}
|
'enabled': False}}
|
||||||
@ -175,6 +222,24 @@ class TestLoggingPlugin(base.BaseLogTestCase):
|
|||||||
self.ctxt,
|
self.ctxt,
|
||||||
log)
|
log)
|
||||||
|
|
||||||
|
def test_create_log_with_unsupported_logging_type_on_port(self):
|
||||||
|
log = {'log': {'resource_type': 'security_group',
|
||||||
|
'enabled': True,
|
||||||
|
'target_id': uuidutils.generate_uuid()}}
|
||||||
|
|
||||||
|
port = mock.Mock()
|
||||||
|
port.id = log['log']['target_id']
|
||||||
|
with mock.patch.object(ports.Port, 'get_object',
|
||||||
|
return_value=port):
|
||||||
|
with mock.patch('neutron.services.logapi.common.'
|
||||||
|
'validators.validate_log_type_for_port',
|
||||||
|
return_value=False):
|
||||||
|
self.assertRaises(
|
||||||
|
log_exc.LoggingTypeNotSupported,
|
||||||
|
self.log_plugin.create_log,
|
||||||
|
self.ctxt,
|
||||||
|
log)
|
||||||
|
|
||||||
def test_update_log(self):
|
def test_update_log(self):
|
||||||
log_data = {'log': {'enabled': True}}
|
log_data = {'log': {'enabled': True}}
|
||||||
new_log = mock.Mock()
|
new_log = mock.Mock()
|
||||||
|
Loading…
Reference in New Issue
Block a user