coordination: use tooz builtin heartbeat feature
Since version 1.38.0, tooz can manage the heartbeat entirely internally. In this patch, this feature is used to simplify Cinder's code. Change-Id: I7fa654caf6620d410d4e297d3d8af2215e27ed12
This commit is contained in:
parent
f6b6b6550b
commit
42dafd2705
@ -16,14 +16,9 @@
|
|||||||
"""Coordination and locking utilities."""
|
"""Coordination and locking utilities."""
|
||||||
|
|
||||||
import inspect
|
import inspect
|
||||||
import random
|
|
||||||
import threading
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import decorator
|
import decorator
|
||||||
import eventlet
|
|
||||||
from eventlet import tpool
|
|
||||||
import itertools
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
@ -41,16 +36,28 @@ coordination_opts = [
|
|||||||
cfg.FloatOpt('heartbeat',
|
cfg.FloatOpt('heartbeat',
|
||||||
default=1.0,
|
default=1.0,
|
||||||
help='Number of seconds between heartbeats for distributed '
|
help='Number of seconds between heartbeats for distributed '
|
||||||
'coordination.'),
|
'coordination. No longer used since distributed '
|
||||||
|
'coordination manages its heartbeat internally.',
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_reason='This option is no longer used.',
|
||||||
|
deprecated_since='11.0.0'),
|
||||||
cfg.FloatOpt('initial_reconnect_backoff',
|
cfg.FloatOpt('initial_reconnect_backoff',
|
||||||
default=0.1,
|
default=0.1,
|
||||||
help='Initial number of seconds to wait after failed '
|
help='Initial number of seconds to wait after failed '
|
||||||
'reconnection.'),
|
'reconnection. No longer used since distributed '
|
||||||
|
'coordination manages its heartbeat internally.',
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_reason='This option is no longer used.',
|
||||||
|
deprecated_since='11.0.0'),
|
||||||
cfg.FloatOpt('max_reconnect_backoff',
|
cfg.FloatOpt('max_reconnect_backoff',
|
||||||
default=60.0,
|
default=60.0,
|
||||||
help='Maximum number of seconds between sequential '
|
help='Maximum number of seconds between sequential '
|
||||||
'reconnection retries.'),
|
'reconnection retries. No longer used since '
|
||||||
|
'distributed coordination manages its heartbeat '
|
||||||
|
'internally.',
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_reason='This option is no longer used.',
|
||||||
|
deprecated_since='11.0.0'),
|
||||||
]
|
]
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
@ -73,34 +80,22 @@ class Coordinator(object):
|
|||||||
self.agent_id = agent_id or str(uuid.uuid4())
|
self.agent_id = agent_id or str(uuid.uuid4())
|
||||||
self.started = False
|
self.started = False
|
||||||
self.prefix = prefix
|
self.prefix = prefix
|
||||||
self._ev = None
|
|
||||||
self._dead = None
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Connect to coordination backend and start heartbeat."""
|
if self.started:
|
||||||
if not self.started:
|
return
|
||||||
try:
|
|
||||||
self._dead = threading.Event()
|
# NOTE(bluex): Tooz expects member_id as a byte string.
|
||||||
self._start()
|
member_id = (self.prefix + self.agent_id).encode('ascii')
|
||||||
self.started = True
|
self.coordinator = coordination.get_coordinator(
|
||||||
# NOTE(bluex): Start heartbeat in separate thread to avoid
|
cfg.CONF.coordination.backend_url, member_id)
|
||||||
# being blocked by long coroutines.
|
self.coordinator.start(start_heart=True)
|
||||||
if self.coordinator and self.coordinator.requires_beating:
|
self.started = True
|
||||||
self._ev = eventlet.spawn(
|
|
||||||
lambda: tpool.execute(self.heartbeat))
|
|
||||||
except coordination.ToozError:
|
|
||||||
LOG.exception('Error starting coordination backend.')
|
|
||||||
raise
|
|
||||||
LOG.info('Coordination backend started successfully.')
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Disconnect from coordination backend and stop heartbeat."""
|
"""Disconnect from coordination backend and stop heartbeat."""
|
||||||
if self.started:
|
if self.started:
|
||||||
self.coordinator.stop()
|
self.coordinator.stop()
|
||||||
self._dead.set()
|
|
||||||
if self._ev is not None:
|
|
||||||
self._ev.wait()
|
|
||||||
self._ev = None
|
|
||||||
self.coordinator = None
|
self.coordinator = None
|
||||||
self.started = False
|
self.started = False
|
||||||
|
|
||||||
@ -117,63 +112,6 @@ class Coordinator(object):
|
|||||||
else:
|
else:
|
||||||
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
|
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
|
||||||
|
|
||||||
def heartbeat(self):
|
|
||||||
"""Coordinator heartbeat.
|
|
||||||
|
|
||||||
Method that every couple of seconds (config: `coordination.heartbeat`)
|
|
||||||
sends heartbeat to prove that the member is not dead.
|
|
||||||
|
|
||||||
If connection to coordination backend is broken it tries to
|
|
||||||
reconnect every couple of seconds
|
|
||||||
(config: `coordination.initial_reconnect_backoff` up to
|
|
||||||
`coordination.max_reconnect_backoff`)
|
|
||||||
|
|
||||||
"""
|
|
||||||
while self.coordinator is not None and not self._dead.is_set():
|
|
||||||
try:
|
|
||||||
self._heartbeat()
|
|
||||||
except coordination.ToozConnectionError:
|
|
||||||
self._reconnect()
|
|
||||||
else:
|
|
||||||
self._dead.wait(cfg.CONF.coordination.heartbeat)
|
|
||||||
|
|
||||||
def _start(self):
|
|
||||||
# NOTE(bluex): Tooz expects member_id as a byte string.
|
|
||||||
member_id = (self.prefix + self.agent_id).encode('ascii')
|
|
||||||
self.coordinator = coordination.get_coordinator(
|
|
||||||
cfg.CONF.coordination.backend_url, member_id)
|
|
||||||
self.coordinator.start()
|
|
||||||
|
|
||||||
def _heartbeat(self):
|
|
||||||
try:
|
|
||||||
self.coordinator.heartbeat()
|
|
||||||
return True
|
|
||||||
except coordination.ToozConnectionError:
|
|
||||||
LOG.exception('Connection error while sending a heartbeat '
|
|
||||||
'to coordination backend.')
|
|
||||||
raise
|
|
||||||
except coordination.ToozError:
|
|
||||||
LOG.exception('Error sending a heartbeat to coordination '
|
|
||||||
'backend.')
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _reconnect(self):
|
|
||||||
"""Reconnect with jittered exponential backoff increase."""
|
|
||||||
LOG.info('Reconnecting to coordination backend.')
|
|
||||||
cap = cfg.CONF.coordination.max_reconnect_backoff
|
|
||||||
backoff = base = cfg.CONF.coordination.initial_reconnect_backoff
|
|
||||||
for attempt in itertools.count(1):
|
|
||||||
try:
|
|
||||||
self._start()
|
|
||||||
break
|
|
||||||
except coordination.ToozError:
|
|
||||||
backoff = min(cap, random.uniform(base, backoff * 3))
|
|
||||||
msg = ('Reconnect attempt %(attempt)s failed. '
|
|
||||||
'Next try in %(backoff).2fs.')
|
|
||||||
LOG.warning(msg, {'attempt': attempt, 'backoff': backoff})
|
|
||||||
self._dead.wait(backoff)
|
|
||||||
LOG.info('Reconnected to coordination backend.')
|
|
||||||
|
|
||||||
|
|
||||||
COORDINATOR = Coordinator(prefix='cinder-')
|
COORDINATOR = Coordinator(prefix='cinder-')
|
||||||
|
|
||||||
|
@ -43,25 +43,19 @@ class MockToozLock(tooz.locking.Lock):
|
|||||||
self.active_locks.remove(self.name)
|
self.active_locks.remove(self.name)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('time.sleep', lambda _: None)
|
|
||||||
@mock.patch('eventlet.spawn', lambda f: f())
|
|
||||||
@mock.patch('eventlet.tpool.execute', lambda f: f())
|
|
||||||
@mock.patch.object(coordination.Coordinator, 'heartbeat')
|
|
||||||
@mock.patch('tooz.coordination.get_coordinator')
|
@mock.patch('tooz.coordination.get_coordinator')
|
||||||
@mock.patch('random.uniform', lambda _a, _b: 0)
|
|
||||||
class CoordinatorTestCase(test.TestCase):
|
class CoordinatorTestCase(test.TestCase):
|
||||||
MOCK_TOOZ = False
|
MOCK_TOOZ = False
|
||||||
|
|
||||||
def test_coordinator_start(self, get_coordinator, heartbeat):
|
def test_coordinator_start(self, get_coordinator):
|
||||||
crd = get_coordinator.return_value
|
crd = get_coordinator.return_value
|
||||||
|
|
||||||
agent = coordination.Coordinator()
|
agent = coordination.Coordinator()
|
||||||
agent.start()
|
agent.start()
|
||||||
self.assertTrue(get_coordinator.called)
|
self.assertTrue(get_coordinator.called)
|
||||||
self.assertTrue(heartbeat.called)
|
|
||||||
self.assertTrue(crd.start.called)
|
self.assertTrue(crd.start.called)
|
||||||
|
|
||||||
def test_coordinator_stop(self, get_coordinator, heartbeat):
|
def test_coordinator_stop(self, get_coordinator):
|
||||||
crd = get_coordinator.return_value
|
crd = get_coordinator.return_value
|
||||||
|
|
||||||
agent = coordination.Coordinator()
|
agent = coordination.Coordinator()
|
||||||
@ -71,7 +65,7 @@ class CoordinatorTestCase(test.TestCase):
|
|||||||
self.assertTrue(crd.stop.called)
|
self.assertTrue(crd.stop.called)
|
||||||
self.assertIsNone(agent.coordinator)
|
self.assertIsNone(agent.coordinator)
|
||||||
|
|
||||||
def test_coordinator_lock(self, get_coordinator, heartbeat):
|
def test_coordinator_lock(self, get_coordinator):
|
||||||
crd = get_coordinator.return_value
|
crd = get_coordinator.return_value
|
||||||
crd.get_lock.side_effect = lambda n: MockToozLock(n)
|
crd.get_lock.side_effect = lambda n: MockToozLock(n)
|
||||||
|
|
||||||
@ -90,35 +84,13 @@ class CoordinatorTestCase(test.TestCase):
|
|||||||
self.assertRaises(Locked, agent2.get_lock(lock_name).acquire)
|
self.assertRaises(Locked, agent2.get_lock(lock_name).acquire)
|
||||||
self.assertNotIn(expected_name, MockToozLock.active_locks)
|
self.assertNotIn(expected_name, MockToozLock.active_locks)
|
||||||
|
|
||||||
def test_coordinator_offline(self, get_coordinator, heartbeat):
|
def test_coordinator_offline(self, get_coordinator):
|
||||||
crd = get_coordinator.return_value
|
crd = get_coordinator.return_value
|
||||||
crd.start.side_effect = tooz.coordination.ToozConnectionError('err')
|
crd.start.side_effect = tooz.coordination.ToozConnectionError('err')
|
||||||
|
|
||||||
agent = coordination.Coordinator()
|
agent = coordination.Coordinator()
|
||||||
self.assertRaises(tooz.coordination.ToozError, agent.start)
|
self.assertRaises(tooz.coordination.ToozError, agent.start)
|
||||||
self.assertFalse(agent.started)
|
self.assertFalse(agent.started)
|
||||||
self.assertFalse(heartbeat.called)
|
|
||||||
|
|
||||||
def test_coordinator_reconnect(self, get_coordinator, heartbeat):
|
|
||||||
start_online = iter([True] + [False] * 5 + [True])
|
|
||||||
heartbeat_online = iter((False, True, True))
|
|
||||||
|
|
||||||
def raiser(cond):
|
|
||||||
if not cond:
|
|
||||||
raise tooz.coordination.ToozConnectionError('err')
|
|
||||||
|
|
||||||
crd = get_coordinator.return_value
|
|
||||||
crd.start.side_effect = lambda *_: raiser(next(start_online))
|
|
||||||
crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online))
|
|
||||||
|
|
||||||
agent = coordination.Coordinator()
|
|
||||||
agent.start()
|
|
||||||
self.assertRaises(tooz.coordination.ToozConnectionError,
|
|
||||||
agent._heartbeat)
|
|
||||||
self.assertEqual(1, get_coordinator.call_count)
|
|
||||||
agent._reconnect()
|
|
||||||
self.assertEqual(7, get_coordinator.call_count)
|
|
||||||
agent._heartbeat()
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(coordination.COORDINATOR, 'get_lock')
|
@mock.patch.object(coordination.COORDINATOR, 'get_lock')
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
The coordination system used by Cinder has been simplified to leverage tooz
|
||||||
|
builtin heartbeat feature. Therefore, the configuration options
|
||||||
|
`coordination.heartbeat`, `coordination.initial_reconnect_backoff` and
|
||||||
|
`coordination.max_reconnect_backoff` have been removed.
|
Loading…
Reference in New Issue
Block a user