Merge "Fix several issues in the lock/release database code"
This commit is contained in:
commit
8e2aab8291
@ -685,53 +685,37 @@ class Connection(api.Connection):
|
||||
@synchronized(RESERVATION_SEMAPHORE, fair=True)
|
||||
@wrap_sqlite_retry
|
||||
def _reserve_node_place_lock(self, tag, node_id, node):
|
||||
try:
|
||||
# NOTE(TheJulia): We explicitly do *not* synch the session
|
||||
# so the other actions in the conductor do not become aware
|
||||
# that the lock is in place and believe they hold the lock.
|
||||
# This necessitates an overall lock in the code side, so
|
||||
# we avoid conditions where two separate threads can believe
|
||||
# they hold locks at the same time.
|
||||
with _session_for_write() as session:
|
||||
res = session.execute(
|
||||
sa.update(models.Node).
|
||||
where(models.Node.id == node.id).
|
||||
where(models.Node.reservation == None). # noqa
|
||||
values(reservation=tag).
|
||||
execution_options(synchronize_session=False))
|
||||
session.flush()
|
||||
node = self._get_node_by_id_no_joins(node.id)
|
||||
# NOTE(TheJulia): In SQLAlchemy 2.0 style, we don't
|
||||
# magically get a changed node as they moved from the
|
||||
# many ways to do things to singular ways to do things.
|
||||
if res.rowcount != 1:
|
||||
# Nothing updated and node exists. Must already be
|
||||
# locked.
|
||||
raise exception.NodeLocked(node=node.uuid,
|
||||
host=node.reservation)
|
||||
except NoResultFound:
|
||||
# In the event that someone has deleted the node on
|
||||
# another thread.
|
||||
raise exception.NodeNotFound(node=node_id)
|
||||
# NOTE(TheJulia): We explicitly do *not* synch the session
|
||||
# so the other actions in the conductor do not become aware
|
||||
# that the lock is in place and believe they hold the lock.
|
||||
# This necessitates an overall lock in the code side, so
|
||||
# we avoid conditions where two separate threads can believe
|
||||
# they hold locks at the same time.
|
||||
with _session_for_write() as session:
|
||||
res = session.execute(
|
||||
sa.update(models.Node).
|
||||
where(models.Node.id == node.id).
|
||||
where(models.Node.reservation == None). # noqa
|
||||
values(reservation=tag).
|
||||
execution_options(synchronize_session=False))
|
||||
session.flush()
|
||||
node = self._get_node_reservation(node.id)
|
||||
# NOTE(TheJulia): In SQLAlchemy 2.0 style, we don't
|
||||
# magically get a changed node as they moved from the
|
||||
# many ways to do things to singular ways to do things.
|
||||
if res.rowcount != 1:
|
||||
# Nothing updated and node exists. Must already be
|
||||
# locked.
|
||||
raise exception.NodeLocked(node=node.uuid, host=node.reservation)
|
||||
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def reserve_node(self, tag, node_id):
|
||||
with _session_for_read() as session:
|
||||
try:
|
||||
# TODO(TheJulia): Figure out a good way to query
|
||||
# this so that we do it as light as possible without
|
||||
# the full object invocation, which will speed lock
|
||||
# activities. Granted, this is all at the DB level
|
||||
# so maybe that is okay in the grand scheme of things.
|
||||
query = session.query(models.Node)
|
||||
query = add_identity_filter(query, node_id)
|
||||
node = query.one()
|
||||
except NoResultFound:
|
||||
raise exception.NodeNotFound(node=node_id)
|
||||
if node.reservation:
|
||||
# Fail fast, instead of attempt the update.
|
||||
raise exception.NodeLocked(node=node.uuid,
|
||||
host=node.reservation)
|
||||
# Check existence and convert UUID to ID
|
||||
node = self._get_node_reservation(node_id)
|
||||
if node.reservation:
|
||||
# Fail fast, instead of attempt the update.
|
||||
raise exception.NodeLocked(node=node.uuid, host=node.reservation)
|
||||
|
||||
self._reserve_node_place_lock(tag, node_id, node)
|
||||
# Return a node object as that is the contract for this method.
|
||||
return self.get_node_by_id(node.id)
|
||||
@ -739,32 +723,25 @@ class Connection(api.Connection):
|
||||
@wrap_sqlite_retry
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def release_node(self, tag, node_id):
|
||||
with _session_for_read() as session:
|
||||
try:
|
||||
query = session.query(models.Node)
|
||||
query = add_identity_filter(query, node_id)
|
||||
node = query.one()
|
||||
except NoResultFound:
|
||||
raise exception.NodeNotFound(node=node_id)
|
||||
# Check existence and convert UUID to ID
|
||||
node = self._get_node_reservation(node_id)
|
||||
|
||||
with _session_for_write() as session:
|
||||
try:
|
||||
res = session.execute(
|
||||
sa.update(models.Node).
|
||||
where(models.Node.id == node.id).
|
||||
where(models.Node.reservation == tag).
|
||||
values(reservation=None).
|
||||
execution_options(synchronize_session=False)
|
||||
)
|
||||
node = self.get_node_by_id(node.id)
|
||||
if res.rowcount != 1:
|
||||
if node.reservation is None:
|
||||
raise exception.NodeNotLocked(node=node.uuid)
|
||||
else:
|
||||
raise exception.NodeLocked(node=node.uuid,
|
||||
host=node['reservation'])
|
||||
session.flush()
|
||||
except NoResultFound:
|
||||
raise exception.NodeNotFound(node=node_id)
|
||||
res = session.execute(
|
||||
sa.update(models.Node).
|
||||
where(models.Node.id == node.id).
|
||||
where(models.Node.reservation == tag).
|
||||
values(reservation=None).
|
||||
execution_options(synchronize_session=False)
|
||||
)
|
||||
session.flush()
|
||||
|
||||
if res.rowcount != 1:
|
||||
if node.reservation is None:
|
||||
raise exception.NodeNotLocked(node=node.uuid)
|
||||
else:
|
||||
raise exception.NodeLocked(node=node.uuid,
|
||||
host=node.reservation)
|
||||
|
||||
@wrap_sqlite_retry
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
@ -810,20 +787,24 @@ class Connection(api.Connection):
|
||||
raise exception.NodeAlreadyExists(uuid=values['uuid'])
|
||||
return node
|
||||
|
||||
def _get_node_by_id_no_joins(self, node_id):
|
||||
# TODO(TheJulia): Maybe replace with this with a minimal
|
||||
# "get these three fields" thing.
|
||||
try:
|
||||
with _session_for_read() as session:
|
||||
# Explicitly load NodeBase as the invocation of the
|
||||
# priamary model object reesults in the join query
|
||||
# triggering.
|
||||
res = session.execute(
|
||||
sa.select(models.NodeBase).filter_by(id=node_id).limit(1)
|
||||
).scalars().first()
|
||||
except NoResultFound:
|
||||
def _get_node_reservation(self, node_id):
|
||||
with _session_for_read() as session:
|
||||
# Explicitly load NodeBase as the invocation of the
|
||||
# primary model object results in the join query
|
||||
# triggering.
|
||||
res = session.execute(
|
||||
add_identity_filter(
|
||||
sa.select(models.NodeBase.id,
|
||||
models.NodeBase.uuid,
|
||||
models.NodeBase.reservation),
|
||||
node_id
|
||||
).limit(1)
|
||||
).first()
|
||||
|
||||
if res is None:
|
||||
raise exception.NodeNotFound(node=node_id)
|
||||
return res
|
||||
else:
|
||||
return res
|
||||
|
||||
def get_node_by_id(self, node_id):
|
||||
try:
|
||||
|
Loading…
Reference in New Issue
Block a user