Merge "[OVS] Fix live-migration connection disruption"

This commit is contained in:
Zuul 2021-01-18 13:39:24 +00:00 committed by Gerrit Code Review
commit c7bc883061
9 changed files with 134 additions and 21 deletions
neutron
agent
conf
db
notifiers
plugins/ml2
drivers/openvswitch/agent
plugin.py
tests/unit
agent
plugins/ml2/drivers/openvswitch/agent
zuul.d

@ -25,8 +25,10 @@ from neutron_lib.callbacks import resources as callback_resources
from neutron_lib import constants
from neutron_lib.plugins import utils
from neutron_lib import rpc as lib_rpc
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from neutron.agent import resource_cache
@ -329,8 +331,10 @@ class CacheBackedPluginApi(PluginApi):
binding = utils.get_port_binding_by_status_and_host(
port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
if (port_obj.device_owner.startswith(
constants.DEVICE_OWNER_COMPUTE_PREFIX) and
migrating_to = migrating_to_host(port_obj.bindings)
if (not (migrating_to and cfg.CONF.nova.live_migration_events) and
port_obj.device_owner.startswith(
constants.DEVICE_OWNER_COMPUTE_PREFIX) and
binding[pb_ext.HOST] != host):
LOG.debug("Device %s has no active binding in this host",
port_obj)
@ -366,7 +370,8 @@ class CacheBackedPluginApi(PluginApi):
'profile': binding.profile,
'vif_type': binding.vif_type,
'vnic_type': binding.vnic_type,
'security_groups': list(port_obj.security_group_ids)
'security_groups': list(port_obj.security_group_ids),
'migrating_to': migrating_to,
}
LOG.debug("Returning: %s", entry)
return entry
@ -381,3 +386,30 @@ class CacheBackedPluginApi(PluginApi):
rcache = resource_cache.RemoteResourceCache(self.RESOURCE_TYPES)
rcache.start_watcher()
self.remote_resource_cache = rcache
# TODO(ralonsoh): move this method to neutron_lib.plugins.utils
def migrating_to_host(bindings, host=None):
"""Return the host the port is being migrated.
If the host is passed, the port binding profile with the "migrating_to",
that contains the host the port is being migrated, is compared to this
value. If no value is passed, this method will return if the port is
being migrated ("migrating_to" is present in any port binding profile).
The function returns None or the matching host.
"""
for binding in (binding for binding in bindings if
binding[pb_ext.STATUS] == constants.ACTIVE):
profile = binding.get('profile')
if not profile:
continue
profile = (jsonutils.loads(profile) if isinstance(profile, str) else
profile)
migrating_to = profile.get('migrating_to')
if migrating_to:
if not host: # Just know if the port is being migrated.
return migrating_to
if migrating_to == host:
return migrating_to
return None

@ -174,6 +174,24 @@ nova_opts = [
help=_('Type of the nova endpoint to use. This endpoint will'
' be looked up in the keystone catalog and should be'
' one of public, internal or admin.')),
cfg.BoolOpt('live_migration_events', default=False,
help=_('When this option is enabled, during the live '
'migration, the OVS agent will only send the '
'"vif-plugged-event" when the destination host '
'interface is bound. This option also disables any '
'other agent (like DHCP) to send to Nova this event '
'when the port is provisioned.'
'This option can be enabled if Nova patch '
'https://review.opendev.org/c/openstack/nova/+/767368 '
'is in place.'
'This option is temporary and will be removed in Y and '
'the behavior will be "True".'),
deprecated_for_removal=True,
deprecated_reason=(
'In Y the Nova patch '
'https://review.opendev.org/c/openstack/nova/+/767368 '
'will be in the code even when running a Nova server in '
'X.')),
]

