diff --git a/cinder/coordination.py b/cinder/coordination.py new file mode 100644 index 00000000000..a4797c02706 --- /dev/null +++ b/cinder/coordination.py @@ -0,0 +1,286 @@ +# Copyright 2015 Intel +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Coordination and locking utilities.""" + +import inspect +import random +import threading +import uuid + +import eventlet +from eventlet import tpool +import itertools +from oslo_config import cfg +from oslo_log import log +import six +from tooz import coordination +from tooz import locking + +from cinder import exception +from cinder.i18n import _, _LE, _LI, _LW + +LOG = log.getLogger(__name__) + +coordination_opts = [ + cfg.StrOpt('backend_url', + default='file://$state_path', + help='The backend URL to use for distributed coordination.'), + cfg.FloatOpt('heartbeat', + default=1.0, + help='Number of seconds between heartbeats for distributed ' + 'coordination.'), + cfg.FloatOpt('initial_reconnect_backoff', + default=0.1, + help='Initial number of seconds to wait after failed ' + 'reconnection.'), + cfg.FloatOpt('max_reconnect_backoff', + default=60.0, + help='Maximum number of seconds between sequential ' + 'reconnection retries.'), + +] + +CONF = cfg.CONF +CONF.register_opts(coordination_opts, group='coordination') + + +class Coordinator(object): + """Tooz coordination wrapper. + + Coordination member id is created from concatenated + `prefix` and `agent_id` parameters. + + :param str agent_id: Agent identifier + :param str prefix: Used to provide member identifier with a + meaningful prefix. + """ + + def __init__(self, agent_id=None, prefix=''): + self.coordinator = None + self.agent_id = agent_id or str(uuid.uuid4()) + self.started = False + self.prefix = prefix + self._ev = None + self._dead = None + + def is_active(self): + return self.coordinator is not None + + def start(self): + """Connect to coordination backend and start heartbeat.""" + if not self.started: + try: + self._dead = threading.Event() + self._start() + self.started = True + # NOTE(bluex): Start heartbeat in separate thread to avoid + # being blocked by long coroutines. + if self.coordinator and self.coordinator.requires_beating: + self._ev = eventlet.spawn( + lambda: tpool.execute(self.heartbeat)) + except coordination.ToozError: + LOG.exception(_LE('Error starting coordination backend.')) + raise + LOG.info(_LI('Coordination backend started successfully.')) + + def stop(self): + """Disconnect from coordination backend and stop heartbeat.""" + if self.started: + self.coordinator.stop() + self._dead.set() + if self._ev is not None: + self._ev.wait() + self._ev = None + self.coordinator = None + self.started = False + + def get_lock(self, name): + """Return a Tooz backend lock. + + :param str name: The lock name that is used to identify it + across all nodes. + """ + if self.coordinator is not None: + return self.coordinator.get_lock(self.prefix + name) + else: + raise exception.LockCreationFailed(_('Coordinator uninitialized.')) + + def heartbeat(self): + """Coordinator heartbeat. + + Method that every couple of seconds (config: `coordination.heartbeat`) + sends heartbeat to prove that the member is not dead. + + If connection to coordination backend is broken it tries to + reconnect every couple of seconds + (config: `coordination.initial_reconnect_backoff` up to + `coordination.max_reconnect_backoff`) + + """ + while self.coordinator is not None and not self._dead.is_set(): + try: + self._heartbeat() + except coordination.ToozConnectionError: + self._reconnect() + else: + self._dead.wait(cfg.CONF.coordination.heartbeat) + + def _start(self): + member_id = self.prefix + self.agent_id + self.coordinator = coordination.get_coordinator( + cfg.CONF.coordination.backend_url, member_id) + self.coordinator.start() + + def _heartbeat(self): + try: + self.coordinator.heartbeat() + return True + except coordination.ToozConnectionError: + LOG.exception(_LE('Connection error while sending a heartbeat ' + 'to coordination backend.')) + raise + except coordination.ToozError: + LOG.exception(_LE('Error sending a heartbeat to coordination ' + 'backend.')) + return False + + def _reconnect(self): + """Reconnect with jittered exponential backoff increase.""" + LOG.info(_LI('Reconnecting to coordination backend.')) + cap = cfg.CONF.coordination.max_reconnect_backoff + backoff = base = cfg.CONF.coordination.initial_reconnect_backoff + for attempt in itertools.count(1): + try: + self._start() + break + except coordination.ToozError: + backoff = min(cap, random.uniform(base, backoff * 3)) + msg = _LW('Reconnect attempt %(attempt)s failed. ' + 'Next try in %(backoff).2fs.') + LOG.warning(msg, {'attempt': attempt, 'backoff': backoff}) + self._dead.wait(backoff) + LOG.info(_LI('Reconnected to coordination backend.')) + + +COORDINATOR = Coordinator(prefix='cinder-') + + +class Lock(locking.Lock): + """Lock with dynamic name. + + :param str lock_name: Lock name. + :param dict lock_data: Data for lock name formatting. + :param coordinator: Coordinator class to use when creating lock. + Defaults to the global coordinator. + + Using it like so:: + + with Lock('mylock'): + ... + + ensures that only one process at a time will execute code in context. + Lock name can be formatted using Python format string syntax:: + + Lock('foo-{volume.id}, {'volume': ...,}) + + Available field names are keys of lock_data. + """ + def __init__(self, lock_name, lock_data=None, coordinator=None): + super(Lock, self).__init__(str(id(self))) + lock_data = lock_data or {} + self.coordinator = coordinator or COORDINATOR + self.blocking = True + self.lock = self._prepare_lock(lock_name, lock_data) + + def _prepare_lock(self, lock_name, lock_data): + if not isinstance(lock_name, six.string_types): + raise ValueError(_('Not a valid string: %s') % lock_name) + return self.coordinator.get_lock(lock_name.format(**lock_data)) + + def acquire(self, blocking=None): + """Attempts to acquire lock. + + :param blocking: If True, blocks until the lock is acquired. If False, + returns right away. Otherwise, the value is used as a timeout + value and the call returns maximum after this number of seconds. + :return: returns true if acquired (false if not) + :rtype: bool + """ + blocking = self.blocking if blocking is None else blocking + return self.lock.acquire(blocking=blocking) + + def release(self): + """Attempts to release lock. + + The behavior of releasing a lock which was not acquired in the first + place is undefined. + :return: returns true if released (false if not) + :rtype: bool + """ + self.lock.release() + + +def synchronized(lock_name, blocking=True, coordinator=None): + """Synchronization decorator. + + :param str lock_name: Lock name. + :param blocking: If True, blocks until the lock is acquired. + If False, raises exception when not acquired. Otherwise, + the value is used as a timeout value and if lock is not acquired + after this number of seconds exception is raised. + :param coordinator: Coordinator class to use when creating lock. + Defaults to the global coordinator. + :raises tooz.coordination.LockAcquireFailed: if lock is not acquired + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one process will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + Lock name can be formatted using Python format string syntax:: + + @synchronized('{f_name}-{vol.id}-{snap[name]}') + def foo(self, vol, snap): + ... + + Available field names are: decorated function parameters and + `f_name` as a decorated function name. + """ + def wrap(f): + @six.wraps(f) + def wrapped(*a, **k): + call_args = inspect.getcallargs(f, *a, **k) + call_args['f_name'] = f.__name__ + lock = Lock(lock_name, call_args, coordinator) + with lock(blocking): + return f(*a, **k) + return wrapped + return wrap diff --git a/cinder/exception.py b/cinder/exception.py index b5344537591..b70661ba687 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -702,6 +702,14 @@ class EvaluatorParseException(Exception): message = _("Error during evaluator parsing: %(reason)s") +class LockCreationFailed(CinderException): + message = _('Unable to create lock. Coordination backend not started.') + + +class LockingFailed(CinderException): + message = _('Lock acquisition failed.') + + UnsupportedObjectError = obj_exc.UnsupportedObjectError OrphanedObjectError = obj_exc.OrphanedObjectError IncompatibleObjectVersion = obj_exc.IncompatibleObjectVersion diff --git a/cinder/opts.py b/cinder/opts.py index 314ebd1d2fc..a5760434ffc 100644 --- a/cinder/opts.py +++ b/cinder/opts.py @@ -34,6 +34,7 @@ from cinder.common import config as cinder_common_config import cinder.compute from cinder.compute import nova as cinder_compute_nova from cinder import context as cinder_context +from cinder import coordination as cinder_coordination from cinder.db import api as cinder_db_api from cinder.db import base as cinder_db_base from cinder import exception as cinder_exception @@ -332,6 +333,10 @@ def list_opts(): cinder_zonemanager_drivers_brocade_brcdfabricopts. brcd_zone_opts, )), + ('COORDINATION', + itertools.chain( + cinder_coordination.coordination_opts, + )), ('BACKEND', itertools.chain( [cinder_cmd_volume.host_opt], diff --git a/cinder/tests/unit/test_coordination.py b/cinder/tests/unit/test_coordination.py new file mode 100644 index 00000000000..7d31e51917d --- /dev/null +++ b/cinder/tests/unit/test_coordination.py @@ -0,0 +1,131 @@ +# Copyright 2015 Intel +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import tooz.coordination +import tooz.locking + +from cinder import coordination +from cinder import test + + +class Locked(Exception): + pass + + +class MockToozLock(tooz.locking.Lock): + active_locks = set() + + def acquire(self, blocking=True): + if self.name not in self.active_locks: + self.active_locks.add(self.name) + return True + elif not blocking: + return False + else: + raise Locked + + def release(self): + self.active_locks.remove(self.name) + + +@mock.patch('time.sleep', lambda _: None) +@mock.patch('eventlet.spawn', lambda f: f()) +@mock.patch('eventlet.tpool.execute', lambda f: f()) +@mock.patch.object(coordination.Coordinator, 'heartbeat') +@mock.patch('tooz.coordination.get_coordinator') +class CoordinatorTestCase(test.TestCase): + def test_coordinator_start(self, get_coordinator, heartbeat): + crd = get_coordinator.return_value + + agent = coordination.Coordinator() + agent.start() + self.assertTrue(get_coordinator.called) + self.assertTrue(heartbeat.called) + self.assertTrue(crd.start.called) + + def test_coordinator_stop(self, get_coordinator, heartbeat): + crd = get_coordinator.return_value + + agent = coordination.Coordinator() + agent.start() + self.assertIsNotNone(agent.coordinator) + agent.stop() + self.assertTrue(crd.stop.called) + self.assertIsNone(agent.coordinator) + + def test_coordinator_lock(self, get_coordinator, heartbeat): + crd = get_coordinator.return_value + crd.get_lock.side_effect = lambda n: MockToozLock(n) + + agent1 = coordination.Coordinator() + agent1.start() + agent2 = coordination.Coordinator() + agent2.start() + self.assertNotIn('lock', MockToozLock.active_locks) + with agent1.get_lock('lock'): + self.assertIn('lock', MockToozLock.active_locks) + self.assertRaises(Locked, agent1.get_lock('lock').acquire) + self.assertRaises(Locked, agent2.get_lock('lock').acquire) + self.assertNotIn('lock', MockToozLock.active_locks) + + def test_coordinator_offline(self, get_coordinator, heartbeat): + crd = get_coordinator.return_value + crd.start.side_effect = tooz.coordination.ToozConnectionError('err') + + agent = coordination.Coordinator() + self.assertRaises(tooz.coordination.ToozError, agent.start) + self.assertFalse(agent.started) + self.assertFalse(heartbeat.called) + + def test_coordinator_reconnect(self, get_coordinator, heartbeat): + start_online = iter([True] + [False] * 5 + [True]) + heartbeat_online = iter((False, True, True)) + + def raiser(cond): + if not cond: + raise tooz.coordination.ToozConnectionError('err') + + crd = get_coordinator.return_value + crd.start.side_effect = lambda *_: raiser(next(start_online)) + crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online)) + + agent = coordination.Coordinator() + agent.start() + self.assertRaises(tooz.coordination.ToozConnectionError, + agent._heartbeat) + self.assertEqual(1, get_coordinator.call_count) + agent._reconnect() + self.assertEqual(7, get_coordinator.call_count) + agent._heartbeat() + + +@mock.patch.object(coordination.COORDINATOR, 'get_lock') +class CoordinationTestCase(test.TestCase): + def test_lock(self, get_lock): + with coordination.Lock('lock'): + self.assertTrue(get_lock.called) + + def test_synchronized(self, get_lock): + @coordination.synchronized('lock-{f_name}-{foo.val}-{bar[val]}') + def func(foo, bar): + pass + + foo = mock.Mock() + foo.val = 7 + bar = mock.MagicMock() + bar.__getitem__.return_value = 8 + func(foo, bar) + get_lock.assert_called_with('lock-func-7-8') diff --git a/releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml b/releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml new file mode 100644 index 00000000000..1fb77dc7a29 --- /dev/null +++ b/releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml @@ -0,0 +1,4 @@ +--- +features: + - Locks may use Tooz as abstraction layer now, to support distributed lock + managers and prepare Cinder to better support HA configurations. diff --git a/requirements.txt b/requirements.txt index c5710f19d32..3f249476c31 100644 --- a/requirements.txt +++ b/requirements.txt @@ -52,3 +52,4 @@ oslo.i18n>=1.5.0 # Apache-2.0 oslo.vmware>=1.16.0 # Apache-2.0 os-brick>=0.4.0 # Apache-2.0 os-win>=0.0.7 # Apache-2.0 +tooz>=1.28.0 # Apache-2.0