Merge "Add provisioning blocks to status ACTIVE transition"

This commit is contained in:
Jenkins 2016-05-17 22:26:28 +00:00 committed by Gerrit Code Review
commit 42a607f0db
17 changed files with 808 additions and 44 deletions

View File

@ -76,6 +76,7 @@ Neutron Internals
openvswitch_firewall
network_ip_availability
tag
provisioning_blocks
Testing
-------

View File

@ -0,0 +1,159 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Convention for heading levels in Neutron devref:
======= Heading 0 (reserved for the title in a document)
------- Heading 1
~~~~~~~ Heading 2
+++++++ Heading 3
''''''' Heading 4
(Avoid deeper levels because they do not render well.)
Composite Object Status via Provisioning Blocks
===============================================
We use the STATUS field on objects to indicate when a resource is ready
by setting it to ACTIVE so external systems know when it's safe to use
that resource. Knowing when to set the status to ACTIVE is simple when
there is only one entity responsible for provisioning a given object.
When that entity has finishing provisioning, we just update the STATUS
directly to active. However, there are resources in Neutron that require
provisioning by multiple asynchronous entities before they are ready to
be used so managing the transition to the ACTIVE status becomes more
complex. To handle these cases, Neutron has the provisioning_blocks
module to track the entities that are still provisioning a resource.
The main example of this is with ML2, the L2 agents and the DHCP agents.
When a port is created and bound to a host, it's placed in the DOWN
status. The L2 agent now has to setup flows, security group rules, etc
for the port and the DHCP agent has to setup a DHCP reservation for
the port's IP and MAC. Before the transition to ACTIVE, both agents
must complete their work or the port user (e.g. Nova) may attempt to
use the port and not have connectivity. To solve this, the
provisioning_blocks module is used to track the provisioning state
of each agent and the status is only updated when both complete.
High Level View
---------------
To make use of the provisioning_blocks module, provisioning components
should be added whenever there is work to be done by another entity
before an object's status can transition to ACTIVE. This is
accomplished by calling the add_provisioning_component method for
each entity. Then as each entity finishes provisioning the object,
the provisioning_complete must be called to lift the provisioning
block.
When the last provisioning block is removed, the provisioning_blocks
module will trigger a callback notification containing the object ID
for the object's resource type with the event PROVISIONING_COMPLETE.
A subscriber to this event can now update the status of this object
to ACTIVE or perform any other necessary actions.
A normal state transition will look something like the following:
1. Request comes in to create an object
2. Logic on the Neutron server determines which entities are required
to provision the object and adds a provisioning component for each
entity for that object.
3. A notification is emitted to the entities so they start their work.
4. Object is returned to the API caller in the DOWN (or BUILD) state.
5. Each entity tells the server when it has finished provisioning the
object. The server calls provisioning_complete for each entity that
finishes.
6. When provisioning_complete is called on the last remaining entity,
the provisioning_blocks module will emit an event indicating that
provisioning has completed for that object.
7. A subscriber to this event on the server will then update the status
of the object to ACTIVE to indicate that it is fully provisioned.
For a more concrete example, see the section below.
ML2, L2 agents, and DHCP agents
-------------------------------
ML2 makes use of the provisioning_blocks module to prevent the status
of ports from being transitioned to ACTIVE until both the L2 agent and
the DHCP agent have finished wiring a port.
When a port is created or updated, the following happens to register
the DHCP agent's provisioning blocks:
1. The subnet_ids are extracted from the fixed_ips field of the port
and then ML2 checks to see if DHCP is enabled on any of the subnets.
2. The configuration for the DHCP agents hosting the network are looked
up to ensure that at least one of them is new enough to report back
that it has finished setting up the port reservation.
3. If either of the preconditions above fail, a provisioning block for
the DHCP agent is not added and any existing DHCP agent blocks for
that port are cleared to ensure the port isn't blocked waiting for an
event that will never happen.
4. If the preconditions pass, a provisioning block is added for the port
under the 'DHCP' entity.
When a port is created or updated, the following happens to register the
L2 agent's provisioning blocks:
1. If the port is not bound, nothing happens because we don't know yet
if an L2 agent is involved so we have to wait until a port update that
binds it.
2. Once the port is bound, the agent based mechanism drivers will check
if they have an agent on the bound host and if the VNIC type belongs
to the mechanism driver, a provisioning block is added for the port
under the 'L2 Agent' entity.
Once the DHCP agent has finished setting up the reservation, it calls
dhcp_ready_on_ports via the RPC API with the port ID. The DHCP RPC
handler receives this and calls 'provisioning_complete' in the
provisioning module with the port ID and the 'DHCP' entity to remove
the provisioning block.
Once the L2 agent has finished setting up the reservation, it calls
the normal update_device_list (or update_device_up) via the RPC API.
The RPC callbacks handler calls 'provisioning_complete' with the
port ID and the 'L2 Agent' entity to remove the provisioning block.
On the 'provisioning_complete' call that removes the last record,
the provisioning_blocks module emits a callback PROVISIONING_COMPLETE
event with the port ID. A function subscribed to this in ML2 then calls
update_port_status to set the port to ACTIVE.
At this point the normal notification is emitted to Nova allowing the
VM to be unpaused.
In the event that the DHCP or L2 agent is down, the port will not
transition to the ACTIVE status (as is the case now if the L2 agent
is down). Agents must account for this by telling the server that
wiring has been completed after configuring everything during
startup. This ensures that ports created on offline agents (or agents
that crash and restart) eventually become active.
To account for server instability, the notifications about port wiring
be complete must use RPC calls so the agent gets a positive
acknowledgement from the server and it must keep retrying until either
the port is deleted or it is successful.
If an ML2 driver immediately places a bound port in the ACTIVE state
(e.g. after calling a backend in update_port_postcommit), this patch
will not have any impact on that process.
References
----------
.. [#] Provisioning Blocks Module: http://git.openstack.org/cgit/openstack/neutron/tree/neutron/db/provisioning_blocks.py

View File

@ -54,6 +54,7 @@ class DhcpAgent(manager.Manager):
def __init__(self, host=None, conf=None):
super(DhcpAgent, self).__init__(host=host)
self.needs_resync_reasons = collections.defaultdict(list)
self.dhcp_ready_ports = set()
self.conf = conf or cfg.CONF
self.cache = NetworkCache()
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
@ -97,6 +98,7 @@ class DhcpAgent(manager.Manager):
"""Activate the DHCP agent."""
self.sync_state()
self.periodic_resync()
self.start_ready_ports_loop()
def call_driver(self, action, network, **action_kwargs):
"""Invoke an action on a DHCP driver instance."""
@ -169,6 +171,9 @@ class DhcpAgent(manager.Manager):
network.id in only_nets): # specific network to sync
pool.spawn(self.safe_configure_dhcp_for_network, network)
pool.waitall()
# we notify all ports in case some were created while the agent
# was down
self.dhcp_ready_ports |= set(self.cache.get_port_ids())
LOG.info(_LI('Synchronizing state complete'))
except Exception as e:
@ -179,6 +184,37 @@ class DhcpAgent(manager.Manager):
self.schedule_resync(e)
LOG.exception(_LE('Unable to sync network state.'))
def _dhcp_ready_ports_loop(self):
"""Notifies the server of any ports that had reservations setup."""
while True:
# this is just watching a set so we can do it really frequently
eventlet.sleep(0.1)
if self.dhcp_ready_ports:
ports_to_send = self.dhcp_ready_ports
self.dhcp_ready_ports = set()
try:
self.plugin_rpc.dhcp_ready_on_ports(ports_to_send)
continue
except oslo_messaging.MessagingTimeout:
LOG.error(_LE("Timeout notifying server of ports ready. "
"Retrying..."))
except Exception as e:
if (isinstance(e, oslo_messaging.RemoteError)
and e.exc_type == 'NoSuchMethod'):
LOG.info(_LI("Server does not support port ready "
"notifications. Waiting for 5 minutes "
"before retrying."))
eventlet.sleep(300)
continue
LOG.exception(_LE("Failure notifying DHCP server of "
"ready DHCP ports. Will retry on next "
"iteration."))
self.dhcp_ready_ports |= ports_to_send
def start_ready_ports_loop(self):
"""Spawn a thread to push changed ports to server."""
eventlet.spawn(self._dhcp_ready_ports_loop)
@utils.exception_logger()
def _periodic_resync_helper(self):
"""Resync the dhcp state at the configured interval."""
@ -348,6 +384,7 @@ class DhcpAgent(manager.Manager):
driver_action = 'restart'
self.cache.put_port(updated_port)
self.call_driver(driver_action, network)
self.dhcp_ready_ports.add(updated_port.id)
def _is_port_on_this_agent(self, port):
thishost = utils.get_dhcp_agent_device_id(
@ -421,6 +458,7 @@ class DhcpPluginApi(object):
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
and update_dhcp_port methods.
1.5 - Added dhcp_ready_on_ports
"""
@ -471,6 +509,12 @@ class DhcpPluginApi(object):
network_id=network_id, device_id=device_id,
host=self.host)
def dhcp_ready_on_ports(self, port_ids):
"""Notify the server that DHCP is configured for the port."""
cctxt = self.client.prepare(version='1.5')
return cctxt.call(self.context, 'dhcp_ready_on_ports',
port_ids=port_ids)
class NetworkCache(object):
"""Agent cache of the current network state."""
@ -479,6 +523,9 @@ class NetworkCache(object):
self.subnet_lookup = {}
self.port_lookup = {}
def get_port_ids(self):
return self.port_lookup.keys()
def get_network_ids(self):
return self.cache.keys()
@ -563,6 +610,7 @@ class DhcpAgentWithStateReport(DhcpAgent):
'availability_zone': self.conf.AGENT.availability_zone,
'topic': topics.DHCP_AGENT,
'configurations': {
'notifies_port_ready': True,
'dhcp_driver': self.conf.dhcp_driver,
'dhcp_lease_duration': self.conf.dhcp_lease_duration,
'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats},

View File

@ -26,10 +26,12 @@ import oslo_messaging
from oslo_utils import excutils
from neutron._i18n import _, _LW
from neutron.callbacks import resources
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.db import api as db_api
from neutron.db import provisioning_blocks
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.common import utils as p_utils
@ -64,9 +66,10 @@ class DhcpRpcCallback(object):
# 1.4 - Removed update_lease_expiration. It's not used by reference
# DHCP agent since Juno, so similar rationale for not bumping the
# major version as above applies here too.
# 1.5 - Added dhcp_ready_on_ports.
target = oslo_messaging.Target(
namespace=n_const.RPC_NAMESPACE_DHCP_PLUGIN,
version='1.4')
version='1.5')
def _get_active_networks(self, context, **kwargs):
"""Retrieve and return a list of the active networks."""
@ -225,3 +228,9 @@ class DhcpRpcCallback(object):
{'port': port,
'host': host})
return self._port_action(plugin, context, port, 'update_port')
def dhcp_ready_on_ports(self, context, port_ids):
for port_id in port_ids:
provisioning_blocks.provisioning_complete(
context, port_id, resources.PORT,
provisioning_blocks.DHCP_ENTITY)

View File

@ -1 +1 @@
d3435b514502
30107ab6a3ee

View File

@ -0,0 +1,39 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""provisioning_blocks.py
Revision ID: 30107ab6a3ee
Revises: d3435b514502
Create Date: 2016-04-15 05:59:59.000001
"""
# revision identifiers, used by Alembic.
revision = '30107ab6a3ee'
down_revision = 'd3435b514502'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'provisioningblocks',
sa.Column('standard_attr_id', sa.BigInteger(),
sa.ForeignKey('standardattributes.id', ondelete='CASCADE'),
nullable=False, primary_key=True),
sa.Column('entity', sa.String(length=255), nullable=False,
primary_key=True),
)

