Ensure only one worker creates neturon_pg_drop

Use an OVSDB lock to ensure that only one worker tries to create
the neutron_pg_drop port group. This also waits pre_fork so that if
getting the port group fails, neutron exits instead of continuing
on without the port group being created.

It was previously possible that a server could create the port
group and we wouldn't get the update before trying to create it
ourselves and checking for its existence.

This also modifies the get_port_group method to use the built-in
lookup() which searches by name or uuid and can take advantage of
both indexing and newly added ovsdbapp wait for sync functionality.

Closes-Bug: #1934930
Change-Id: Id870f746ff8e9741a7c211aebdcf13597d31465b
This commit is contained in:
Terry Wilson 2021-07-07 18:21:47 +00:00
parent 65cce351d7
commit 2e6f6c9ec3
4 changed files with 35 additions and 38 deletions

View File

@ -264,20 +264,25 @@ class OVNMechanismDriver(api.MechanismDriver):
"""
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
ovn_conf.get_ovn_nb_connection(), self.nb_schema_helper, self)
# Only one server should try to create the port group
idl.set_lock('pg_drop_creation')
with ovsdb_monitor.short_living_ovsdb_api(
impl_idl_ovn.OvsdbNbOvnIdl, idl) as pre_ovn_nb_api:
try:
create_default_drop_port_group(pre_ovn_nb_api)
except RuntimeError as re:
if pre_ovn_nb_api.get_port_group(
ovn_const.OVN_DROP_PORT_GROUP_NAME):
LOG.debug(
"Port Group %(port_group)s already exists, "
"ignoring RuntimeError %(error)s", {
'port_group': ovn_const.OVN_DROP_PORT_GROUP_NAME,
'error': re})
else:
raise
# If we don't get the lock, and the port group didn't exist
# when we tried to create it, it might still have been
# created by another server and we just haven't gotten the
# update yet.
LOG.info("Waiting for Port Group %(pg)s to be created",
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
if not idl.neutron_pg_drop_event.wait():
LOG.error("Port Group %(pg)s was not created in time",
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
raise re
LOG.info("Porg Group %(pg)s was created by another server",
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
@staticmethod
def should_post_fork_initialize(worker_class):
@ -291,7 +296,6 @@ class OVNMechanismDriver(api.MechanismDriver):
return
self._post_fork_event.clear()
self._wait_for_pg_drop_event()
self._ovn_client_inst = None
if worker_class == neutron.wsgi.WorkerService:
@ -343,23 +347,6 @@ class OVNMechanismDriver(api.MechanismDriver):
self.hash_ring_group))
self._maintenance_thread.start()
def _wait_for_pg_drop_event(self):
"""Wait for event that occurs when neutron_pg_drop Port Group exists.
The method creates a short living connection to the Northbound
database. It waits for CREATE event caused by the Port Group.
Such event occurs when:
1) The Port Group doesn't exist and is created by other process.
2) The Port Group already exists and event is emitted when DB copy
is available to the IDL.
"""
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
ovn_conf.get_ovn_nb_connection(), self.nb_schema_helper, self,
pg_only=True)
with ovsdb_monitor.short_living_ovsdb_api(
impl_idl_ovn.OvsdbNbOvnIdl, idl) as ovn_nb_api:
ovn_nb_api.idl.neutron_pg_drop_event.wait()
def _create_security_group_precommit(self, resource, event, trigger,
**kwargs):
ovn_revision_numbers_db.create_initial_revision(

View File

@ -749,9 +749,7 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
if uuidutils.is_uuid_like(pg_name):
pg_name = utils.ovn_port_group_name(pg_name)
try:
for pg in self._tables['Port_Group'].rows.values():
if pg.name == pg_name:
return pg
return self.lookup('Port_Group', pg_name, default=None)
except KeyError:
# TODO(dalvarez): This except block is added for backwards compat
# with old OVN schemas (<=2.9) where Port Groups are not present.

View File

@ -445,12 +445,12 @@ class FIPAddDeleteEvent(row_event.RowEvent):
class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
"""WaitEvent for neutron_pg_drop Create event."""
def __init__(self):
def __init__(self, timeout=None):
table = 'Port_Group'
events = (self.ROW_CREATE,)
conditions = (('name', '=', ovn_const.OVN_DROP_PORT_GROUP_NAME),)
super(NeutronPgDropPortGroupCreated, self).__init__(
events, table, conditions)
events, table, conditions, timeout=timeout)
self.event_name = 'PortGroupCreated'
@ -711,9 +711,15 @@ class OvnInitPGNbIdl(OvnIdl):
self.cond_change(
'Port_Group',
[['name', '==', ovn_const.OVN_DROP_PORT_GROUP_NAME]])
self.neutron_pg_drop_event = NeutronPgDropPortGroupCreated()
self.neutron_pg_drop_event = NeutronPgDropPortGroupCreated(
timeout=ovn_conf.get_ovn_ovsdb_timeout())
self.notify_handler.watch_event(self.neutron_pg_drop_event)
def notify(self, event, row, updates=None):
# Go ahead and process events even if the lock is contended so we can
# know that some other server has created the drop group
self.notify_handler.notify(event, row, updates)
@classmethod
def from_server(cls, connection_string, helper, driver, pg_only=False):
if pg_only:

View File

@ -148,23 +148,29 @@ class TestOVNMechanismDriver(TestOVNMechanismDriverBase):
def test__create_neutron_pg_drop_created_meanwhile(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.side_effect = [None, 'foo']
m_ovsdb_api.get_port_group.return_value = None
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
idl = m_from_server.return_value
idl.neutron_pg_drop_event.wait.return_value = True
result = self.mech_driver._create_neutron_pg_drop()
idl.neutron_pg_drop_event.wait.assert_called_once()
# If sommething else creates the port group, just return
self.assertIsNone(result)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_error(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.side_effect = [None, None]
m_ovsdb_api.get_port_group.return_value = None
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
idl = m_from_server.return_value
idl.neutron_pg_drop_event.wait.return_value = False
self.assertRaises(RuntimeError,
self.mech_driver._create_neutron_pg_drop)
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
idl.neutron_pg_drop_event.wait.assert_called_once()
def test__get_max_tunid_no_key_set(self):
self.mech_driver._nb_ovn.nb_global.options.get.return_value = None