diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 0a263252e1c..cac309dd4e1 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -8536,15 +8536,20 @@ def image_volume_cache_include_in_cluster( ################### -def _worker_query(context, session=None, until=None, db_filters=None, - ignore_sentinel=True, **filters): +def _worker_query( + context, + until=None, + db_filters=None, + ignore_sentinel=True, + **filters, +): # Remove all filters based on the workers table that are set to None filters = _clean_filters(filters) if filters and not is_valid_model_filters(models.Worker, filters): return None - query = model_query(context, models.Worker, session=session) + query = model_query(context, models.Worker) # TODO: Once we stop creating the SENTINEL entry in the database (which # was only needed to support MySQL 5.5), we can drop this. Probably in the @@ -8575,21 +8580,25 @@ def _worker_set_updated_at_field(values): values['updated_at'] = updated_at +@require_context @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) +@main_context_manager.writer def worker_create(context, **values): """Create a worker entry from optional arguments.""" _worker_set_updated_at_field(values) worker = models.Worker(**values) - session = get_session() try: - with session.begin(): - worker.save(session) + worker.save(context.session) except db_exc.DBDuplicateEntry: - raise exception.WorkerExists(type=values.get('resource_type'), - id=values.get('resource_id')) + raise exception.WorkerExists( + type=values.get('resource_type'), + id=values.get('resource_id'), + ) return worker +@require_context +@main_context_manager.reader def worker_get(context, **filters): """Get a worker or raise exception if it does not exist.""" query = _worker_query(context, **filters) @@ -8599,6 +8608,8 @@ def worker_get(context, **filters): return worker +@require_context +@main_context_manager.reader def worker_get_all(context, **filters): """Get all workers that match given criteria.""" query = _worker_query(context, **filters) @@ -8612,7 +8623,9 @@ def _orm_worker_update(worker, values): setattr(worker, key, value) +@require_context @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) +@main_context_manager.writer def worker_update(context, id, filters=None, orm_worker=None, **values): """Update a worker with given values.""" filters = filters or {} @@ -8631,24 +8644,30 @@ def worker_update(context, id, filters=None, orm_worker=None, **values): return result +@require_context @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) +@main_context_manager.writer def worker_claim_for_cleanup(context, claimer_id, orm_worker): """Claim a worker entry for cleanup.""" # We set updated_at value so we are sure we update the DB entry even if the # service_id is the same in the DB, thus flagging the claim. - values = {'service_id': claimer_id, - 'race_preventer': orm_worker.race_preventer + 1, - 'updated_at': timeutils.utcnow()} + values = { + 'service_id': claimer_id, + 'race_preventer': orm_worker.race_preventer + 1, + 'updated_at': timeutils.utcnow(), + } _worker_set_updated_at_field(values) # We only update the worker entry if it hasn't been claimed by other host # or thread - query = _worker_query(context, - status=orm_worker.status, - service_id=orm_worker.service_id, - race_preventer=orm_worker.race_preventer, - until=orm_worker.updated_at, - id=orm_worker.id) + query = _worker_query( + context, + status=orm_worker.status, + service_id=orm_worker.service_id, + race_preventer=orm_worker.race_preventer, + until=orm_worker.updated_at, + id=orm_worker.id, + ) result = query.update(values, synchronize_session=False) if result: @@ -8656,6 +8675,8 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker): return result +@require_context +@main_context_manager.writer def worker_destroy(context, **filters): """Delete a worker (no soft delete).""" query = _worker_query(context, **filters)