@ -138,8 +138,7 @@ def provisioning_complete(context, object_id, object_type, entity):
context, standard_attr_id=standard_attr_id):
LOG.debug("Provisioning complete for %(otype)s %(oid)s triggered by "
"entity %(entity)s.", log_dict)
registry.publish(object_type, PROVISIONING_COMPLETE,
'neutron.db.provisioning_blocks',
registry.publish(object_type, PROVISIONING_COMPLETE, entity,
payload=events.DBEventPayload(
context, resource_id=object_id))

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from keystoneauth1 import loading as ks_loading
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
@ -66,6 +68,16 @@ class Notifier(object):
if ext.name == "server_external_events"]
self.batch_notifier = batch_notifier.BatchNotifier(
cfg.CONF.send_events_interval, self.send_events)
self._enabled = True
@contextlib.contextmanager
def context_enabled(self, enabled):
stored_enabled = self._enabled
try:
self._enabled = enabled
yield
finally:
self._enabled = stored_enabled
def _get_nova_client(self):
global_id = common_context.generate_request_id()
@ -164,6 +176,10 @@ class Notifier(object):
return self._get_network_changed_event(port)
def _can_notify(self, port):
if not self._enabled:
LOG.debug("Nova notifier disabled")
return False
if not port.id:
LOG.warning("Port ID not set! Nova will not be notified of "
"port status change.")

@ -181,8 +181,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
"DVR and tunneling are enabled, setting to True.")
self.arp_responder_enabled = True
host = self.conf.host
self.agent_id = 'ovs-agent-%s' % host
self.host = self.conf.host
self.agent_id = 'ovs-agent-%s' % self.host
# Validate agent configurations
self._check_agent_configurations()
@ -267,7 +267,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.phys_ofports,
self.patch_int_ofport,
self.patch_tun_ofport,
host,
self.host,
self.enable_tunneling,
self.enable_distributed_routing)
@ -313,7 +313,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# or which are used by specific extensions.
self.agent_state = {
'binary': n_const.AGENT_PROCESS_OVS,
'host': host,
'host': self.host,
'topic': n_const.L2_AGENT_TOPIC,
'configurations': {'bridge_mappings': self.bridge_mappings,
n_const.RP_BANDWIDTHS: self.rp_bandwidths,
@ -1896,6 +1896,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
skipped_devices = []
need_binding_devices = []
binding_no_activated_devices = set()
migrating_devices = set()
agent_restarted = self.iter_num == 0
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
@ -1925,6 +1926,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
if not port.ofport or port.ofport == ovs_lib.INVALID_OFPORT:
devices_not_in_datapath.add(device)
migrating_to = details.get('migrating_to')
if migrating_to and migrating_to != self.host:
LOG.info('Port %(device)s is being migrated to host %(host)s.',
{'device': device, 'host': migrating_to})
migrating_devices.add(device)
if 'port_id' in details:
LOG.info("Port %(device)s updated. Details: %(details)s",
{'device': device, 'details': details})
@ -1962,7 +1969,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
if (port and port.ofport != -1):
self.port_dead(port)
return (skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices, devices_not_in_datapath)
need_binding_devices, failed_devices, devices_not_in_datapath,
migrating_devices)
def _update_port_network(self, port_id, network_id):
self._clean_network_ports(port_id)
@ -2057,11 +2065,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
skipped_devices = set()
binding_no_activated_devices = set()
devices_not_in_datapath = set()
migrating_devices = set()
start = time.time()
if devices_added_updated:
(skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices['added'],
devices_not_in_datapath) = (
devices_not_in_datapath, migrating_devices) = (
self.treat_devices_added_or_updated(
devices_added_updated, provisioning_needed, re_added))
LOG.info("process_network_ports - iteration:%(iter_num)d - "
@ -2084,7 +2093,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# TODO(salv-orlando): Optimize avoiding applying filters
# unnecessarily, (eg: when there are no IP address changes)
added_ports = (port_info.get('added', set()) - skipped_devices -
binding_no_activated_devices)
binding_no_activated_devices - migrating_devices)
self._add_port_tag_info(need_binding_devices)
self.process_install_ports_egress_flows(need_binding_devices)
added_to_datapath = added_ports - devices_not_in_datapath

@ -91,6 +91,7 @@ from sqlalchemy import or_
from sqlalchemy.orm import exc as sa_exc
from neutron._i18n import _
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
@ -342,8 +343,19 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug("Port %s is administratively disabled so it will "
"not transition to active.", port_id)
return
self.update_port_status(
payload.context, port_id, const.PORT_STATUS_ACTIVE)
host_migrating = agent_rpc.migrating_to_host(
getattr(port, 'port_bindings', []))
if (host_migrating and cfg.CONF.nova.live_migration_events and
self.nova_notifier):
send_nova_event = bool(trigger ==
provisioning_blocks.L2_AGENT_ENTITY)
with self.nova_notifier.context_enabled(send_nova_event):
self.update_port_status(payload.context, port_id,
const.PORT_STATUS_ACTIVE)
else:
self.update_port_status(payload.context, port_id,
const.PORT_STATUS_ACTIVE)
@log_helpers.log_method_call
def _start_rpc_notifiers(self):

@ -22,7 +22,9 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib import rpc as n_rpc
from oslo_config import cfg
from oslo_context import context as oslo_context
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from neutron.agent import rpc
@ -305,6 +307,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
self.assertEqual(self._port_id, entry['port_id'])
self.assertEqual(self._network_id, entry['network_id'])
self.assertNotIn(constants.NO_ACTIVE_BINDING, entry)
self.assertIsNone(entry['migrating_to'])
def test_get_device_details_binding_not_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
@ -314,8 +317,26 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
self.assertEqual(self._port_id, entry['device'])
self.assertNotIn('port_id', entry)
self.assertNotIn('network_id', entry)
self.assertNotIn('migrating_to', entry)
self.assertIn(constants.NO_ACTIVE_BINDING, entry)
def test_get_device_details_migrating_to_host(self):
for live_migration_events, migrating_to in ((True, 'host2'),
(False, 'irrelevant')):
cfg.CONF.set_override('live_migration_events',
live_migration_events, group='nova')
profile = jsonutils.dumps({'migrating_to': migrating_to})
self._port.bindings[0].profile = profile
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
self._port, self._network]
entry = self._api.get_device_details(mock.ANY, self._port_id,
mock.ANY, 'host2')
if live_migration_events:
self.assertEqual('host2', entry['migrating_to'])
else:
self.assertTrue(entry[constants.NO_ACTIVE_BINDING])
self.assertNotIn('migrating_to', entry)
@mock.patch('neutron.agent.resource_cache.RemoteResourceCache')
def test_initialization_with_default_resources(self, rcache_class):
rcache_obj = mock.MagicMock()