View File

@ -42,6 +42,7 @@ from neutron.db import model_base
from neutron.db import models_v2 # noqa
from neutron.db import portbindings_db # noqa
from neutron.db import portsecurity_db # noqa
from neutron.db import provisioning_blocks # noqa
from neutron.db.qos import models as qos_models # noqa
from neutron.db.quota import models # noqa
from neutron.db import rbac_db_models # noqa

View File

@ -0,0 +1,168 @@
# Copyright 2016 Mirantis, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_db import exception as db_exc
from oslo_log import log as logging
import sqlalchemy as sa
from neutron._i18n import _LE
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.db import models_v2
LOG = logging.getLogger(__name__)
PROVISIONING_COMPLETE = 'provisioning_complete'
# identifiers for the various entities that participate in provisioning
DHCP_ENTITY = 'DHCP'
L2_AGENT_ENTITY = 'L2'
_RESOURCE_TO_MODEL_MAP = {resources.PORT: models_v2.Port}
class ProvisioningBlock(model_base.BASEV2):
# the standard attr id of the thing we want to block
standard_attr_id = (
sa.Column(sa.BigInteger().with_variant(sa.Integer(), 'sqlite'),
sa.ForeignKey(model_base.StandardAttribute.id,
ondelete="CASCADE"),
primary_key=True))
# the entity that wants to block the status change (e.g. L2 Agent)
entity = sa.Column(sa.String(255), nullable=False, primary_key=True)
def add_model_for_resource(resource, model):
"""Adds a mapping between a callback resource and a DB model."""
_RESOURCE_TO_MODEL_MAP[resource] = model
def add_provisioning_component(context, object_id, object_type, entity):
"""Adds a provisioning block by an entity to a given object.
Adds a provisioning block to the DB for object_id with an identifier
of the entity that is doing the provisioning. While an object has these
provisioning blocks present, this module will not emit any callback events
indicating that provisioning has completed. Any logic that depends on
multiple disjoint components use these blocks and subscribe to the
PROVISIONING_COMPLETE event to know when all components have completed.
:param context: neutron api request context
:param object_id: ID of object that has been provisioned
:param object_type: callback resource type of the object
:param entity: The entity that has provisioned the object
"""
log_dict = {'entity': entity, 'oid': object_id, 'otype': object_type}
# we get an object's ID, so we need to convert that into a standard attr id
standard_attr_id = _get_standard_attr_id(context, object_id, object_type)
if not standard_attr_id:
return
try:
with db_api.autonested_transaction(context.session):
record = ProvisioningBlock(standard_attr_id=standard_attr_id,
entity=entity)
context.session.add(record)
except db_exc.DBDuplicateEntry:
# an entry could be leftover from a previous transition that hasn't
# yet been provisioned. (e.g. multiple updates in a short period)
LOG.debug("Ignored duplicate provisioning block setup for %(otype)s "
"%(oid)s by entity %(entity)s.", log_dict)
return
LOG.debug("Transition to ACTIVE for %(otype)s object %(oid)s "
"will not be triggered until provisioned by entity %(entity)s.",
log_dict)
def remove_provisioning_component(context, object_id, object_type, entity,
standard_attr_id=None):
"""Removes a provisioning block for an object with triggering a callback.
Removes a provisioning block without triggering a callback. A user of this
module should call this when a block is no longer correct. If the block has
been satisfied, the 'provisioning_complete' method should be called.
:param context: neutron api request context
:param object_id: ID of object that has been provisioned
:param object_type: callback resource type of the object
:param entity: The entity that has provisioned the object
:param standard_attr_id: Optional ID to pass to the function to avoid the
extra DB lookup to translate the object_id into
the standard_attr_id.
:return: boolean indicating whether or not a record was deleted
"""
with context.session.begin(subtransactions=True):
standard_attr_id = standard_attr_id or _get_standard_attr_id(
context, object_id, object_type)
if not standard_attr_id:
return False
record = context.session.query(ProvisioningBlock).filter_by(
standard_attr_id=standard_attr_id, entity=entity).first()
if record:
context.session.delete(record)
return True
return False
def provisioning_complete(context, object_id, object_type, entity):
"""Mark that the provisioning for object_id has been completed by entity.
Marks that an entity has finished provisioning an object. If there are
no remaining provisioning components, a callback will be triggered
indicating that provisioning has been completed for the object. Subscribers
to this callback must be idempotent because it may be called multiple
times in high availability deployments.
:param context: neutron api request context
:param object_id: ID of object that has been provisioned
:param object_type: callback resource type of the object
:param entity: The entity that has provisioned the object
"""
log_dict = {'oid': object_id, 'entity': entity, 'otype': object_type}
# this can't be called in a transaction to avoid REPEATABLE READ
# tricking us into thinking there are remaining provisioning components
if context.session.is_active:
raise RuntimeError(_LE("Must not be called in a transaction"))
standard_attr_id = _get_standard_attr_id(context, object_id,
object_type)
if not standard_attr_id:
return
if remove_provisioning_component(context, object_id, object_type, entity,
standard_attr_id):
LOG.debug("Provisioning for %(otype)s %(oid)s completed by entity "
"%(entity)s.", log_dict)
# now with that committed, check if any records are left. if None, emit
# an event that provisioning is complete.
records = context.session.query(ProvisioningBlock).filter_by(
standard_attr_id=standard_attr_id).count()
if not records:
LOG.debug("Provisioning complete for %(otype)s %(oid)s", log_dict)
registry.notify(object_type, PROVISIONING_COMPLETE,
'neutron.db.provisioning_blocks',
context=context, object_id=object_id)
def _get_standard_attr_id(context, object_id, object_type):
model = _RESOURCE_TO_MODEL_MAP.get(object_type)
if not model:
raise RuntimeError(_LE("Could not find model for %s. If you are "
"adding provisioning blocks for a new resource "
"you must call add_model_for_resource during "
"initialization for your type.") % object_type)
obj = (context.session.query(model).enable_eagerloads(False).
filter_by(id=object_id).first())
if not obj:
# concurrent delete
LOG.debug("Could not find standard attr ID for object %s.", object_id)
return
return obj.standard_attr_id

