Merge "Deal with TODO related to Security Groups RPC API's classes"
This commit is contained in:
commit
f47ed81063
@ -29,7 +29,7 @@ running on the compute nodes, and modifying the IPTables rules on each hyperviso
|
||||
|
||||
* `Plugin RPC classes <https://github.com/openstack/neutron/blob/master/neutron/db/securitygroups_rpc_base.py>`_
|
||||
|
||||
* `SecurityGroupServerRpcCallbackMixin <https://github.com/openstack/neutron/blob/master/neutron/db/securitygroups_rpc_base.py#L126>`_ - defines the RPC API that the plugin uses to communicate with the agents running on the compute nodes
|
||||
* `SecurityGroupServerRpcMixin <https://github.com/openstack/neutron/blob/master/neutron/db/securitygroups_rpc_base.py#39>`_ - defines the RPC API that the plugin uses to communicate with the agents running on the compute nodes
|
||||
* SecurityGroupServerRpcMixin - Defines the API methods used to fetch data from the database, in order to return responses to agents via the RPC API
|
||||
|
||||
* `Agent RPC classes <https://github.com/openstack/neutron/blob/master/neutron/agent/securitygroups_rpc.py>`_
|
||||
|
@ -22,15 +22,11 @@ import oslo_messaging
|
||||
from oslo_utils import importutils
|
||||
|
||||
from neutron.agent import firewall
|
||||
from neutron.common import constants
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.api.rpc.handlers import securitygroups_rpc
|
||||
from neutron.i18n import _LI, _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
# history
|
||||
# 1.1 Support Security Group RPC
|
||||
SG_RPC_VERSION = "1.1"
|
||||
|
||||
|
||||
security_group_opts = [
|
||||
cfg.StrOpt(
|
||||
@ -85,81 +81,6 @@ def disable_security_group_extension_by_config(aliases):
|
||||
_disable_extension('allowed-address-pairs', aliases)
|
||||
|
||||
|
||||
class SecurityGroupServerRpcApi(object):
|
||||
"""RPC client for security group methods in the plugin.
|
||||
|
||||
This class implements the client side of an rpc interface. This interface
|
||||
is used by agents to call security group related methods implemented on the
|
||||
plugin side. The other side of this interface can be found in
|
||||
neutron.api.rpc.handlers.SecurityGroupServerRpcCallback. For more
|
||||
information about changing rpc interfaces, see
|
||||
doc/source/devref/rpc_api.rst.
|
||||
"""
|
||||
def __init__(self, topic):
|
||||
target = oslo_messaging.Target(
|
||||
topic=topic, version='1.0',
|
||||
namespace=constants.RPC_NAMESPACE_SECGROUP)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def security_group_rules_for_devices(self, context, devices):
|
||||
LOG.debug("Get security group rules "
|
||||
"for devices via rpc %r", devices)
|
||||
cctxt = self.client.prepare(version='1.1')
|
||||
return cctxt.call(context, 'security_group_rules_for_devices',
|
||||
devices=devices)
|
||||
|
||||
def security_group_info_for_devices(self, context, devices):
|
||||
LOG.debug("Get security group information for devices via rpc %r",
|
||||
devices)
|
||||
cctxt = self.client.prepare(version='1.2')
|
||||
return cctxt.call(context, 'security_group_info_for_devices',
|
||||
devices=devices)
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcCallbackMixin(object):
|
||||
"""A mix-in that enable SecurityGroup agent
|
||||
support in agent implementations.
|
||||
"""
|
||||
#mix-in object should be have sg_agent
|
||||
sg_agent = None
|
||||
|
||||
def _security_groups_agent_not_set(self):
|
||||
LOG.warning(_LW("Security group agent binding currently not set. "
|
||||
"This should be set by the end of the init "
|
||||
"process."))
|
||||
|
||||
def security_groups_rule_updated(self, context, **kwargs):
|
||||
"""Callback for security group rule update.
|
||||
|
||||
:param security_groups: list of updated security_groups
|
||||
"""
|
||||
security_groups = kwargs.get('security_groups', [])
|
||||
LOG.debug("Security group rule updated on remote: %s",
|
||||
security_groups)
|
||||
if not self.sg_agent:
|
||||
return self._security_groups_agent_not_set()
|
||||
self.sg_agent.security_groups_rule_updated(security_groups)
|
||||
|
||||
def security_groups_member_updated(self, context, **kwargs):
|
||||
"""Callback for security group member update.
|
||||
|
||||
:param security_groups: list of updated security_groups
|
||||
"""
|
||||
security_groups = kwargs.get('security_groups', [])
|
||||
LOG.debug("Security group member updated on remote: %s",
|
||||
security_groups)
|
||||
if not self.sg_agent:
|
||||
return self._security_groups_agent_not_set()
|
||||
self.sg_agent.security_groups_member_updated(security_groups)
|
||||
|
||||
def security_groups_provider_updated(self, context, **kwargs):
|
||||
"""Callback for security group provider update."""
|
||||
LOG.debug("Provider rule updated")
|
||||
if not self.sg_agent:
|
||||
return self._security_groups_agent_not_set()
|
||||
self.sg_agent.security_groups_provider_updated()
|
||||
|
||||
|
||||
class SecurityGroupAgentRpc(object):
|
||||
"""Enables SecurityGroup agent support in agent implementations."""
|
||||
|
||||
@ -375,36 +296,16 @@ class SecurityGroupAgentRpc(object):
|
||||
self.refresh_firewall(updated_devices)
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcApiMixin(object):
|
||||
|
||||
def _get_security_group_topic(self):
|
||||
return topics.get_topic_name(self.topic,
|
||||
topics.SECURITY_GROUP,
|
||||
topics.UPDATE)
|
||||
|
||||
def security_groups_rule_updated(self, context, security_groups):
|
||||
"""Notify rule updated security groups."""
|
||||
if not security_groups:
|
||||
return
|
||||
cctxt = self.client.prepare(version=SG_RPC_VERSION,
|
||||
topic=self._get_security_group_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'security_groups_rule_updated',
|
||||
security_groups=security_groups)
|
||||
|
||||
def security_groups_member_updated(self, context, security_groups):
|
||||
"""Notify member updated security groups."""
|
||||
if not security_groups:
|
||||
return
|
||||
cctxt = self.client.prepare(version=SG_RPC_VERSION,
|
||||
topic=self._get_security_group_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'security_groups_member_updated',
|
||||
security_groups=security_groups)
|
||||
|
||||
def security_groups_provider_updated(self, context):
|
||||
"""Notify provider updated security groups."""
|
||||
cctxt = self.client.prepare(version=SG_RPC_VERSION,
|
||||
topic=self._get_security_group_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'security_groups_provider_updated')
|
||||
# TODO(armax): for bw compat with external dependencies; to be dropped in M.
|
||||
SG_RPC_VERSION = (
|
||||
securitygroups_rpc.SecurityGroupAgentRpcApiMixin.SG_RPC_VERSION
|
||||
)
|
||||
SecurityGroupServerRpcApi = (
|
||||
securitygroups_rpc.SecurityGroupServerRpcApi
|
||||
)
|
||||
SecurityGroupAgentRpcApiMixin = (
|
||||
securitygroups_rpc.SecurityGroupAgentRpcApiMixin
|
||||
)
|
||||
SecurityGroupAgentRpcCallbackMixin = (
|
||||
securitygroups_rpc.SecurityGroupAgentRpcCallbackMixin
|
||||
)
|
||||
|
@ -14,21 +14,53 @@
|
||||
|
||||
import oslo_messaging
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.i18n import _LW
|
||||
from neutron import manager
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# TODO(amotoki): Move security group RPC API and agent callback
|
||||
# from neutron/agent/securitygroups_rpc.py.
|
||||
|
||||
class SecurityGroupServerRpcApi(object):
|
||||
"""RPC client for security group methods in the plugin.
|
||||
|
||||
This class implements the client side of an rpc interface. This interface
|
||||
is used by agents to call security group related methods implemented on the
|
||||
plugin side. The other side of this interface is defined in
|
||||
SecurityGroupServerRpcCallback. For more information about changing rpc
|
||||
interfaces, see doc/source/devref/rpc_api.rst.
|
||||
"""
|
||||
def __init__(self, topic):
|
||||
target = oslo_messaging.Target(
|
||||
topic=topic, version='1.0',
|
||||
namespace=constants.RPC_NAMESPACE_SECGROUP)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def security_group_rules_for_devices(self, context, devices):
|
||||
LOG.debug("Get security group rules "
|
||||
"for devices via rpc %r", devices)
|
||||
cctxt = self.client.prepare(version='1.1')
|
||||
return cctxt.call(context, 'security_group_rules_for_devices',
|
||||
devices=devices)
|
||||
|
||||
def security_group_info_for_devices(self, context, devices):
|
||||
LOG.debug("Get security group information for devices via rpc %r",
|
||||
devices)
|
||||
cctxt = self.client.prepare(version='1.2')
|
||||
return cctxt.call(context, 'security_group_info_for_devices',
|
||||
devices=devices)
|
||||
|
||||
|
||||
class SecurityGroupServerRpcCallback(object):
|
||||
"""Callback for SecurityGroup agent RPC in plugin implementations.
|
||||
|
||||
This class implements the server side of an rpc interface. The client side
|
||||
can be found in neutron.agent.securitygroups_rpc.SecurityGroupServerRpcApi.
|
||||
For more information on changing rpc interfaces, see
|
||||
doc/source/devref/rpc_api.rst.
|
||||
can be found in SecurityGroupServerRpcApi. For more information on changing
|
||||
rpc interfaces, see doc/source/devref/rpc_api.rst.
|
||||
"""
|
||||
|
||||
# API version history:
|
||||
@ -80,3 +112,99 @@ class SecurityGroupServerRpcCallback(object):
|
||||
devices_info = kwargs.get('devices')
|
||||
ports = self._get_devices_info(devices_info)
|
||||
return self.plugin.security_group_info_for_ports(context, ports)
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcApiMixin(object):
|
||||
"""RPC client for security group methods to the agent.
|
||||
|
||||
This class implements the client side of an rpc interface. This interface
|
||||
is used by plugins to call security group methods implemented on the
|
||||
agent side. The other side of this interface can be found in
|
||||
SecurityGroupAgentRpcCallbackMixin. For more information about changing
|
||||
rpc interfaces, see doc/source/devref/rpc_api.rst.
|
||||
"""
|
||||
|
||||
# history
|
||||
# 1.1 Support Security Group RPC
|
||||
SG_RPC_VERSION = "1.1"
|
||||
|
||||
def _get_security_group_topic(self):
|
||||
return topics.get_topic_name(self.topic,
|
||||
topics.SECURITY_GROUP,
|
||||
topics.UPDATE)
|
||||
|
||||
def security_groups_rule_updated(self, context, security_groups):
|
||||
"""Notify rule updated security groups."""
|
||||
if not security_groups:
|
||||
return
|
||||
cctxt = self.client.prepare(version=self.SG_RPC_VERSION,
|
||||
topic=self._get_security_group_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'security_groups_rule_updated',
|
||||
security_groups=security_groups)
|
||||
|
||||
def security_groups_member_updated(self, context, security_groups):
|
||||
"""Notify member updated security groups."""
|
||||
if not security_groups:
|
||||
return
|
||||
cctxt = self.client.prepare(version=self.SG_RPC_VERSION,
|
||||
topic=self._get_security_group_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'security_groups_member_updated',
|
||||
security_groups=security_groups)
|
||||
|
||||
def security_groups_provider_updated(self, context):
|
||||
"""Notify provider updated security groups."""
|
||||
cctxt = self.client.prepare(version=self.SG_RPC_VERSION,
|
||||
topic=self._get_security_group_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'security_groups_provider_updated')
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcCallbackMixin(object):
|
||||
"""A mix-in that enable SecurityGroup support in agent implementations.
|
||||
|
||||
This class implements the server side of an rpc interface. The client side
|
||||
can be found in SecurityGroupServerRpcApi. For more information on changing
|
||||
rpc interfaces, see doc/source/devref/rpc_api.rst.
|
||||
|
||||
The sg_agent reference implementation is available in neutron/agent
|
||||
"""
|
||||
# mix-in object should be have sg_agent
|
||||
sg_agent = None
|
||||
|
||||
def _security_groups_agent_not_set(self):
|
||||
LOG.warning(_LW("Security group agent binding currently not set. "
|
||||
"This should be set by the end of the init "
|
||||
"process."))
|
||||
|
||||
def security_groups_rule_updated(self, context, **kwargs):
|
||||
"""Callback for security group rule update.
|
||||
|
||||
:param security_groups: list of updated security_groups
|
||||
"""
|
||||
security_groups = kwargs.get('security_groups', [])
|
||||
LOG.debug("Security group rule updated on remote: %s",
|
||||
security_groups)
|
||||
if not self.sg_agent:
|
||||
return self._security_groups_agent_not_set()
|
||||
self.sg_agent.security_groups_rule_updated(security_groups)
|
||||
|
||||
def security_groups_member_updated(self, context, **kwargs):
|
||||
"""Callback for security group member update.
|
||||
|
||||
:param security_groups: list of updated security_groups
|
||||
"""
|
||||
security_groups = kwargs.get('security_groups', [])
|
||||
LOG.debug("Security group member updated on remote: %s",
|
||||
security_groups)
|
||||
if not self.sg_agent:
|
||||
return self._security_groups_agent_not_set()
|
||||
self.sg_agent.security_groups_member_updated(security_groups)
|
||||
|
||||
def security_groups_provider_updated(self, context, **kwargs):
|
||||
"""Callback for security group provider update."""
|
||||
LOG.debug("Provider rule updated")
|
||||
if not self.sg_agent:
|
||||
return self._security_groups_agent_not_set()
|
||||
self.sg_agent.security_groups_provider_updated()
|
||||
|
@ -17,8 +17,8 @@ from oslo_log import log
|
||||
import oslo_messaging
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.handlers import dvr_rpc
|
||||
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
|
@ -1084,30 +1084,6 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
self._delete('ports', port_id2)
|
||||
|
||||
|
||||
class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(SGAgentRpcCallBackMixinTestCase, self).setUp()
|
||||
self.rpc = sg_rpc.SecurityGroupAgentRpcCallbackMixin()
|
||||
self.rpc.sg_agent = mock.Mock()
|
||||
|
||||
def test_security_groups_rule_updated(self):
|
||||
self.rpc.security_groups_rule_updated(None,
|
||||
security_groups=['fake_sgid'])
|
||||
self.rpc.sg_agent.assert_has_calls(
|
||||
[mock.call.security_groups_rule_updated(['fake_sgid'])])
|
||||
|
||||
def test_security_groups_member_updated(self):
|
||||
self.rpc.security_groups_member_updated(None,
|
||||
security_groups=['fake_sgid'])
|
||||
self.rpc.sg_agent.assert_has_calls(
|
||||
[mock.call.security_groups_member_updated(['fake_sgid'])])
|
||||
|
||||
def test_security_groups_provider_updated(self):
|
||||
self.rpc.security_groups_provider_updated(None)
|
||||
self.rpc.sg_agent.assert_has_calls(
|
||||
[mock.call.security_groups_provider_updated()])
|
||||
|
||||
|
||||
class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
|
||||
def test_init_firewall_with_none_driver(self):
|
||||
set_enable_security_groups(False)
|
||||
@ -1598,25 +1574,6 @@ class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
|
||||
self.assertFalse(self.agent.prepare_devices_filter.called)
|
||||
|
||||
|
||||
class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
|
||||
def test_security_group_rules_for_devices(self):
|
||||
rpcapi = sg_rpc.SecurityGroupServerRpcApi('fake_topic')
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(rpcapi.client, 'call'),
|
||||
mock.patch.object(rpcapi.client, 'prepare'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock
|
||||
):
|
||||
prepare_mock.return_value = rpcapi.client
|
||||
rpcapi.security_group_rules_for_devices('context', ['fake_device'])
|
||||
|
||||
rpc_mock.assert_called_once_with(
|
||||
'context',
|
||||
'security_group_rules_for_devices',
|
||||
devices=['fake_device'])
|
||||
|
||||
|
||||
class FakeSGNotifierAPI(sg_rpc.SecurityGroupAgentRpcApiMixin):
|
||||
def __init__(self):
|
||||
self.topic = 'fake'
|
||||
|
@ -0,0 +1,64 @@
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
|
||||
from neutron.api.rpc.handlers import securitygroups_rpc
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
|
||||
|
||||
def test_security_group_rules_for_devices(self):
|
||||
rpcapi = securitygroups_rpc.SecurityGroupServerRpcApi('fake_topic')
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(rpcapi.client, 'call'),
|
||||
mock.patch.object(rpcapi.client, 'prepare'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock
|
||||
):
|
||||
prepare_mock.return_value = rpcapi.client
|
||||
rpcapi.security_group_rules_for_devices('context', ['fake_device'])
|
||||
|
||||
rpc_mock.assert_called_once_with(
|
||||
'context',
|
||||
'security_group_rules_for_devices',
|
||||
devices=['fake_device'])
|
||||
|
||||
|
||||
class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(SGAgentRpcCallBackMixinTestCase, self).setUp()
|
||||
self.rpc = securitygroups_rpc.SecurityGroupAgentRpcCallbackMixin()
|
||||
self.rpc.sg_agent = mock.Mock()
|
||||
|
||||
def test_security_groups_rule_updated(self):
|
||||
self.rpc.security_groups_rule_updated(None,
|
||||
security_groups=['fake_sgid'])
|
||||
self.rpc.sg_agent.assert_has_calls(
|
||||
[mock.call.security_groups_rule_updated(['fake_sgid'])])
|
||||
|
||||
def test_security_groups_member_updated(self):
|
||||
self.rpc.security_groups_member_updated(None,
|
||||
security_groups=['fake_sgid'])
|
||||
self.rpc.sg_agent.assert_has_calls(
|
||||
[mock.call.security_groups_member_updated(['fake_sgid'])])
|
||||
|
||||
def test_security_groups_provider_updated(self):
|
||||
self.rpc.security_groups_provider_updated(None)
|
||||
self.rpc.sg_agent.assert_has_calls(
|
||||
[mock.call.security_groups_provider_updated()])
|
Loading…
Reference in New Issue
Block a user