Lock nodes to avoid simultaneous introspection requests
Now a NodeInfo object has a lock instance attached, which is associated with a node UUID. New calls acquire_lock and release_lock are added. NodeInfo.find_node now always acquire a lock, get_node does it optionally. NodeInfo.finished now always release a lock. Note that this change does not make NodeInfo thread-safe. Rather it's preventing simultaneous operations on the same node. Change-Id: I0c0bd2f4e0622fd7086660b0874c5b8dca82b4d2 Implements: blueprint node-state
This commit is contained in:
parent
f5671a4376
commit
2b8cc3a93f
@ -117,6 +117,26 @@ def introspect(uuid, new_ipmi_credentials=None, token=None):
|
|||||||
def _background_introspect(ironic, node_info):
|
def _background_introspect(ironic, node_info):
|
||||||
global _LAST_INTROSPECTION_TIME
|
global _LAST_INTROSPECTION_TIME
|
||||||
|
|
||||||
|
if not node_info.options.get('new_ipmi_credentials'):
|
||||||
|
if re.match(CONF.introspection_delay_drivers, node_info.node().driver):
|
||||||
|
LOG.debug('Attempting to acquire lock on last introspection time')
|
||||||
|
with _LAST_INTROSPECTION_LOCK:
|
||||||
|
delay = (_LAST_INTROSPECTION_TIME - time.time()
|
||||||
|
+ CONF.introspection_delay)
|
||||||
|
if delay > 0:
|
||||||
|
LOG.debug('Waiting %d seconds before sending the next '
|
||||||
|
'node on introspection', delay)
|
||||||
|
time.sleep(delay)
|
||||||
|
_LAST_INTROSPECTION_TIME = time.time()
|
||||||
|
|
||||||
|
node_info.acquire_lock()
|
||||||
|
try:
|
||||||
|
_background_introspect_locked(ironic, node_info)
|
||||||
|
finally:
|
||||||
|
node_info.release_lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _background_introspect_locked(ironic, node_info):
|
||||||
# TODO(dtantsur): pagination
|
# TODO(dtantsur): pagination
|
||||||
macs = list(node_info.ports())
|
macs = list(node_info.ports())
|
||||||
if macs:
|
if macs:
|
||||||
@ -146,17 +166,6 @@ def _background_introspect(ironic, node_info):
|
|||||||
' node %(node)s: %(exc)s') %
|
' node %(node)s: %(exc)s') %
|
||||||
{'node': node_info.uuid, 'exc': exc})
|
{'node': node_info.uuid, 'exc': exc})
|
||||||
|
|
||||||
if re.match(CONF.introspection_delay_drivers, node_info.node().driver):
|
|
||||||
LOG.debug('Attempting to acquire lock on last introspection time')
|
|
||||||
with _LAST_INTROSPECTION_LOCK:
|
|
||||||
delay = (_LAST_INTROSPECTION_TIME - time.time()
|
|
||||||
+ CONF.introspection_delay)
|
|
||||||
if delay > 0:
|
|
||||||
LOG.debug('Waiting %d seconds before sending the next '
|
|
||||||
'node on introspection', delay)
|
|
||||||
time.sleep(delay)
|
|
||||||
_LAST_INTROSPECTION_TIME = time.time()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ironic.node.set_power_state(node_info.uuid, 'reboot')
|
ironic.node.set_power_state(node_info.uuid, 'reboot')
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
@ -18,9 +18,11 @@ import json
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from ironicclient import exceptions
|
from ironicclient import exceptions
|
||||||
|
from oslo_concurrency import lockutils
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
from oslo_utils import excutils
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
from ironic_inspector import db
|
from ironic_inspector import db
|
||||||
@ -34,13 +36,31 @@ LOG = log.getLogger("ironic_inspector.node_cache")
|
|||||||
|
|
||||||
|
|
||||||
MACS_ATTRIBUTE = 'mac'
|
MACS_ATTRIBUTE = 'mac'
|
||||||
|
_LOCK_TEMPLATE = 'node-%s'
|
||||||
|
_SEMAPHORES = lockutils.Semaphores()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_lock(uuid):
|
||||||
|
"""Get lock object for a given node UUID."""
|
||||||
|
return lockutils.internal_lock(_LOCK_TEMPLATE % uuid,
|
||||||
|
semaphores=_SEMAPHORES)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_lock_ctx(uuid):
|
||||||
|
"""Get context manager yielding a lock object for a given node UUID."""
|
||||||
|
return lockutils.lock(_LOCK_TEMPLATE % uuid, semaphores=_SEMAPHORES)
|
||||||
|
|
||||||
|
|
||||||
class NodeInfo(object):
|
class NodeInfo(object):
|
||||||
"""Record about a node in the cache."""
|
"""Record about a node in the cache.
|
||||||
|
|
||||||
def __init__(self, uuid, started_at, finished_at=None, error=None,
|
This class optionally allows to acquire a lock on a node. Note that the
|
||||||
node=None, ports=None, ironic=None):
|
class instance itself is NOT thread-safe, you need to create a new instance
|
||||||
|
for every thread.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, uuid, started_at=None, finished_at=None, error=None,
|
||||||
|
node=None, ports=None, ironic=None, lock=None):
|
||||||
self.uuid = uuid
|
self.uuid = uuid
|
||||||
self.started_at = started_at
|
self.started_at = started_at
|
||||||
self.finished_at = finished_at
|
self.finished_at = finished_at
|
||||||
@ -52,6 +72,48 @@ class NodeInfo(object):
|
|||||||
self._ports = ports
|
self._ports = ports
|
||||||
self._attributes = None
|
self._attributes = None
|
||||||
self._ironic = ironic
|
self._ironic = ironic
|
||||||
|
# This is a lock on a node UUID, not on a NodeInfo object
|
||||||
|
self._lock = lock if lock is not None else _get_lock(uuid)
|
||||||
|
# Whether lock was acquired using this NodeInfo object
|
||||||
|
self._locked = lock is not None
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
if self._locked:
|
||||||
|
LOG.warning(_LW('BUG: node lock was not released by the moment '
|
||||||
|
'node info object is deleted'))
|
||||||
|
self._lock.release()
|
||||||
|
|
||||||
|
def acquire_lock(self, blocking=True):
|
||||||
|
"""Acquire a lock on the associated node.
|
||||||
|
|
||||||
|
Exits with success if a lock is already acquired using this NodeInfo
|
||||||
|
object.
|
||||||
|
|
||||||
|
:param blocking: if True, wait for lock to be acquired, otherwise
|
||||||
|
return immediately.
|
||||||
|
:returns: boolean value, whether lock was acquired successfully
|
||||||
|
"""
|
||||||
|
if self._locked:
|
||||||
|
return True
|
||||||
|
|
||||||
|
LOG.debug('Attempting to acquire lock on node %s', self.uuid)
|
||||||
|
if self._lock.acquire(blocking):
|
||||||
|
self._locked = True
|
||||||
|
LOG.debug('Successfully acquired lock on node %s', self.uuid)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
LOG.debug('Unable to acquire lock on node %s', self.uuid)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def release_lock(self):
|
||||||
|
"""Release a lock on a node.
|
||||||
|
|
||||||
|
Does nothing if lock was not acquired using this NodeInfo object.
|
||||||
|
"""
|
||||||
|
if self._locked:
|
||||||
|
LOG.debug('Successfully released lock on node %s', self.uuid)
|
||||||
|
self._lock.release()
|
||||||
|
self._locked = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def options(self):
|
def options(self):
|
||||||
@ -98,6 +160,8 @@ class NodeInfo(object):
|
|||||||
|
|
||||||
:param error: error message
|
:param error: error message
|
||||||
"""
|
"""
|
||||||
|
self.release_lock()
|
||||||
|
|
||||||
self.finished_at = time.time()
|
self.finished_at = time.time()
|
||||||
self.error = error
|
self.error = error
|
||||||
|
|
||||||
@ -136,11 +200,11 @@ class NodeInfo(object):
|
|||||||
self._attributes = None
|
self._attributes = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_row(cls, row, ironic=None):
|
def from_row(cls, row, ironic=None, lock=None):
|
||||||
"""Construct NodeInfo from a database row."""
|
"""Construct NodeInfo from a database row."""
|
||||||
fields = {key: row[key]
|
fields = {key: row[key]
|
||||||
for key in ('uuid', 'started_at', 'finished_at', 'error')}
|
for key in ('uuid', 'started_at', 'finished_at', 'error')}
|
||||||
return cls(ironic=ironic, **fields)
|
return cls(ironic=ironic, lock=lock, **fields)
|
||||||
|
|
||||||
def invalidate_cache(self):
|
def invalidate_cache(self):
|
||||||
"""Clear all cached info, so that it's reloaded next time."""
|
"""Clear all cached info, so that it's reloaded next time."""
|
||||||
@ -333,7 +397,8 @@ def delete_nodes_not_in_list(uuids):
|
|||||||
LOG.warning(
|
LOG.warning(
|
||||||
_LW('Node %s was deleted from Ironic, dropping from Ironic '
|
_LW('Node %s was deleted from Ironic, dropping from Ironic '
|
||||||
'Inspector database'), uuid)
|
'Inspector database'), uuid)
|
||||||
_delete_node(uuid)
|
with _get_lock_ctx(uuid):
|
||||||
|
_delete_node(uuid)
|
||||||
|
|
||||||
|
|
||||||
def _delete_node(uuid, session=None):
|
def _delete_node(uuid, session=None):
|
||||||
@ -362,23 +427,37 @@ def _list_node_uuids():
|
|||||||
return {x.uuid for x in db.model_query(db.Node.uuid)}
|
return {x.uuid for x in db.model_query(db.Node.uuid)}
|
||||||
|
|
||||||
|
|
||||||
def get_node(uuid, ironic=None):
|
def get_node(uuid, ironic=None, locked=False):
|
||||||
"""Get node from cache by it's UUID.
|
"""Get node from cache by it's UUID.
|
||||||
|
|
||||||
:param uuid: node UUID.
|
:param uuid: node UUID.
|
||||||
:param ironic: optional ironic client instance
|
:param ironic: optional ironic client instance
|
||||||
|
:param locked: if True, get a lock on node before fetching its data
|
||||||
:returns: structure NodeInfo.
|
:returns: structure NodeInfo.
|
||||||
"""
|
"""
|
||||||
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
|
if locked:
|
||||||
if row is None:
|
lock = _get_lock(uuid)
|
||||||
raise utils.Error(_('Could not find node %s in cache') % uuid,
|
lock.acquire()
|
||||||
code=404)
|
else:
|
||||||
return NodeInfo.from_row(row, ironic=ironic)
|
lock = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
row = db.model_query(db.Node).filter_by(uuid=uuid).first()
|
||||||
|
if row is None:
|
||||||
|
raise utils.Error(_('Could not find node %s in cache') % uuid,
|
||||||
|
code=404)
|
||||||
|
return NodeInfo.from_row(row, ironic=ironic, lock=lock)
|
||||||
|
except Exception:
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
if lock is not None:
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
|
||||||
def find_node(**attributes):
|
def find_node(**attributes):
|
||||||
"""Find node in cache.
|
"""Find node in cache.
|
||||||
|
|
||||||
|
This function acquires a lock on a node.
|
||||||
|
|
||||||
:param attributes: attributes known about this node (like macs, BMC etc)
|
:param attributes: attributes known about this node (like macs, BMC etc)
|
||||||
also ironic client instance may be passed under 'ironic'
|
also ironic client instance may be passed under 'ironic'
|
||||||
:returns: structure NodeInfo with attributes ``uuid`` and ``created_at``
|
:returns: structure NodeInfo with attributes ``uuid`` and ``created_at``
|
||||||
@ -417,20 +496,28 @@ def find_node(**attributes):
|
|||||||
% {'attr': attributes, 'found': list(found)}, code=404)
|
% {'attr': attributes, 'found': list(found)}, code=404)
|
||||||
|
|
||||||
uuid = found.pop()
|
uuid = found.pop()
|
||||||
row = (db.model_query(db.Node.started_at, db.Node.finished_at).
|
node_info = NodeInfo(uuid=uuid, ironic=ironic)
|
||||||
filter_by(uuid=uuid).first())
|
node_info.acquire_lock()
|
||||||
|
|
||||||
if not row:
|
try:
|
||||||
raise utils.Error(_(
|
row = (db.model_query(db.Node.started_at, db.Node.finished_at).
|
||||||
'Could not find node %s in introspection cache, '
|
filter_by(uuid=uuid).first())
|
||||||
'probably it\'s not on introspection now') % uuid, code=404)
|
|
||||||
|
|
||||||
if row.finished_at:
|
if not row:
|
||||||
raise utils.Error(_(
|
raise utils.Error(_(
|
||||||
'Introspection for node %(node)s already finished on '
|
'Could not find node %s in introspection cache, '
|
||||||
'%(finish)s') % {'node': uuid, 'finish': row.finished_at})
|
'probably it\'s not on introspection now') % uuid, code=404)
|
||||||
|
|
||||||
return NodeInfo(uuid=uuid, started_at=row.started_at, ironic=ironic)
|
if row.finished_at:
|
||||||
|
raise utils.Error(_(
|
||||||
|
'Introspection for node %(node)s already finished on '
|
||||||
|
'%(finish)s') % {'node': uuid, 'finish': row.finished_at})
|
||||||
|
|
||||||
|
node_info.started_at = row.started_at
|
||||||
|
return node_info
|
||||||
|
except Exception:
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
node_info.release_lock()
|
||||||
|
|
||||||
|
|
||||||
def clean_up():
|
def clean_up():
|
||||||
@ -461,15 +548,20 @@ def clean_up():
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
LOG.error(_LE('Introspection for nodes %s has timed out'), uuids)
|
LOG.error(_LE('Introspection for nodes %s has timed out'), uuids)
|
||||||
query = db.model_query(db.Node, session=session).filter(
|
|
||||||
db.Node.started_at < threshold,
|
|
||||||
db.Node.finished_at.is_(None))
|
|
||||||
query.update({'finished_at': time.time(),
|
|
||||||
'error': 'Introspection timeout'})
|
|
||||||
for u in uuids:
|
for u in uuids:
|
||||||
db.model_query(db.Attribute, session=session).filter_by(
|
node_info = get_node(u, locked=True)
|
||||||
uuid=u).delete()
|
try:
|
||||||
db.model_query(db.Option, session=session).filter_by(
|
if node_info.finished_at or node_info.started_at > threshold:
|
||||||
uuid=u).delete()
|
continue
|
||||||
|
|
||||||
|
db.model_query(db.Node, session=session).filter_by(
|
||||||
|
uuid=u).update({'finished_at': time.time(),
|
||||||
|
'error': 'Introspection timeout'})
|
||||||
|
db.model_query(db.Attribute, session=session).filter_by(
|
||||||
|
uuid=u).delete()
|
||||||
|
db.model_query(db.Option, session=session).filter_by(
|
||||||
|
uuid=u).delete()
|
||||||
|
finally:
|
||||||
|
node_info.release_lock()
|
||||||
|
|
||||||
return uuids
|
return uuids
|
||||||
|
@ -85,6 +85,10 @@ def process(introspection_data):
|
|||||||
'in hook %s') % hook_ext.name)
|
'in hook %s') % hook_ext.name)
|
||||||
|
|
||||||
node_info = _find_node_info(introspection_data, failures)
|
node_info = _find_node_info(introspection_data, failures)
|
||||||
|
if node_info:
|
||||||
|
# Locking is already done in find_node() but may be not done in a
|
||||||
|
# node_not_found hook
|
||||||
|
node_info.acquire_lock()
|
||||||
|
|
||||||
if failures and node_info:
|
if failures and node_info:
|
||||||
msg = _('The following failures happened during running '
|
msg = _('The following failures happened during running '
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from oslo_concurrency import lockutils
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
@ -57,6 +58,7 @@ class BaseTest(unittest.TestCase):
|
|||||||
engine.connect()
|
engine.connect()
|
||||||
self.addCleanup(db.get_engine().dispose)
|
self.addCleanup(db.get_engine().dispose)
|
||||||
plugins_base._HOOKS_MGR = None
|
plugins_base._HOOKS_MGR = None
|
||||||
|
node_cache._SEMAPHORES = lockutils.Semaphores()
|
||||||
for name in ('_', '_LI', '_LW', '_LE', '_LC'):
|
for name in ('_', '_LI', '_LW', '_LE', '_LC'):
|
||||||
patch = mock.patch.object(i18n, name, lambda s: s)
|
patch = mock.patch.object(i18n, name, lambda s: s)
|
||||||
patch.start()
|
patch.start()
|
||||||
|
@ -82,8 +82,10 @@ class TestIntrospect(BaseTest):
|
|||||||
persistent=False)
|
persistent=False)
|
||||||
cli.node.set_power_state.assert_called_once_with(self.uuid,
|
cli.node.set_power_state.assert_called_once_with(self.uuid,
|
||||||
'reboot')
|
'reboot')
|
||||||
add_mock.return_value.set_option.assert_called_once_with(
|
self.node_info.set_option.assert_called_once_with(
|
||||||
'new_ipmi_credentials', None)
|
'new_ipmi_credentials', None)
|
||||||
|
self.node_info.acquire_lock.assert_called_once_with()
|
||||||
|
self.node_info.release_lock.assert_called_once_with()
|
||||||
|
|
||||||
def test_ok_ilo_and_drac(self, client_mock, add_mock, filters_mock):
|
def test_ok_ilo_and_drac(self, client_mock, add_mock, filters_mock):
|
||||||
cli = self._prepare(client_mock)
|
cli = self._prepare(client_mock)
|
||||||
@ -117,6 +119,8 @@ class TestIntrospect(BaseTest):
|
|||||||
'reboot')
|
'reboot')
|
||||||
add_mock.return_value.finished.assert_called_once_with(
|
add_mock.return_value.finished.assert_called_once_with(
|
||||||
error=mock.ANY)
|
error=mock.ANY)
|
||||||
|
self.node_info.acquire_lock.assert_called_once_with()
|
||||||
|
self.node_info.release_lock.assert_called_once_with()
|
||||||
|
|
||||||
def test_unexpected_error(self, client_mock, add_mock, filters_mock):
|
def test_unexpected_error(self, client_mock, add_mock, filters_mock):
|
||||||
cli = self._prepare(client_mock)
|
cli = self._prepare(client_mock)
|
||||||
@ -133,6 +137,8 @@ class TestIntrospect(BaseTest):
|
|||||||
self.assertFalse(cli.node.set_boot_device.called)
|
self.assertFalse(cli.node.set_boot_device.called)
|
||||||
add_mock.return_value.finished.assert_called_once_with(
|
add_mock.return_value.finished.assert_called_once_with(
|
||||||
error=mock.ANY)
|
error=mock.ANY)
|
||||||
|
self.node_info.acquire_lock.assert_called_once_with()
|
||||||
|
self.node_info.release_lock.assert_called_once_with()
|
||||||
|
|
||||||
def test_with_maintenance(self, client_mock, add_mock, filters_mock):
|
def test_with_maintenance(self, client_mock, add_mock, filters_mock):
|
||||||
cli = client_mock.return_value
|
cli = client_mock.return_value
|
||||||
@ -194,6 +200,8 @@ class TestIntrospect(BaseTest):
|
|||||||
self.node_info.finished.assert_called_once_with(error=mock.ANY)
|
self.node_info.finished.assert_called_once_with(error=mock.ANY)
|
||||||
self.assertEqual(0, filters_mock.call_count)
|
self.assertEqual(0, filters_mock.call_count)
|
||||||
self.assertEqual(0, cli.node.set_power_state.call_count)
|
self.assertEqual(0, cli.node.set_power_state.call_count)
|
||||||
|
self.node_info.acquire_lock.assert_called_once_with()
|
||||||
|
self.node_info.release_lock.assert_called_once_with()
|
||||||
|
|
||||||
def test_no_lookup_attrs_with_node_not_found_hook(self, client_mock,
|
def test_no_lookup_attrs_with_node_not_found_hook(self, client_mock,
|
||||||
add_mock, filters_mock):
|
add_mock, filters_mock):
|
||||||
@ -229,6 +237,7 @@ class TestIntrospect(BaseTest):
|
|||||||
self.assertEqual(0, filters_mock.call_count)
|
self.assertEqual(0, filters_mock.call_count)
|
||||||
self.assertEqual(0, cli.node.set_power_state.call_count)
|
self.assertEqual(0, cli.node.set_power_state.call_count)
|
||||||
self.assertFalse(add_mock.called)
|
self.assertFalse(add_mock.called)
|
||||||
|
self.assertFalse(self.node_info.acquire_lock.called)
|
||||||
|
|
||||||
def test_failed_to_validate_node(self, client_mock, add_mock,
|
def test_failed_to_validate_node(self, client_mock, add_mock,
|
||||||
filters_mock):
|
filters_mock):
|
||||||
@ -247,6 +256,7 @@ class TestIntrospect(BaseTest):
|
|||||||
self.assertEqual(0, filters_mock.call_count)
|
self.assertEqual(0, filters_mock.call_count)
|
||||||
self.assertEqual(0, cli.node.set_power_state.call_count)
|
self.assertEqual(0, cli.node.set_power_state.call_count)
|
||||||
self.assertFalse(add_mock.called)
|
self.assertFalse(add_mock.called)
|
||||||
|
self.assertFalse(self.node_info.acquire_lock.called)
|
||||||
|
|
||||||
def test_wrong_provision_state(self, client_mock, add_mock, filters_mock):
|
def test_wrong_provision_state(self, client_mock, add_mock, filters_mock):
|
||||||
self.node.provision_state = 'active'
|
self.node.provision_state = 'active'
|
||||||
@ -261,6 +271,7 @@ class TestIntrospect(BaseTest):
|
|||||||
self.assertEqual(0, filters_mock.call_count)
|
self.assertEqual(0, filters_mock.call_count)
|
||||||
self.assertEqual(0, cli.node.set_power_state.call_count)
|
self.assertEqual(0, cli.node.set_power_state.call_count)
|
||||||
self.assertFalse(add_mock.called)
|
self.assertFalse(add_mock.called)
|
||||||
|
self.assertFalse(self.node_info.acquire_lock.called)
|
||||||
|
|
||||||
@mock.patch.object(time, 'sleep')
|
@mock.patch.object(time, 'sleep')
|
||||||
@mock.patch.object(time, 'time')
|
@mock.patch.object(time, 'time')
|
||||||
|
@ -41,6 +41,7 @@ class TestNodeCache(test_base.NodeTest):
|
|||||||
bmc_address='1.2.3.4', foo=None)
|
bmc_address='1.2.3.4', foo=None)
|
||||||
self.assertEqual(self.uuid, res.uuid)
|
self.assertEqual(self.uuid, res.uuid)
|
||||||
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 60)
|
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 60)
|
||||||
|
self.assertFalse(res._locked)
|
||||||
|
|
||||||
res = (db.model_query(db.Node.uuid,
|
res = (db.model_query(db.Node.uuid,
|
||||||
db.Node.started_at).order_by(db.Node.uuid).all())
|
db.Node.started_at).order_by(db.Node.uuid).all())
|
||||||
@ -80,10 +81,12 @@ class TestNodeCache(test_base.NodeTest):
|
|||||||
uuid=self.uuid).first()
|
uuid=self.uuid).first()
|
||||||
self.assertIsNone(row_option)
|
self.assertIsNone(row_option)
|
||||||
|
|
||||||
|
@mock.patch.object(node_cache, '_get_lock_ctx', autospec=True)
|
||||||
@mock.patch.object(node_cache, '_list_node_uuids')
|
@mock.patch.object(node_cache, '_list_node_uuids')
|
||||||
@mock.patch.object(node_cache, '_delete_node')
|
@mock.patch.object(node_cache, '_delete_node')
|
||||||
def test_delete_nodes_not_in_list(self, mock__delete_node,
|
def test_delete_nodes_not_in_list(self, mock__delete_node,
|
||||||
mock__list_node_uuids):
|
mock__list_node_uuids,
|
||||||
|
mock__get_lock_ctx):
|
||||||
uuid2 = 'uuid2'
|
uuid2 = 'uuid2'
|
||||||
uuids = {self.uuid}
|
uuids = {self.uuid}
|
||||||
mock__list_node_uuids.return_value = {self.uuid, uuid2}
|
mock__list_node_uuids.return_value = {self.uuid, uuid2}
|
||||||
@ -91,6 +94,8 @@ class TestNodeCache(test_base.NodeTest):
|
|||||||
with session.begin():
|
with session.begin():
|
||||||
node_cache.delete_nodes_not_in_list(uuids)
|
node_cache.delete_nodes_not_in_list(uuids)
|
||||||
mock__delete_node.assert_called_once_with(uuid2)
|
mock__delete_node.assert_called_once_with(uuid2)
|
||||||
|
mock__get_lock_ctx.assert_called_once_with(uuid2)
|
||||||
|
mock__get_lock_ctx.return_value.__enter__.assert_called_once_with()
|
||||||
|
|
||||||
def test_add_node_duplicate_mac(self):
|
def test_add_node_duplicate_mac(self):
|
||||||
session = db.get_session()
|
session = db.get_session()
|
||||||
@ -178,11 +183,13 @@ class TestNodeCacheFind(test_base.NodeTest):
|
|||||||
res = node_cache.find_node(bmc_address='1.2.3.4')
|
res = node_cache.find_node(bmc_address='1.2.3.4')
|
||||||
self.assertEqual(self.uuid, res.uuid)
|
self.assertEqual(self.uuid, res.uuid)
|
||||||
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
|
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
|
||||||
|
self.assertTrue(res._locked)
|
||||||
|
|
||||||
def test_macs(self):
|
def test_macs(self):
|
||||||
res = node_cache.find_node(mac=['11:22:33:33:33:33', self.macs[1]])
|
res = node_cache.find_node(mac=['11:22:33:33:33:33', self.macs[1]])
|
||||||
self.assertEqual(self.uuid, res.uuid)
|
self.assertEqual(self.uuid, res.uuid)
|
||||||
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
|
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
|
||||||
|
self.assertTrue(res._locked)
|
||||||
|
|
||||||
def test_macs_not_found(self):
|
def test_macs_not_found(self):
|
||||||
self.assertRaises(utils.Error, node_cache.find_node,
|
self.assertRaises(utils.Error, node_cache.find_node,
|
||||||
@ -199,6 +206,7 @@ class TestNodeCacheFind(test_base.NodeTest):
|
|||||||
mac=self.macs)
|
mac=self.macs)
|
||||||
self.assertEqual(self.uuid, res.uuid)
|
self.assertEqual(self.uuid, res.uuid)
|
||||||
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
|
self.assertTrue(time.time() - 60 < res.started_at < time.time() + 1)
|
||||||
|
self.assertTrue(res._locked)
|
||||||
|
|
||||||
def test_inconsistency(self):
|
def test_inconsistency(self):
|
||||||
session = db.get_session()
|
session = db.get_session()
|
||||||
@ -244,8 +252,9 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||||||
db.model_query(db.Attribute).count())
|
db.model_query(db.Attribute).count())
|
||||||
self.assertEqual(1, db.model_query(db.Option).count())
|
self.assertEqual(1, db.model_query(db.Option).count())
|
||||||
|
|
||||||
|
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||||
@mock.patch.object(time, 'time')
|
@mock.patch.object(time, 'time')
|
||||||
def test_ok(self, time_mock):
|
def test_ok(self, time_mock, get_lock_mock):
|
||||||
time_mock.return_value = 1000
|
time_mock.return_value = 1000
|
||||||
|
|
||||||
self.assertFalse(node_cache.clean_up())
|
self.assertFalse(node_cache.clean_up())
|
||||||
@ -256,9 +265,11 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||||||
self.assertEqual(len(self.macs),
|
self.assertEqual(len(self.macs),
|
||||||
db.model_query(db.Attribute).count())
|
db.model_query(db.Attribute).count())
|
||||||
self.assertEqual(1, db.model_query(db.Option).count())
|
self.assertEqual(1, db.model_query(db.Option).count())
|
||||||
|
self.assertFalse(get_lock_mock.called)
|
||||||
|
|
||||||
|
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||||
@mock.patch.object(time, 'time')
|
@mock.patch.object(time, 'time')
|
||||||
def test_timeout(self, time_mock):
|
def test_timeout(self, time_mock, get_lock_mock):
|
||||||
# Add a finished node to confirm we don't try to timeout it
|
# Add a finished node to confirm we don't try to timeout it
|
||||||
time_mock.return_value = self.started_at
|
time_mock.return_value = self.started_at
|
||||||
session = db.get_session()
|
session = db.get_session()
|
||||||
@ -277,6 +288,8 @@ class TestNodeCacheCleanUp(test_base.NodeTest):
|
|||||||
res)
|
res)
|
||||||
self.assertEqual([], db.model_query(db.Attribute).all())
|
self.assertEqual([], db.model_query(db.Attribute).all())
|
||||||
self.assertEqual([], db.model_query(db.Option).all())
|
self.assertEqual([], db.model_query(db.Option).all())
|
||||||
|
get_lock_mock.assert_called_once_with(self.uuid)
|
||||||
|
get_lock_mock.return_value.acquire.assert_called_once_with()
|
||||||
|
|
||||||
def test_old_status(self):
|
def test_old_status(self):
|
||||||
CONF.set_override('node_status_keep_time', 42)
|
CONF.set_override('node_status_keep_time', 42)
|
||||||
@ -302,6 +315,20 @@ class TestNodeCacheGetNode(test_base.NodeTest):
|
|||||||
self.assertEqual(started_at, info.started_at)
|
self.assertEqual(started_at, info.started_at)
|
||||||
self.assertIsNone(info.finished_at)
|
self.assertIsNone(info.finished_at)
|
||||||
self.assertIsNone(info.error)
|
self.assertIsNone(info.error)
|
||||||
|
self.assertFalse(info._locked)
|
||||||
|
|
||||||
|
def test_locked(self):
|
||||||
|
started_at = time.time() - 42
|
||||||
|
session = db.get_session()
|
||||||
|
with session.begin():
|
||||||
|
db.Node(uuid=self.uuid, started_at=started_at).save(session)
|
||||||
|
info = node_cache.get_node(self.uuid, locked=True)
|
||||||
|
|
||||||
|
self.assertEqual(self.uuid, info.uuid)
|
||||||
|
self.assertEqual(started_at, info.started_at)
|
||||||
|
self.assertIsNone(info.finished_at)
|
||||||
|
self.assertIsNone(info.error)
|
||||||
|
self.assertTrue(info._locked)
|
||||||
|
|
||||||
def test_not_found(self):
|
def test_not_found(self):
|
||||||
self.assertRaises(utils.Error, node_cache.get_node, 'foo')
|
self.assertRaises(utils.Error, node_cache.get_node, 'foo')
|
||||||
@ -343,6 +370,11 @@ class TestNodeInfoFinished(test_base.NodeTest):
|
|||||||
self.assertEqual([], db.model_query(db.Attribute).all())
|
self.assertEqual([], db.model_query(db.Attribute).all())
|
||||||
self.assertEqual([], db.model_query(db.Option).all())
|
self.assertEqual([], db.model_query(db.Option).all())
|
||||||
|
|
||||||
|
def test_release_lock(self):
|
||||||
|
self.node_info.acquire_lock()
|
||||||
|
self.node_info.finished()
|
||||||
|
self.assertFalse(self.node_info._locked)
|
||||||
|
|
||||||
|
|
||||||
class TestInit(unittest.TestCase):
|
class TestInit(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -574,3 +606,43 @@ class TestNodeCacheGetByPath(test_base.NodeTest):
|
|||||||
self.assertEqual(42, self.node_info.get_by_path('/properties/answer'))
|
self.assertEqual(42, self.node_info.get_by_path('/properties/answer'))
|
||||||
self.assertRaises(KeyError, self.node_info.get_by_path, '/foo')
|
self.assertRaises(KeyError, self.node_info.get_by_path, '/foo')
|
||||||
self.assertRaises(KeyError, self.node_info.get_by_path, '/extra/foo')
|
self.assertRaises(KeyError, self.node_info.get_by_path, '/extra/foo')
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch.object(node_cache, '_get_lock', autospec=True)
|
||||||
|
class TestLock(test_base.NodeTest):
|
||||||
|
def test_acquire(self, get_lock_mock):
|
||||||
|
node_info = node_cache.NodeInfo(self.uuid)
|
||||||
|
self.assertFalse(node_info._locked)
|
||||||
|
get_lock_mock.assert_called_once_with(self.uuid)
|
||||||
|
self.assertFalse(get_lock_mock.return_value.acquire.called)
|
||||||
|
|
||||||
|
self.assertTrue(node_info.acquire_lock())
|
||||||
|
self.assertTrue(node_info._locked)
|
||||||
|
self.assertTrue(node_info.acquire_lock())
|
||||||
|
self.assertTrue(node_info._locked)
|
||||||
|
get_lock_mock.return_value.acquire.assert_called_once_with(True)
|
||||||
|
|
||||||
|
def test_release(self, get_lock_mock):
|
||||||
|
node_info = node_cache.NodeInfo(self.uuid)
|
||||||
|
node_info.acquire_lock()
|
||||||
|
self.assertTrue(node_info._locked)
|
||||||
|
node_info.release_lock()
|
||||||
|
self.assertFalse(node_info._locked)
|
||||||
|
node_info.release_lock()
|
||||||
|
self.assertFalse(node_info._locked)
|
||||||
|
get_lock_mock.return_value.acquire.assert_called_once_with(True)
|
||||||
|
get_lock_mock.return_value.release.assert_called_once_with()
|
||||||
|
|
||||||
|
def test_acquire_non_blocking(self, get_lock_mock):
|
||||||
|
node_info = node_cache.NodeInfo(self.uuid)
|
||||||
|
self.assertFalse(node_info._locked)
|
||||||
|
get_lock_mock.return_value.acquire.side_effect = iter([False, True])
|
||||||
|
|
||||||
|
self.assertFalse(node_info.acquire_lock(blocking=False))
|
||||||
|
self.assertFalse(node_info._locked)
|
||||||
|
self.assertTrue(node_info.acquire_lock(blocking=False))
|
||||||
|
self.assertTrue(node_info._locked)
|
||||||
|
self.assertTrue(node_info.acquire_lock(blocking=False))
|
||||||
|
self.assertTrue(node_info._locked)
|
||||||
|
get_lock_mock.return_value.acquire.assert_called_with(False)
|
||||||
|
self.assertEqual(2, get_lock_mock.return_value.acquire.call_count)
|
||||||
|
3
releasenotes/notes/node-locking-4d135ca5b93524b1.yaml
Normal file
3
releasenotes/notes/node-locking-4d135ca5b93524b1.yaml
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- Acquire a lock on a node UUID when handling it.
|
@ -13,6 +13,7 @@ pbr>=1.6
|
|||||||
python-ironicclient>=0.8.0
|
python-ironicclient>=0.8.0
|
||||||
python-keystoneclient!=1.8.0,>=1.6.0
|
python-keystoneclient!=1.8.0,>=1.6.0
|
||||||
python-swiftclient>=2.2.0
|
python-swiftclient>=2.2.0
|
||||||
|
oslo.concurrency>=2.3.0 # Apache-2.0
|
||||||
oslo.config>=2.7.0 # Apache-2.0
|
oslo.config>=2.7.0 # Apache-2.0
|
||||||
oslo.db>=3.2.0 # Apache-2.0
|
oslo.db>=3.2.0 # Apache-2.0
|
||||||
oslo.i18n>=1.5.0 # Apache-2.0
|
oslo.i18n>=1.5.0 # Apache-2.0
|
||||||
|
Loading…
Reference in New Issue
Block a user