Move to tooz hash ring implementation

This changes the ironic driver to use the hash ring implementation from
tooz, which is nearly identical to ironic.common.hash_ring.

Change-Id: I4200be2035067622604e5aa70e025594bcd0a801
Depends-On: Ic1f8b89b819ace8df9b15c61eaf9bf136ad3166b
This commit is contained in:
Jim Rollenhagen 2016-11-17 14:14:20 +00:00 committed by Julia Kreger
parent 98bdf8791b
commit b08e536831
8 changed files with 29 additions and 313 deletions

View File

@ -122,5 +122,5 @@ driver actions such as take-over or clean-up.
.. _Conductor service: ../api/ironic.conductor.manager.html
.. _DB API: ../api/ironic.db.api.html
.. _diskimage-builder: http://docs.openstack.org/developer/diskimage-builder/
.. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html
.. _consistent hashing algorithm: http://docs.openstack.org/developer/tooz/tutorial/hashring.html
.. _periodic: http://docs.openstack.org/developer/futurist/api.html#futurist.periodics.periodic

View File

@ -13,12 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import bisect
import hashlib
import threading
import time
import six
from tooz import hashring
from ironic.common import exception
from ironic.common.i18n import _
@ -26,113 +24,6 @@ from ironic.conf import CONF
from ironic.db import api as dbapi
class HashRing(object):
"""A stable hash ring.
We map item N to a host Y based on the closest lower hash:
- hash(item) -> partition
- hash(host) -> divider
- closest lower divider is the host to use
- we hash each host many times to spread load more finely
as otherwise adding a host gets (on average) 50% of the load of
just one other host assigned to it.
"""
def __init__(self, hosts, replicas=None):
"""Create a new hash ring across the specified hosts.
:param hosts: an iterable of hosts which will be mapped.
:param replicas: number of hosts to map to each hash partition,
or len(hosts), which ever is lesser.
Default: CONF.hash_distribution_replicas
"""
if replicas is None:
replicas = CONF.hash_distribution_replicas
try:
self.hosts = set(hosts)
self.replicas = replicas if replicas <= len(hosts) else len(hosts)
except TypeError:
raise exception.Invalid(
_("Invalid hosts supplied when building HashRing."))
self._host_hashes = {}
for host in hosts:
key = str(host).encode('utf8')
key_hash = hashlib.md5(key)
for p in range(2 ** CONF.hash_partition_exponent):
key_hash.update(key)
hashed_key = self._hash2int(key_hash)
self._host_hashes[hashed_key] = host
# Gather the (possibly colliding) resulting hashes into a bisectable
# list.
self._partitions = sorted(self._host_hashes.keys())
def _hash2int(self, key_hash):
"""Convert the given hash's digest to a numerical value for the ring.
:returns: An integer equivalent value of the digest.
"""
return int(key_hash.hexdigest(), 16)
def _get_partition(self, data):
try:
if six.PY3 and data is not None:
data = data.encode('utf-8')
key_hash = hashlib.md5(data)
hashed_key = self._hash2int(key_hash)
position = bisect.bisect(self._partitions, hashed_key)
return position if position < len(self._partitions) else 0
except TypeError:
raise exception.Invalid(
_("Invalid data supplied to HashRing.get_hosts."))
def get_hosts(self, data, ignore_hosts=None):
"""Get the list of hosts which the supplied data maps onto.
:param data: A string identifier to be mapped across the ring.
:param ignore_hosts: A list of hosts to skip when performing the hash.
Useful to temporarily skip down hosts without
performing a full rebalance.
Default: None.
:returns: a list of hosts.
The length of this list depends on the number of replicas
this `HashRing` was created with. It may be less than this
if ignore_hosts is not None.
"""
hosts = []
if ignore_hosts is None:
ignore_hosts = set()
else:
ignore_hosts = set(ignore_hosts)
ignore_hosts.intersection_update(self.hosts)
partition = self._get_partition(data)
for replica in range(0, self.replicas):
if len(hosts) + len(ignore_hosts) == len(self.hosts):
# prevent infinite loop - cannot allocate more fallbacks.
break
# Linear probing: partition N, then N+1 etc.
host = self._get_host(partition)
while host in hosts or host in ignore_hosts:
partition += 1
if partition >= len(self._partitions):
partition = 0
host = self._get_host(partition)
hosts.append(host)
return hosts
def _get_host(self, partition):
"""Find what host is serving a partition.
:param partition: The index of the partition in the partition map.
e.g. 0 is the first partition, 1 is the second.
:return: The host object the ring was constructed with.
"""
return self._host_hashes[self._partitions[partition]]
class HashRingManager(object):
_hash_rings = None
_lock = threading.Lock()
@ -161,7 +52,8 @@ class HashRingManager(object):
d2c = self.dbapi.get_active_driver_dict()
for driver_name, hosts in d2c.items():
rings[driver_name] = HashRing(hosts)
rings[driver_name] = hashring.HashRing(
hosts, partitions=2 ** CONF.hash_partition_exponent)
return rings
@classmethod

