Merge "ovn-metadata: Refactor events"
This commit is contained in:
commit
1daa0dd5bf
@ -48,7 +48,8 @@ CHASSIS_METADATA_LOCK = 'chassis_metadata_lock'
|
||||
|
||||
NS_PREFIX = 'ovnmeta-'
|
||||
MAC_PATTERN = re.compile(r'([0-9A-F]{2}[:-]){5}([0-9A-F]{2})', re.I)
|
||||
OVN_VIF_PORT_TYPES = ("", "external", ovn_const.LSP_TYPE_LOCALPORT, )
|
||||
OVN_VIF_PORT_TYPES = (
|
||||
"", ovn_const.LSP_TYPE_EXTERNAL, ovn_const.LSP_TYPE_LOCALPORT)
|
||||
|
||||
MetadataPortInfo = collections.namedtuple('MetadataPortInfo', ['mac',
|
||||
'ip_addresses',
|
||||
@ -74,39 +75,31 @@ class ConfigException(Exception):
|
||||
"""
|
||||
|
||||
|
||||
class PortBindingChassisEvent(row_event.RowEvent):
|
||||
def __init__(self, metadata_agent, events):
|
||||
class PortBindingEvent(row_event.RowEvent):
|
||||
def __init__(self, metadata_agent):
|
||||
self.agent = metadata_agent
|
||||
table = 'Port_Binding'
|
||||
super(PortBindingChassisEvent, self).__init__(
|
||||
events, table, None)
|
||||
super().__init__((self.__class__.EVENT,), table, None)
|
||||
self.event_name = self.__class__.__name__
|
||||
self._log_msg = (
|
||||
"PortBindingEvent matched for logical port %s and network %s")
|
||||
|
||||
def log_row(self, row):
|
||||
net_name = ovn_utils.get_network_name_from_datapath(
|
||||
row.datapath)
|
||||
LOG.info(self._log_msg, row.logical_port, net_name)
|
||||
|
||||
def match_fn(self, event, row, old):
|
||||
return row.type in OVN_VIF_PORT_TYPES
|
||||
|
||||
def run(self, event, row, old):
|
||||
# Check if the port has been bound/unbound to our chassis and update
|
||||
# the metadata namespace accordingly.
|
||||
resync = False
|
||||
if row.type not in OVN_VIF_PORT_TYPES:
|
||||
return
|
||||
if row.type == ovn_const.LSP_TYPE_LOCALPORT:
|
||||
new_ext_ids = row.external_ids
|
||||
old_ext_ids = old.external_ids
|
||||
device_id = row.external_ids.get(
|
||||
ovn_const.OVN_DEVID_EXT_ID_KEY, "")
|
||||
if not device_id.startswith(NS_PREFIX):
|
||||
return
|
||||
new_cidrs = new_ext_ids.get(ovn_const.OVN_CIDRS_EXT_ID_KEY, "")
|
||||
old_cidrs = old_ext_ids.get(ovn_const.OVN_CIDRS_EXT_ID_KEY, "")
|
||||
# If old_cidrs is "", it is create event,
|
||||
# nothing needs to be done.
|
||||
# If old_cidrs equals new_cidrs, the ip does not change.
|
||||
if old_cidrs in ("", new_cidrs, ):
|
||||
return
|
||||
|
||||
with _SYNC_STATE_LOCK.read_lock():
|
||||
self.log_row(row)
|
||||
try:
|
||||
net_name = ovn_utils.get_network_name_from_datapath(
|
||||
row.datapath)
|
||||
LOG.info(self.LOG_MSG, row.logical_port, net_name)
|
||||
self.agent.provision_datapath(row.datapath)
|
||||
except ConfigException:
|
||||
# We're now in the reader lock mode, we need to exit the
|
||||
@ -116,65 +109,86 @@ class PortBindingChassisEvent(row_event.RowEvent):
|
||||
self.agent.resync()
|
||||
|
||||
|
||||
class PortBindingMetaPortUpdatedEvent(PortBindingChassisEvent):
|
||||
LOG_MSG = "Metadata Port %s in datapath %s updated."
|
||||
class PortBindingUpdatedEvent(PortBindingEvent):
|
||||
EVENT = PortBindingEvent.ROW_UPDATE
|
||||
|
||||
def __init__(self, metadata_agent):
|
||||
events = (self.ROW_UPDATE,)
|
||||
super(PortBindingMetaPortUpdatedEvent, self).__init__(
|
||||
metadata_agent, events)
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._match_checks = [
|
||||
self._is_localport_ext_ids_update,
|
||||
self._is_new_chassis_set,
|
||||
self._is_chassis_removed,
|
||||
]
|
||||
|
||||
def match_fn(self, event, row, old):
|
||||
if row.type == ovn_const.LSP_TYPE_LOCALPORT:
|
||||
if hasattr(row, 'external_ids') and hasattr(old, 'external_ids'):
|
||||
device_id = row.external_ids.get(
|
||||
ovn_const.OVN_DEVID_EXT_ID_KEY, "")
|
||||
if device_id.startswith(NS_PREFIX):
|
||||
return True
|
||||
if not super().match_fn(event, row, old):
|
||||
return False
|
||||
# if any of the check functions is true, the event should be triggered
|
||||
return any(check(row, old) for check in self._match_checks)
|
||||
|
||||
def _is_localport_ext_ids_update(self, row, old):
|
||||
if row.type != ovn_const.LSP_TYPE_LOCALPORT:
|
||||
return False
|
||||
|
||||
if not hasattr(old, 'external_ids'):
|
||||
return False
|
||||
|
||||
device_id = row.external_ids.get(
|
||||
ovn_const.OVN_DEVID_EXT_ID_KEY, "")
|
||||
if not device_id.startswith(NS_PREFIX):
|
||||
return False
|
||||
|
||||
new_cidrs = row.external_ids.get(
|
||||
ovn_const.OVN_CIDRS_EXT_ID_KEY, "")
|
||||
old_cidrs = old.external_ids.get(
|
||||
ovn_const.OVN_CIDRS_EXT_ID_KEY, "")
|
||||
# If old_cidrs is "", it is create event,
|
||||
# nothing needs to be done.
|
||||
# If old_cidrs equals new_cidrs, the ip does not change.
|
||||
if old_cidrs not in ("", new_cidrs):
|
||||
self._log_msg = (
|
||||
"Metadata Port %s in datapath %s updated")
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class PortBindingChassisCreatedEvent(PortBindingChassisEvent):
|
||||
LOG_MSG = "Port %s in datapath %s bound to our chassis"
|
||||
|
||||
def __init__(self, metadata_agent):
|
||||
events = (self.ROW_UPDATE,)
|
||||
super(PortBindingChassisCreatedEvent, self).__init__(
|
||||
metadata_agent, events)
|
||||
|
||||
def match_fn(self, event, row, old):
|
||||
def _is_new_chassis_set(self, row, old):
|
||||
self._log_msg = "Port %s in datapath %s bound to our chassis"
|
||||
try:
|
||||
return (row.chassis[0].name == self.agent.chassis and
|
||||
not old.chassis)
|
||||
except (IndexError, AttributeError):
|
||||
return False
|
||||
|
||||
|
||||
class PortBindingChassisDeletedEvent(PortBindingChassisEvent):
|
||||
LOG_MSG = "Port %s in datapath %s unbound from our chassis"
|
||||
|
||||
def __init__(self, metadata_agent):
|
||||
events = (self.ROW_UPDATE, self.ROW_DELETE)
|
||||
super(PortBindingChassisDeletedEvent, self).__init__(
|
||||
metadata_agent, events)
|
||||
|
||||
def match_fn(self, event, row, old):
|
||||
def _is_chassis_removed(self, row, old):
|
||||
self._log_msg = "Port %s in datapath %s unbound from our chassis"
|
||||
try:
|
||||
if event == self.ROW_UPDATE:
|
||||
return (old.chassis[0].name == self.agent.chassis and
|
||||
not row.chassis)
|
||||
else:
|
||||
if row.chassis[0].name == self.agent.chassis:
|
||||
if row.type != "external":
|
||||
LOG.warning(
|
||||
'Removing non-external type port %(port_id)s with '
|
||||
'type "%(type)s"',
|
||||
{"port_id": row.uuid, "type": row.type})
|
||||
return True
|
||||
return (old.chassis[0].name == self.agent.chassis and
|
||||
not row.chassis)
|
||||
except (IndexError, AttributeError):
|
||||
return False
|
||||
|
||||
|
||||
class PortBindingDeletedEvent(PortBindingEvent):
|
||||
EVENT = PortBindingEvent.ROW_DELETE
|
||||
|
||||
def match_fn(self, event, row, old):
|
||||
if not super().match_fn(event, row, old):
|
||||
return False
|
||||
try:
|
||||
if row.chassis[0].name != self.agent.chassis:
|
||||
return False
|
||||
except (IndexError, AttributeError):
|
||||
return False
|
||||
if row.type != ovn_const.LSP_TYPE_EXTERNAL:
|
||||
LOG.warning(
|
||||
'Removing non-external type port %(port_id)s with '
|
||||
'type "%(type)s"',
|
||||
{"port_id": row.uuid, "type": row.type})
|
||||
self._log_msg = (
|
||||
"Port %s in datapath %s unbound from our chassis")
|
||||
return True
|
||||
|
||||
|
||||
class ChassisCreateEventBase(row_event.RowEvent):
|
||||
"""Row create event - Chassis name == our_chassis.
|
||||
|
||||
@ -301,10 +315,10 @@ class MetadataAgent(object):
|
||||
|
||||
tables = ('Encap', 'Port_Binding', 'Datapath_Binding', 'SB_Global',
|
||||
'Chassis')
|
||||
events = (PortBindingChassisCreatedEvent(self),
|
||||
PortBindingChassisDeletedEvent(self),
|
||||
events = (PortBindingUpdatedEvent(self),
|
||||
PortBindingDeletedEvent(self),
|
||||
SbGlobalUpdateEvent(self),
|
||||
PortBindingMetaPortUpdatedEvent(self))
|
||||
)
|
||||
|
||||
# TODO(lucasagomes): Remove this in the future. Try to register
|
||||
# the Chassis_Private table, if not present, fallback to the normal
|
||||
|
@ -223,115 +223,66 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
|
||||
timeout=10,
|
||||
exception=exc)
|
||||
|
||||
def _test_agent_events(self, delete, type_=None, update=False):
|
||||
m_pb_created = mock.patch.object(
|
||||
agent.PortBindingChassisCreatedEvent, 'run').start()
|
||||
m_pb_deleted = mock.patch.object(
|
||||
agent.PortBindingChassisDeletedEvent, 'run').start()
|
||||
m_pb_updated = mock.patch.object(
|
||||
agent.PortBindingMetaPortUpdatedEvent, 'run').start()
|
||||
|
||||
def _test_agent_events_prepare(self, lsp_type=None):
|
||||
lswitchport_name, lswitch_name = self._create_logical_switch_port(
|
||||
type_)
|
||||
self.sb_api.lsp_bind(lswitchport_name, self.chassis_name).execute(
|
||||
check_error=True, log_errors=True)
|
||||
if update and type_ == ovn_const.LSP_TYPE_LOCALPORT:
|
||||
with self.nb_api.transaction(
|
||||
check_error=True, log_errors=True) as txn:
|
||||
mdt_port_name = 'ovn-mdt-' + uuidutils.generate_uuid()
|
||||
metadata_port_create_event = MetadataPortCreateEvent(
|
||||
mdt_port_name)
|
||||
self.agent.sb_idl.idl.notify_handler.watch_event(
|
||||
metadata_port_create_event)
|
||||
self._create_metadata_port(txn, lswitch_name, mdt_port_name)
|
||||
self.assertTrue(metadata_port_create_event.wait())
|
||||
|
||||
self.sb_api.lsp_bind(mdt_port_name, self.chassis_name).execute(
|
||||
lsp_type)
|
||||
with mock.patch.object(
|
||||
agent.MetadataAgent, 'provision_datapath') as m_provision:
|
||||
self.sb_api.lsp_bind(lswitchport_name, self.chassis_name).execute(
|
||||
check_error=True, log_errors=True)
|
||||
self._update_metadata_port_ip(mdt_port_name)
|
||||
|
||||
def pb_created():
|
||||
if m_pb_created.call_count < 1:
|
||||
return False
|
||||
args = m_pb_created.call_args[0]
|
||||
self.assertEqual('update', args[0])
|
||||
self.assertEqual(self.chassis_name, args[1].chassis[0].name)
|
||||
self.assertFalse(args[2].chassis)
|
||||
return True
|
||||
|
||||
n_utils.wait_until_true(
|
||||
pb_created,
|
||||
timeout=10,
|
||||
exception=Exception(
|
||||
"PortBindingChassisCreatedEvent didn't happen on port "
|
||||
"binding."))
|
||||
|
||||
def pb_updated():
|
||||
if m_pb_updated.call_count < 1:
|
||||
return False
|
||||
args = m_pb_updated.call_args[0]
|
||||
self.assertEqual('update', args[0])
|
||||
self.assertTrue(args[1].external_ids)
|
||||
self.assertTrue(args[2].external_ids)
|
||||
device_id = args[1].external_ids.get(
|
||||
ovn_const.OVN_DEVID_EXT_ID_KEY, "")
|
||||
self.assertTrue(device_id.startswith("ovnmeta-"))
|
||||
new_cidrs = args[1].external_ids.get(
|
||||
ovn_const.OVN_CIDRS_EXT_ID_KEY, "")
|
||||
old_cidrs = args[2].external_ids.get(
|
||||
ovn_const.OVN_CIDRS_EXT_ID_KEY, "")
|
||||
self.assertNotEqual(new_cidrs, old_cidrs)
|
||||
self.assertNotEqual(old_cidrs, "")
|
||||
return True
|
||||
if update and type_ == ovn_const.LSP_TYPE_LOCALPORT:
|
||||
# Wait until port is bound
|
||||
n_utils.wait_until_true(
|
||||
pb_updated,
|
||||
lambda: m_provision.called,
|
||||
timeout=10,
|
||||
exception=Exception(
|
||||
"PortBindingMetaPortUpdatedEvent didn't happen on "
|
||||
"metadata port ip address updated."))
|
||||
"Datapath provisioning did not happen on port binding"))
|
||||
|
||||
if delete:
|
||||
self.nb_api.delete_lswitch_port(
|
||||
lswitchport_name, lswitch_name).execute(
|
||||
check_error=True, log_errors=True)
|
||||
else:
|
||||
return lswitchport_name, lswitch_name
|
||||
|
||||
def test_agent_unbind_port(self):
|
||||
lswitchport_name, lswitch_name = self._test_agent_events_prepare()
|
||||
|
||||
with mock.patch.object(
|
||||
agent.MetadataAgent, 'provision_datapath') as m_provision:
|
||||
self.sb_api.lsp_unbind(lswitchport_name).execute(
|
||||
check_error=True, log_errors=True)
|
||||
|
||||
def pb_deleted():
|
||||
if m_pb_deleted.call_count < 1:
|
||||
return False
|
||||
args = m_pb_deleted.call_args[0]
|
||||
if delete:
|
||||
self.assertEqual('delete', args[0])
|
||||
self.assertTrue(args[1].chassis)
|
||||
self.assertEqual(self.chassis_name, args[1].chassis[0].name)
|
||||
n_utils.wait_until_true(
|
||||
lambda: m_provision.called,
|
||||
timeout=10,
|
||||
exception=Exception(
|
||||
"Datapath teardown did not happen after the port was "
|
||||
"unbound"))
|
||||
|
||||
def _test_agent_delete_bound_external_port(self, lsp_type=None):
|
||||
lswitchport_name, lswitch_name = self._test_agent_events_prepare(
|
||||
lsp_type)
|
||||
|
||||
with mock.patch.object(
|
||||
agent.MetadataAgent, 'provision_datapath') as m_provision,\
|
||||
mock.patch.object(agent.LOG, 'warning') as m_log_warn:
|
||||
self.nb_api.delete_lswitch_port(
|
||||
lswitchport_name, lswitch_name).execute(
|
||||
check_error=True, log_errors=True)
|
||||
|
||||
n_utils.wait_until_true(
|
||||
lambda: m_provision.called,
|
||||
timeout=10,
|
||||
exception=Exception(
|
||||
"Datapath teardown did not happen after external port was "
|
||||
"deleted"))
|
||||
if lsp_type == ovn_const.LSP_TYPE_EXTERNAL:
|
||||
m_log_warn.assert_not_called()
|
||||
else:
|
||||
self.assertEqual('update', args[0])
|
||||
self.assertFalse(args[1].chassis)
|
||||
self.assertEqual(self.chassis_name, args[2].chassis[0].name)
|
||||
return True
|
||||
|
||||
n_utils.wait_until_true(
|
||||
pb_deleted,
|
||||
timeout=10,
|
||||
exception=Exception(
|
||||
"PortBindingChassisDeletedEvent didn't happen on port "
|
||||
"unbind or delete."))
|
||||
|
||||
self.assertEqual(1, m_pb_deleted.call_count)
|
||||
|
||||
def test_agent_unbind_port(self):
|
||||
self._test_agent_events(delete=False)
|
||||
m_log_warn.assert_called()
|
||||
|
||||
def test_agent_delete_bound_external_port(self):
|
||||
self._test_agent_events(delete=True, type_='external')
|
||||
self._test_agent_delete_bound_external_port(
|
||||
lsp_type=ovn_const.LSP_TYPE_EXTERNAL)
|
||||
|
||||
def test_agent_delete_bound_nonexternal_port(self):
|
||||
with mock.patch.object(agent.LOG, 'warning') as m_warn:
|
||||
self._test_agent_events(delete=True)
|
||||
self.assertTrue(m_warn.called)
|
||||
self._test_agent_delete_bound_external_port()
|
||||
|
||||
def test_agent_registration_at_chassis_create_event(self):
|
||||
def check_for_metadata():
|
||||
@ -363,8 +314,42 @@ class TestMetadataAgent(base.TestOVNFunctionalBase):
|
||||
exception=exc)
|
||||
|
||||
def test_agent_metadata_port_ip_update_event(self):
|
||||
self._test_agent_events(
|
||||
delete=False, type_=ovn_const.LSP_TYPE_LOCALPORT, update=True)
|
||||
lswitch_name = 'ovn-' + uuidutils.generate_uuid()
|
||||
mdt_port_name = 'ovn-mdt-' + uuidutils.generate_uuid()
|
||||
|
||||
mdt_pb_event = test_event.WaitForPortBindingEvent(mdt_port_name)
|
||||
self.handler.watch_event(mdt_pb_event)
|
||||
|
||||
with self.nb_api.transaction(
|
||||
check_error=True, log_errors=True) as txn:
|
||||
txn.add(
|
||||
self.nb_api.ls_add(lswitch_name))
|
||||
self._create_metadata_port(txn, lswitch_name, mdt_port_name)
|
||||
|
||||
self.assertTrue(mdt_pb_event.wait())
|
||||
|
||||
with mock.patch.object(
|
||||
agent.MetadataAgent, 'provision_datapath') as m_provision:
|
||||
self.sb_api.lsp_bind(mdt_port_name, self.chassis_name).execute(
|
||||
check_error=True, log_errors=True)
|
||||
|
||||
# Wait until port is bound
|
||||
n_utils.wait_until_true(
|
||||
lambda: m_provision.called,
|
||||
timeout=10,
|
||||
exception=Exception(
|
||||
"Datapath provisioning did not happen on port binding"))
|
||||
|
||||
m_provision.reset_mock()
|
||||
|
||||
self._update_metadata_port_ip(mdt_port_name)
|
||||
|
||||
n_utils.wait_until_true(
|
||||
lambda: m_provision.called,
|
||||
timeout=10,
|
||||
exception=Exception(
|
||||
"Datapath provisioning not called after external ids was "
|
||||
"changed"))
|
||||
|
||||
def test_metadata_agent_only_monitors_own_chassis(self):
|
||||
# We already have the fake chassis which we should be monitoring, so
|
||||
|
Loading…
Reference in New Issue
Block a user