Merge "db: Migrate "worker" APIs to enginefacade"

This commit is contained in:
Zuul 2022-06-07 12:07:58 +00:00 committed by Gerrit Code Review
commit 895f3ce283

View File

@ -8536,15 +8536,20 @@ def image_volume_cache_include_in_cluster(
################### ###################
def _worker_query(context, session=None, until=None, db_filters=None, def _worker_query(
ignore_sentinel=True, **filters): context,
until=None,
db_filters=None,
ignore_sentinel=True,
**filters,
):
# Remove all filters based on the workers table that are set to None # Remove all filters based on the workers table that are set to None
filters = _clean_filters(filters) filters = _clean_filters(filters)
if filters and not is_valid_model_filters(models.Worker, filters): if filters and not is_valid_model_filters(models.Worker, filters):
return None return None
query = model_query(context, models.Worker, session=session) query = model_query(context, models.Worker)
# TODO: Once we stop creating the SENTINEL entry in the database (which # TODO: Once we stop creating the SENTINEL entry in the database (which
# was only needed to support MySQL 5.5), we can drop this. Probably in the # was only needed to support MySQL 5.5), we can drop this. Probably in the
@ -8575,21 +8580,25 @@ def _worker_set_updated_at_field(values):
values['updated_at'] = updated_at values['updated_at'] = updated_at
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@main_context_manager.writer
def worker_create(context, **values): def worker_create(context, **values):
"""Create a worker entry from optional arguments.""" """Create a worker entry from optional arguments."""
_worker_set_updated_at_field(values) _worker_set_updated_at_field(values)
worker = models.Worker(**values) worker = models.Worker(**values)
session = get_session()
try: try:
with session.begin(): worker.save(context.session)
worker.save(session)
except db_exc.DBDuplicateEntry: except db_exc.DBDuplicateEntry:
raise exception.WorkerExists(type=values.get('resource_type'), raise exception.WorkerExists(
id=values.get('resource_id')) type=values.get('resource_type'),
id=values.get('resource_id'),
)
return worker return worker
@require_context
@main_context_manager.reader
def worker_get(context, **filters): def worker_get(context, **filters):
"""Get a worker or raise exception if it does not exist.""" """Get a worker or raise exception if it does not exist."""
query = _worker_query(context, **filters) query = _worker_query(context, **filters)
@ -8599,6 +8608,8 @@ def worker_get(context, **filters):
return worker return worker
@require_context
@main_context_manager.reader
def worker_get_all(context, **filters): def worker_get_all(context, **filters):
"""Get all workers that match given criteria.""" """Get all workers that match given criteria."""
query = _worker_query(context, **filters) query = _worker_query(context, **filters)
@ -8612,7 +8623,9 @@ def _orm_worker_update(worker, values):
setattr(worker, key, value) setattr(worker, key, value)
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@main_context_manager.writer
def worker_update(context, id, filters=None, orm_worker=None, **values): def worker_update(context, id, filters=None, orm_worker=None, **values):
"""Update a worker with given values.""" """Update a worker with given values."""
filters = filters or {} filters = filters or {}
@ -8631,24 +8644,30 @@ def worker_update(context, id, filters=None, orm_worker=None, **values):
return result return result
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@main_context_manager.writer
def worker_claim_for_cleanup(context, claimer_id, orm_worker): def worker_claim_for_cleanup(context, claimer_id, orm_worker):
"""Claim a worker entry for cleanup.""" """Claim a worker entry for cleanup."""
# We set updated_at value so we are sure we update the DB entry even if the # We set updated_at value so we are sure we update the DB entry even if the
# service_id is the same in the DB, thus flagging the claim. # service_id is the same in the DB, thus flagging the claim.
values = {'service_id': claimer_id, values = {
'service_id': claimer_id,
'race_preventer': orm_worker.race_preventer + 1, 'race_preventer': orm_worker.race_preventer + 1,
'updated_at': timeutils.utcnow()} 'updated_at': timeutils.utcnow(),
}
_worker_set_updated_at_field(values) _worker_set_updated_at_field(values)
# We only update the worker entry if it hasn't been claimed by other host # We only update the worker entry if it hasn't been claimed by other host
# or thread # or thread
query = _worker_query(context, query = _worker_query(
context,
status=orm_worker.status, status=orm_worker.status,
service_id=orm_worker.service_id, service_id=orm_worker.service_id,
race_preventer=orm_worker.race_preventer, race_preventer=orm_worker.race_preventer,
until=orm_worker.updated_at, until=orm_worker.updated_at,
id=orm_worker.id) id=orm_worker.id,
)
result = query.update(values, synchronize_session=False) result = query.update(values, synchronize_session=False)
if result: if result:
@ -8656,6 +8675,8 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker):
return result return result
@require_context
@main_context_manager.writer
def worker_destroy(context, **filters): def worker_destroy(context, **filters):
"""Delete a worker (no soft delete).""" """Delete a worker (no soft delete)."""
query = _worker_query(context, **filters) query = _worker_query(context, **filters)