@ -884,7 +884,7 @@ class TestOvsNeutronAgent(object):
'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, _, need_bound_devices, _, _ = (
skip_devs, _, need_bound_devices, _, _, _ = (
self.agent.treat_devices_added_or_updated([], False, set()))
# The function should not raise
self.assertFalse(skip_devs)
@ -902,7 +902,7 @@ class TestOvsNeutronAgent(object):
'get_vifs_by_ids',
return_value={details['device']: port}),\
mock.patch.object(self.agent, 'port_dead') as func:
skip_devs, binding_no_activated_devices, _, _, _ = (
skip_devs, binding_no_activated_devices, _, _, _, _ = (
self.agent.treat_devices_added_or_updated([], False, set()))
self.assertFalse(skip_devs)
self.assertTrue(func.called)
@ -979,8 +979,9 @@ class TestOvsNeutronAgent(object):
[], False, set())
# The function should return False for resync and no device
# processed
self.assertEqual((['the_skipped_one'], set(), [], set(), set()),
skip_devs)
self.assertEqual(
(['the_skipped_one'], set(), [], set(), set(), set()),
skip_devs)
ext_mgr_delete_port.assert_called_once_with(
self.agent.context, {'port_id': 'the_skipped_one'})
treat_vif_port.assert_not_called()
@ -997,7 +998,7 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
failed_devices = {'added': set(), 'removed': set()}
(_, _, _, failed_devices['added'], _) = (
(_, _, _, failed_devices['added'], _, _) = (
self.agent.treat_devices_added_or_updated([], False, set()))
# The function should return False for resync and no device
# processed
@ -1028,7 +1029,7 @@ class TestOvsNeutronAgent(object):
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs, _, need_bound_devices, _, _ = (
skip_devs, _, need_bound_devices, _, _, _ = (
self.agent.treat_devices_added_or_updated([], False, set()))
# The function should return False for resync
self.assertFalse(skip_devs)
@ -1138,7 +1139,7 @@ class TestOvsNeutronAgent(object):
return_value=(
skipped_devices, binding_no_activated_devices, [],
failed_devices['added'],
set())) as device_added_updated,\
set(), set())) as device_added_updated,\
mock.patch.object(self.agent.int_br, "get_ports_attributes",
return_value=[]),\
mock.patch.object(self.agent,

@ -136,6 +136,11 @@
s-container: false
s-object: false
s-proxy: false
devstack_local_conf:
post-config:
$NEUTRON_CONF:
nova:
live_migration_events: True
group-vars:
subnode:
devstack_services: