Merge "NSX|v service insertion handle upgrade"

This commit is contained in:
Jenkins 2016-07-19 20:05:51 +00:00 committed by Gerrit Code Review
commit c2084fa43b
6 changed files with 160 additions and 8 deletions

View File

@ -599,6 +599,10 @@ nsxv_opts = [
help=_("(Optional) The profile id of the redirect firewall "
"rules that will be used for the Service Insertion "
"feature.")),
cfg.BoolOpt('service_insertion_redirect_all', default=False,
help=_("(Optional) If set to True, the plugin will create "
"a redirect rule to send all the traffic to the "
"security partner")),
]
# Register the configuration options

View File

@ -227,11 +227,48 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self.metadata_proxy_handler = (
nsx_v_md_proxy.NsxVMetadataProxyHandler(self))
# Service insertion driver register
self._si_handler = fc_utils.NsxvServiceInsertionHandler(self)
registry.subscribe(self.add_vms_to_service_insertion,
fc_utils.SERVICE_INSERTION_RESOURCE,
events.AFTER_CREATE)
def init_complete(self, resource, event, trigger, **kwargs):
self.init_is_complete = True
def add_vms_to_service_insertion(self, sg_id):
def _add_vms_to_service_insertion(*args, **kwargs):
"""Adding existing VMs to the service insertion security group
Adding all current compute ports with port security to the service
insertion security group in order to classify their traffic by the
security redirect rules
"""
sg_id = args[0]
context = n_context.get_admin_context()
filters = {'device_owner': ['compute:None']}
ports = self.get_ports(context, filters=filters)
for port in ports:
# Only add compute ports with device-id, vnic & port security
if (validators.is_attr_set(port.get(ext_vnic_idx.VNIC_INDEX))
and validators.is_attr_set(port.get('device_id'))
and port[psec.PORTSECURITY]):
try:
vnic_idx = port[ext_vnic_idx.VNIC_INDEX]
device_id = port['device_id']
vnic_id = self._get_port_vnic_id(vnic_idx, device_id)
self._add_member_to_security_group(sg_id, vnic_id)
except Exception as e:
LOG.info(_LI('Could not add port %(port)s to service '
'insertion security group. Exception '
'%(err)s'),
{'port': port['id'], 'err': e})
# Doing this in a separate thread to not slow down the init process
# in case there are many compute ports
c_utils.spawn_n(_add_vms_to_service_insertion, sg_id)
def _start_rpc_listeners(self):
self.conn = n_rpc.create_connection()
qos_topic = resources_rpc.resource_type_versioned_topic(

View File

@ -16,8 +16,14 @@
import xml.etree.ElementTree as et
from networking_sfc.extensions import flowclassifier
from networking_sfc.services.flowclassifier.common import exceptions as exc
from networking_sfc.services.flowclassifier.drivers import base as fc_driver
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron import context as n_context
from neutron import manager
from oslo_config import cfg
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
@ -26,6 +32,7 @@ from vmware_nsx._i18n import _, _LE
from vmware_nsx.common import config # noqa
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.common import locking
from vmware_nsx.common import nsxv_constants
from vmware_nsx.plugins.nsx_v.vshield import vcns as nsxv_api
from vmware_nsx.plugins.nsx_v.vshield import vcns_driver
from vmware_nsx.services.flowclassifier.nsx_v import utils as fc_utils
@ -47,9 +54,15 @@ class NsxvFlowClassifierDriver(fc_driver.FlowClassifierDriverBase):
self.init_security_group()
self.init_security_group_in_profile()
#TODO(asarfaty) - Add a new config for any->any redirect:
# create any->any flow classifier entry (and backed rule)
# if not exist yet
# register an event to the end of the init to handle the first upgrade
# TODO(asarfaty): This registry will call the callback on each
# spawned thread, but we need it to be called only once.
# So it should be replaces with events.BEFORE_SPAWN after approved
# in neutron
if self._is_new_security_group:
registry.subscribe(self.init_complete,
resources.PROCESS,
events.AFTER_INIT)
def init_profile_id(self):
"""Init the service insertion profile ID
@ -78,6 +91,7 @@ class NsxvFlowClassifierDriver(fc_driver.FlowClassifierDriverBase):
# check if this group exist, and create it if not.
sg_name = fc_utils.SERVICE_INSERTION_SG_NAME
sg_id = self._nsxv.vcns.get_security_group_id(sg_name)
self._is_new_security_group = False
if not sg_id:
description = ("OpenStack Service Insertion Security Group, "
"managed by Neutron nsx-v plugin.")
@ -85,10 +99,7 @@ class NsxvFlowClassifierDriver(fc_driver.FlowClassifierDriverBase):
"description": description}}
h, sg_id = (
self._nsxv.vcns.create_security_group(sg))
# TODO(asarfaty) - if the security group was just created
# also add all the current compute ports with port-security
# to this security group (for upgrades scenarios)
self._is_new_security_group = True
self._security_group_id = sg_id
@ -110,6 +121,55 @@ class NsxvFlowClassifierDriver(fc_driver.FlowClassifierDriverBase):
self._profile_id,
et.tostring(profile_binding, encoding="us-ascii"))
def init_complete(self, resource, event, trigger, **kwargs):
# This callback is called for each process.
# Until fixing it, lock is used to keep things in order
with locking.LockManager.get_lock('service_insertion_init_complete'):
if self._is_new_security_group:
# add existing VMs to the new security group
# This code must run after init is done
core_plugin = manager.NeutronManager.get_plugin()
core_plugin.add_vms_to_service_insertion(
self._security_group_id)
# Add the first flow classifier entry
if cfg.CONF.nsxv.service_insertion_redirect_all:
self.add_any_any_redirect_rule()
def add_any_any_redirect_rule(self):
"""Add an any->any flow classifier entry
Add 1 flow classifier entry that will redirect all the traffic to the
security partner
The user will be able to delete/change it later
"""
context = n_context.get_admin_context()
fc_plugin = manager.NeutronManager.get_service_plugins().get(
flowclassifier.FLOW_CLASSIFIER_EXT)
# first check that there is no other flow classifier entry defined:
fcs = fc_plugin.get_flow_classifiers(context)
if len(fcs) > 0:
return
# Create any->any rule
fc = {'name': 'redirect_all',
'description': 'Redirect all traffic',
'tenant_id': nsxv_constants.INTERNAL_TENANT_ID,
'l7_parameters': {},
'ethertype': 'IPv4',
'protocol': None,
'source_port_range_min': None,
'source_port_range_max': None,
'destination_port_range_min': None,
'destination_port_range_max': None,
'source_ip_prefix': None,
'destination_ip_prefix': None,
'logical_source_port': None,
'logical_destination_port': None
}
fc_plugin.create_flow_classifier(context, {'flow_classifier': fc})
def get_redirect_fw_section_id(self):
if not self._redirect_section_id:
# try to find it

View File

@ -20,6 +20,7 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)
SERVICE_INSERTION_SG_NAME = 'Service Insertion Security Group'
SERVICE_INSERTION_RESOURCE = 'Service Insertion'
class NsxvServiceInsertionHandler(object):

View File

@ -3870,6 +3870,33 @@ class TestNSXPortSecurity(test_psec.TestPortSecurity,
self.fc2.remove_member_from_security_group.assert_any_call(
p._si_handler.sg_id, vnic_index)
def test_service_insertion_notify(self):
# create a compute ports with/without port security
device_id = _uuid()
# create 2 compute ports with port security
port1 = self._create_compute_port('net1', device_id, True)
self._add_vnic_to_port(port1['port']['id'], False, 1)
port2 = self._create_compute_port('net2', device_id, True)
self._add_vnic_to_port(port2['port']['id'], False, 2)
# create 1 compute port without port security
port3 = self._create_compute_port('net3', device_id, False)
self._add_vnic_to_port(port3['port']['id'], True, 3)
# init the plugin mocks
p = manager.NeutronManager.get_plugin()
self.fc2.add_member_to_security_group = (
mock.Mock().add_member_to_security_group)
# call the function (that should be called from the flow classifier
# driver) and verify it adds all relevant ports to the group
# Since it uses spawn_n, we should mock it.
orig_spawn = c_utils.spawn_n
c_utils.spawn_n = mock.Mock(side_effect=lambda f, x: f(x, None))
p.add_vms_to_service_insertion(sg_id='aaa')
# back to normal
c_utils.spawn_n = orig_spawn
self.assertEqual(2, self.fc2.add_member_to_security_group.call_count)
class TestSharedRouterTestCase(L3NatTest, L3NatTestCaseBase,
test_l3_plugin.L3NatTestCaseMixin,

View File

@ -24,6 +24,7 @@ from neutron.api import extensions as api_ext
from neutron.common import config
from neutron import context
from neutron.extensions import portbindings
from neutron import manager
from networking_sfc.db import flowclassifier_db as fdb
from networking_sfc.extensions import flowclassifier
@ -84,6 +85,9 @@ class TestNsxvFlowClassifierDriver(
self._profile_id = 'serviceprofile-1'
cfg.CONF.set_override('service_insertion_profile_id',
self._profile_id, 'nsxv')
cfg.CONF.set_override('service_insertion_redirect_all',
True, 'nsxv')
self.driver = nsx_v_driver.NsxvFlowClassifierDriver()
self.driver.initialize()
@ -110,9 +114,28 @@ class TestNsxvFlowClassifierDriver(
super(TestNsxvFlowClassifierDriver, self).tearDown()
def test_driver_init(self):
self.assertEqual(self.driver._profile_id, self._profile_id)
self.assertEqual(self._profile_id, self.driver._profile_id)
self.assertEqual(self.driver._security_group_id, '0')
mock_nsxv_plugin = mock.Mock()
fc_plugin = manager.NeutronManager.get_service_plugins().get(
flowclassifier.FLOW_CLASSIFIER_EXT)
with mock.patch.object(manager.NeutronManager, 'get_plugin',
return_value=mock_nsxv_plugin):
with mock.patch.object(
mock_nsxv_plugin,
'add_vms_to_service_insertion') as fake_add:
with mock.patch.object(
fc_plugin,
'create_flow_classifier') as fake_create:
self.driver.init_complete(None, None, {})
# check that the plugin was called to add vms to the
# security group
self.assertTrue(fake_add.called)
# check that redirect_all flow classifier entry
# was created
self.assertTrue(fake_create.called)
def test_create_flow_classifier_precommit(self):
with self.flow_classifier(flow_classifier=self._fc) as fc:
fc_context = fc_ctx.FlowClassifierContext(