Merge "Add support for deleting ml2/ovn agents"
This commit is contained in:
commit
04694a3490
neutron
agent/ovn/metadata
plugins/ml2/drivers/ovn
tests
functional/plugins/ml2/drivers/ovn/mech_driver
unit/plugins/ml2/drivers/ovn/mech_driver
releasenotes/notes
@ -15,11 +15,11 @@
|
||||
import collections
|
||||
import functools
|
||||
import re
|
||||
import uuid
|
||||
|
||||
from neutron_lib import constants as n_const
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import event as row_event
|
||||
from ovsdbapp.backend.ovs_idl import vlog
|
||||
import tenacity
|
||||
@ -261,8 +261,10 @@ class MetadataAgent(object):
|
||||
# NOTE(lucasagomes): db_add() will not overwrite the UUID if
|
||||
# it's already set.
|
||||
table = ('Chassis_Private' if self.has_chassis_private else 'Chassis')
|
||||
ext_ids = {
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY: uuidutils.generate_uuid()}
|
||||
chassis_id = uuid.UUID(self._get_own_chassis_name())
|
||||
# Generate unique, but consistent metadata id for chassis name
|
||||
agent_id = uuid.uuid5(chassis_id, 'metadata_agent')
|
||||
ext_ids = {ovn_const.OVN_AGENT_METADATA_ID_KEY: str(agent_id)}
|
||||
self.sb_idl.db_add(table, self.chassis, 'external_ids',
|
||||
ext_ids).execute(check_error=True)
|
||||
|
||||
|
@ -14,10 +14,13 @@
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import utils as ovn_utils
|
||||
from neutron.common import utils
|
||||
|
||||
|
||||
class NeutronAgent(abc.ABC):
|
||||
@ -27,26 +30,26 @@ class NeutronAgent(abc.ABC):
|
||||
# Register the subclasses to be looked up by their type
|
||||
NeutronAgent.types[cls.agent_type] = cls
|
||||
|
||||
def __init__(self, chassis_private):
|
||||
self.chassis_private = chassis_private
|
||||
self.chassis = self.get_chassis(chassis_private)
|
||||
def __init__(self, chassis_private, driver, updated_at=None):
|
||||
self.driver = driver
|
||||
self.set_down = False
|
||||
self.update(chassis_private, updated_at)
|
||||
|
||||
@staticmethod
|
||||
def get_chassis(chassis_private):
|
||||
try:
|
||||
return chassis_private.chassis[0]
|
||||
except (AttributeError, IndexError):
|
||||
# No Chassis_Private support, just use Chassis
|
||||
return chassis_private
|
||||
def update(self, chassis_private, updated_at=None, clear_down=False):
|
||||
self.chassis_private = chassis_private
|
||||
self.updated_at = updated_at or timeutils.utcnow(with_timezone=True)
|
||||
if clear_down:
|
||||
self.set_down = False
|
||||
|
||||
@property
|
||||
def updated_at(self):
|
||||
def chassis(self):
|
||||
try:
|
||||
return timeutils.parse_isotime(self.chassis.external_ids[self.key])
|
||||
except KeyError:
|
||||
return timeutils.utcnow(with_timezone=True)
|
||||
return self.chassis_private.chassis[0]
|
||||
except (AttributeError, IndexError):
|
||||
# No Chassis_Private support, just use Chassis
|
||||
return self.chassis_private
|
||||
|
||||
def as_dict(self, alive):
|
||||
def as_dict(self):
|
||||
return {
|
||||
'binary': self.binary,
|
||||
'host': self.chassis.hostname,
|
||||
@ -62,39 +65,62 @@ class NeutronAgent(abc.ABC):
|
||||
'start_flag': True,
|
||||
'agent_type': self.agent_type,
|
||||
'id': self.agent_id,
|
||||
'alive': alive,
|
||||
'alive': self.alive,
|
||||
'admin_state_up': True}
|
||||
|
||||
@classmethod
|
||||
def from_type(cls, _type, chassis_private):
|
||||
return cls.types[_type](chassis_private)
|
||||
|
||||
@staticmethod
|
||||
def matches_chassis(chassis):
|
||||
"""Is this Agent type found on the passed in chassis?"""
|
||||
return True
|
||||
@property
|
||||
def alive(self):
|
||||
if self.set_down:
|
||||
return False
|
||||
# TODO(twilson) Determine if we can go back to just checking:
|
||||
# if self.driver._nb_ovn.nb_global.nb_cfg == self.nb_cfg:
|
||||
if self.driver._nb_ovn.nb_global.nb_cfg - self.nb_cfg <= 1:
|
||||
return True
|
||||
now = timeutils.utcnow(with_timezone=True)
|
||||
if (now - self.updated_at).total_seconds() < cfg.CONF.agent_down_time:
|
||||
# down, but not yet timed out
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def agents_from_chassis(cls, chassis_private):
|
||||
return [AgentCls(chassis_private)
|
||||
for AgentCls in cls.types.values()
|
||||
if AgentCls.matches_chassis(cls.get_chassis(chassis_private))]
|
||||
def from_type(cls, _type, chassis_private, driver, updated_at=None):
|
||||
return cls.types[_type](chassis_private, driver, updated_at)
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def agent_type(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def binary(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def nb_cfg(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def agent_id(self):
|
||||
pass
|
||||
|
||||
|
||||
class ControllerAgent(NeutronAgent):
|
||||
agent_type = ovn_const.OVN_CONTROLLER_AGENT
|
||||
binary = 'ovn-controller'
|
||||
key = ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY
|
||||
|
||||
@staticmethod # it is by default, but this makes pep8 happy
|
||||
def __new__(cls, chassis_private, driver, updated_at=None):
|
||||
if ('enable-chassis-as-gw' in
|
||||
chassis_private.external_ids.get('ovn-cms-options', [])):
|
||||
cls = ControllerGatewayAgent
|
||||
return super().__new__(cls)
|
||||
|
||||
@staticmethod
|
||||
def matches_chassis(chassis):
|
||||
return ('enable-chassis-as-gw' not in
|
||||
chassis.external_ids.get('ovn-cms-options', []))
|
||||
def id_from_chassis_private(chassis_private):
|
||||
return chassis_private.name
|
||||
|
||||
@property
|
||||
def nb_cfg(self):
|
||||
@ -102,7 +128,7 @@ class ControllerAgent(NeutronAgent):
|
||||
|
||||
@property
|
||||
def agent_id(self):
|
||||
return self.chassis_private.name
|
||||
return self.id_from_chassis_private(self.chassis_private)
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
@ -113,28 +139,76 @@ class ControllerAgent(NeutronAgent):
|
||||
class ControllerGatewayAgent(ControllerAgent):
|
||||
agent_type = ovn_const.OVN_CONTROLLER_GW_AGENT
|
||||
|
||||
@staticmethod
|
||||
def matches_chassis(chassis):
|
||||
return ('enable-chassis-as-gw' in
|
||||
chassis.external_ids.get('ovn-cms-options', []))
|
||||
|
||||
|
||||
class MetadataAgent(NeutronAgent):
|
||||
agent_type = ovn_const.OVN_METADATA_AGENT
|
||||
binary = 'neutron-ovn-metadata-agent'
|
||||
key = ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY
|
||||
|
||||
@property
|
||||
def alive(self):
|
||||
# If ovn-controller is down, then metadata agent is down even
|
||||
# if the metadata-agent binary is updating external_ids.
|
||||
try:
|
||||
if not AgentCache()[self.chassis_private.name].alive:
|
||||
return False
|
||||
except KeyError:
|
||||
return False
|
||||
return super().alive
|
||||
|
||||
@property
|
||||
def nb_cfg(self):
|
||||
return int(self.chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, 0))
|
||||
|
||||
@staticmethod
|
||||
def id_from_chassis_private(chassis_private):
|
||||
return chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
|
||||
@property
|
||||
def agent_id(self):
|
||||
return self.chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
return self.id_from_chassis_private(self.chassis_private)
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return self.chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_DESC_KEY, '')
|
||||
|
||||
|
||||
@utils.SingletonDecorator
|
||||
class AgentCache:
|
||||
def __init__(self, driver=None):
|
||||
# This is just to make pylint happy because it doesn't like calls to
|
||||
# AgentCache() with no arguments, despite init only being called the
|
||||
# first time--and we do really want a driver passed in.
|
||||
if driver is None:
|
||||
raise ValueError(_("driver cannot be None"))
|
||||
self.agents = {}
|
||||
self.driver = driver
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.agents.values())
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.agents[key]
|
||||
|
||||
def update(self, agent_type, row, updated_at=None, clear_down=False):
|
||||
cls = NeutronAgent.types[agent_type]
|
||||
try:
|
||||
agent = self.agents[cls.id_from_chassis_private(row)]
|
||||
agent.update(row, updated_at=updated_at, clear_down=clear_down)
|
||||
except KeyError:
|
||||
agent = NeutronAgent.from_type(agent_type, row, self.driver,
|
||||
updated_at=updated_at)
|
||||
self.agents[agent.agent_id] = agent
|
||||
return agent
|
||||
|
||||
def __delitem__(self, agent_id):
|
||||
del self.agents[agent_id]
|
||||
|
||||
def agents_by_chassis_private(self, chassis_private):
|
||||
# Get unique agent ids based on the chassis_private
|
||||
agent_ids = {cls.id_from_chassis_private(chassis_private)
|
||||
for cls in NeutronAgent.types.values()}
|
||||
# Return the cached agents of agent_ids whose keys are in the cache
|
||||
return (self.agents[id_] for id_ in agent_ids & self.agents.keys())
|
||||
|
@ -36,7 +36,6 @@ from oslo_config import cfg
|
||||
from oslo_db import exception as os_db_exc
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.common.ovn import acl as ovn_acl
|
||||
@ -64,7 +63,6 @@ import neutron.wsgi
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
METADATA_READY_WAIT_TIMEOUT = 15
|
||||
AGENTS = {}
|
||||
|
||||
|
||||
class MetadataServiceReadyWaitTimeoutException(Exception):
|
||||
@ -277,15 +275,12 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
|
||||
n_agent.AgentCache(self) # Initialize singleton agent cache
|
||||
self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(self, trigger)
|
||||
|
||||
if self._sb_ovn.is_table_present('Chassis_Private'):
|
||||
self.agent_chassis_table = 'Chassis_Private'
|
||||
|
||||
# AGENTS must be populated after fork so if ovn-controller is stopped
|
||||
# before a worker handles a get_agents request, we still show agents
|
||||
populate_agents(self)
|
||||
|
||||
# Override agents API methods
|
||||
self.patch_plugin_merge("get_agents", get_agents)
|
||||
self.patch_plugin_choose("get_agent", get_agent)
|
||||
@ -1132,38 +1127,6 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
" neutron-ovn-metadata-agent status/logs.",
|
||||
port_id)
|
||||
|
||||
def agent_alive(self, agent, update_db):
|
||||
# Allow a maximum of 1 difference between expected and read values
|
||||
# to avoid false positives.
|
||||
if self._nb_ovn.nb_global.nb_cfg - agent.nb_cfg <= 1:
|
||||
if update_db:
|
||||
self.mark_agent_alive(agent)
|
||||
return True
|
||||
|
||||
now = timeutils.utcnow(with_timezone=True)
|
||||
if (now - agent.updated_at).total_seconds() < cfg.CONF.agent_down_time:
|
||||
# down, but not yet timed out
|
||||
return True
|
||||
return False
|
||||
|
||||
def mark_agent_alive(self, agent):
|
||||
# Update the time of our successful check
|
||||
value = timeutils.utcnow(with_timezone=True).isoformat()
|
||||
self._sb_ovn.db_set(
|
||||
self.agent_chassis_table, agent.chassis_private.uuid,
|
||||
('external_ids', {agent.key: value})).execute(check_error=True)
|
||||
|
||||
def agents_from_chassis(self, chassis_private, update_db=True):
|
||||
agent_dict = {}
|
||||
# For each Chassis there will possibly be a Metadata agent and either
|
||||
# a Controller or Controller Gateway agent.
|
||||
for agent in n_agent.NeutronAgent.agents_from_chassis(chassis_private):
|
||||
if not agent.agent_id:
|
||||
continue
|
||||
alive = self.agent_alive(agent, update_db)
|
||||
agent_dict[agent.agent_id] = agent.as_dict(alive)
|
||||
return agent_dict
|
||||
|
||||
def patch_plugin_merge(self, method_name, new_fn, op=operator.add):
|
||||
old_method = getattr(self._plugin, method_name)
|
||||
|
||||
@ -1230,42 +1193,22 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
return azs
|
||||
|
||||
|
||||
def populate_agents(driver):
|
||||
for ch in driver._sb_ovn.tables[driver.agent_chassis_table].rows.values():
|
||||
# update the cache, rows are hashed on uuid but it is the name that
|
||||
# stays consistent across ovn-controller restarts
|
||||
AGENTS.update({ch.name: ch})
|
||||
|
||||
|
||||
def get_agents(self, context, filters=None, fields=None, _driver=None):
|
||||
update_db = _driver.ping_all_chassis()
|
||||
_driver.ping_all_chassis()
|
||||
filters = filters or {}
|
||||
agent_list = []
|
||||
populate_agents(_driver)
|
||||
for ch in AGENTS.values():
|
||||
for agent in _driver.agents_from_chassis(ch, update_db).values():
|
||||
if all(agent[k] in v for k, v in filters.items()):
|
||||
agent_list.append(agent)
|
||||
for agent in n_agent.AgentCache():
|
||||
agent_dict = agent.as_dict()
|
||||
if all(agent_dict[k] in v for k, v in filters.items()):
|
||||
agent_list.append(agent_dict)
|
||||
return agent_list
|
||||
|
||||
|
||||
def get_agent(self, context, id, fields=None, _driver=None):
|
||||
chassis = None
|
||||
try:
|
||||
# look up Chassis by *name*, which the id attribute is
|
||||
chassis = _driver._sb_ovn.lookup(_driver.agent_chassis_table, id)
|
||||
except idlutils.RowNotFound:
|
||||
# If the UUID is not found, check for the metadata agent ID
|
||||
for ch in _driver._sb_ovn.tables[
|
||||
_driver.agent_chassis_table].rows.values():
|
||||
metadata_agent_id = ch.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
if id == metadata_agent_id:
|
||||
chassis = ch
|
||||
break
|
||||
else:
|
||||
raise n_exc.agent.AgentNotFound(id=id)
|
||||
return _driver.agents_from_chassis(chassis)[id]
|
||||
return n_agent.AgentCache()[id].as_dict()
|
||||
except KeyError:
|
||||
raise n_exc.agent.AgentNotFound(id=id)
|
||||
|
||||
|
||||
def update_agent(self, context, id, agent, _driver=None):
|
||||
@ -1291,9 +1234,28 @@ def update_agent(self, context, id, agent, _driver=None):
|
||||
|
||||
|
||||
def delete_agent(self, context, id, _driver=None):
|
||||
get_agent(self, None, id, _driver=_driver)
|
||||
raise n_exc.BadRequest(resource='agent',
|
||||
msg='OVN agents cannot be deleted')
|
||||
# raise AgentNotFound if this isn't an ml2/ovn-related agent
|
||||
agent = get_agent(self, None, id, _driver=_driver)
|
||||
|
||||
# NOTE(twilson) According to the API docs, an agent must be disabled
|
||||
# before deletion. Otherwise, behavior seems to be undefined. We could
|
||||
# check that alive=False before allowing deletion, but depending on the
|
||||
# agent_down_time setting, that could take quite a while.
|
||||
# If ovn-controller is up, the Chassis will be recreated and so the agent
|
||||
# will still show as up. The recreated Chassis will cause all kinds of
|
||||
# events to fire. But again, undefined behavior.
|
||||
chassis_name = agent['configurations']['chassis_name']
|
||||
_driver._sb_ovn.chassis_del(chassis_name, if_exists=True).execute(
|
||||
check_error=True)
|
||||
# Send a specific event that all API workers can get to delete the agent
|
||||
# from their caches. Ideally we could send a single transaction that both
|
||||
# created and deleted the key, but alas python-ovs is too "smart"
|
||||
_driver._sb_ovn.db_set(
|
||||
'SB_Global', '.', ('external_ids', {'delete_agent': str(id)})).execute(
|
||||
check_error=True)
|
||||
_driver._sb_ovn.db_remove(
|
||||
'SB_Global', '.', 'external_ids', delete_agent=str(id),
|
||||
if_exists=True).execute(check_error=True)
|
||||
|
||||
|
||||
def create_default_drop_port_group(nb_idl):
|
||||
|
@ -34,6 +34,7 @@ from neutron.common.ovn import hash_ring_manager
|
||||
from neutron.common.ovn import utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.plugins.ml2.drivers.ovn.agent import neutron_agent as n_agent
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -213,6 +214,92 @@ class PortBindingChassisUpdateEvent(row_event.RowEvent):
|
||||
self.driver.set_port_status_up(row.logical_port)
|
||||
|
||||
|
||||
class ChassisAgentEvent(BaseEvent):
|
||||
GLOBAL = True
|
||||
|
||||
# NOTE (twilson) Do not run new transactions out of a GLOBAL Event since
|
||||
# it will be running on every single process, and you almost certainly
|
||||
# don't want to insert/update/delete something a bajillion times.
|
||||
def __init__(self, driver):
|
||||
self.driver = driver
|
||||
super().__init__()
|
||||
|
||||
@property
|
||||
def table(self):
|
||||
# It probably doesn't matter, but since agent_chassis_table changes
|
||||
# in post_fork_initialize(), resolve this at runtime
|
||||
return self.driver.agent_chassis_table
|
||||
|
||||
@table.setter
|
||||
def table(self, value):
|
||||
pass
|
||||
|
||||
|
||||
class ChassisAgentDownEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_DELETE,)
|
||||
|
||||
def run(self, event, row, old):
|
||||
for agent in n_agent.AgentCache().agents_by_chassis_private(row):
|
||||
agent.set_down = True
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
return True
|
||||
|
||||
|
||||
class ChassisAgentDeleteEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_UPDATE,)
|
||||
table = 'SB_Global'
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
try:
|
||||
return (old.external_ids.get('delete_agent') !=
|
||||
row.external_ids['delete_agent'])
|
||||
except (AttributeError, KeyError):
|
||||
return False
|
||||
|
||||
def run(self, event, row, old):
|
||||
del n_agent.AgentCache()[row.external_ids['delete_agent']]
|
||||
|
||||
|
||||
class ChassisAgentWriteEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE)
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
return event == self.ROW_CREATE or getattr(old, 'nb_cfg', False)
|
||||
|
||||
def run(self, event, row, old):
|
||||
n_agent.AgentCache().update(ovn_const.OVN_CONTROLLER_AGENT, row,
|
||||
clear_down=event == self.ROW_CREATE)
|
||||
|
||||
|
||||
class ChassisMetadataAgentWriteEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE)
|
||||
|
||||
@staticmethod
|
||||
def _metadata_nb_cfg(row):
|
||||
return int(
|
||||
row.external_ids.get(ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, -1))
|
||||
|
||||
@staticmethod
|
||||
def agent_id(row):
|
||||
return row.external_ids.get(ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
if not self.agent_id(row):
|
||||
# Don't create a cached object with an agent_id of 'None'
|
||||
return False
|
||||
if event == self.ROW_CREATE:
|
||||
return True
|
||||
try:
|
||||
return self._metadata_nb_cfg(row) != self._metadata_nb_cfg(old)
|
||||
except (AttributeError, KeyError):
|
||||
return False
|
||||
|
||||
def run(self, event, row, old):
|
||||
n_agent.AgentCache().update(ovn_const.OVN_METADATA_AGENT, row,
|
||||
clear_down=True)
|
||||
|
||||
|
||||
class PortBindingChassisEvent(row_event.RowEvent):
|
||||
"""Port_Binding update event - set chassis for chassisredirect port.
|
||||
|
||||
@ -359,8 +446,24 @@ class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
|
||||
|
||||
class OvnDbNotifyHandler(row_event.RowEventHandler):
|
||||
def __init__(self, driver):
|
||||
super(OvnDbNotifyHandler, self).__init__()
|
||||
self.driver = driver
|
||||
super(OvnDbNotifyHandler, self).__init__()
|
||||
try:
|
||||
self._lock = self._RowEventHandler__lock
|
||||
self._watched_events = self._RowEventHandler__watched_events
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def notify(self, event, row, updates=None, global_=False):
|
||||
matching = self.matching_events(event, row, updates, global_)
|
||||
for match in matching:
|
||||
self.notifications.put((match, event, row, updates))
|
||||
|
||||
def matching_events(self, event, row, updates, global_=False):
|
||||
with self._lock:
|
||||
return tuple(t for t in self._watched_events
|
||||
if getattr(t, 'GLOBAL', False) == global_ and
|
||||
self.match(t, event, row, updates))
|
||||
|
||||
|
||||
class Ml2OvnIdlBase(connection.OvsdbIdl):
|
||||
@ -448,12 +551,12 @@ class OvnIdlDistributedLock(BaseOvnIdl):
|
||||
self._last_touch = None
|
||||
|
||||
def notify(self, event, row, updates=None):
|
||||
self.notify_handler.notify(event, row, updates, global_=True)
|
||||
try:
|
||||
target_node = self._hash_ring.get_node(str(row.uuid))
|
||||
except exceptions.HashRingIsEmpty as e:
|
||||
LOG.error('HashRing is empty, error: %s', e)
|
||||
return
|
||||
|
||||
if target_node != self._node_uuid:
|
||||
return
|
||||
|
||||
@ -530,6 +633,14 @@ class OvnNbIdl(OvnIdlDistributedLock):
|
||||
|
||||
class OvnSbIdl(OvnIdlDistributedLock):
|
||||
|
||||
def __init__(self, driver, remote, schema):
|
||||
super(OvnSbIdl, self).__init__(driver, remote, schema)
|
||||
self.notify_handler.watch_events([
|
||||
ChassisAgentDeleteEvent(self.driver),
|
||||
ChassisAgentDownEvent(self.driver),
|
||||
ChassisAgentWriteEvent(self.driver),
|
||||
ChassisMetadataAgentWriteEvent(self.driver)])
|
||||
|
||||
@classmethod
|
||||
def from_server(cls, connection_string, schema_name, driver):
|
||||
_check_and_set_ssl_files(schema_name)
|
||||
@ -541,6 +652,7 @@ class OvnSbIdl(OvnIdlDistributedLock):
|
||||
helper.register_table('Port_Binding')
|
||||
helper.register_table('Datapath_Binding')
|
||||
helper.register_table('MAC_Binding')
|
||||
helper.register_columns('SB_Global', ['external_ids'])
|
||||
return cls(driver, connection_string, helper)
|
||||
|
||||
def post_connect(self):
|
||||
|
@ -15,7 +15,6 @@
|
||||
from unittest import mock
|
||||
|
||||
import fixtures as og_fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
@ -70,6 +69,10 @@ class DistributedLockTestEvent(event.WaitEvent):
|
||||
self.event.set()
|
||||
|
||||
|
||||
class GlobalTestEvent(DistributedLockTestEvent):
|
||||
GLOBAL = True
|
||||
|
||||
|
||||
class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
|
||||
def setUp(self):
|
||||
@ -198,15 +201,12 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
self._test_port_binding_and_status(port['id'], 'bind', 'ACTIVE')
|
||||
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
|
||||
|
||||
def test_distributed_lock(self):
|
||||
api_workers = 11
|
||||
cfg.CONF.set_override('api_workers', api_workers)
|
||||
row_event = DistributedLockTestEvent()
|
||||
def _create_workers(self, row_event, worker_num):
|
||||
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
|
||||
worker_list = [self.mech_driver._nb_ovn, ]
|
||||
worker_list = [self.mech_driver._nb_ovn]
|
||||
|
||||
# Create 10 fake workers
|
||||
for _ in range(api_workers - len(worker_list)):
|
||||
for _ in range(worker_num):
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
db_hash_ring.add_node(
|
||||
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
|
||||
@ -228,11 +228,17 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
|
||||
# Assert we have 11 active workers in the ring
|
||||
self.assertEqual(
|
||||
11, len(db_hash_ring.get_active_nodes(
|
||||
self.context,
|
||||
interval=ovn_const.HASH_RING_NODES_TIMEOUT,
|
||||
group_name=ovn_const.HASH_RING_ML2_GROUP)))
|
||||
worker_num + 1,
|
||||
len(db_hash_ring.get_active_nodes(
|
||||
self.context,
|
||||
interval=ovn_const.HASH_RING_NODES_TIMEOUT,
|
||||
group_name=ovn_const.HASH_RING_ML2_GROUP)))
|
||||
|
||||
return worker_list
|
||||
|
||||
def test_distributed_lock(self):
|
||||
row_event = DistributedLockTestEvent()
|
||||
self._create_workers(row_event, worker_num=10)
|
||||
# Trigger the event
|
||||
self.create_port()
|
||||
|
||||
@ -242,6 +248,30 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
# Assert that only one worker handled the event
|
||||
self.assertEqual(1, row_event.COUNTER)
|
||||
|
||||
def test_global_events(self):
|
||||
worker_num = 10
|
||||
distributed_event = DistributedLockTestEvent()
|
||||
global_event = GlobalTestEvent()
|
||||
worker_list = self._create_workers(distributed_event, worker_num)
|
||||
for worker in worker_list:
|
||||
worker.idl.notify_handler.watch_event(global_event)
|
||||
|
||||
# This should generate one distributed even handled by a single worker
|
||||
# and one global event, that should be handled by all workers
|
||||
self.create_port()
|
||||
|
||||
# Wait for the distributed event to complete
|
||||
self.assertTrue(distributed_event.wait())
|
||||
|
||||
# Assert that only one worker handled the distributed event
|
||||
self.assertEqual(1, distributed_event.COUNTER)
|
||||
|
||||
n_utils.wait_until_true(
|
||||
lambda: global_event.COUNTER == worker_num + 1,
|
||||
exception=Exception(
|
||||
"Fanout event didn't get handled expected %d times" %
|
||||
(worker_num + 1)))
|
||||
|
||||
|
||||
class TestNBDbMonitorOverTcp(TestNBDbMonitor):
|
||||
def get_ovsdb_server_protocol(self):
|
||||
|
@ -17,8 +17,10 @@ from unittest import mock
|
||||
|
||||
from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.exceptions import agent as agent_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import event
|
||||
from ovsdbapp.tests.functional import base as ovs_base
|
||||
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
@ -744,23 +746,75 @@ class TestProvnetPorts(base.TestOVNFunctionalBase):
|
||||
self.assertIsNone(ovn_localnetport)
|
||||
|
||||
|
||||
class AgentWaitEvent(event.WaitEvent):
|
||||
"""Wait for a list of Chassis to be created"""
|
||||
|
||||
ONETIME = False
|
||||
|
||||
def __init__(self, driver, chassis_names):
|
||||
table = driver.agent_chassis_table
|
||||
events = (self.ROW_CREATE,)
|
||||
self.chassis_names = chassis_names
|
||||
super().__init__(events, table, None)
|
||||
self.event_name = "AgentWaitEvent"
|
||||
|
||||
def match_fn(self, event, row, old):
|
||||
return row.name in self.chassis_names
|
||||
|
||||
def run(self, event, row, old):
|
||||
self.chassis_names.remove(row.name)
|
||||
if not self.chassis_names:
|
||||
self.event.set()
|
||||
|
||||
|
||||
class TestAgentApi(base.TestOVNFunctionalBase):
|
||||
TEST_AGENT = 'test'
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.host = 'test-host'
|
||||
self.controller_agent = self.add_fake_chassis(self.host)
|
||||
self.host = n_utils.get_rand_name(prefix='testhost-')
|
||||
self.plugin = self.mech_driver._plugin
|
||||
agent = {'agent_type': 'test', 'binary': '/bin/test',
|
||||
'host': self.host, 'topic': 'test_topic'}
|
||||
_, status = self.plugin.create_or_update_agent(self.context, agent)
|
||||
self.test_agent = status['id']
|
||||
mock.patch.object(self.mech_driver, 'ping_all_chassis',
|
||||
return_value=False).start()
|
||||
|
||||
def test_agent_show_non_ovn(self):
|
||||
self.assertTrue(self.plugin.get_agent(self.context, self.test_agent))
|
||||
metadata_agent_id = uuidutils.generate_uuid()
|
||||
# To be *mostly* sure the agent cache has been updated, we need to
|
||||
# wait for the Chassis events to run. So add a new event that should
|
||||
# run afterthey do and wait for it. I've only had to do this when
|
||||
# adding *a bunch* of Chassis at a time, but better safe than sorry.
|
||||
chassis_name = uuidutils.generate_uuid()
|
||||
agent_event = AgentWaitEvent(self.mech_driver, [chassis_name])
|
||||
self.sb_api.idl.notify_handler.watch_event(agent_event)
|
||||
|
||||
def test_agent_show_ovn_controller(self):
|
||||
self.assertTrue(self.plugin.get_agent(self.context,
|
||||
self.controller_agent))
|
||||
self.chassis = self.add_fake_chassis(self.host, name=chassis_name,
|
||||
external_ids={
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY: metadata_agent_id})
|
||||
|
||||
self.assertTrue(agent_event.wait())
|
||||
|
||||
self.agent_types = {
|
||||
self.TEST_AGENT: self._create_test_agent(),
|
||||
ovn_const.OVN_CONTROLLER_AGENT: self.chassis,
|
||||
ovn_const.OVN_METADATA_AGENT: metadata_agent_id,
|
||||
}
|
||||
|
||||
def _create_test_agent(self):
|
||||
agent = {'agent_type': self.TEST_AGENT, 'binary': '/bin/test',
|
||||
'host': self.host, 'topic': 'test_topic'}
|
||||
_, status = self.plugin.create_or_update_agent(self.context, agent)
|
||||
return status['id']
|
||||
|
||||
def test_agent_show(self):
|
||||
for agent_id in self.agent_types.values():
|
||||
self.assertTrue(self.plugin.get_agent(self.context, agent_id))
|
||||
|
||||
def test_agent_list(self):
|
||||
agent_ids = [a['id'] for a in self.plugin.get_agents(
|
||||
self.context, filters={'host': self.host})]
|
||||
self.assertCountEqual(list(self.agent_types.values()), agent_ids)
|
||||
|
||||
def test_agent_delete(self):
|
||||
for agent_id in self.agent_types.values():
|
||||
self.plugin.delete_agent(self.context, agent_id)
|
||||
self.assertRaises(agent_exc.AgentNotFound, self.plugin.get_agent,
|
||||
self.context, agent_id)
|
||||
|
@ -217,13 +217,18 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
hash_ring_manager.HashRingManager,
|
||||
'get_node', return_value=self.node_uuid).start()
|
||||
|
||||
def _assert_has_notify_calls(self):
|
||||
self.idl.notify_handler.notify.assert_has_calls([
|
||||
mock.call(self.fake_event, self.fake_row, None, global_=True),
|
||||
mock.call(self.fake_event, self.fake_row, None)])
|
||||
self.assertEqual(2, len(self.idl.notify_handler.mock_calls))
|
||||
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
def test_notify(self, mock_touch_node):
|
||||
self.idl.notify(self.fake_event, self.fake_row)
|
||||
|
||||
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
def test_notify_skip_touch_node(self, mock_touch_node):
|
||||
@ -233,8 +238,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
|
||||
# Assert that touch_node() wasn't called
|
||||
self.assertFalse(mock_touch_node.called)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
def test_notify_last_touch_expired(self, mock_touch_node):
|
||||
@ -250,8 +254,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
|
||||
# Assert that touch_node() was invoked
|
||||
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
@mock.patch.object(ovsdb_monitor.LOG, 'exception')
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
@ -264,14 +267,14 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
|
||||
# Assert we are logging the exception
|
||||
self.assertTrue(mock_log.called)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
def test_notify_different_node(self):
|
||||
self.mock_get_node.return_value = 'different-node-uuid'
|
||||
self.idl.notify('fake-event', self.fake_row)
|
||||
# Assert that notify() wasn't called for a different node uuid
|
||||
self.assertFalse(self.idl.notify_handler.notify.called)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None, global_=True)
|
||||
|
||||
|
||||
class TestPortBindingChassisUpdateEvent(base.BaseTestCase):
|
||||
@ -420,8 +423,9 @@ class TestOvnNbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
self.idl.notify_handler.notify = mock.Mock()
|
||||
self.idl.notify("create", row)
|
||||
# Assert that if the target_node returned by the ring is different
|
||||
# than this driver's node_uuid, notify() won't be called
|
||||
self.assertFalse(self.idl.notify_handler.notify.called)
|
||||
# than this driver's node_uuid, only global notify() won't be called
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
"create", row, None, global_=True)
|
||||
|
||||
|
||||
class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
@ -432,6 +436,7 @@ class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
super(TestOvnSbIdlNotifyHandler, self).setUp()
|
||||
sb_helper = ovs_idl.SchemaHelper(schema_json=OVN_SB_SCHEMA)
|
||||
sb_helper.register_table('Chassis')
|
||||
self.driver.agent_chassis_table = 'Chassis'
|
||||
self.sb_idl = ovsdb_monitor.OvnSbIdl(self.driver, "remote", sb_helper)
|
||||
self.sb_idl.post_connect()
|
||||
self.chassis_table = self.sb_idl.tables.get('Chassis')
|
||||
|
@ -87,6 +87,11 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
|
||||
super(TestOVNMechanismDriver, self).setUp()
|
||||
mm = directory.get_plugin().mechanism_manager
|
||||
self.mech_driver = mm.mech_drivers['ovn'].obj
|
||||
neutron_agent.AgentCache(self.mech_driver)
|
||||
# Because AgentCache is a singleton and we get a new mech_driver each
|
||||
# setUp(), override the AgentCache driver.
|
||||
neutron_agent.AgentCache().driver = self.mech_driver
|
||||
|
||||
self.mech_driver._nb_ovn = fakes.FakeOvsdbNbOvnIdl()
|
||||
self.mech_driver._sb_ovn = fakes.FakeOvsdbSbOvnIdl()
|
||||
self.mech_driver._ovn_client._qos_driver = mock.Mock()
|
||||
@ -1724,73 +1729,75 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
|
||||
self.plugin.update_port_status.assert_called_once_with(
|
||||
fake_context, fake_port['id'], const.PORT_STATUS_ACTIVE)
|
||||
|
||||
def _add_chassis_agent(self, nb_cfg, agent_type, updated_at=None):
|
||||
updated_at = updated_at or datetime.datetime.utcnow()
|
||||
def _add_chassis(self, nb_cfg):
|
||||
chassis_private = mock.Mock()
|
||||
chassis_private.nb_cfg = nb_cfg
|
||||
chassis_private.uuid = uuid.uuid4()
|
||||
chassis_private.name = str(uuid.uuid4())
|
||||
return chassis_private
|
||||
|
||||
def _add_chassis_agent(self, nb_cfg, agent_type, chassis_private=None,
|
||||
updated_at=None):
|
||||
updated_at = updated_at or timeutils.utcnow(with_timezone=True)
|
||||
chassis_private = chassis_private or self._add_chassis(nb_cfg)
|
||||
chassis_private.external_ids = {
|
||||
ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY:
|
||||
datetime.datetime.isoformat(updated_at)}
|
||||
if agent_type == ovn_const.OVN_METADATA_AGENT:
|
||||
chassis_private.external_ids.update({
|
||||
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY: nb_cfg,
|
||||
ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY:
|
||||
datetime.datetime.isoformat(updated_at)})
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY: str(uuid.uuid4())})
|
||||
chassis_private.chassis = [chassis_private]
|
||||
|
||||
return neutron_agent.NeutronAgent.from_type(
|
||||
agent_type, chassis_private)
|
||||
return neutron_agent.AgentCache().update(agent_type, chassis_private,
|
||||
updated_at)
|
||||
|
||||
def test_agent_alive_true(self):
|
||||
chassis_private = self._add_chassis(5)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
agent = self._add_chassis_agent(5, agent_type)
|
||||
self.assertTrue(self.mech_driver.agent_alive(agent,
|
||||
update_db=True))
|
||||
# Assert that each Chassis has been updated in the SB database
|
||||
self.assertEqual(2, self.sb_ovn.db_set.call_count)
|
||||
agent = self._add_chassis_agent(5, agent_type, chassis_private)
|
||||
self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def test_agent_alive_true_one_diff(self):
|
||||
# Agent should be reported as alive when the nb_cfg delta is 1
|
||||
# even if the last update time was old enough.
|
||||
nb_cfg = 5
|
||||
chassis_private = self._add_chassis(nb_cfg)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 1
|
||||
now = timeutils.utcnow()
|
||||
updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1)
|
||||
agent = self._add_chassis_agent(4, agent_type, updated_at)
|
||||
self.assertTrue(self.mech_driver.agent_alive(agent,
|
||||
update_db=True))
|
||||
agent = self._add_chassis_agent(nb_cfg, agent_type,
|
||||
chassis_private, updated_at)
|
||||
self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def test_agent_alive_not_timed_out(self):
|
||||
nb_cfg = 3
|
||||
chassis_private = self._add_chassis(nb_cfg)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
agent = self._add_chassis_agent(3, agent_type)
|
||||
self.assertTrue(self.mech_driver.agent_alive(
|
||||
agent, update_db=True),
|
||||
"Agent type %s is not alive" % agent_type)
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2
|
||||
agent = self._add_chassis_agent(nb_cfg, agent_type,
|
||||
chassis_private)
|
||||
self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def test_agent_alive_timed_out(self):
|
||||
nb_cfg = 3
|
||||
chassis_private = self._add_chassis(nb_cfg)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
now = timeutils.utcnow()
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2
|
||||
now = timeutils.utcnow(with_timezone=True)
|
||||
updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1)
|
||||
agent = self._add_chassis_agent(3, agent_type, updated_at)
|
||||
self.assertFalse(self.mech_driver.agent_alive(agent,
|
||||
update_db=True))
|
||||
|
||||
def test_agent_alive_true_skip_db_update(self):
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
agent = self._add_chassis_agent(5, agent_type)
|
||||
self.assertTrue(self.mech_driver.agent_alive(agent,
|
||||
update_db=False))
|
||||
self.sb_ovn.db_set.assert_not_called()
|
||||
agent = self._add_chassis_agent(nb_cfg, agent_type,
|
||||
chassis_private, updated_at)
|
||||
self.assertFalse(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def _test__update_dnat_entry_if_needed(self, up=True):
|
||||
ovn_conf.cfg.CONF.set_override(
|
||||
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Add support for deleting ML2/OVN agents. Previously, deleting an agent
|
||||
would return a Bad Request error. In addition to deleting the agent,
|
||||
this change also drastically improves the scalability of the ML2/OVN
|
||||
agent handling code.
|
Loading…
x
Reference in New Issue
Block a user