Merge "Allocation API: allow skipping retries in TaskManager"
This commit is contained in:
commit
b77fe3c427
@ -149,20 +149,16 @@ def require_exclusive_lock(f):
|
|||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
def acquire(context, node_id, shared=False, purpose='unspecified action'):
|
def acquire(context, *args, **kwargs):
|
||||||
"""Shortcut for acquiring a lock on a Node.
|
"""Shortcut for acquiring a lock on a Node.
|
||||||
|
|
||||||
:param context: Request context.
|
:param context: Request context.
|
||||||
:param node_id: ID or UUID of node to lock.
|
|
||||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
|
||||||
lock. Default: False.
|
|
||||||
:param purpose: human-readable purpose to put to debug logs.
|
|
||||||
:returns: An instance of :class:`TaskManager`.
|
:returns: An instance of :class:`TaskManager`.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# NOTE(lintan): This is a workaround to set the context of periodic tasks.
|
# NOTE(lintan): This is a workaround to set the context of periodic tasks.
|
||||||
context.ensure_thread_contain_context()
|
context.ensure_thread_contain_context()
|
||||||
return TaskManager(context, node_id, shared=shared, purpose=purpose)
|
return TaskManager(context, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class TaskManager(object):
|
class TaskManager(object):
|
||||||
@ -174,7 +170,7 @@ class TaskManager(object):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, context, node_id, shared=False,
|
def __init__(self, context, node_id, shared=False,
|
||||||
purpose='unspecified action'):
|
purpose='unspecified action', retry=True):
|
||||||
"""Create a new TaskManager.
|
"""Create a new TaskManager.
|
||||||
|
|
||||||
Acquire a lock on a node. The lock can be either shared or
|
Acquire a lock on a node. The lock can be either shared or
|
||||||
@ -187,6 +183,7 @@ class TaskManager(object):
|
|||||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
:param shared: Boolean indicating whether to take a shared or exclusive
|
||||||
lock. Default: False.
|
lock. Default: False.
|
||||||
:param purpose: human-readable purpose to put to debug logs.
|
:param purpose: human-readable purpose to put to debug logs.
|
||||||
|
:param retry: whether to retry locking if it fails. Default: True.
|
||||||
:raises: DriverNotFound
|
:raises: DriverNotFound
|
||||||
:raises: InterfaceNotFoundInEntrypoint
|
:raises: InterfaceNotFoundInEntrypoint
|
||||||
:raises: NodeNotFound
|
:raises: NodeNotFound
|
||||||
@ -201,6 +198,7 @@ class TaskManager(object):
|
|||||||
self._node = None
|
self._node = None
|
||||||
self.node_id = node_id
|
self.node_id = node_id
|
||||||
self.shared = shared
|
self.shared = shared
|
||||||
|
self._retry = retry
|
||||||
|
|
||||||
self.fsm = states.machine.copy()
|
self.fsm = states.machine.copy()
|
||||||
self._purpose = purpose
|
self._purpose = purpose
|
||||||
@ -251,12 +249,17 @@ class TaskManager(object):
|
|||||||
def _lock(self):
|
def _lock(self):
|
||||||
self._debug_timer.restart()
|
self._debug_timer.restart()
|
||||||
|
|
||||||
|
if self._retry:
|
||||||
|
attempts = CONF.conductor.node_locked_retry_attempts
|
||||||
|
else:
|
||||||
|
attempts = 1
|
||||||
|
|
||||||
# NodeLocked exceptions can be annoying. Let's try to alleviate
|
# NodeLocked exceptions can be annoying. Let's try to alleviate
|
||||||
# some of that pain by retrying our lock attempts. The retrying
|
# some of that pain by retrying our lock attempts. The retrying
|
||||||
# module expects a wait_fixed value in milliseconds.
|
# module expects a wait_fixed value in milliseconds.
|
||||||
@retrying.retry(
|
@retrying.retry(
|
||||||
retry_on_exception=lambda e: isinstance(e, exception.NodeLocked),
|
retry_on_exception=lambda e: isinstance(e, exception.NodeLocked),
|
||||||
stop_max_attempt_number=CONF.conductor.node_locked_retry_attempts,
|
stop_max_attempt_number=attempts,
|
||||||
wait_fixed=CONF.conductor.node_locked_retry_interval * 1000)
|
wait_fixed=CONF.conductor.node_locked_retry_interval * 1000)
|
||||||
def reserve_node():
|
def reserve_node():
|
||||||
self.node = objects.Node.reserve(self.context, CONF.host,
|
self.node = objects.Node.reserve(self.context, CONF.host,
|
||||||
|
@ -159,6 +159,28 @@ class TaskManagerTestCase(db_base.DbTestCase):
|
|||||||
reserve_mock.assert_has_calls(expected_calls)
|
reserve_mock.assert_has_calls(expected_calls)
|
||||||
self.assertEqual(2, reserve_mock.call_count)
|
self.assertEqual(2, reserve_mock.call_count)
|
||||||
|
|
||||||
|
def test_excl_lock_exception_no_retries(
|
||||||
|
self, get_voltgt_mock, get_volconn_mock, get_portgroups_mock,
|
||||||
|
get_ports_mock, build_driver_mock,
|
||||||
|
reserve_mock, release_mock, node_get_mock):
|
||||||
|
retry_attempts = 3
|
||||||
|
self.config(node_locked_retry_attempts=retry_attempts,
|
||||||
|
group='conductor')
|
||||||
|
|
||||||
|
# Fail on the first lock attempt, succeed on the second.
|
||||||
|
reserve_mock.side_effect = [exception.NodeLocked(node='foo',
|
||||||
|
host='foo'),
|
||||||
|
self.node]
|
||||||
|
|
||||||
|
self.assertRaises(exception.NodeLocked,
|
||||||
|
task_manager.TaskManager,
|
||||||
|
self.context,
|
||||||
|
'fake-node-id',
|
||||||
|
retry=False)
|
||||||
|
|
||||||
|
reserve_mock.assert_called_once_with(self.context, self.host,
|
||||||
|
'fake-node-id')
|
||||||
|
|
||||||
def test_excl_lock_reserve_exception(
|
def test_excl_lock_reserve_exception(
|
||||||
self, get_voltgt_mock, get_volconn_mock, get_portgroups_mock,
|
self, get_voltgt_mock, get_volconn_mock, get_portgroups_mock,
|
||||||
get_ports_mock, build_driver_mock,
|
get_ports_mock, build_driver_mock,
|
||||||
|
Loading…
Reference in New Issue
Block a user