View File

@ -283,3 +283,11 @@ def get_dvr_port_bindings(session, port_id):
if not bindings:
LOG.debug("No bindings for DVR port %s", port_id)
return bindings
def is_dhcp_active_on_any_subnet(context, subnet_ids):
if not subnet_ids:
return False
return bool(context.session.query(models_v2.Subnet).
enable_eagerloads(False).filter_by(enable_dhcp=True).
filter(models_v2.Subnet.id.in_(subnet_ids)).count())

View File

@ -19,6 +19,9 @@ from oslo_log import log
import six
from neutron._i18n import _LW
from neutron.callbacks import resources
from neutron.common import constants
from neutron.db import provisioning_blocks
from neutron.extensions import portbindings
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2 import driver_api as api
@ -53,6 +56,32 @@ class AgentMechanismDriverBase(api.MechanismDriver):
def initialize(self):
pass
def create_port_precommit(self, context):
self._insert_provisioning_block(context)
def update_port_precommit(self, context):
if context.host == context.original_host:
return
self._insert_provisioning_block(context)
def _insert_provisioning_block(self, context):
# we insert a status barrier to prevent the port from transitioning
# to active until the agent reports back that the wiring is done
port = context.current
if not context.host or port['status'] == constants.PORT_STATUS_ACTIVE:
# no point in putting in a block if the status is already ACTIVE
return
vnic_type = context.current.get(portbindings.VNIC_TYPE,
portbindings.VNIC_NORMAL)
if vnic_type not in self.supported_vnic_types:
# we check the VNIC type because there could be multiple agents
# on a single host with different VNIC types
return
if context.host_agents(self.agent_type):
provisioning_blocks.add_provisioning_component(
context._plugin_context, port['id'], resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def bind_port(self, context):
LOG.debug("Attempting to bind port %(port)s on "
"network %(network)s",

View File

@ -59,6 +59,7 @@ from neutron.db import external_net_db
from neutron.db import extradhcpopt_db
from neutron.db import models_v2
from neutron.db import netmtu_db
from neutron.db import provisioning_blocks
from neutron.db.quota import driver # noqa
from neutron.db import securitygroups_db
from neutron.db import securitygroups_rpc_base as sg_db_rpc
@ -159,6 +160,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.type_manager.initialize()
self.extension_manager.initialize()
self.mechanism_manager.initialize()
registry.subscribe(self._port_provisioned, resources.PORT,
provisioning_blocks.PROVISIONING_COMPLETE)
self._setup_dhcp()
self._start_rpc_notifiers()
self.add_agent_status_check(self.agent_health_check)
@ -195,6 +198,23 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
driver=extension_driver, service_plugin=service_plugin
)
def _port_provisioned(self, rtype, event, trigger, context, object_id,
**kwargs):
port_id = object_id
port = db.get_port(context.session, port_id)
if not port:
LOG.debug("Port %s was deleted so its status cannot be updated.",
port_id)
return
if port.port_binding.vif_type in (portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_UNBOUND):
# NOTE(kevinbenton): we hit here when a port is created without
# a host ID and the dhcp agent notifies that its wiring is done
LOG.debug("Port %s cannot update to ACTIVE because it "
"is not bound.", port_id)
return
self.update_port_status(context, port_id, const.PORT_STATUS_ACTIVE)
@property
def supported_qos_rule_types(self):
return self.mechanism_manager.supported_qos_rule_types
@ -1056,6 +1076,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
elif self._check_update_has_security_groups(port):
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
def _setup_dhcp_agent_provisioning_component(self, context, port):
subnet_ids = [f['subnet_id'] for f in port['fixed_ips']]
if (db.is_dhcp_active_on_any_subnet(context, subnet_ids) and
any(self.get_configuration_dict(a).get('notifies_port_ready')
for a in self.get_dhcp_agents_hosting_networks(
context, [port['network_id']]))):
# at least one of the agents will tell us when the dhcp config
# is ready so we setup a provisioning component to prevent the
# port from going ACTIVE until a dhcp_ready_on_port
# notification is received.
provisioning_blocks.add_provisioning_component(
context, port['id'], resources.PORT,
provisioning_blocks.DHCP_ENTITY)
else:
provisioning_blocks.remove_provisioning_component(
context, port['id'], resources.PORT,
provisioning_blocks.DHCP_ENTITY)
def _create_port_db(self, context, port):
attrs = port[attributes.PORT]
if not attrs.get('status'):
@ -1086,6 +1124,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self._process_port_create_extra_dhcp_opts(context, result,
dhcp_opts)
self.mechanism_manager.create_port_precommit(mech_context)
self._setup_dhcp_agent_provisioning_component(context, result)
self._apply_dict_extend_functions('ports', result, port_db)
return result, mech_context
@ -1271,6 +1310,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
bound_mech_contexts.append(dvr_mech_context)
else:
self.mechanism_manager.update_port_precommit(mech_context)
self._setup_dhcp_agent_provisioning_component(
context, updated_port)
bound_mech_contexts.append(mech_context)
# Notifications must be sent after the above transaction is complete
@ -1572,6 +1613,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if updated:
self.mechanism_manager.update_port_postcommit(mech_context)
kwargs = {'context': context, 'port': mech_context.current,
'original_port': original_port}
if status == const.PORT_STATUS_ACTIVE:
# NOTE(kevinbenton): this kwarg was carried over from
# the RPC handler that used to call this. it's not clear
# who uses it so maybe it can be removed. added in commit
# 3f3874717c07e2b469ea6c6fd52bcb4da7b380c7
kwargs['update_device_up'] = True
registry.notify(resources.PORT, events.AFTER_UPDATE, self,
**kwargs)
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
db.delete_dvr_port_binding_if_stale(session, binding)
@ -1579,20 +1630,22 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return port['id']
def port_bound_to_host(self, context, port_id, host):
if not host:
return
port = db.get_port(context.session, port_id)
if not port:
LOG.debug("No Port match for: %s", port_id)
return False
return
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
bindings = db.get_dvr_port_bindings(context.session, port_id)
for b in bindings:
if b.host == host:
return True
return port
LOG.debug("No binding found for DVR port %s", port['id'])
return False
return
else:
port_host = db.get_port_binding_host(context.session, port_id)
return (port_host == host)
return port if (port_host == host) else None
def get_ports_from_devices(self, context, devices):
port_ids_to_devices = dict(

View File

@ -14,7 +14,6 @@
# under the License.
from neutron_lib import constants as n_const
from neutron_lib import exceptions
from oslo_log import log
import oslo_messaging
from sqlalchemy.orm import exc
@ -22,14 +21,14 @@ from sqlalchemy.orm import exc
from neutron._i18n import _LE, _LW
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import provisioning_blocks
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
from neutron import manager
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.services.qos import qos_consts
@ -205,31 +204,30 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
port = plugin.port_bound_to_host(rpc_context, port_id, host)
if host and not port:
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
return
port_id = plugin.update_port_status(rpc_context, port_id,
n_const.PORT_STATUS_ACTIVE,
host)
try:
# NOTE(armax): it's best to remove all objects from the
# session, before we try to retrieve the new port object
rpc_context.session.expunge_all()
port = plugin._get_port(rpc_context, port_id)
except exceptions.PortNotFound:
LOG.debug('Port %s not found during update', port_id)
if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE:
# NOTE(kevinbenton): we have to special case DVR ports because of
# the special multi-binding status update logic they have that
# depends on the host
plugin.update_port_status(rpc_context, port_id,
n_const.PORT_STATUS_ACTIVE, host)
else:
kwargs = {
'context': rpc_context,
'port': port,
'update_device_up': True
}
registry.notify(
resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
# _device_to_port_id may have returned a truncated UUID if the
# agent did not provide a full one (e.g. Linux Bridge case). We
# need to look up the full one before calling provisioning_complete
if not port:
port = ml2_db.get_port(rpc_context.session, port_id)
if not port:
# port doesn't exist, no need to add a provisioning block
return
provisioning_blocks.provisioning_complete(
rpc_context, port['id'], resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def update_device_list(self, rpc_context, **kwargs):
devices_up = []

View File

@ -245,6 +245,9 @@ class TestDhcpAgent(base.BaseTestCase):
state_rpc_str = 'neutron.agent.rpc.PluginReportStateAPI'
# sync_state is needed for this test
cfg.CONF.set_override('report_interval', 1, 'AGENT')
mock_start_ready = mock.patch.object(
dhcp_agent.DhcpAgentWithStateReport, 'start_ready_ports_loop',
autospec=True).start()
with mock.patch.object(dhcp_agent.DhcpAgentWithStateReport,
'sync_state',
autospec=True) as mock_sync_state:
@ -267,6 +270,7 @@ class TestDhcpAgent(base.BaseTestCase):
agent_mgr.after_start()
mock_sync_state.assert_called_once_with(agent_mgr)
mock_periodic_resync.assert_called_once_with(agent_mgr)
mock_start_ready.assert_called_once_with(agent_mgr)
state_rpc.assert_has_calls(
[mock.call(mock.ANY),
mock.call().report_state(mock.ANY, mock.ANY,
@ -277,11 +281,13 @@ class TestDhcpAgent(base.BaseTestCase):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
['sync_state', 'periodic_resync']])
['sync_state', 'periodic_resync',
'start_ready_ports_loop']])
with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks:
dhcp.run()
mocks['sync_state'].assert_called_once_with()
mocks['periodic_resync'].assert_called_once_with()
mocks['start_ready_ports_loop'].assert_called_once_with()
def test_call_driver(self):
network = mock.Mock()
@ -350,12 +356,14 @@ class TestDhcpAgent(base.BaseTestCase):
with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks:
mocks['cache'].get_network_ids.return_value = known_net_ids
mocks['cache'].get_port_ids.return_value = range(4)
dhcp.sync_state()
diff = set(known_net_ids) - set(active_net_ids)
exp_disable = [mock.call(net_id) for net_id in diff]
mocks['cache'].assert_has_calls([mock.call.get_network_ids()])
mocks['disable_dhcp_helper'].assert_has_calls(exp_disable)
self.assertEqual(set(range(4)), dhcp.dhcp_ready_ports)
def test_sync_state_initial(self):
self._test_sync_state_helper([], ['a'])
@ -410,6 +418,55 @@ class TestDhcpAgent(base.BaseTestCase):
dhcp.periodic_resync()
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
def test_start_ready_ports_loop(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
dhcp.start_ready_ports_loop()
spawn.assert_called_once_with(dhcp._dhcp_ready_ports_loop)
def test__dhcp_ready_ports_doesnt_log_exception_on_timeout(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.dhcp_ready_ports = set(range(4))
with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports',
side_effect=oslo_messaging.MessagingTimeout):
# exit after 2 iterations
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
side_effect=[0, 0, RuntimeError]):
with mock.patch.object(dhcp_agent.LOG, 'exception') as lex:
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
self.assertFalse(lex.called)
def test__dhcp_ready_ports_disables_on_incompatible_server(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.agent_state = dict(configurations=dict(notifies_port_ready=True))
dhcp.dhcp_ready_ports = set(range(4))
side_effect = oslo_messaging.RemoteError(exc_type='NoSuchMethod')
with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports',
side_effect=side_effect):
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
side_effect=[None, RuntimeError]) as sleep:
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
# should have slept for 5 minutes
sleep.assert_called_with(300)
def test__dhcp_ready_ports_loop(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.dhcp_ready_ports = set(range(4))
with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports',
side_effect=[RuntimeError, 0]) as ready:
# exit after 2 iterations
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
side_effect=[0, 0, RuntimeError]):
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
# should have been called with all ports again after the failure
ready.assert_has_calls([mock.call(set(range(4)))] * 2)
def test_report_state_revival_logic(self):
dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME)
with mock.patch.object(dhcp.state_rpc,
@ -1123,6 +1180,18 @@ class TestNetworkCache(base.BaseTestCase):
self.assertEqual(nc.get_network_by_port_id(fake_port1.id),
fake_network)
def test_get_port_ids(self):
fake_net = dhcp.NetModel(
dict(id='12345678-1234-5678-1234567890ab',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
subnets=[fake_subnet1],
ports=[fake_port1]))
nc = dhcp_agent.NetworkCache()
nc.put(fake_net)
nc.put_port(fake_port2)
self.assertEqual(set([fake_port1['id'], fake_port2['id']]),
set(nc.get_port_ids()))
def test_put_port(self):
fake_net = dhcp.NetModel(
dict(id='12345678-1234-5678-1234567890ab',

View File

@ -19,9 +19,11 @@ from neutron_lib import exceptions as n_exc
from oslo_db import exception as db_exc
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.callbacks import resources
from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.common import utils
from neutron.db import provisioning_blocks
from neutron.extensions import portbindings
from neutron.tests import base
@ -251,3 +253,14 @@ class TestDhcpRpcCallback(base.BaseTestCase):
self.plugin.assert_has_calls([
mock.call.delete_ports_by_device_id(mock.ANY, 'devid', 'netid')])
def test_dhcp_ready_on_ports(self):
context = mock.Mock()
port_ids = range(10)
with mock.patch.object(provisioning_blocks,
'provisioning_complete') as pc:
self.callbacks.dhcp_ready_on_ports(context, port_ids)
calls = [mock.call(context, port_id, resources.PORT,
provisioning_blocks.DHCP_ENTITY)
for port_id in port_ids]
pc.assert_has_calls(calls)

View File

@ -0,0 +1,130 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron import context as n_ctx
from neutron.db import models_v2
from neutron.db import provisioning_blocks as pb
from neutron.tests.unit import testlib_api
class TestStatusBarriers(testlib_api.SqlTestCase):
def setUp(self):
super(TestStatusBarriers, self).setUp()
self.ctx = n_ctx.get_admin_context()
self.provisioned = mock.Mock()
self.port = self._make_port()
registry.subscribe(self.provisioned, resources.PORT,
pb.PROVISIONING_COMPLETE)
def _make_net(self):
with self.ctx.session.begin():
net = models_v2.Network(name='net_net', status='ACTIVE',
tenant_id='1', admin_state_up=True)
self.ctx.session.add(net)
return net
def _make_port(self):
net = self._make_net()
with self.ctx.session.begin():
port = models_v2.Port(networks=net, mac_address='1', tenant_id='1',
admin_state_up=True, status='DOWN',
device_id='2', device_owner='3')
self.ctx.session.add(port)
return port
def test_no_callback_on_missing_object(self):
pb.provisioning_complete(self.ctx, 'someid', resources.PORT, 'entity')
self.assertFalse(self.provisioned.called)
def test_provisioned_with_no_components(self):
pb.provisioning_complete(self.ctx, self.port.id, resources.PORT,
'entity')
self.assertTrue(self.provisioned.called)
def test_provisioned_after_component_finishes(self):
pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT,
'entity')
pb.provisioning_complete(self.ctx, self.port.id, resources.PORT,
'entity')
self.assertTrue(self.provisioned.called)
def test_not_provisioned_until_final_component_complete(self):
pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT,
'entity1')
pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT,
'entity2')
pb.provisioning_complete(self.ctx, self.port.id, resources.PORT,
'entity1')
self.assertFalse(self.provisioned.called)
pb.provisioning_complete(self.ctx, self.port.id, resources.PORT,
'entity2')
self.assertTrue(self.provisioned.called)
def test_provisioning_of_correct_item(self):
port2 = self._make_port()
pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT,
'entity1')
pb.provisioning_complete(self.ctx, port2.id,
resources.PORT, 'entity1')
self.provisioned.assert_called_once_with(
resources.PORT, pb.PROVISIONING_COMPLETE, mock.ANY,
context=self.ctx, object_id=port2.id)
def test_not_provisioned_when_wrong_component_reports(self):
pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT,
'entity1')
pb.provisioning_complete(self.ctx, self.port.id,
resources.PORT, 'entity2')
self.assertFalse(self.provisioned.called)
def test_remove_provisioning_component(self):
pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT,
'e1')
pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT,
'e2')
self.assertTrue(pb.remove_provisioning_component(
self.ctx, self.port.id, resources.PORT, 'e1'))
self.assertFalse(self.provisioned.called)
pb.provisioning_complete(self.ctx, self.port.id,
resources.PORT, 'other')
self.assertFalse(self.provisioned.called)
pb.provisioning_complete(self.ctx, self.port.id,
resources.PORT, 'e2')
self.assertTrue(self.provisioned.called)
def test_adding_component_idempotent(self):
for i in range(5):
pb.add_provisioning_component(self.ctx, self.port.id,
resources.PORT, 'entity1')
pb.provisioning_complete(self.ctx, self.port.id,
resources.PORT, 'entity1')
self.assertTrue(self.provisioned.called)
def test_adding_component_for_new_resource_type(self):
provisioned = mock.Mock()
registry.subscribe(provisioned, 'NETWORK', pb.PROVISIONING_COMPLETE)
net = self._make_net()
# expect failed because the model was not registered for the type
with testtools.ExpectedException(RuntimeError):
pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
pb.add_model_for_resource('NETWORK', models_v2.Network)
pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
pb.provisioning_complete(self.ctx, net.id, 'NETWORK', 'ent')
self.assertTrue(provisioned.called)

