Merge "Adding resources callback handler"
This commit is contained in:
commit
f4422ee121
34
neutron/services/logapi/common/sg_callback.py
Normal file
34
neutron/services/logapi/common/sg_callback.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
# Copyright (c) 2018 Fujitsu Limited
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from neutron.services.logapi.common import constants as log_const
|
||||||
|
from neutron.services.logapi.common import db_api
|
||||||
|
from neutron.services.logapi.drivers import manager
|
||||||
|
|
||||||
|
|
||||||
|
class SecurityGroupRuleCallBack(manager.ResourceCallBackBase):
|
||||||
|
|
||||||
|
def handle_event(self, resource, event, trigger, **kwargs):
|
||||||
|
context = kwargs.get("context")
|
||||||
|
sg_rule = kwargs.get('security_group_rule')
|
||||||
|
if sg_rule:
|
||||||
|
sg_id = sg_rule.get('security_group_id')
|
||||||
|
else:
|
||||||
|
sg_id = kwargs.get('security_group_id')
|
||||||
|
|
||||||
|
log_resources = db_api.get_logs_bound_sg(context, sg_id)
|
||||||
|
if log_resources:
|
||||||
|
self.resource_push_api(
|
||||||
|
log_const.RESOURCE_UPDATE, context, log_resources)
|
@ -15,17 +15,23 @@
|
|||||||
|
|
||||||
from neutron_lib.callbacks import events
|
from neutron_lib.callbacks import events
|
||||||
from neutron_lib.callbacks import registry
|
from neutron_lib.callbacks import registry
|
||||||
from neutron_lib.callbacks import resources
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.services.logapi.common import constants as log_const
|
from neutron.services.logapi.common import constants as log_const
|
||||||
from neutron.services.logapi.common import db_api
|
|
||||||
from neutron.services.logapi.common import exceptions as log_exc
|
from neutron.services.logapi.common import exceptions as log_exc
|
||||||
from neutron.services.logapi.rpc import server as server_rpc
|
from neutron.services.logapi.rpc import server as server_rpc
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
RESOURCE_CB_CLASS_MAP = {}
|
||||||
|
|
||||||
|
|
||||||
|
# This function should be called by log_driver
|
||||||
|
def register(resource_type, obj_class):
|
||||||
|
if resource_type not in RESOURCE_CB_CLASS_MAP:
|
||||||
|
RESOURCE_CB_CLASS_MAP[resource_type] = obj_class
|
||||||
|
|
||||||
|
|
||||||
def _get_param(args, kwargs, name, index):
|
def _get_param(args, kwargs, name, index):
|
||||||
try:
|
try:
|
||||||
@ -38,7 +44,24 @@ def _get_param(args, kwargs, name, index):
|
|||||||
raise log_exc.LogapiDriverException(exception_msg=msg)
|
raise log_exc.LogapiDriverException(exception_msg=msg)
|
||||||
|
|
||||||
|
|
||||||
@registry.has_registry_receivers
|
class ResourceCallBackBase(object):
|
||||||
|
|
||||||
|
def __new__(cls, *args, **kwargs):
|
||||||
|
if not hasattr(cls, '_instance'):
|
||||||
|
cls._instance = super(ResourceCallBackBase, cls).__new__(cls)
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
def __init__(self, resource, push_api):
|
||||||
|
self.resource_push_api = push_api
|
||||||
|
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
|
||||||
|
events.AFTER_DELETE):
|
||||||
|
registry.subscribe(self.handle_event, resource, event)
|
||||||
|
|
||||||
|
def handle_event(self, resource, event, trigger, **kwargs):
|
||||||
|
"""Handle resource callback event"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class LoggingServiceDriverManager(object):
|
class LoggingServiceDriverManager(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -62,6 +85,12 @@ class LoggingServiceDriverManager(object):
|
|||||||
self._drivers.add(driver)
|
self._drivers.add(driver)
|
||||||
self.rpc_required |= driver.requires_rpc
|
self.rpc_required |= driver.requires_rpc
|
||||||
|
|
||||||
|
# Handle callback event AFTER_UPDATE, AFTER_DELETE, AFTER_CREATE of
|
||||||
|
# resources which related to log object. For example: when a sg_rule
|
||||||
|
# is added or deleted from security group, if this rule is bounded by a
|
||||||
|
# log_resources, then it should tell to agent to trigger log_drivers.
|
||||||
|
self._setup_resources_cb_handle()
|
||||||
|
|
||||||
def _start_rpc_listeners(self):
|
def _start_rpc_listeners(self):
|
||||||
self._skeleton = server_rpc.LoggingApiSkeleton()
|
self._skeleton = server_rpc.LoggingApiSkeleton()
|
||||||
return self._skeleton.conn.consume_in_threads()
|
return self._skeleton.conn.consume_in_threads()
|
||||||
@ -107,23 +136,6 @@ class LoggingServiceDriverManager(object):
|
|||||||
return
|
return
|
||||||
rpc_method(context, log_obj)
|
rpc_method(context, log_obj)
|
||||||
|
|
||||||
@registry.receives(resources.SECURITY_GROUP_RULE,
|
def _setup_resources_cb_handle(self):
|
||||||
[events.AFTER_CREATE, events.AFTER_DELETE])
|
for res, obj_class in RESOURCE_CB_CLASS_MAP.items():
|
||||||
def _handle_sg_rule_callback(self, resource, event, trigger, **kwargs):
|
obj_class(res, self.call)
|
||||||
"""Handle sg_rule create/delete events
|
|
||||||
|
|
||||||
This method handles sg_rule events, if sg_rule bound by log_resources,
|
|
||||||
it should tell to agent to update log_drivers.
|
|
||||||
|
|
||||||
"""
|
|
||||||
context = kwargs['context']
|
|
||||||
sg_rules = kwargs.get('security_group_rule')
|
|
||||||
if sg_rules:
|
|
||||||
sg_id = sg_rules.get('security_group_id')
|
|
||||||
else:
|
|
||||||
sg_id = kwargs.get('security_group_id')
|
|
||||||
|
|
||||||
log_resources = db_api.get_logs_bound_sg(context, sg_id)
|
|
||||||
if log_resources:
|
|
||||||
self.call(
|
|
||||||
log_const.RESOURCE_UPDATE, context, log_resources)
|
|
||||||
|
@ -19,7 +19,9 @@ from oslo_log import log as logging
|
|||||||
from oslo_utils import importutils
|
from oslo_utils import importutils
|
||||||
|
|
||||||
from neutron.services.logapi.common import constants as log_const
|
from neutron.services.logapi.common import constants as log_const
|
||||||
|
from neutron.services.logapi.common import sg_callback
|
||||||
from neutron.services.logapi.drivers import base
|
from neutron.services.logapi.drivers import base
|
||||||
|
from neutron.services.logapi.drivers import manager
|
||||||
from neutron.services.logapi.rpc import server as server_rpc
|
from neutron.services.logapi.rpc import server as server_rpc
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -60,4 +62,8 @@ def register():
|
|||||||
importutils.import_module(
|
importutils.import_module(
|
||||||
'neutron.services.logapi.common.sg_validate'
|
'neutron.services.logapi.common.sg_validate'
|
||||||
)
|
)
|
||||||
|
# Register resource callback handler
|
||||||
|
manager.register(
|
||||||
|
resources.SECURITY_GROUP_RULE, sg_callback.SecurityGroupRuleCallBack)
|
||||||
|
|
||||||
LOG.debug('Open vSwitch logging driver registered')
|
LOG.debug('Open vSwitch logging driver registered')
|
||||||
|
@ -0,0 +1,67 @@
|
|||||||
|
# Copyright (c) 2018 Fujitsu Limited
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from neutron_lib.callbacks import events
|
||||||
|
from neutron_lib.callbacks import registry
|
||||||
|
from neutron_lib.callbacks import resources
|
||||||
|
|
||||||
|
from neutron.services.logapi.common import sg_callback
|
||||||
|
from neutron.services.logapi.drivers import base as log_driver_base
|
||||||
|
from neutron.services.logapi.drivers import manager as driver_mgr
|
||||||
|
from neutron.tests import base
|
||||||
|
|
||||||
|
FAKE_DRIVER = None
|
||||||
|
|
||||||
|
|
||||||
|
class FakeDriver(log_driver_base.DriverBase):
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create():
|
||||||
|
return FakeDriver(
|
||||||
|
name='fake_driver',
|
||||||
|
vif_types=[],
|
||||||
|
vnic_types=[],
|
||||||
|
supported_logging_types=['security_group'],
|
||||||
|
requires_rpc=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def fake_register():
|
||||||
|
global FAKE_DRIVER
|
||||||
|
if not FAKE_DRIVER:
|
||||||
|
FAKE_DRIVER = FakeDriver.create()
|
||||||
|
driver_mgr.register(resources.SECURITY_GROUP_RULE,
|
||||||
|
sg_callback.SecurityGroupRuleCallBack)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityGroupRuleCallback(base.BaseTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestSecurityGroupRuleCallback, self).setUp()
|
||||||
|
self.driver_manager = driver_mgr.LoggingServiceDriverManager()
|
||||||
|
|
||||||
|
@mock.patch.object(sg_callback.SecurityGroupRuleCallBack, 'handle_event')
|
||||||
|
def test_handle_event(self, mock_sg_cb):
|
||||||
|
fake_register()
|
||||||
|
self.driver_manager.register_driver(FAKE_DRIVER)
|
||||||
|
|
||||||
|
registry.notify(
|
||||||
|
resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY)
|
||||||
|
mock_sg_cb.assert_called_once_with(
|
||||||
|
resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, mock.ANY)
|
||||||
|
mock_sg_cb.reset_mock()
|
||||||
|
registry.notify('fake_resource', events.AFTER_DELETE, mock.ANY)
|
||||||
|
mock_sg_cb.assert_not_called()
|
@ -14,12 +14,15 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from neutron_lib.callbacks import events
|
||||||
|
from neutron_lib import fixture
|
||||||
|
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.services.logapi.common import constants as log_const
|
from neutron.services.logapi.common import constants as log_const
|
||||||
from neutron.services.logapi.common import exceptions as log_exc
|
from neutron.services.logapi.common import exceptions as log_exc
|
||||||
from neutron.services.logapi.drivers import base as log_driver_base
|
from neutron.services.logapi.drivers import base as log_driver_base
|
||||||
from neutron.services.logapi.drivers import manager as driver_mgr
|
from neutron.services.logapi.drivers import manager as driver_mgr
|
||||||
|
from neutron.tests import tools
|
||||||
from neutron.tests.unit.services.logapi import base
|
from neutron.tests.unit.services.logapi import base
|
||||||
|
|
||||||
|
|
||||||
@ -51,7 +54,6 @@ class TestLogDriversManagerBase(base.BaseLogTestCase):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _create_manager_with_drivers(drivers_details):
|
def _create_manager_with_drivers(drivers_details):
|
||||||
for name, driver_details in drivers_details.items():
|
for name, driver_details in drivers_details.items():
|
||||||
|
|
||||||
class LogDriver(log_driver_base.DriverBase):
|
class LogDriver(log_driver_base.DriverBase):
|
||||||
@property
|
@property
|
||||||
def is_loaded(self):
|
def is_loaded(self):
|
||||||
@ -126,3 +128,61 @@ class TestLogDriversCalls(TestLogDriversManagerBase):
|
|||||||
log_obj = mock.sentinel.log_obj
|
log_obj = mock.sentinel.log_obj
|
||||||
self.assertRaises(exceptions.DriverCallError, self.driver_manager.call,
|
self.assertRaises(exceptions.DriverCallError, self.driver_manager.call,
|
||||||
'wrong_method', context=context, log_objs=[log_obj])
|
'wrong_method', context=context, log_objs=[log_obj])
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandleResourceCallback(TestLogDriversManagerBase):
|
||||||
|
"""Test handle resource callback"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestHandleResourceCallback, self).setUp()
|
||||||
|
self._cb_mgr = mock.Mock()
|
||||||
|
self.useFixture(fixture.CallbackRegistryFixture(
|
||||||
|
callback_manager=self._cb_mgr))
|
||||||
|
self.driver_manager = driver_mgr.LoggingServiceDriverManager()
|
||||||
|
|
||||||
|
def test_subscribe_resources_cb(self):
|
||||||
|
|
||||||
|
class FakeResourceCB1(driver_mgr.ResourceCallBackBase):
|
||||||
|
def handle_event(self, resource, event, trigger, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class FakeResourceCB2(driver_mgr.ResourceCallBackBase):
|
||||||
|
def handle_event(self, resource, event, trigger, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
driver_mgr.RESOURCE_CB_CLASS_MAP = {'fake_resource1': FakeResourceCB1,
|
||||||
|
'fake_resource2': FakeResourceCB2}
|
||||||
|
|
||||||
|
self.driver_manager._setup_resources_cb_handle()
|
||||||
|
|
||||||
|
fake_resource_cb1 = FakeResourceCB1(
|
||||||
|
'fake_resource1', self.driver_manager.call)
|
||||||
|
fake_resource_cb2 = FakeResourceCB2(
|
||||||
|
'fake_resource2', self.driver_manager.call)
|
||||||
|
assert_calls = [
|
||||||
|
mock.call(
|
||||||
|
*tools.get_subscribe_args(
|
||||||
|
fake_resource_cb1.handle_event,
|
||||||
|
'fake_resource1', events.AFTER_CREATE)),
|
||||||
|
mock.call(
|
||||||
|
*tools.get_subscribe_args(
|
||||||
|
fake_resource_cb1.handle_event,
|
||||||
|
'fake_resource1', events.AFTER_UPDATE)),
|
||||||
|
mock.call(
|
||||||
|
*tools.get_subscribe_args(
|
||||||
|
fake_resource_cb1.handle_event,
|
||||||
|
'fake_resource1', events.AFTER_DELETE)),
|
||||||
|
mock.call(
|
||||||
|
*tools.get_subscribe_args(
|
||||||
|
fake_resource_cb2.handle_event,
|
||||||
|
'fake_resource2', events.AFTER_CREATE)),
|
||||||
|
mock.call(
|
||||||
|
*tools.get_subscribe_args(
|
||||||
|
fake_resource_cb2.handle_event,
|
||||||
|
'fake_resource2', events.AFTER_UPDATE)),
|
||||||
|
mock.call(
|
||||||
|
*tools.get_subscribe_args(
|
||||||
|
fake_resource_cb2.handle_event,
|
||||||
|
'fake_resource2', events.AFTER_DELETE)),
|
||||||
|
]
|
||||||
|
self._cb_mgr.subscribe.assert_has_calls(assert_calls)
|
||||||
|
Loading…
Reference in New Issue
Block a user