View File

@ -26,7 +26,7 @@ from oslo_utils import excutils
from ironic.common import context as ironic_context
from ironic.common import driver_factory
from ironic.common import exception
from ironic.common import hash_ring as hash
from ironic.common import hash_ring
from ironic.common.i18n import _, _LC, _LE, _LI, _LW
from ironic.common import rpc
from ironic.common import states
@ -78,7 +78,7 @@ class BaseConductorManager(object):
check_and_reject=rejection_func)
"""Executor for performing tasks async."""
self.ring_manager = hash.HashRingManager()
self.ring_manager = hash_ring.HashRingManager()
"""Consistent hash ring which maps drivers to conductors."""
# NOTE(deva): these calls may raise DriverLoadError or DriverNotFound
@ -302,7 +302,9 @@ class BaseConductorManager(object):
except exception.DriverNotFound:
return False
return self.host in ring.get_hosts(node_uuid)
return self.host in ring.get_nodes(
node_uuid.encode('utf-8'),
replicas=CONF.hash_distribution_replicas)
def _fail_if_in_state(self, context, filters, provision_state,
sort_key, callback_method=None,

View File

@ -34,11 +34,12 @@ locked by each conductor when performing actions which change the state of that
node; these locks are represented by the
:py:class:`ironic.conductor.task_manager.TaskManager` class.
A :py:class:`ironic.common.hash_ring.HashRing` is used to distribute nodes
across the set of active conductors which support each node's driver.
Rebalancing this ring can trigger various actions by each conductor, such as
building or tearing down the TFTP environment for a node, notifying Neutron of
a change, etc.
A `tooz.hashring.HashRing
<https://git.openstack.org/cgit/openstack/tooz/tree/tooz/hashring.py>`_
is used to distribute nodes across the set of active conductors which support
each node's driver. Rebalancing this ring can trigger various actions by each
conductor, such as building or tearing down the TFTP environment for a node,
notifying Neutron of a change, etc.
"""
import collections

View File

@ -27,6 +27,7 @@ from ironic.common import hash_ring
from ironic.common.i18n import _
from ironic.common import rpc
from ironic.conductor import manager
from ironic.conf import CONF
from ironic.objects import base as objects_base
@ -119,8 +120,9 @@ class ConductorAPI(object):
try:
ring = self.ring_manager[node.driver]
dest = ring.get_hosts(node.uuid)
return self.topic + "." + dest[0]
dest = ring.get_nodes(node.uuid.encode('utf-8'),
replicas=CONF.hash_distribution_replicas)
return '%s.%s' % (self.topic, dest.pop())
except exception.DriverNotFound:
reason = (_('No conductor service registered which supports '
'driver %s.') % node.driver)
@ -140,8 +142,8 @@ class ConductorAPI(object):
"""
self.ring_manager.reset()
hash_ring = self.ring_manager[driver_name]
host = random.choice(list(hash_ring.hosts))
ring = self.ring_manager[driver_name]
host = random.choice(list(ring.nodes))
return self.topic + "." + host
def create_node(self, context, node_obj, topic=None):

View File

@ -13,205 +13,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import time
import mock
from oslo_config import cfg
from testtools import matchers
from ironic.common import exception
from ironic.common import hash_ring
from ironic.tests import base
from ironic.tests.unit.db import base as db_base
CONF = cfg.CONF
class HashRingTestCase(base.TestCase):
# NOTE(deva): the mapping used in these tests is as follows:
# if hosts = [foo, bar]:
# fake -> foo, bar
# if hosts = [foo, bar, baz]:
# fake -> foo, bar, baz
# fake-again -> bar, baz, foo
@mock.patch.object(hashlib, 'md5', autospec=True)
def test__hash2int_returns_int(self, mock_md5):
CONF.set_override('hash_partition_exponent', 0)
r1 = 32 * 'a'
r2 = 32 * 'b'
mock_md5.return_value.hexdigest.side_effect = [r1, r2]
hosts = ['foo', 'bar']
replicas = 1
ring = hash_ring.HashRing(hosts, replicas=replicas)
self.assertIn(int(r1, 16), ring._host_hashes)
self.assertIn(int(r2, 16), ring._host_hashes)
def test_create_ring(self):
hosts = ['foo', 'bar']
replicas = 2
ring = hash_ring.HashRing(hosts, replicas=replicas)
self.assertEqual(set(hosts), ring.hosts)
self.assertEqual(replicas, ring.replicas)
def test_create_with_different_partition_counts(self):
hosts = ['foo', 'bar']
CONF.set_override('hash_partition_exponent', 2)
ring = hash_ring.HashRing(hosts)
self.assertEqual(2 ** 2 * 2, len(ring._partitions))
CONF.set_override('hash_partition_exponent', 8)
ring = hash_ring.HashRing(hosts)
self.assertEqual(2 ** 8 * 2, len(ring._partitions))
CONF.set_override('hash_partition_exponent', 16)
ring = hash_ring.HashRing(hosts)
self.assertEqual(2 ** 16 * 2, len(ring._partitions))
def test_distribution_one_replica(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=1)
fake_1_hosts = ring.get_hosts('fake')
fake_2_hosts = ring.get_hosts('fake-again')
# We should have one hosts for each thing
self.assertThat(fake_1_hosts, matchers.HasLength(1))
self.assertThat(fake_2_hosts, matchers.HasLength(1))
# And they must not be the same answers even on this simple data.
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
def test_distribution_two_replicas(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=2)
fake_1_hosts = ring.get_hosts('fake')
fake_2_hosts = ring.get_hosts('fake-again')
# We should have two hosts for each thing
self.assertThat(fake_1_hosts, matchers.HasLength(2))
self.assertThat(fake_2_hosts, matchers.HasLength(2))
# And they must not be the same answers even on this simple data
# because if they were we'd be making the active replica a hot spot.
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
def test_distribution_three_replicas(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=3)
fake_1_hosts = ring.get_hosts('fake')
fake_2_hosts = ring.get_hosts('fake-again')
# We should have two hosts for each thing
self.assertThat(fake_1_hosts, matchers.HasLength(3))
self.assertThat(fake_2_hosts, matchers.HasLength(3))
# And they must not be the same answers even on this simple data
# because if they were we'd be making the active replica a hot spot.
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
self.assertNotEqual(fake_1_hosts[0], fake_2_hosts[0])
def test_ignore_hosts(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=1)
equals_bar_or_baz = matchers.MatchesAny(
matchers.Equals(['bar']),
matchers.Equals(['baz']))
self.assertThat(
ring.get_hosts('fake', ignore_hosts=['foo']),
equals_bar_or_baz)
self.assertThat(
ring.get_hosts('fake', ignore_hosts=['foo', 'bar']),
equals_bar_or_baz)
self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts))
def test_ignore_hosts_with_replicas(self):
hosts = ['foo', 'bar', 'baz']
ring = hash_ring.HashRing(hosts, replicas=2)
self.assertEqual(
set(['bar', 'baz']),
set(ring.get_hosts('fake', ignore_hosts=['foo'])))
self.assertEqual(
set(['baz']),
set(ring.get_hosts('fake', ignore_hosts=['foo', 'bar'])))
self.assertEqual(
set(['baz', 'foo']),
set(ring.get_hosts('fake-again', ignore_hosts=['bar'])))
self.assertEqual(
set(['foo']),
set(ring.get_hosts('fake-again', ignore_hosts=['bar', 'baz'])))
self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts))
def _compare_rings(self, nodes, conductors, ring,
new_conductors, new_ring):
delta = {}
mapping = dict((node, ring.get_hosts(node)[0]) for node in nodes)
new_mapping = dict(
(node, new_ring.get_hosts(node)[0]) for node in nodes)
for key, old in mapping.items():
new = new_mapping.get(key, None)
if new != old:
delta[key] = (old, new)
return delta
def test_rebalance_stability_join(self):
num_conductors = 10
num_nodes = 10000
# Adding 1 conductor to a set of N should move 1/(N+1) of all nodes
# Eg, for a cluster of 10 nodes, adding one should move 1/11, or 9%
# We allow for 1/N to allow for rounding in tests.
redistribution_factor = 1.0 / num_conductors
nodes = [str(x) for x in range(num_nodes)]
conductors = [str(x) for x in range(num_conductors)]
new_conductors = conductors + ['new']
delta = self._compare_rings(
nodes, conductors, hash_ring.HashRing(conductors),
new_conductors, hash_ring.HashRing(new_conductors))
self.assertLess(len(delta), num_nodes * redistribution_factor)
def test_rebalance_stability_leave(self):
num_conductors = 10
num_nodes = 10000
# Removing 1 conductor from a set of N should move 1/(N) of all nodes
# Eg, for a cluster of 10 nodes, removing one should move 1/10, or 10%
# We allow for 1/(N-1) to allow for rounding in tests.
redistribution_factor = 1.0 / (num_conductors - 1)
nodes = [str(x) for x in range(num_nodes)]
conductors = [str(x) for x in range(num_conductors)]
new_conductors = conductors[:]
new_conductors.pop()
delta = self._compare_rings(
nodes, conductors, hash_ring.HashRing(conductors),
new_conductors, hash_ring.HashRing(new_conductors))
self.assertLess(len(delta), num_nodes * redistribution_factor)
def test_more_replicas_than_hosts(self):
hosts = ['foo', 'bar']
ring = hash_ring.HashRing(hosts, replicas=10)
self.assertEqual(set(hosts), set(ring.get_hosts('fake')))
def test_ignore_non_existent_host(self):
hosts = ['foo', 'bar']
ring = hash_ring.HashRing(hosts, replicas=1)
self.assertEqual(['foo'], ring.get_hosts('fake',
ignore_hosts=['baz']))
def test_create_ring_invalid_data(self):
hosts = None
self.assertRaises(exception.Invalid,
hash_ring.HashRing,
hosts)
def test_get_hosts_invalid_data(self):
hosts = ['foo', 'bar']
ring = hash_ring.HashRing(hosts)
self.assertRaises(exception.Invalid,
ring.get_hosts,
None)
class HashRingManagerTestCase(db_base.DbTestCase):
def setUp(self):
@ -231,7 +43,7 @@ class HashRingManagerTestCase(db_base.DbTestCase):
def test_hash_ring_manager_get_ring_success(self):
self.register_conductors()
ring = self.ring_manager['driver1']
self.assertEqual(sorted(['host1', 'host2']), sorted(ring.hosts))
self.assertEqual(sorted(['host1', 'host2']), sorted(ring.nodes))
def test_hash_ring_manager_driver_not_found(self):
self.register_conductors()

View File

@ -0,0 +1,6 @@
---
upgrade:
- |
Adds a new dependency on the
`tooz library <https://pypi.python.org/pypi/tooz>`_, as the consistent
hash ring code was moved out of ironic and into tooz.

View File

@ -41,3 +41,4 @@ oslo.versionedobjects>=1.17.0 # Apache-2.0
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
psutil<2.0.0,>=1.1.1 # BSD
futurist!=0.15.0,>=0.11.0 # Apache-2.0
tooz>=1.47.0 # Apache-2.0