View File

@ -39,6 +39,7 @@ from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.db import provisioning_blocks
from neutron.extensions import availability_zone as az_ext
from neutron.extensions import external_net
from neutron.extensions import multiprovidernet as mpnet
@ -556,6 +557,45 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
plugin.update_port_status(ctx, short_id, 'UP')
mock_gbl.assert_called_once_with(mock.ANY, port_id, mock.ANY)
def _add_fake_dhcp_agent(self):
agent = mock.Mock(configurations='{"notifies_port_ready": true}')
plugin = manager.NeutronManager.get_plugin()
self.get_dhcp_mock = mock.patch.object(
plugin, 'get_dhcp_agents_hosting_networks',
return_value=[agent]).start()
def test_dhcp_provisioning_blocks_inserted_on_create_with_agents(self):
self._add_fake_dhcp_agent()
with mock.patch.object(provisioning_blocks,
'add_provisioning_component') as ap:
with self.port():
self.assertTrue(ap.called)
def test_dhcp_provisioning_blocks_skipped_on_create_with_no_dhcp(self):
self._add_fake_dhcp_agent()
with self.subnet(enable_dhcp=False) as subnet:
with mock.patch.object(provisioning_blocks,
'add_provisioning_component') as ap:
with self.port(subnet=subnet):
self.assertFalse(ap.called)
def test_dhcp_provisioning_blocks_inserted_on_update(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
self._add_fake_dhcp_agent()
with self.port() as port:
with mock.patch.object(provisioning_blocks,
'add_provisioning_component') as ap:
port['port']['binding:host_id'] = 'newhost'
plugin.update_port(ctx, port['port']['id'], port)
self.assertTrue(ap.called)
def test_dhcp_provisioning_blocks_removed_without_dhcp_agents(self):
with mock.patch.object(provisioning_blocks,
'remove_provisioning_component') as cp:
with self.port():
self.assertTrue(cp.called)
def test_update_port_fixed_ip_changed(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()

View File

@ -21,14 +21,15 @@ import collections
import mock
from neutron_lib import constants
from neutron_lib import exceptions
from oslo_config import cfg
from oslo_context import context as oslo_context
import oslo_messaging
from sqlalchemy.orm import exc
from neutron.agent import rpc as agent_rpc
from neutron.callbacks import resources
from neutron.common import topics
from neutron.db import provisioning_blocks
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc as plugin_rpc
@ -51,29 +52,27 @@ class RpcCallbacksTestCase(base.BaseTestCase):
plugin_rpc.manager, 'NeutronManager').start()
self.plugin = self.manager.get_plugin()
def _test_update_device_up(self):
def _test_update_device_up(self, host=None):
kwargs = {
'agent_id': 'foo_agent',
'device': 'foo_device'
'device': 'foo_device',
'host': host
}
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
'._device_to_port_id'):
with mock.patch('neutron.callbacks.registry.notify') as notify:
with mock.patch('neutron.db.provisioning_blocks.'
'provisioning_complete') as pc:
self.callbacks.update_device_up(mock.Mock(), **kwargs)
return notify
return pc
def test_update_device_up_notify(self):
notify = self._test_update_device_up()
kwargs = {
'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True
}
notify.assert_called_once_with(
'port', 'after_update', self.plugin, **kwargs)
notify.assert_called_once_with(mock.ANY, mock.ANY, resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def test_update_device_up_notify_not_sent_with_port_not_found(self):
self.plugin._get_port.side_effect = (
exceptions.PortNotFound(port_id='foo_port_id'))
notify = self._test_update_device_up()
self.plugin.port_bound_to_host.return_value = False
notify = self._test_update_device_up('host')
self.assertFalse(notify.call_count)
def test_get_device_details_without_port_context(self):
@ -93,7 +92,7 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def test_get_device_details_port_status_equal_new_status(self):
port = collections.defaultdict(lambda: 'fake')
self.plugin.get_bound_port_context().current = port
self.plugin.port_bound_to_host = mock.MagicMock(return_value=True)
self.plugin.port_bound_to_host = port
for admin_state_up in (True, False):
new_status = (constants.PORT_STATUS_BUILD if admin_state_up
else constants.PORT_STATUS_DOWN)