Merge "[OVN] Create a deterministic hash ring node UUID generator"
This commit is contained in:
@@ -105,6 +105,25 @@ The Neutron API consists of the following executables:
|
||||
mechanism driver.
|
||||
|
||||
|
||||
ML2/OVN
|
||||
~~~~~~~
|
||||
|
||||
The mechanism driver ML2/OVN requires a synchronization method between all
|
||||
nodes (controllers) and workers. The OVN database events will be received by
|
||||
all workers in all nodes; however, only one worker should process this event.
|
||||
The ``HashRingManager``, locally instantiated in each worker, is in charge of
|
||||
hashing the event received and decide what worker will process the event.
|
||||
|
||||
The ``HashRingManager`` uses the information stored in the Neutron database to
|
||||
determine how many workers are alive at this time. Each worker will register
|
||||
itself in the Neutron database, creating a register in the table
|
||||
``ovn_hash_ring``. The UUID of each register is created using a deterministic
|
||||
method that depends on (1) the hash ring group (always "mechanism_driver" for
|
||||
the API workers), (2) the host name and (3) the worker ID. If the worker is
|
||||
restarted, this method will provide the same register UUID and the previous
|
||||
register (if present in the database) will be overwritten.
|
||||
|
||||
|
||||
.. note::
|
||||
|
||||
Right now, only the API server is running without eventlet.
|
||||
|
@@ -13,6 +13,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import typing
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from neutron.common import utils
|
||||
@@ -44,3 +46,13 @@ def get_start_time(default=None, current_time=False):
|
||||
return int(start_time.decode(encoding='utf-8'))
|
||||
except ImportError:
|
||||
return default
|
||||
|
||||
|
||||
def get_api_worker_id() -> typing.Union[int, None]:
|
||||
"""Return the worker ID number provided by uWSGI"""
|
||||
try:
|
||||
# pylint: disable=import-outside-toplevel
|
||||
import uwsgi
|
||||
return uwsgi.worker_id()
|
||||
except ImportError:
|
||||
return None
|
||||
|
@@ -14,26 +14,33 @@
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from neutron_lib.db import api as db_api
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.db.models import ovn as ovn_models
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
OVN_HASHRING_UUID_NAMESPACE = uuid.UUID('551ea055-1982-4295-8bc7-207a0d3c9231')
|
||||
|
||||
|
||||
def get_node_uuid(
|
||||
group_name: str,
|
||||
host: str,
|
||||
worker_id: int) -> str:
|
||||
node_str = '%s%s%s' % (group_name, host, str(worker_id))
|
||||
return uuid.uuid5(OVN_HASHRING_UUID_NAMESPACE, node_str).hex
|
||||
|
||||
|
||||
# NOTE(ralonsoh): this was migrated from networking-ovn to neutron and should
|
||||
# be refactored to be integrated in a OVO.
|
||||
@db_api.retry_if_session_inactive()
|
||||
def add_node(context, group_name, node_uuid=None, created_at=None):
|
||||
if node_uuid is None:
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
|
||||
def add_node(context, group_name, node_uuid, created_at=None):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
kwargs = {'node_uuid': node_uuid,
|
||||
'hostname': CONF.host,
|
||||
|
@@ -43,6 +43,7 @@ from oslo_db import exception as os_db_exc
|
||||
from oslo_log import log
|
||||
from oslo_service import service as oslo_service
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
|
||||
from neutron._i18n import _
|
||||
@@ -124,7 +125,7 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
self._maintenance_thread = None
|
||||
self._hash_ring_thread = None
|
||||
self._hash_ring_probe_event = multiprocessing.Event()
|
||||
self.node_uuid = None
|
||||
self._node_uuid = None
|
||||
self.hash_ring_group = ovn_const.HASH_RING_ML2_GROUP
|
||||
self.sg_enabled = ovn_acl.is_sg_enabled()
|
||||
ovn_conf.register_opts()
|
||||
@@ -203,6 +204,28 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
|
||||
return self._start_time
|
||||
|
||||
@property
|
||||
def node_uuid(self):
|
||||
if self._node_uuid:
|
||||
return self._node_uuid
|
||||
|
||||
worker_id = wsgi_utils.get_api_worker_id()
|
||||
if worker_id is None:
|
||||
# NOTE(ralonsoh): the hash ring node UUID should be based on the
|
||||
# Neutron API worker ID. Right now only uWSGI mode is supported.
|
||||
# The worker ID is provided via ``uwsgi`` library. If other loader
|
||||
# is used, a random node UUID will be provided.
|
||||
LOG.warning('uWSGI is the only supported loader for the Neutron '
|
||||
'API; it provides, via ``uwsgi`` library, the worker '
|
||||
'ID. If other loader is used, a random hash ring node '
|
||||
'UUID will be provided')
|
||||
self._node_uuid = uuidutils.generate_uuid()
|
||||
else:
|
||||
self._node_uuid = ovn_hash_ring_db.get_node_uuid(
|
||||
self.hash_ring_group, cfg.CONF.host, worker_id)
|
||||
|
||||
return self._node_uuid
|
||||
|
||||
def get_supported_vif_types(self):
|
||||
vif_types = set()
|
||||
for ch in self.sb_ovn.chassis_list().execute(check_error=True):
|
||||
@@ -372,8 +395,9 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
created_at = n_utils.ts_to_datetime(self.start_time)
|
||||
ovn_hash_ring_db.remove_nodes_from_host(
|
||||
context, self.hash_ring_group, created_at=created_at)
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(
|
||||
context, self.hash_ring_group, created_at=created_at)
|
||||
ovn_hash_ring_db.add_node(
|
||||
context, self.hash_ring_group, self.node_uuid,
|
||||
created_at=created_at)
|
||||
newer_nodes = ovn_hash_ring_db.get_nodes(
|
||||
context, self.hash_ring_group, created_at=created_at)
|
||||
LOG.debug('Hash Ring setup, this worker has detected %s OVN hash '
|
||||
|
@@ -72,10 +72,14 @@ class TestOVNMechanismDriver(base.TestOVNFunctionalBase):
|
||||
|
||||
# Create several OVN hash registers left by a previous execution.
|
||||
created_at = timeutils.utcnow() - datetime.timedelta(1)
|
||||
node_uuids = [uuidutils.generate_uuid(),
|
||||
uuidutils.generate_uuid(),
|
||||
uuidutils.generate_uuid()]
|
||||
with db_api.CONTEXT_WRITER.using(self.context):
|
||||
for _ in range(3):
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(
|
||||
self.context, ring_group, created_at=created_at)
|
||||
for idx in range(3):
|
||||
ovn_hash_ring_db.add_node(
|
||||
self.context, ring_group,
|
||||
node_uuids[idx], created_at=created_at)
|
||||
|
||||
# Check the existing OVN hash ring registers.
|
||||
ovn_hrs = ovn_hash_ring_db.get_nodes(self.context, ring_group)
|
||||
@@ -83,7 +87,8 @@ class TestOVNMechanismDriver(base.TestOVNFunctionalBase):
|
||||
|
||||
start_time = timeutils.utcnow()
|
||||
self.mech_driver._start_time = int(start_time.timestamp())
|
||||
for _ in range(3):
|
||||
for idx in range(3):
|
||||
self.mech_driver._node_uuid = node_uuids[idx]
|
||||
self.mech_driver._init_hash_ring(self.context)
|
||||
|
||||
ovn_hrs = ovn_hash_ring_db.get_nodes(self.context, ring_group)
|
||||
|
@@ -63,7 +63,9 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
|
||||
group_name=HASH_RING_TEST_GROUP):
|
||||
nodes = []
|
||||
for i in range(count):
|
||||
node_uuid = ovn_hash_ring_db.add_node(self.admin_ctx, group_name)
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
ovn_hash_ring_db.add_node(self.admin_ctx, group_name,
|
||||
node_uuid)
|
||||
self.assertIsNotNone(self._get_node_row(node_uuid))
|
||||
nodes.append(node_uuid)
|
||||
return nodes
|
||||
@@ -311,3 +313,9 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
|
||||
# Assert we only have 3 node entries after the clean up
|
||||
self.assertEqual(3, ovn_hash_ring_db.count_nodes_from_host(
|
||||
self.admin_ctx, HASH_RING_TEST_GROUP))
|
||||
|
||||
def test_get_node_uuid(self):
|
||||
# Test get_node_uuid is idempotent
|
||||
res1 = ovn_hash_ring_db.get_node_uuid('group1', 'host1', 1)
|
||||
res2 = ovn_hash_ring_db.get_node_uuid('group1', 'host1', 1)
|
||||
self.assertEqual(res1, res2)
|
||||
|
@@ -55,9 +55,11 @@ from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import exceptions as ovn_exceptions
|
||||
from neutron.common.ovn import hash_ring_manager
|
||||
from neutron.common.ovn import utils as ovn_utils
|
||||
from neutron.common import wsgi_utils
|
||||
from neutron.conf.agent import ovs_conf
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.db import ovn_revision_numbers_db
|
||||
from neutron.db import provisioning_blocks
|
||||
from neutron.db import securitygroups_db
|
||||
@@ -3171,6 +3173,18 @@ class TestOVNMechanismDriver(TestOVNMechanismDriverBase):
|
||||
tag=new_vlan_tag, if_exists=True)
|
||||
self.nb_ovn.set_lswitch_port.assert_has_calls([expected_call])
|
||||
|
||||
@mock.patch.object(wsgi_utils, 'get_api_worker_id', return_value=1)
|
||||
def test_node_uuid_worker_id(self, *args):
|
||||
cfg.CONF.set_override('host', 'host1')
|
||||
node_uuid = ovn_hash_ring_db.get_node_uuid(
|
||||
self.mech_driver.hash_ring_group, 'host1', 1)
|
||||
self.assertEqual(node_uuid, self.mech_driver.node_uuid)
|
||||
|
||||
@mock.patch.object(wsgi_utils, 'get_api_worker_id', return_value=None)
|
||||
@mock.patch.object(uuidutils, 'generate_uuid', return_value=123456789)
|
||||
def test_node_uuid_no_worker_id(self, *args):
|
||||
self.assertEqual(123456789, self.mech_driver.node_uuid)
|
||||
|
||||
|
||||
class OVNMechanismDriverTestCase(MechDriverSetupBase,
|
||||
test_plugin.Ml2PluginV2TestCase):
|
||||
|
Reference in New Issue
Block a user