vmware-nsx/vmware_nsx/plugins/nsx_p/plugin.py
Adit Sarfaty 3be8af0c37 NSX-Policy: Skeleton for the new NSX Policy plugin
Change-Id: Ia3195293270ceb3af1f14fa280de43019ca44b7f
2018-09-12 08:52:41 +03:00

685 lines
30 KiB
Python

# Copyright 2018 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log
from oslo_utils import excutils
import webob.exc
from neutron.db import _resource_extend as resource_extend
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import api as db_api
from neutron.db import dns_db
from neutron.db import external_net_db
from neutron.db import extradhcpopt_db
from neutron.db import extraroute_db
from neutron.db import l3_attrs_db
from neutron.db import l3_gwmode_db
from neutron.db.models import l3 as l3_db_models
from neutron.db.models import securitygroup as securitygroup_model # noqa
from neutron.db import models_v2
from neutron.db import portbindings_db
from neutron.db import portsecurity_db
from neutron.db import securitygroups_db
from neutron.db import vlantransparent_db
from neutron.extensions import providernet
from neutron.quota import resource_registry
from neutron_lib.api.definitions import external_net
from neutron_lib.api.definitions import l3 as l3_apidef
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api import faults
from neutron_lib.api import validators
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as const
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as n_exc
from vmware_nsx._i18n import _
from vmware_nsx.common import config # noqa
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.common import l3_rpc_agent_api
from vmware_nsx.common import locking
from vmware_nsx.common import managers
from vmware_nsx.db import db as nsx_db
from vmware_nsx.db import extended_security_group_rule as extend_sg_rule
from vmware_nsx.db import maclearning as mac_db
from vmware_nsx.extensions import projectpluginmap
from vmware_nsx.plugins.common import plugin as nsx_plugin_common
from vmware_nsx.plugins.nsx_v3 import utils as v3_utils
from vmware_nsxlib.v3 import exceptions as nsx_lib_exc
from vmware_nsxlib.v3 import nsx_constants as nsxlib_consts
from vmware_nsxlib.v3 import utils as nsxlib_utils
LOG = log.getLogger(__name__)
@resource_extend.has_resource_extenders
class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
addr_pair_db.AllowedAddressPairsMixin,
nsx_plugin_common.NsxPluginBase,
extend_sg_rule.ExtendedSecurityGroupRuleMixin,
securitygroups_db.SecurityGroupDbMixin,
external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
portbindings_db.PortBindingMixin,
portsecurity_db.PortSecurityDbMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
dns_db.DNSDbMixin,
vlantransparent_db.Vlantransparent_db_mixin,
mac_db.MacLearningDbMixin,
l3_attrs_db.ExtraAttributesMixin):
__native_bulk_support = True
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["allowed-address-pairs",
"address-scope",
"quotas",
"binding",
"extra_dhcp_opt",
"agent",
"dhcp_agent_scheduler",
"ext-gw-mode",
"security-group",
"secgroup-rule-local-ip-prefix",
"port-security",
"provider",
"external-net",
"extraroute",
"router",
"subnet_allocation",
"port-security-groups-filtering"]
@resource_registry.tracked_resources(
network=models_v2.Network,
port=models_v2.Port,
subnet=models_v2.Subnet,
subnetpool=models_v2.SubnetPool,
security_group=securitygroup_model.SecurityGroup,
security_group_rule=securitygroup_model.SecurityGroupRule,
router=l3_db_models.Router,
floatingip=l3_db_models.FloatingIP)
def __init__(self):
self.fwaas_callbacks = None
self.init_is_complete = False
nsxlib_utils.set_is_attr_callback(validators.is_attr_set)
self._extend_fault_map()
extension_drivers = cfg.CONF.nsx_extension_drivers
self._extension_manager = managers.ExtensionManager(
extension_drivers=extension_drivers)
super(NsxPolicyPlugin, self).__init__()
# Bind the dummy L3 notifications
self.l3_rpc_notifier = l3_rpc_agent_api.L3NotifyAPI()
LOG.info("Starting NsxPolicyPlugin (Experimental only!)")
self._extension_manager.initialize()
self.supported_extension_aliases.extend(
self._extension_manager.extension_aliases())
self.nsxpolicy = v3_utils.get_nsxpolicy_wrapper()
nsxlib_utils.set_inject_headers_callback(v3_utils.inject_headers)
self._validate_nsx_policy_version()
self.cfg_group = 'nsx_p' # group name for nsx_p section in nsx.ini
self._prepare_default_rules()
# subscribe the init complete method last, so it will be called only
# if init was successful
registry.subscribe(self.init_complete,
resources.PROCESS,
events.AFTER_INIT)
def _validate_nsx_policy_version(self):
self._nsx_version = self.nsxpolicy.get_version()
LOG.info("NSX Version: %s", self._nsx_version)
if not self.nsxpolicy.feature_supported(
nsxlib_consts.FEATURE_NSX_POLICY_NETWORKING):
msg = (_("The NSX Policy plugin cannot be used with NSX version "
"%(ver)s") % {'ver': self._nsx_version})
raise nsx_exc.NsxPluginException(err_msg=msg)
def _prepare_default_rules(self):
#TODO(asarfaty): implement
pass
@staticmethod
def plugin_type():
return projectpluginmap.NsxPlugins.NSX_P
@staticmethod
def is_tvd_plugin():
return False
def init_complete(self, resource, event, trigger, payload=None):
with locking.LockManager.get_lock('plugin-init-complete'):
if self.init_is_complete:
# Should be called only once per worker
return
# reinitialize the cluster upon fork for api workers to ensure
# each process has its own keepalive loops + state
self.nsxpolicy.reinitialize_cluster(resource, event, trigger,
payload=payload)
self.init_is_complete = True
def _extend_fault_map(self):
"""Extends the Neutron Fault Map.
Exceptions specific to the NSX Plugin are mapped to standard
HTTP Exceptions.
"""
#TODO(asarfaty): consider reusing the nsx-t code here
faults.FAULT_MAP.update({nsx_lib_exc.ManagerError:
webob.exc.HTTPBadRequest,
nsx_lib_exc.ServiceClusterUnavailable:
webob.exc.HTTPServiceUnavailable,
nsx_lib_exc.ClientCertificateNotTrusted:
webob.exc.HTTPBadRequest,
nsx_exc.SecurityGroupMaximumCapacityReached:
webob.exc.HTTPBadRequest,
nsx_lib_exc.NsxLibInvalidInput:
webob.exc.HTTPBadRequest,
})
def _create_network_at_the_backend(self, context, net_data):
#TODO(asarfaty): implement, using nsx-id the same as the neutron id
pass
def _validate_external_net_create(self, net_data):
#TODO(asarfaty): implement
pass
def create_network(self, context, network):
net_data = network['network']
#TODO(asarfaty): network validation
external = net_data.get(external_net.EXTERNAL)
is_external_net = validators.is_attr_set(external) and external
tenant_id = net_data['tenant_id']
self._ensure_default_security_group(context, tenant_id)
if is_external_net:
self._validate_external_net_create(net_data)
# Create the neutron network
with db_api.context_manager.writer.using(context):
# Create network in Neutron
created_net = super(NsxPolicyPlugin, self).create_network(
context, network)
self._extension_manager.process_create_network(
context, net_data, created_net)
if psec.PORTSECURITY not in net_data:
net_data[psec.PORTSECURITY] = True
self._process_network_port_security_create(
context, net_data, created_net)
self._process_l3_create(context, created_net, net_data)
# Create the backend NSX network
if not is_external_net:
try:
self._create_network_at_the_backend(context, net_data)
except Exception as e:
LOG.exception("Failed to create NSX network network: %s", e)
with excutils.save_and_reraise_exception():
super(NsxPolicyPlugin, self).delete_network(
context, created_net['id'])
# this extra lookup is necessary to get the
# latest db model for the extension functions
net_model = self._get_network(context, created_net['id'])
resource_extend.apply_funcs('networks', created_net, net_model)
return created_net
def delete_network(self, context, network_id):
with db_api.context_manager.writer.using(context):
self._process_l3_delete(context, network_id)
return super(NsxPolicyPlugin, self).delete_network(
context, network_id)
if not self._network_is_external(context, network_id):
# TODO(asarfaty) delete the NSX logical network
pass
def update_network(self, context, id, network):
original_net = super(NsxPolicyPlugin, self).get_network(context, id)
net_data = network['network']
LOG.debug("Updating network %s %s->%s", id, original_net, net_data)
# Neutron does not support changing provider network values
providernet._raise_if_updates_provider_attributes(net_data)
extern_net = self._network_is_external(context, id)
# Do not support changing external/non-external networks
if (external_net.EXTERNAL in net_data and
net_data[external_net.EXTERNAL] != extern_net):
err_msg = _("Cannot change the router:external flag of a network")
raise n_exc.InvalidInput(error_message=err_msg)
updated_net = super(NsxPolicyPlugin, self).update_network(context, id,
network)
self._extension_manager.process_update_network(context, net_data,
updated_net)
self._process_l3_update(context, updated_net, network['network'])
#TODO(asarfaty): update the Policy manager
return updated_net
def get_network(self, context, id, fields=None):
with db_api.context_manager.reader.using(context):
# Get network from Neutron database
network = self._get_network(context, id)
# Don't do field selection here otherwise we won't be able to add
# provider networks fields
net = self._make_network_dict(network, context=context)
return db_utils.resource_fields(net, fields)
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
# Get networks from Neutron database
filters = filters or {}
with db_api.context_manager.reader.using(context):
networks = (
super(NsxPolicyPlugin, self).get_networks(
context, filters, fields, sorts,
limit, marker, page_reverse))
# TODO(asarfaty) Add plugin/provider network fields
return (networks if not fields else
[db_utils.resource_fields(network,
fields) for network in networks])
def create_subnet(self, context, subnet):
self._validate_host_routes_input(subnet)
created_subnet = super(
NsxPolicyPlugin, self).create_subnet(context, subnet)
# TODO(asarfaty): Handle dhcp on the policy manager
return created_subnet
def delete_subnet(self, context, subnet_id):
# TODO(asarfaty): cleanup dhcp on the policy manager
super(NsxPolicyPlugin, self).delete_subnet(context, subnet_id)
def update_subnet(self, context, subnet_id, subnet):
updated_subnet = None
orig = self._get_subnet(context, subnet_id)
self._validate_host_routes_input(subnet,
orig_enable_dhcp=orig['enable_dhcp'],
orig_host_routes=orig['routes'])
# TODO(asarfaty): Handle dhcp updates on the policy manager
updated_subnet = super(NsxPolicyPlugin, self).update_subnet(
context, subnet_id, subnet)
self._extension_manager.process_update_subnet(
context, subnet['subnet'], updated_subnet)
return updated_subnet
def _create_port_at_the_backend(self, context, port_data):
#TODO(asarfaty): implement
pass
def _cleanup_port(self, context, port_id, lport_id):
super(NsxPolicyPlugin, self).delete_port(context, port_id)
#TODO(asarfaty): Delete the NSX logical port
def base_create_port(self, context, port):
neutron_db = super(NsxPolicyPlugin, self).create_port(context, port)
self._extension_manager.process_create_port(
context, port['port'], neutron_db)
return neutron_db
def create_port(self, context, port, l2gw_port_check=False):
port_data = port['port']
self._validate_max_ips_per_port(port_data.get('fixed_ips', []),
port_data.get('device_owner'))
with db_api.context_manager.writer.using(context):
is_external_net = self._network_is_external(
context, port_data['network_id'])
if is_external_net:
self._assert_on_external_net_with_compute(port_data)
neutron_db = self.base_create_port(context, port)
port["port"].update(neutron_db)
if not is_external_net:
try:
self._create_port_at_the_backend(context, port_data)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error('Failed to create port %(id)s on NSX '
'backend. Exception: %(e)s',
{'id': neutron_db['id'], 'e': e})
self._cleanup_port(context, neutron_db['id'], None)
# this extra lookup is necessary to get the
# latest db model for the extension functions
port_model = self._get_port(context, port_data['id'])
resource_extend.apply_funcs('ports', port_data, port_model)
kwargs = {'context': context, 'port': neutron_db}
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
return neutron_db
def delete_port(self, context, port_id,
l3_port_check=True, l2gw_port_check=True,
force_delete_dhcp=False,
force_delete_vpn=False):
port = self.get_port(context, port_id)
if not self._network_is_external(context, port['network_id']):
#TODO(asarfaty): Delete the NSX logical port
pass
self.disassociate_floatingips(context, port_id)
super(NsxPolicyPlugin, self).delete_port(context, port_id)
def _update_port_on_backend(self, context, lport_id,
original_port, updated_port):
#TODO(asarfaty): implement
pass
def update_port(self, context, id, port):
with db_api.context_manager.writer.using(context):
# get the original port, and keep it honest as it is later used
# for notifications
original_port = super(NsxPolicyPlugin, self).get_port(context, id)
port_data = port['port']
is_external_net = self._network_is_external(
context, original_port['network_id'])
if is_external_net:
self._assert_on_external_net_with_compute(port_data)
device_owner = (port_data['device_owner']
if 'device_owner' in port_data
else original_port.get('device_owner'))
self._validate_max_ips_per_port(
port_data.get('fixed_ips', []), device_owner)
updated_port = super(NsxPolicyPlugin, self).update_port(context,
id, port)
self._extension_manager.process_update_port(context, port_data,
updated_port)
# copy values over - except fixed_ips as
# they've already been processed
port_data.pop('fixed_ips', None)
updated_port.update(port_data)
# update the port in the backend, only if it exists in the DB
# (i.e not external net)
if not is_external_net:
self._update_port_on_backend(context, id,
original_port, updated_port)
# Make sure the port revision is updated
if 'revision_number' in updated_port:
port_model = self._get_port(context, id)
updated_port['revision_number'] = port_model.revision_number
# Notifications must be sent after the above transaction is complete
kwargs = {
'context': context,
'port': updated_port,
'mac_address_updated': False,
'original_port': original_port,
}
registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
return updated_port
def get_port(self, context, id, fields=None):
port = super(NsxPolicyPlugin, self).get_port(context, id, fields=None)
if 'id' in port:
port_model = self._get_port(context, port['id'])
resource_extend.apply_funcs('ports', port, port_model)
return db_utils.resource_fields(port, fields)
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
filters = filters or {}
self._update_filters_with_sec_group(context, filters)
with db_api.context_manager.reader.using(context):
ports = (
super(NsxPolicyPlugin, self).get_ports(
context, filters, fields, sorts,
limit, marker, page_reverse))
# Add port extensions
for port in ports[:]:
if 'id' in port:
try:
port_model = self._get_port(context, port['id'])
resource_extend.apply_funcs('ports', port, port_model)
except n_exc.PortNotFound:
# Port might have been deleted by now
LOG.debug("Port %s was deleted during the get_ports "
"process, and is being skipped", port['id'])
ports.remove(port)
continue
return (ports if not fields else
[db_utils.resource_fields(port, fields) for port in ports])
def _update_router_gw_info(self, context, router_id, info):
router = self._get_router(context, router_id)
super(NsxPolicyPlugin, self)._update_router_gw_info(
context, router_id, info, router=router)
#TODO(asarfaty): Update the NSX
def create_router(self, context, router):
r = router['router']
gw_info = self._extract_external_gw(context, router, is_extract=True)
with db_api.context_manager.writer.using(context):
router = super(NsxPolicyPlugin, self).create_router(
context, router)
router_db = self._get_router(context, router['id'])
self._process_extra_attr_router_create(context, router_db, r)
#TODO(asarfaty): Create the NSX logical router and add DB mapping
LOG.debug("Created router %s: %s. GW info %s",
router['id'], r, gw_info)
return self.get_router(context, router['id'])
def delete_router(self, context, router_id):
router = self.get_router(context, router_id)
if router.get(l3_apidef.EXTERNAL_GW_INFO):
self._update_router_gw_info(context, router_id, {})
nsx_router_id = nsx_db.get_nsx_router_id(
context.session, router_id)
ret_val = super(NsxPolicyPlugin, self).delete_router(
context, router_id)
if nsx_router_id:
#TODO(asarfaty): delete the NSX logical router
pass
return ret_val
def update_router(self, context, router_id, router):
gw_info = self._extract_external_gw(context, router, is_extract=False)
router_data = router['router']
LOG.debug("Updating router %s: %s. GW info %s",
router_id, router_data, gw_info)
#TODO(asarfaty) update the NSX logical router & interfaces
return super(NsxPolicyPlugin, self).update_router(
context, router_id, router)
def add_router_interface(self, context, router_id, interface_info):
network_id = self._get_interface_network(context, interface_info)
extern_net = self._network_is_external(context, network_id)
router_db = self._get_router(context, router_id)
gw_network_id = (router_db.gw_port.network_id if router_db.gw_port
else None)
LOG.debug("Adding router %s interface %s with GW %s",
router_id, network_id, gw_network_id)
# A router interface cannot be an external network
if extern_net:
msg = _("An external network cannot be attached as "
"an interface to a router")
raise n_exc.InvalidInput(error_message=msg)
# Update the interface of the neutron router
info = super(NsxPolicyPlugin, self).add_router_interface(
context, router_id, interface_info)
#TODO(asarfaty) Update the NSX logical router ports
return info
def remove_router_interface(self, context, router_id, interface_info):
#TODO(asarfaty) Update the NSX logical router ports
info = super(NsxPolicyPlugin, self).remove_router_interface(
context, router_id, interface_info)
return info
def create_floatingip(self, context, floatingip):
new_fip = super(NsxPolicyPlugin, self).create_floatingip(
context, floatingip, initial_status=(
const.FLOATINGIP_STATUS_ACTIVE
if floatingip['floatingip']['port_id']
else const.FLOATINGIP_STATUS_DOWN))
router_id = new_fip['router_id']
if not router_id:
return new_fip
#TODO(asarfaty): Update the NSX router
return new_fip
def delete_floatingip(self, context, fip_id):
fip = self.get_floatingip(context, fip_id)
router_id = fip['router_id']
port_id = fip['port_id']
LOG.debug("Deleting floating IP %s. Router %s, Port %s",
fip_id, router_id, port_id)
if router_id:
nsx_router_id = nsx_db.get_nsx_router_id(context.session,
router_id)
if nsx_router_id:
#TODO(asarfaty): Update the NSX router
pass
super(NsxPolicyPlugin, self).delete_floatingip(context, fip_id)
def update_floatingip(self, context, fip_id, floatingip):
old_fip = self.get_floatingip(context, fip_id)
old_port_id = old_fip['port_id']
new_status = (const.FLOATINGIP_STATUS_ACTIVE
if floatingip['floatingip'].get('port_id')
else const.FLOATINGIP_STATUS_DOWN)
new_fip = super(NsxPolicyPlugin, self).update_floatingip(
context, fip_id, floatingip)
router_id = new_fip['router_id']
new_port_id = new_fip['port_id']
nsx_router_id = nsx_db.get_nsx_router_id(context.session,
router_id)
if nsx_router_id:
#TODO(asarfaty): Update the NSX router
LOG.debug("Updating floating IP %s. Router %s, Port %s "
"(old port %s)",
fip_id, router_id, new_port_id, old_port_id)
if new_fip['status'] != new_status:
new_fip['status'] = new_status
self.update_floatingip_status(context, fip_id, new_status)
return new_fip
def disassociate_floatingips(self, context, port_id):
fip_qry = context.session.query(l3_db_models.FloatingIP)
fip_dbs = fip_qry.filter_by(fixed_port_id=port_id)
for fip_db in fip_dbs:
if not fip_db.router_id:
continue
nsx_router_id = nsx_db.get_nsx_router_id(context.session,
fip_db.router_id)
if nsx_router_id:
# TODO(asarfaty): Update the NSX logical router
pass
self.update_floatingip_status(context, fip_db.id,
const.FLOATINGIP_STATUS_DOWN)
super(NsxPolicyPlugin, self).disassociate_floatingips(
context, port_id, do_notify=False)
def _create_security_group_backend_resources(self, secgroup):
# TODO(asarfaty): implement
pass
def create_security_group(self, context, security_group, default_sg=False):
secgroup = security_group['security_group']
if not default_sg:
tenant_id = secgroup['tenant_id']
self._ensure_default_security_group(context, tenant_id)
self._create_security_group_backend_resources(secgroup)
with db_api.context_manager.writer.using(context):
secgroup_db = (
super(NsxPolicyPlugin, self).create_security_group(
context, security_group, default_sg))
# TODO(asarfaty) save NSX->Neutron mappings
self._process_security_group_properties_create(context,
secgroup_db,
secgroup,
default_sg)
return secgroup_db
def update_security_group(self, context, id, security_group):
orig_secgroup = self.get_security_group(
context, id, fields=['id', 'name', 'description'])
LOG.debug("Updating SG %s -> %s", orig_secgroup,
security_group['security_group'])
with db_api.context_manager.writer.using(context):
secgroup_res = super(NsxPolicyPlugin, self).update_security_group(
context, id, security_group)
self._process_security_group_properties_update(
context, secgroup_res, security_group['security_group'])
#TODO(asarfaty): Update the NSX backend
return secgroup_res
def delete_security_group(self, context, id):
super(NsxPolicyPlugin, self).delete_security_group(context, id)
#TODO(asarfaty): Update the nSX backend
def create_security_group_rule(self, context, security_group_rule):
bulk_rule = {'security_group_rules': [security_group_rule]}
return self.create_security_group_rule_bulk(context, bulk_rule)[0]
def create_security_group_rule_bulk(self, context, security_group_rules):
sg_rules = security_group_rules['security_group_rules']
for r in sg_rules:
# TODO(asarfaty): create rules at the NSX
pass
with db_api.context_manager.writer.using(context):
rules_db = (super(NsxPolicyPlugin,
self).create_security_group_rule_bulk_native(
context, security_group_rules))
for i, r in enumerate(sg_rules):
self._process_security_group_rule_properties(
context, rules_db[i], r['security_group_rule'])
return rules_db
def delete_security_group_rule(self, context, id):
#TODO(asarfaty): Update the nSX backend
super(NsxPolicyPlugin, self).delete_security_group_rule(context, id)