Merge "Make c-vol use workers table for cleanup"
This commit is contained in:
commit
6aceda61b7
@ -80,6 +80,15 @@ class AdminController(wsgi.Controller):
|
||||
action = '%s_admin_actions:%s' % (self.resource_name, action_name)
|
||||
extensions.extension_authorizer('volume', action)(context)
|
||||
|
||||
def _remove_worker(self, context, id):
|
||||
# Remove the cleanup worker from the DB when we change a resource
|
||||
# status since it renders useless the entry.
|
||||
res = db.worker_destroy(context, resource_type=self.collection.title(),
|
||||
resource_id=id)
|
||||
if res:
|
||||
LOG.debug('Worker entry for %s with id %s has been deleted.',
|
||||
self.collection, id)
|
||||
|
||||
@wsgi.action('os-reset_status')
|
||||
def _reset_status(self, req, id, body):
|
||||
"""Reset status on the resource."""
|
||||
@ -106,6 +115,7 @@ class AdminController(wsgi.Controller):
|
||||
|
||||
# Not found exception will be handled at the wsgi level
|
||||
self._update(context, id, update)
|
||||
self._remove_worker(context, id)
|
||||
if update.get('attach_status') == 'detached':
|
||||
_clean_volume_attachment(context, id)
|
||||
|
||||
|
@ -251,8 +251,8 @@ def volume_get(context, volume_id):
|
||||
return IMPL.volume_get(context, volume_id)
|
||||
|
||||
|
||||
def volume_get_all(context, marker, limit, sort_keys=None, sort_dirs=None,
|
||||
filters=None, offset=None):
|
||||
def volume_get_all(context, marker=None, limit=None, sort_keys=None,
|
||||
sort_dirs=None, filters=None, offset=None):
|
||||
"""Get all volumes."""
|
||||
return IMPL.volume_get_all(context, marker, limit, sort_keys=sort_keys,
|
||||
sort_dirs=sort_dirs, filters=filters,
|
||||
@ -1584,6 +1584,17 @@ def message_destroy(context, message_id):
|
||||
###################
|
||||
|
||||
|
||||
def workers_init():
|
||||
"""Check if DB supports subsecond resolution and set global flag.
|
||||
|
||||
MySQL 5.5 doesn't support subsecond resolution in datetime fields, so we
|
||||
have to take it into account when working with the worker's table.
|
||||
|
||||
Once we drop support for MySQL 5.5 we can remove this method.
|
||||
"""
|
||||
return IMPL.workers_init()
|
||||
|
||||
|
||||
def worker_create(context, **values):
|
||||
"""Create a worker entry from optional arguments."""
|
||||
return IMPL.worker_create(context, **values)
|
||||
|
@ -1751,8 +1751,8 @@ def volume_get(context, volume_id):
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def volume_get_all(context, marker, limit, sort_keys=None, sort_dirs=None,
|
||||
filters=None, offset=None):
|
||||
def volume_get_all(context, marker=None, limit=None, sort_keys=None,
|
||||
sort_dirs=None, filters=None, offset=None):
|
||||
"""Retrieves all volumes.
|
||||
|
||||
If no sort parameters are specified then the returned volumes are sorted
|
||||
@ -1998,6 +1998,15 @@ def _process_volume_filters(query, filters):
|
||||
LOG.debug("'migration_status' column could not be found.")
|
||||
return None
|
||||
|
||||
host = filters.pop('host', None)
|
||||
if host:
|
||||
query = query.filter(_filter_host(models.Volume.host, host))
|
||||
|
||||
cluster_name = filters.pop('cluster_name', None)
|
||||
if cluster_name:
|
||||
query = query.filter(_filter_host(models.Volume.cluster_name,
|
||||
cluster_name))
|
||||
|
||||
# Apply exact match filters for everything else, ensure that the
|
||||
# filter value exists on the model
|
||||
for key in filters.keys():
|
||||
@ -2608,7 +2617,8 @@ def snapshot_get_all(context, filters=None, marker=None, limit=None,
|
||||
order.
|
||||
|
||||
:param context: context to query under
|
||||
:param filters: dictionary of filters; will do exact matching on values
|
||||
:param filters: dictionary of filters; will do exact matching on values.
|
||||
Special keys host and cluster_name refer to the volume.
|
||||
:param marker: the last item of the previous page, used to determine the
|
||||
next page of results to return
|
||||
:param limit: maximum number of items to return
|
||||
@ -2618,7 +2628,9 @@ def snapshot_get_all(context, filters=None, marker=None, limit=None,
|
||||
paired with corresponding item in sort_keys
|
||||
:returns: list of matching snapshots
|
||||
"""
|
||||
if filters and not is_valid_model_filters(models.Snapshot, filters):
|
||||
if filters and not is_valid_model_filters(models.Snapshot, filters,
|
||||
exclude_list=('host',
|
||||
'cluster_name')):
|
||||
return []
|
||||
|
||||
session = get_session()
|
||||
@ -2642,8 +2654,19 @@ def _snaps_get_query(context, session=None, project_only=False):
|
||||
def _process_snaps_filters(query, filters):
|
||||
if filters:
|
||||
# Ensure that filters' keys exist on the model
|
||||
if not is_valid_model_filters(models.Snapshot, filters):
|
||||
if not is_valid_model_filters(models.Snapshot, filters,
|
||||
exclude_list=('host', 'cluster_name')):
|
||||
return None
|
||||
filters = filters.copy()
|
||||
host = filters.pop('host', None)
|
||||
cluster = filters.pop('cluster_name', None)
|
||||
if host or cluster:
|
||||
query = query.join(models.Snapshot.volume)
|
||||
vol_field = models.Volume
|
||||
if host:
|
||||
query = query.filter(_filter_host(vol_field.host, host))
|
||||
if cluster:
|
||||
query = query.filter(_filter_host(vol_field.cluster_name, cluster))
|
||||
query = query.filter_by(**filters)
|
||||
return query
|
||||
|
||||
@ -5481,13 +5504,15 @@ def cgsnapshot_get(context, cgsnapshot_id):
|
||||
return _cgsnapshot_get(context, cgsnapshot_id)
|
||||
|
||||
|
||||
def is_valid_model_filters(model, filters):
|
||||
def is_valid_model_filters(model, filters, exclude_list=None):
|
||||
"""Return True if filter values exist on the model
|
||||
|
||||
:param model: a Cinder model
|
||||
:param filters: dictionary of filters
|
||||
"""
|
||||
for key in filters.keys():
|
||||
if exclude_list and key in exclude_list:
|
||||
continue
|
||||
try:
|
||||
getattr(model, key)
|
||||
except AttributeError:
|
||||
@ -6055,7 +6080,7 @@ def image_volume_cache_get_all_for_host(context, host):
|
||||
|
||||
|
||||
def _worker_query(context, session=None, until=None, db_filters=None,
|
||||
**filters):
|
||||
ignore_sentinel=True, **filters):
|
||||
# Remove all filters based on the workers table that are set to None
|
||||
filters = _clean_filters(filters)
|
||||
|
||||
@ -6064,6 +6089,11 @@ def _worker_query(context, session=None, until=None, db_filters=None,
|
||||
|
||||
query = model_query(context, models.Worker, session=session)
|
||||
|
||||
# TODO(geguileo): Once we remove support for MySQL 5.5 we can remove this
|
||||
if ignore_sentinel:
|
||||
# We don't want to retrieve the workers sentinel
|
||||
query = query.filter(models.Worker.resource_type != 'SENTINEL')
|
||||
|
||||
if until:
|
||||
db_filters = list(db_filters) if db_filters else []
|
||||
# Since we set updated_at at creation time we don't need to check
|
||||
@ -6079,8 +6109,41 @@ def _worker_query(context, session=None, until=None, db_filters=None,
|
||||
return query
|
||||
|
||||
|
||||
DB_SUPPORTS_SUBSECOND_RESOLUTION = True
|
||||
|
||||
|
||||
def workers_init():
|
||||
"""Check if DB supports subsecond resolution and set global flag.
|
||||
|
||||
MySQL 5.5 doesn't support subsecond resolution in datetime fields, so we
|
||||
have to take it into account when working with the worker's table.
|
||||
|
||||
To do this we'll have 1 row in the DB, created by the migration script,
|
||||
where we have tried to set the microseconds and we'll check it.
|
||||
|
||||
Once we drop support for MySQL 5.5 we can remove this method.
|
||||
"""
|
||||
global DB_SUPPORTS_SUBSECOND_RESOLUTION
|
||||
session = get_session()
|
||||
query = session.query(models.Worker).filter_by(resource_type='SENTINEL')
|
||||
worker = query.first()
|
||||
DB_SUPPORTS_SUBSECOND_RESOLUTION = bool(worker.updated_at.microsecond)
|
||||
|
||||
|
||||
def _worker_set_updated_at_field(values):
|
||||
# TODO(geguileo): Once we drop support for MySQL 5.5 we can simplify this
|
||||
# method.
|
||||
updated_at = values.get('updated_at', timeutils.utcnow())
|
||||
if isinstance(updated_at, six.string_types):
|
||||
return
|
||||
if not DB_SUPPORTS_SUBSECOND_RESOLUTION:
|
||||
updated_at = updated_at.replace(microsecond=0)
|
||||
values['updated_at'] = updated_at
|
||||
|
||||
|
||||
def worker_create(context, **values):
|
||||
"""Create a worker entry from optional arguments."""
|
||||
_worker_set_updated_at_field(values)
|
||||
worker = models.Worker(**values)
|
||||
session = get_session()
|
||||
try:
|
||||
@ -6118,6 +6181,11 @@ def worker_update(context, id, filters=None, orm_worker=None, **values):
|
||||
"""Update a worker with given values."""
|
||||
filters = filters or {}
|
||||
query = _worker_query(context, id=id, **filters)
|
||||
|
||||
# If we want to update the orm_worker and we don't set the update_at field
|
||||
# we set it here instead of letting SQLAlchemy do it to be able to update
|
||||
# the orm_worker.
|
||||
_worker_set_updated_at_field(values)
|
||||
result = query.update(values)
|
||||
if not result:
|
||||
raise exception.WorkerNotFound(id=id, **filters)
|
||||
@ -6131,6 +6199,7 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker):
|
||||
# service_id is the same in the DB, thus flagging the claim.
|
||||
values = {'service_id': claimer_id,
|
||||
'updated_at': timeutils.utcnow()}
|
||||
_worker_set_updated_at_field(values)
|
||||
|
||||
# We only update the worker entry if it hasn't been claimed by other host
|
||||
# or thread
|
||||
|
@ -0,0 +1,55 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy.dialects import mysql
|
||||
from sqlalchemy import MetaData, Table
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
"""Add microseconds precission on updated_at field in MySQL databases.
|
||||
|
||||
PostgreSQL, SQLite, and MSSQL have sub-second precission by default, but
|
||||
MySQL defaults to second precision in DateTime fields, which creates
|
||||
problems for the resource cleanup mechanism.
|
||||
"""
|
||||
meta = MetaData()
|
||||
meta.bind = migrate_engine
|
||||
workers = Table('workers', meta, autoload=True)
|
||||
|
||||
# This is only necessary for mysql, and since the table is not in use this
|
||||
# will only be an schema update.
|
||||
if migrate_engine.name.startswith('mysql'):
|
||||
try:
|
||||
workers.c.updated_at.alter(mysql.DATETIME(fsp=6))
|
||||
except Exception:
|
||||
# MySQL v5.5 or earlier don't support sub-second resolution so we
|
||||
# may have cleanup races in Active-Active configurations, that's
|
||||
# why upgrading is recommended in that case.
|
||||
# Code in Cinder is capable of working with 5.5, so for 5.5 there's
|
||||
# no problem
|
||||
pass
|
||||
|
||||
# TODO(geguileo): Once we remove support for MySQL 5.5 we have to create
|
||||
# an upgrade migration to remove this row.
|
||||
# Set workers table sub-second support sentinel
|
||||
wi = workers.insert()
|
||||
now = timeutils.utcnow().replace(microsecond=123)
|
||||
wi.execute({'created_at': now,
|
||||
'updated_at': now,
|
||||
'deleted': False,
|
||||
'resource_type': 'SENTINEL',
|
||||
'resource_id': 'SUB-SECOND',
|
||||
'status': 'OK'})
|
@ -56,9 +56,14 @@ from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder.db import base
|
||||
from cinder.i18n import _LI
|
||||
from cinder import exception
|
||||
from cinder.i18n import _LE, _LI, _LW
|
||||
from cinder import objects
|
||||
from cinder import rpc
|
||||
from cinder.scheduler import rpcapi as scheduler_rpcapi
|
||||
|
||||
@ -92,13 +97,14 @@ class Manager(base.Base, PeriodicTasks):
|
||||
"""Tasks to be run at a periodic interval."""
|
||||
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
|
||||
|
||||
def init_host(self, added_to_cluster=None):
|
||||
def init_host(self, service_id=None, added_to_cluster=None):
|
||||
"""Handle initialization if this is a standalone service.
|
||||
|
||||
A hook point for services to execute tasks before the services are made
|
||||
available (i.e. showing up on RPC and starting to accept RPC calls) to
|
||||
other components. Child classes should override this method.
|
||||
|
||||
:param service_id: ID of the service where the manager is running.
|
||||
:param added_to_cluster: True when a host's cluster configuration has
|
||||
changed from not being defined or being '' to
|
||||
any other value and the DB service record
|
||||
@ -175,3 +181,101 @@ class SchedulerDependentManager(Manager):
|
||||
def reset(self):
|
||||
super(SchedulerDependentManager, self).reset()
|
||||
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
|
||||
|
||||
|
||||
class CleanableManager(object):
|
||||
def do_cleanup(self, context, cleanup_request):
|
||||
LOG.info(_LI('Initiating service %s cleanup'),
|
||||
cleanup_request.service_id)
|
||||
|
||||
# If the 'until' field in the cleanup request is not set, we default to
|
||||
# this very moment.
|
||||
until = cleanup_request.until or timeutils.utcnow()
|
||||
keep_entry = False
|
||||
|
||||
to_clean = db.worker_get_all(
|
||||
context,
|
||||
resource_type=cleanup_request.resource_type,
|
||||
resource_id=cleanup_request.resource_id,
|
||||
service_id=cleanup_request.service_id,
|
||||
until=until)
|
||||
|
||||
for clean in to_clean:
|
||||
original_service_id = clean.service_id
|
||||
original_time = clean.updated_at
|
||||
# Try to do a soft delete to mark the entry as being cleaned up
|
||||
# by us (setting service id to our service id).
|
||||
res = db.worker_claim_for_cleanup(context,
|
||||
claimer_id=self.service_id,
|
||||
orm_worker=clean)
|
||||
|
||||
# Claim may fail if entry is being cleaned by another service, has
|
||||
# been removed (finished cleaning) by another service or the user
|
||||
# started a new cleanable operation.
|
||||
# In any of these cases we don't have to do cleanup or remove the
|
||||
# worker entry.
|
||||
if not res:
|
||||
continue
|
||||
|
||||
# Try to get versioned object for resource we have to cleanup
|
||||
try:
|
||||
vo_cls = getattr(objects, clean.resource_type)
|
||||
vo = vo_cls.get_by_id(context, clean.resource_id)
|
||||
# Set the worker DB entry in the VO and mark it as being a
|
||||
# clean operation
|
||||
clean.cleaning = True
|
||||
vo.worker = clean
|
||||
except exception.NotFound:
|
||||
LOG.debug('Skipping cleanup for non existent %(type)s %(id)s.',
|
||||
{'type': clean.resource_type,
|
||||
'id': clean.resource_id})
|
||||
else:
|
||||
# Resource status should match
|
||||
if vo.status != clean.status:
|
||||
LOG.debug('Skipping cleanup for mismatching work on '
|
||||
'%(type)s %(id)s: %(exp_sts)s <> %(found_sts)s.',
|
||||
{'type': clean.resource_type,
|
||||
'id': clean.resource_id,
|
||||
'exp_sts': clean.status,
|
||||
'found_sts': vo.status})
|
||||
else:
|
||||
LOG.info(_LI('Cleaning %(type)s with id %(id)s and status '
|
||||
'%(status)s'),
|
||||
{'type': clean.resource_type,
|
||||
'id': clean.resource_id,
|
||||
'status': clean.status},
|
||||
resource=vo)
|
||||
try:
|
||||
# Some cleanup jobs are performed asynchronously, so
|
||||
# we don't delete the worker entry, they'll take care
|
||||
# of it
|
||||
keep_entry = self._do_cleanup(context, vo)
|
||||
except Exception:
|
||||
LOG.exception(_LE('Could not perform cleanup.'))
|
||||
# Return the worker DB entry to the original service
|
||||
db.worker_update(context, clean.id,
|
||||
service_id=original_service_id,
|
||||
updated_at=original_time)
|
||||
continue
|
||||
|
||||
# The resource either didn't exist or was properly cleaned, either
|
||||
# way we can remove the entry from the worker table if the cleanup
|
||||
# method doesn't want to keep the entry (for example for delayed
|
||||
# deletion).
|
||||
if not keep_entry and not db.worker_destroy(context, id=clean.id):
|
||||
LOG.warning(_LW('Could not remove worker entry %s.'), clean.id)
|
||||
|
||||
LOG.info(_LI('Service %s cleanup completed.'),
|
||||
cleanup_request.service_id)
|
||||
|
||||
def _do_cleanup(self, ctxt, vo_resource):
|
||||
return False
|
||||
|
||||
def init_host(self, service_id, **kwargs):
|
||||
ctxt = context.get_admin_context()
|
||||
self.service_id = service_id
|
||||
# TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove
|
||||
# call to workers_init.
|
||||
db.workers_init()
|
||||
cleanup_request = objects.CleanupRequest(service_id=service_id)
|
||||
self.do_cleanup(ctxt, cleanup_request)
|
||||
|
@ -119,6 +119,7 @@ OBJ_VERSIONS.add('1.11', {'GroupSnapshot': '1.0', 'GroupSnapshotList': '1.0',
|
||||
OBJ_VERSIONS.add('1.12', {'VolumeType': '1.3'})
|
||||
OBJ_VERSIONS.add('1.13', {'CleanupRequest': '1.0'})
|
||||
OBJ_VERSIONS.add('1.14', {'VolumeAttachmentList': '1.1'})
|
||||
OBJ_VERSIONS.add('1.15', {'Volume': '1.6', 'Snapshot': '1.2'})
|
||||
|
||||
|
||||
class CinderObjectRegistry(base.VersionedObjectRegistry):
|
||||
|
@ -13,9 +13,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from functools import wraps
|
||||
import inspect
|
||||
|
||||
import decorator
|
||||
from oslo_utils import versionutils
|
||||
|
||||
from cinder import db
|
||||
@ -168,8 +168,7 @@ class CinderCleanableObject(base.CinderPersistentObject):
|
||||
to be added.
|
||||
"""
|
||||
def _decorator(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
def wrapper(f, *args, **kwargs):
|
||||
if decorator_args:
|
||||
call_args = inspect.getcallargs(f, *args, **kwargs)
|
||||
candidates = [call_args[obj] for obj in decorator_args]
|
||||
@ -201,7 +200,7 @@ class CinderCleanableObject(base.CinderPersistentObject):
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
return wrapper
|
||||
return decorator.decorate(f, wrapper)
|
||||
|
||||
# If we don't have optional decorator arguments the argument in
|
||||
# decorator_args is the function we have to decorate
|
||||
|
@ -21,6 +21,7 @@ from cinder import exception
|
||||
from cinder.i18n import _
|
||||
from cinder import objects
|
||||
from cinder.objects import base
|
||||
from cinder.objects import cleanable
|
||||
from cinder.objects import fields as c_fields
|
||||
|
||||
|
||||
@ -28,11 +29,12 @@ CONF = cfg.CONF
|
||||
|
||||
|
||||
@base.CinderObjectRegistry.register
|
||||
class Snapshot(base.CinderPersistentObject, base.CinderObject,
|
||||
class Snapshot(cleanable.CinderCleanableObject, base.CinderObject,
|
||||
base.CinderObjectDictCompat):
|
||||
# Version 1.0: Initial version
|
||||
# Version 1.1: Changed 'status' field to use SnapshotStatusField
|
||||
VERSION = '1.1'
|
||||
# Version 1.2: This object is now cleanable (adds rows to workers table)
|
||||
VERSION = '1.2'
|
||||
|
||||
# NOTE(thangp): OPTIONAL_FIELDS are fields that would be lazy-loaded. They
|
||||
# are typically the relationship in the sqlalchemy object.
|
||||
@ -249,6 +251,13 @@ class Snapshot(base.CinderPersistentObject, base.CinderObject,
|
||||
return db.snapshot_data_get_for_project(context, project_id,
|
||||
volume_type_id)
|
||||
|
||||
@staticmethod
|
||||
def _is_cleanable(status, obj_version):
|
||||
# Before 1.2 we didn't have workers table, so cleanup wasn't supported.
|
||||
if obj_version and obj_version < 1.2:
|
||||
return False
|
||||
return status == 'creating'
|
||||
|
||||
|
||||
@base.CinderObjectRegistry.register
|
||||
class SnapshotList(base.ObjectListBase, base.CinderObject):
|
||||
@ -261,6 +270,11 @@ class SnapshotList(base.ObjectListBase, base.CinderObject):
|
||||
@classmethod
|
||||
def get_all(cls, context, search_opts, marker=None, limit=None,
|
||||
sort_keys=None, sort_dirs=None, offset=None):
|
||||
"""Get all snapshot given some search_opts (filters).
|
||||
|
||||
Special search options accepted are host and cluster_name, that refer
|
||||
to the volume's fields.
|
||||
"""
|
||||
snapshots = db.snapshot_get_all(context, search_opts, marker, limit,
|
||||
sort_keys, sort_dirs, offset)
|
||||
expected_attrs = Snapshot._get_expected_attrs(context)
|
||||
|
@ -21,6 +21,7 @@ from cinder import exception
|
||||
from cinder.i18n import _
|
||||
from cinder import objects
|
||||
from cinder.objects import base
|
||||
from cinder.objects import cleanable
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -48,7 +49,7 @@ class MetadataObject(dict):
|
||||
|
||||
|
||||
@base.CinderObjectRegistry.register
|
||||
class Volume(base.CinderPersistentObject, base.CinderObject,
|
||||
class Volume(cleanable.CinderCleanableObject, base.CinderObject,
|
||||
base.CinderObjectDictCompat, base.CinderComparableObject,
|
||||
base.ClusteredObject):
|
||||
# Version 1.0: Initial version
|
||||
@ -58,7 +59,8 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
|
||||
# Version 1.3: Added finish_volume_migration()
|
||||
# Version 1.4: Added cluster fields
|
||||
# Version 1.5: Added group
|
||||
VERSION = '1.5'
|
||||
# Version 1.6: This object is now cleanable (adds rows to workers table)
|
||||
VERSION = '1.6'
|
||||
|
||||
OPTIONAL_FIELDS = ('metadata', 'admin_metadata', 'glance_metadata',
|
||||
'volume_type', 'volume_attachment', 'consistencygroup',
|
||||
@ -364,6 +366,14 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
|
||||
self.admin_metadata = db.volume_admin_metadata_update(
|
||||
self._context, self.id, metadata, True)
|
||||
|
||||
# When we are creating a volume and we change from 'creating'
|
||||
# status to 'downloading' status we have to change the worker entry
|
||||
# in the DB to reflect this change, otherwise the cleanup will
|
||||
# not be performed as it will be mistaken for a volume that has
|
||||
# been somehow changed (reset status, forced operation...)
|
||||
if updates.get('status') == 'downloading':
|
||||
self.set_worker()
|
||||
|
||||
db.volume_update(self._context, self.id, updates)
|
||||
self.obj_reset_changes()
|
||||
|
||||
@ -486,6 +496,14 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
|
||||
dest_volume.save()
|
||||
return dest_volume
|
||||
|
||||
@staticmethod
|
||||
def _is_cleanable(status, obj_version):
|
||||
# Before 1.6 we didn't have workers table, so cleanup wasn't supported.
|
||||
# cleaning.
|
||||
if obj_version and obj_version < 1.6:
|
||||
return False
|
||||
return status in ('creating', 'deleting', 'uploading', 'downloading')
|
||||
|
||||
|
||||
@base.CinderObjectRegistry.register
|
||||
class VolumeList(base.ObjectListBase, base.CinderObject):
|
||||
@ -523,8 +541,8 @@ class VolumeList(base.ObjectListBase, base.CinderObject):
|
||||
return expected_attrs
|
||||
|
||||
@classmethod
|
||||
def get_all(cls, context, marker, limit, sort_keys=None, sort_dirs=None,
|
||||
filters=None, offset=None):
|
||||
def get_all(cls, context, marker=None, limit=None, sort_keys=None,
|
||||
sort_dirs=None, filters=None, offset=None):
|
||||
volumes = db.volume_get_all(context, marker, limit,
|
||||
sort_keys=sort_keys, sort_dirs=sort_dirs,
|
||||
filters=filters, offset=offset)
|
||||
|
@ -95,6 +95,7 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
|
||||
def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
|
||||
request_spec=None, filter_properties=None):
|
||||
volume.create_worker()
|
||||
cctxt = self._get_cctxt()
|
||||
msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id,
|
||||
'request_spec': request_spec,
|
||||
|
@ -121,7 +121,6 @@ class Service(service.Service):
|
||||
on topic. It also periodically runs tasks on the manager and reports
|
||||
it state to the database services table.
|
||||
"""
|
||||
|
||||
# Make service_id a class attribute so it can be used for clean up
|
||||
service_id = None
|
||||
|
||||
@ -144,6 +143,8 @@ class Service(service.Service):
|
||||
if CONF.profiler.enabled:
|
||||
manager_class = profiler.trace_cls("rpc")(manager_class)
|
||||
|
||||
self.service = None
|
||||
|
||||
# NOTE(geguileo): We need to create the Service DB entry before we
|
||||
# create the manager, otherwise capped versions for serializer and rpc
|
||||
# client would use existing DB entries not including us, which could
|
||||
@ -234,7 +235,8 @@ class Service(service.Service):
|
||||
if self.coordination:
|
||||
coordination.COORDINATOR.start()
|
||||
|
||||
self.manager.init_host(added_to_cluster=self.added_to_cluster)
|
||||
self.manager.init_host(added_to_cluster=self.added_to_cluster,
|
||||
service_id=Service.service_id)
|
||||
|
||||
LOG.debug("Creating RPC server for service %s", self.topic)
|
||||
|
||||
|
@ -89,6 +89,7 @@ class TestCase(testtools.TestCase):
|
||||
"""Test case base class for all unit tests."""
|
||||
|
||||
POLICY_PATH = 'cinder/tests/unit/policy.json'
|
||||
MOCK_WORKER = True
|
||||
|
||||
def _get_joined_notifier(self, *args, **kwargs):
|
||||
# We create a new fake notifier but we join the notifications with
|
||||
@ -110,6 +111,12 @@ class TestCase(testtools.TestCase):
|
||||
side_effect=self._get_joined_notifier)
|
||||
p.start()
|
||||
|
||||
if self.MOCK_WORKER:
|
||||
# Mock worker creation for all tests that don't care about it
|
||||
clean_path = 'cinder.objects.cleanable.CinderCleanableObject.%s'
|
||||
for method in ('create_worker', 'set_worker', 'unset_worker'):
|
||||
self.patch(clean_path % method, return_value=None)
|
||||
|
||||
# Unit tests do not need to use lazy gettext
|
||||
i18n.enable_lazy(False)
|
||||
|
||||
|
@ -23,8 +23,8 @@ from cinder import quota
|
||||
from cinder.tests.unit import conf_fixture
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.tests.unit import fake_snapshot
|
||||
from cinder.tests.unit import test_volume
|
||||
from cinder.tests.unit import utils as tests_utils
|
||||
from cinder.tests.unit import volume as base
|
||||
import cinder.volume
|
||||
from cinder.volume import driver
|
||||
from cinder.volume import utils as volutils
|
||||
@ -33,7 +33,7 @@ CGQUOTAS = quota.CGQUOTAS
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
|
||||
class ConsistencyGroupTestCase(base.BaseVolumeTestCase):
|
||||
def test_delete_volume_in_consistency_group(self):
|
||||
"""Test deleting a volume that's tied to a consistency group fails."""
|
||||
consistencygroup_id = fake.CONSISTENCY_GROUP_ID
|
||||
|
@ -21,8 +21,8 @@ from cinder import test
|
||||
|
||||
|
||||
class BaseObjectsTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(BaseObjectsTestCase, self).setUp()
|
||||
def setUp(self, *args, **kwargs):
|
||||
super(BaseObjectsTestCase, self).setUp(*args, **kwargs)
|
||||
self.user_id = 'fake-user'
|
||||
self.project_id = 'fake-project'
|
||||
self.context = context.RequestContext(self.user_id, self.project_id,
|
||||
|
@ -40,6 +40,7 @@ class Backup(cleanable.CinderCleanableObject):
|
||||
|
||||
|
||||
class TestCleanable(test_objects.BaseObjectsTestCase):
|
||||
MOCK_WORKER = False
|
||||
|
||||
def setUp(self):
|
||||
super(TestCleanable, self).setUp()
|
||||
|
@ -38,9 +38,9 @@ object_data = {
|
||||
'RequestSpec': '1.1-b0bd1a28d191d75648901fa853e8a733',
|
||||
'Service': '1.4-c7d011989d1718ca0496ccf640b42712',
|
||||
'ServiceList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
'Snapshot': '1.1-d6a9d58f627bb2a5cf804b0dd7a12bc7',
|
||||
'Snapshot': '1.2-d6a9d58f627bb2a5cf804b0dd7a12bc7',
|
||||
'SnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
'Volume': '1.5-19919d8086d6a38ab9d3ab88139e70e0',
|
||||
'Volume': '1.6-19919d8086d6a38ab9d3ab88139e70e0',
|
||||
'VolumeList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
'VolumeAttachment': '1.0-b30dacf62b2030dd83d8a1603f1064ff',
|
||||
'VolumeAttachmentList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
|
@ -17,8 +17,6 @@
|
||||
Unit Tests for cinder.scheduler.rpcapi
|
||||
"""
|
||||
|
||||
import copy
|
||||
|
||||
import mock
|
||||
|
||||
from cinder import context
|
||||
@ -46,7 +44,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
"version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
|
||||
}
|
||||
|
||||
expected_msg = copy.deepcopy(kwargs)
|
||||
expected_msg = kwargs.copy()
|
||||
|
||||
self.fake_args = None
|
||||
self.fake_kwargs = None
|
||||
@ -86,44 +84,65 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
version='3.0')
|
||||
|
||||
def test_create_volume(self):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
create_worker_mock = self.mock_object(volume, 'create_worker')
|
||||
self._test_scheduler_api('create_volume',
|
||||
rpc_method='cast',
|
||||
snapshot_id='snapshot_id',
|
||||
image_id='image_id',
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume=fake_volume.fake_volume_obj(
|
||||
self.context),
|
||||
volume=volume,
|
||||
version='3.0')
|
||||
create_worker_mock.assert_called_once()
|
||||
|
||||
def test_create_volume_serialization(self):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
create_worker_mock = self.mock_object(volume, 'create_worker')
|
||||
self._test_scheduler_api('create_volume',
|
||||
rpc_method='cast',
|
||||
snapshot_id='snapshot_id',
|
||||
image_id='image_id',
|
||||
request_spec={'volume_type': {}},
|
||||
filter_properties='filter_properties',
|
||||
volume=volume,
|
||||
version='3.0')
|
||||
create_worker_mock.assert_called_once()
|
||||
|
||||
def test_migrate_volume_to_host(self):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
create_worker_mock = self.mock_object(volume, 'create_worker')
|
||||
self._test_scheduler_api('migrate_volume_to_host',
|
||||
rpc_method='cast',
|
||||
host='host',
|
||||
force_host_copy=True,
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume=fake_volume.fake_volume_obj(
|
||||
self.context),
|
||||
volume=volume,
|
||||
version='3.0')
|
||||
create_worker_mock.assert_not_called()
|
||||
|
||||
def test_retype(self):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
create_worker_mock = self.mock_object(volume, 'create_worker')
|
||||
self._test_scheduler_api('retype',
|
||||
rpc_method='cast',
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume=fake_volume.fake_volume_obj(
|
||||
self.context),
|
||||
volume=volume,
|
||||
version='3.0')
|
||||
create_worker_mock.assert_not_called()
|
||||
|
||||
def test_manage_existing(self):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
create_worker_mock = self.mock_object(volume, 'create_worker')
|
||||
self._test_scheduler_api('manage_existing',
|
||||
rpc_method='cast',
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume=fake_volume.fake_volume_obj(
|
||||
self.context),
|
||||
volume=volume,
|
||||
version='3.0')
|
||||
create_worker_mock.assert_not_called()
|
||||
|
||||
def test_get_pools(self):
|
||||
self._test_scheduler_api('get_pools',
|
||||
|
227
cinder/tests/unit/test_cleanable_manager.py
Normal file
227
cinder/tests/unit/test_cleanable_manager.py
Normal file
@ -0,0 +1,227 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import manager
|
||||
from cinder import objects
|
||||
from cinder import test
|
||||
from cinder.tests.unit import fake_constants
|
||||
from cinder.tests.unit import utils
|
||||
|
||||
|
||||
class FakeManager(manager.CleanableManager):
|
||||
def __init__(self, service_id=None, keep_after_clean=False):
|
||||
if service_id:
|
||||
self.service_id = service_id
|
||||
self.keep_after_clean = keep_after_clean
|
||||
|
||||
def _do_cleanup(self, ctxt, vo_resource):
|
||||
vo_resource.status += '_cleaned'
|
||||
vo_resource.save()
|
||||
return self.keep_after_clean
|
||||
|
||||
|
||||
class TestCleanableManager(test.TestCase):
|
||||
def setUp(self):
|
||||
super(TestCleanableManager, self).setUp()
|
||||
self.user_id = fake_constants.USER_ID
|
||||
self.project_id = fake_constants.PROJECT_ID
|
||||
self.context = context.RequestContext(self.user_id, self.project_id,
|
||||
is_admin=True)
|
||||
self.service = db.service_create(self.context, {})
|
||||
|
||||
@mock.patch('cinder.db.workers_init', autospec=True)
|
||||
@mock.patch('cinder.manager.CleanableManager.do_cleanup', autospec=True)
|
||||
def test_init_host_with_service(self, mock_cleanup, mock_workers_init):
|
||||
mngr = FakeManager()
|
||||
self.assertFalse(hasattr(mngr, 'service_id'))
|
||||
mngr.init_host(service_id=self.service.id)
|
||||
|
||||
self.assertEqual(self.service.id, mngr.service_id)
|
||||
mock_cleanup.assert_called_once_with(mngr, mock.ANY, mock.ANY)
|
||||
clean_req = mock_cleanup.call_args[0][2]
|
||||
self.assertIsInstance(clean_req, objects.CleanupRequest)
|
||||
self.assertEqual(self.service.id, clean_req.service_id)
|
||||
mock_workers_init.assert_called_once_with()
|
||||
|
||||
def test_do_cleanup(self):
|
||||
"""Basic successful cleanup."""
|
||||
vol = utils.create_volume(self.context, status='creating')
|
||||
db.worker_create(self.context, status='creating',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id)
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id)
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
self.assertListEqual([], db.worker_get_all(self.context))
|
||||
vol.refresh()
|
||||
self.assertEqual('creating_cleaned', vol.status)
|
||||
|
||||
def test_do_cleanup_not_cleaning_already_claimed(self):
|
||||
"""Basic cleanup that doesn't touch already cleaning works."""
|
||||
vol = utils.create_volume(self.context, status='creating')
|
||||
worker1 = db.worker_create(self.context, status='creating',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id)
|
||||
worker1 = db.worker_get(self.context, id=worker1.id)
|
||||
vol2 = utils.create_volume(self.context, status='deleting')
|
||||
worker2 = db.worker_create(self.context, status='deleting',
|
||||
resource_type='Volume', resource_id=vol2.id,
|
||||
service_id=self.service.id + 1)
|
||||
worker2 = db.worker_get(self.context, id=worker2.id)
|
||||
|
||||
# Simulate that the change to vol2 worker happened between
|
||||
# worker_get_all and trying to claim a work for cleanup
|
||||
worker2.service_id = self.service.id
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id)
|
||||
with mock.patch('cinder.db.worker_get_all') as get_all_mock:
|
||||
get_all_mock.return_value = [worker1, worker2]
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
workers = db.worker_get_all(self.context)
|
||||
self.assertEqual(1, len(workers))
|
||||
self.assertEqual(worker2.id, workers[0].id)
|
||||
|
||||
vol.refresh()
|
||||
self.assertEqual('creating_cleaned', vol.status)
|
||||
vol2.refresh()
|
||||
self.assertEqual('deleting', vol2.status)
|
||||
|
||||
def test_do_cleanup_not_cleaning_already_claimed_by_us(self):
|
||||
"""Basic cleanup that doesn't touch other thread's claimed works."""
|
||||
original_time = timeutils.utcnow()
|
||||
other_thread_claimed_time = timeutils.utcnow()
|
||||
vol = utils.create_volume(self.context, status='creating')
|
||||
worker1 = db.worker_create(self.context, status='creating',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id,
|
||||
updated_at=original_time)
|
||||
worker1 = db.worker_get(self.context, id=worker1.id)
|
||||
vol2 = utils.create_volume(self.context, status='deleting')
|
||||
worker2 = db.worker_create(self.context, status='deleting',
|
||||
resource_type='Volume', resource_id=vol2.id,
|
||||
service_id=self.service.id,
|
||||
updated_at=other_thread_claimed_time)
|
||||
worker2 = db.worker_get(self.context, id=worker2.id)
|
||||
|
||||
# Simulate that the change to vol2 worker happened between
|
||||
# worker_get_all and trying to claim a work for cleanup
|
||||
worker2.updated_at = original_time
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id)
|
||||
with mock.patch('cinder.db.worker_get_all') as get_all_mock:
|
||||
get_all_mock.return_value = [worker1, worker2]
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
workers = db.worker_get_all(self.context)
|
||||
self.assertEqual(1, len(workers))
|
||||
self.assertEqual(worker2.id, workers[0].id)
|
||||
|
||||
vol.refresh()
|
||||
self.assertEqual('creating_cleaned', vol.status)
|
||||
vol2.refresh()
|
||||
self.assertEqual('deleting', vol2.status)
|
||||
|
||||
def test_do_cleanup_resource_deleted(self):
|
||||
"""Cleanup on a resource that's been already deleted."""
|
||||
vol = utils.create_volume(self.context, status='creating')
|
||||
db.worker_create(self.context, status='creating',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id)
|
||||
vol.destroy()
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id)
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
workers = db.worker_get_all(self.context)
|
||||
self.assertListEqual([], workers)
|
||||
|
||||
def test_do_cleanup_resource_on_another_service(self):
|
||||
"""Cleanup on a resource that's been claimed by other service."""
|
||||
vol = utils.create_volume(self.context, status='deleting')
|
||||
db.worker_create(self.context, status='deleting',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id + 1)
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id)
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
workers = db.worker_get_all(self.context)
|
||||
self.assertEqual(1, len(workers))
|
||||
|
||||
vol.refresh()
|
||||
self.assertEqual('deleting', vol.status)
|
||||
|
||||
def test_do_cleanup_resource_changed_status(self):
|
||||
"""Cleanup on a resource that's changed status."""
|
||||
vol = utils.create_volume(self.context, status='available')
|
||||
db.worker_create(self.context, status='creating',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id)
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id)
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
workers = db.worker_get_all(self.context)
|
||||
self.assertListEqual([], workers)
|
||||
|
||||
vol.refresh()
|
||||
self.assertEqual('available', vol.status)
|
||||
|
||||
def test_do_cleanup_keep_worker(self):
|
||||
"""Cleanup on a resource that will remove worker when cleaning up."""
|
||||
vol = utils.create_volume(self.context, status='deleting')
|
||||
db.worker_create(self.context, status='deleting',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id)
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id, keep_after_clean=True)
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
workers = db.worker_get_all(self.context)
|
||||
self.assertEqual(1, len(workers))
|
||||
vol.refresh()
|
||||
self.assertEqual('deleting_cleaned', vol.status)
|
||||
|
||||
@mock.patch.object(FakeManager, '_do_cleanup', side_effect=Exception)
|
||||
def test_do_cleanup_revive_on_cleanup_fail(self, mock_clean):
|
||||
"""Cleanup will revive a worker if cleanup fails."""
|
||||
vol = utils.create_volume(self.context, status='creating')
|
||||
db.worker_create(self.context, status='creating',
|
||||
resource_type='Volume', resource_id=vol.id,
|
||||
service_id=self.service.id)
|
||||
|
||||
clean_req = objects.CleanupRequest(service_id=self.service.id)
|
||||
mngr = FakeManager(self.service.id)
|
||||
mngr.do_cleanup(self.context, clean_req)
|
||||
|
||||
workers = db.worker_get_all(self.context)
|
||||
self.assertEqual(1, len(workers))
|
||||
vol.refresh()
|
||||
self.assertEqual('creating', vol.status)
|
@ -407,6 +407,21 @@ class DBAPIVolumeTestCase(BaseTest):
|
||||
self._assertEqualListsOfObjects(volumes, db.volume_get_all(
|
||||
self.ctxt, None, None, ['host'], None))
|
||||
|
||||
@ddt.data('cluster_name', 'host')
|
||||
def test_volume_get_all_filter_host_and_cluster(self, field):
|
||||
volumes = []
|
||||
for i in range(2):
|
||||
for value in ('host%d@backend#pool', 'host%d@backend', 'host%d'):
|
||||
kwargs = {field: value % i}
|
||||
volumes.append(utils.create_volume(self.ctxt, **kwargs))
|
||||
|
||||
for i in range(3):
|
||||
filters = {field: getattr(volumes[i], field)}
|
||||
result = db.volume_get_all(self.ctxt, filters=filters)
|
||||
self.assertEqual(i + 1, len(result))
|
||||
self.assertSetEqual({v.id for v in volumes[:i + 1]},
|
||||
{v.id for v in result})
|
||||
|
||||
def test_volume_get_all_marker_passed(self):
|
||||
volumes = [
|
||||
db.volume_create(self.ctxt, {'id': 1}),
|
||||
@ -1277,6 +1292,7 @@ class DBAPIVolumeTestCase(BaseTest):
|
||||
db_vols[i].cluster_name)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class DBAPISnapshotTestCase(BaseTest):
|
||||
|
||||
"""Tests for cinder.db.api.snapshot_*."""
|
||||
@ -1367,6 +1383,24 @@ class DBAPISnapshotTestCase(BaseTest):
|
||||
filters),
|
||||
ignored_keys=['metadata', 'volume'])
|
||||
|
||||
@ddt.data('cluster_name', 'host')
|
||||
def test_snapshot_get_all_filter_host_and_cluster(self, field):
|
||||
volumes = []
|
||||
snapshots = []
|
||||
for i in range(2):
|
||||
for value in ('host%d@backend#pool', 'host%d@backend', 'host%d'):
|
||||
kwargs = {field: value % i}
|
||||
vol = utils.create_volume(self.ctxt, **kwargs)
|
||||
volumes.append(vol)
|
||||
snapshots.append(utils.create_snapshot(self.ctxt, vol.id))
|
||||
|
||||
for i in range(3):
|
||||
filters = {field: getattr(volumes[i], field)}
|
||||
result = db.snapshot_get_all(self.ctxt, filters=filters)
|
||||
self.assertEqual(i + 1, len(result))
|
||||
self.assertSetEqual({s.id for s in snapshots[:i + 1]},
|
||||
{s.id for s in result})
|
||||
|
||||
def test_snapshot_get_by_host(self):
|
||||
db.volume_create(self.ctxt, {'id': 1, 'host': 'host1'})
|
||||
db.volume_create(self.ctxt, {'id': 2, 'host': 'host2'})
|
||||
|
@ -15,9 +15,11 @@
|
||||
|
||||
"""Unit tests for cinder.db.api.Worker"""
|
||||
|
||||
from datetime import datetime
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo_db import exception as db_exception
|
||||
import six
|
||||
|
||||
@ -40,12 +42,41 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
super(DBAPIWorkerTestCase, self).setUp()
|
||||
self.ctxt = context.get_admin_context()
|
||||
|
||||
def tearDown(self):
|
||||
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = True
|
||||
super(DBAPIWorkerTestCase, self).tearDown()
|
||||
|
||||
def test_workers_init(self):
|
||||
# SQLite supports subsecond resolution so result is True
|
||||
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = None
|
||||
db.workers_init()
|
||||
self.assertTrue(db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION)
|
||||
|
||||
def test_workers_init_not_supported(self):
|
||||
# Fake a Db that doesn't support sub-second resolution in datetimes
|
||||
db.worker_update(
|
||||
self.ctxt, None,
|
||||
{'resource_type': 'SENTINEL', 'ignore_sentinel': False},
|
||||
updated_at=datetime.utcnow().replace(microsecond=0))
|
||||
db.workers_init()
|
||||
self.assertFalse(db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION)
|
||||
|
||||
def test_worker_create_and_get(self):
|
||||
"""Test basic creation of a worker record."""
|
||||
worker = db.worker_create(self.ctxt, **self.worker_fields)
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(worker, db_worker)
|
||||
|
||||
@mock.patch('oslo_utils.timeutils.utcnow',
|
||||
return_value=datetime.utcnow().replace(microsecond=123))
|
||||
def test_worker_create_no_subsecond(self, mock_utcnow):
|
||||
"""Test basic creation of a worker record."""
|
||||
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = False
|
||||
worker = db.worker_create(self.ctxt, **self.worker_fields)
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(worker, db_worker)
|
||||
self.assertEqual(0, db_worker.updated_at.microsecond)
|
||||
|
||||
def test_worker_create_unique_constrains(self):
|
||||
"""Test when we use an already existing resource type and id."""
|
||||
db.worker_create(self.ctxt, **self.worker_fields)
|
||||
@ -131,6 +162,21 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(worker, db_worker, ['updated_at'])
|
||||
|
||||
def test_worker_update_no_subsecond(self):
|
||||
"""Test basic worker update."""
|
||||
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = False
|
||||
worker = self._create_workers(1)[0]
|
||||
worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
now = datetime.utcnow().replace(microsecond=123)
|
||||
with mock.patch('oslo_utils.timeutils.utcnow', return_value=now):
|
||||
res = db.worker_update(self.ctxt, worker.id, service_id=1)
|
||||
self.assertEqual(1, res)
|
||||
worker.service_id = 1
|
||||
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(worker, db_worker, ['updated_at'])
|
||||
self.assertEqual(0, db_worker.updated_at.microsecond)
|
||||
|
||||
def test_worker_update_update_orm(self):
|
||||
"""Test worker update updating the worker orm object."""
|
||||
worker = self._create_workers(1)[0]
|
||||
@ -139,7 +185,9 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
self.assertEqual(1, res)
|
||||
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(worker, db_worker, ['updated_at'])
|
||||
# If we are updating the ORM object we don't ignore the update_at field
|
||||
# because it will get updated in the ORM instance.
|
||||
self._assertEqualObjects(worker, db_worker)
|
||||
|
||||
def test_worker_destroy(self):
|
||||
"""Test that worker destroy really deletes the DB entry."""
|
||||
@ -152,7 +200,7 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
|
||||
def test_worker_destroy_non_existent(self):
|
||||
"""Test that worker destroy returns 0 when entry doesn't exist."""
|
||||
res = db.worker_destroy(self.ctxt, id=1)
|
||||
res = db.worker_destroy(self.ctxt, id=100)
|
||||
self.assertEqual(0, res)
|
||||
|
||||
def test_worker_claim(self):
|
||||
|
@ -427,7 +427,8 @@ class ServiceTestCase(test.TestCase):
|
||||
# Since we have created the service entry we call init_host with
|
||||
# added_to_cluster=True
|
||||
init_host_mock.assert_called_once_with(
|
||||
added_to_cluster=added_to_cluster)
|
||||
added_to_cluster=added_to_cluster,
|
||||
service_id=self.service_ref['id'])
|
||||
|
||||
expected_target_calls = [mock.call(topic=self.topic, server=self.host)]
|
||||
expected_rpc_calls = [mock.call(target_mock.return_value, mock.ANY,
|
||||
|
@ -36,7 +36,6 @@ from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import units
|
||||
import six
|
||||
from stevedore import extension
|
||||
from taskflow.engines.action_engine import engine
|
||||
|
||||
from cinder.api import common
|
||||
@ -65,6 +64,7 @@ from cinder.tests.unit import fake_volume
|
||||
from cinder.tests.unit.image import fake as fake_image
|
||||
from cinder.tests.unit.keymgr import fake as fake_keymgr
|
||||
from cinder.tests.unit import utils as tests_utils
|
||||
from cinder.tests.unit import volume as base
|
||||
from cinder import utils
|
||||
import cinder.volume
|
||||
from cinder.volume import api as volume_api
|
||||
@ -124,120 +124,7 @@ class FakeImageService(object):
|
||||
'status': 'active'}
|
||||
|
||||
|
||||
class BaseVolumeTestCase(test.TestCase):
|
||||
"""Test Case for volumes."""
|
||||
|
||||
FAKE_UUID = fake.IMAGE_ID
|
||||
|
||||
def setUp(self):
|
||||
super(BaseVolumeTestCase, self).setUp()
|
||||
self.extension_manager = extension.ExtensionManager(
|
||||
"BaseVolumeTestCase")
|
||||
vol_tmpdir = tempfile.mkdtemp()
|
||||
self.flags(volumes_dir=vol_tmpdir)
|
||||
self.addCleanup(self._cleanup)
|
||||
self.volume = importutils.import_object(CONF.volume_manager)
|
||||
self.volume.message_api = mock.Mock()
|
||||
self.configuration = mock.Mock(conf.Configuration)
|
||||
self.context = context.get_admin_context()
|
||||
self.context.user_id = fake.USER_ID
|
||||
# NOTE(mriedem): The id is hard-coded here for tracking race fail
|
||||
# assertions with the notification code, it's part of an
|
||||
# elastic-recheck query so don't remove it or change it.
|
||||
self.project_id = '7f265bd4-3a85-465e-a899-5dc4854a86d3'
|
||||
self.context.project_id = self.project_id
|
||||
self.volume_params = {
|
||||
'status': 'creating',
|
||||
'host': CONF.host,
|
||||
'size': 1}
|
||||
self.mock_object(brick_lvm.LVM,
|
||||
'get_all_volume_groups',
|
||||
self.fake_get_all_volume_groups)
|
||||
fake_image.mock_image_service(self)
|
||||
self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
|
||||
self.mock_object(os.path, 'exists', lambda x: True)
|
||||
self.volume.driver.set_initialized()
|
||||
self.volume.stats = {'allocated_capacity_gb': 0,
|
||||
'pools': {}}
|
||||
# keep ordered record of what we execute
|
||||
self.called = []
|
||||
self.volume_api = cinder.volume.api.API()
|
||||
|
||||
def _cleanup(self):
|
||||
try:
|
||||
shutil.rmtree(CONF.volumes_dir)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
|
||||
return [{'name': 'cinder-volumes',
|
||||
'size': '5.00',
|
||||
'available': '2.50',
|
||||
'lv_count': '2',
|
||||
'uuid': 'vR1JU3-FAKE-C4A9-PQFh-Mctm-9FwA-Xwzc1m'}]
|
||||
|
||||
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
|
||||
@mock.patch('cinder.volume.flows.manager.create_volume.'
|
||||
'CreateVolumeFromSpecTask._clone_image_volume')
|
||||
def _create_volume_from_image(self, mock_clone_image_volume,
|
||||
mock_fetch_img,
|
||||
fakeout_copy_image_to_volume=False,
|
||||
fakeout_clone_image=False,
|
||||
clone_image_volume=False):
|
||||
"""Test function of create_volume_from_image.
|
||||
|
||||
Test cases call this function to create a volume from image, caller
|
||||
can choose whether to fake out copy_image_to_volume and clone_image,
|
||||
after calling this, test cases should check status of the volume.
|
||||
"""
|
||||
def fake_local_path(volume):
|
||||
return dst_path
|
||||
|
||||
def fake_copy_image_to_volume(context, volume,
|
||||
image_service, image_id):
|
||||
pass
|
||||
|
||||
def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize,
|
||||
size=None, throttle=None):
|
||||
pass
|
||||
|
||||
def fake_clone_image(ctx, volume_ref,
|
||||
image_location, image_meta,
|
||||
image_service):
|
||||
return {'provider_location': None}, True
|
||||
|
||||
dst_fd, dst_path = tempfile.mkstemp()
|
||||
os.close(dst_fd)
|
||||
self.mock_object(self.volume.driver, 'local_path', fake_local_path)
|
||||
if fakeout_clone_image:
|
||||
self.mock_object(self.volume.driver, 'clone_image',
|
||||
fake_clone_image)
|
||||
self.mock_object(image_utils, 'fetch_to_raw', fake_fetch_to_raw)
|
||||
if fakeout_copy_image_to_volume:
|
||||
self.mock_object(self.volume.driver, 'copy_image_to_volume',
|
||||
fake_copy_image_to_volume)
|
||||
mock_clone_image_volume.return_value = ({}, clone_image_volume)
|
||||
mock_fetch_img.return_value = mock.MagicMock(
|
||||
spec=tests_utils.get_file_spec())
|
||||
|
||||
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
|
||||
volume = tests_utils.create_volume(self.context, **self.volume_params)
|
||||
# creating volume testdata
|
||||
try:
|
||||
request_spec = {
|
||||
'volume_properties': self.volume_params,
|
||||
'image_id': image_id,
|
||||
}
|
||||
self.volume.create_volume(self.context, volume, request_spec)
|
||||
finally:
|
||||
# cleanup
|
||||
os.unlink(dst_path)
|
||||
volume = objects.Volume.get_by_id(self.context, volume.id)
|
||||
|
||||
return volume
|
||||
|
||||
|
||||
class AvailabilityZoneTestCase(BaseVolumeTestCase):
|
||||
class AvailabilityZoneTestCase(base.BaseVolumeTestCase):
|
||||
def setUp(self):
|
||||
super(AvailabilityZoneTestCase, self).setUp()
|
||||
self.get_all = self.patch(
|
||||
@ -317,59 +204,16 @@ class AvailabilityZoneTestCase(BaseVolumeTestCase):
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class VolumeTestCase(BaseVolumeTestCase):
|
||||
class VolumeTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(VolumeTestCase, self).setUp()
|
||||
self._clear_patch = mock.patch('cinder.volume.utils.clear_volume',
|
||||
autospec=True)
|
||||
self._clear_patch.start()
|
||||
self.patch('cinder.volume.utils.clear_volume', autospec=True)
|
||||
self.expected_status = 'available'
|
||||
self.service_id = 1
|
||||
|
||||
def tearDown(self):
|
||||
super(VolumeTestCase, self).tearDown()
|
||||
self._clear_patch.stop()
|
||||
|
||||
def test_init_host_clears_downloads(self):
|
||||
"""Test that init_host will unwedge a volume stuck in downloading."""
|
||||
volume = tests_utils.create_volume(self.context, status='downloading',
|
||||
size=0, host=CONF.host)
|
||||
self.volume.init_host()
|
||||
volume.refresh()
|
||||
self.assertEqual("error", volume.status)
|
||||
self.volume.delete_volume(self.context, volume)
|
||||
|
||||
def test_init_host_clears_uploads_available_volume(self):
|
||||
"""init_host will clean an available volume stuck in uploading."""
|
||||
volume = tests_utils.create_volume(self.context, status='uploading',
|
||||
size=0, host=CONF.host)
|
||||
self.volume.init_host()
|
||||
volume = objects.Volume.get_by_id(context.get_admin_context(),
|
||||
volume.id)
|
||||
self.assertEqual("available", volume.status)
|
||||
|
||||
def test_init_host_clears_uploads_in_use_volume(self):
|
||||
"""init_host will clean an in-use volume stuck in uploading."""
|
||||
volume = tests_utils.create_volume(self.context, status='uploading',
|
||||
size=0, host=CONF.host)
|
||||
fake_uuid = fakes.get_fake_uuid()
|
||||
tests_utils.attach_volume(self.context, volume.id, fake_uuid,
|
||||
'fake_host', '/dev/vda')
|
||||
self.volume.init_host()
|
||||
volume = objects.Volume.get_by_id(context.get_admin_context(),
|
||||
volume.id)
|
||||
self.assertEqual("in-use", volume.status)
|
||||
|
||||
def test_init_host_resumes_deletes(self):
|
||||
"""init_host will resume deleting volume in deleting status."""
|
||||
volume = tests_utils.create_volume(self.context, status='deleting',
|
||||
size=0, host=CONF.host)
|
||||
volume_id = volume['id']
|
||||
self.volume.init_host()
|
||||
self.assertRaises(exception.VolumeNotFound, db.volume_get,
|
||||
context.get_admin_context(), volume_id)
|
||||
|
||||
def test_init_host_count_allocated_capacity(self):
|
||||
@mock.patch('cinder.manager.CleanableManager.init_host')
|
||||
def test_init_host_count_allocated_capacity(self, init_host_mock):
|
||||
vol0 = tests_utils.create_volume(
|
||||
self.context, size=100, host=CONF.host)
|
||||
vol1 = tests_utils.create_volume(
|
||||
@ -384,7 +228,9 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
vol4 = tests_utils.create_volume(
|
||||
self.context, size=1024,
|
||||
host=volutils.append_host(CONF.host, 'pool2'))
|
||||
self.volume.init_host()
|
||||
self.volume.init_host(service_id=self.service_id)
|
||||
init_host_mock.assert_called_once_with(
|
||||
service_id=self.service_id, added_to_cluster=None)
|
||||
stats = self.volume.stats
|
||||
self.assertEqual(2020, stats['allocated_capacity_gb'])
|
||||
self.assertEqual(
|
||||
@ -427,7 +273,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
{'id': snap1.id, 'provider_id': '7 8 yyyy'}]
|
||||
mock_update.return_value = (volumes, snapshots)
|
||||
# initialize
|
||||
self.volume.init_host()
|
||||
self.volume.init_host(service_id=self.service_id)
|
||||
# Grab volume and snapshot objects
|
||||
vol0_obj = objects.Volume.get_by_id(context.get_admin_context(),
|
||||
vol0.id)
|
||||
@ -459,7 +305,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
snap1 = tests_utils.create_snapshot(self.context, vol1.id)
|
||||
mock_update.return_value = ([], [])
|
||||
# initialize
|
||||
self.volume.init_host()
|
||||
self.volume.init_host(service_id=self.service_id)
|
||||
# Grab volume and snapshot objects
|
||||
vol0_obj = objects.Volume.get_by_id(context.get_admin_context(),
|
||||
vol0.id)
|
||||
@ -481,16 +327,22 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
@mock.patch('cinder.volume.manager.VolumeManager.'
|
||||
'_include_resources_in_cluster')
|
||||
def test_init_host_cluster_not_changed(self, include_in_cluster_mock):
|
||||
self.volume.init_host(False)
|
||||
self.volume.init_host(added_to_cluster=False,
|
||||
service_id=self.service_id)
|
||||
include_in_cluster_mock.assert_not_called()
|
||||
|
||||
@mock.patch('cinder.objects.snapshot.SnapshotList.get_all',
|
||||
return_value=[])
|
||||
@mock.patch('cinder.objects.volume.VolumeList.get_all', return_value=[])
|
||||
@mock.patch('cinder.objects.volume.VolumeList.include_in_cluster')
|
||||
@mock.patch('cinder.objects.consistencygroup.ConsistencyGroupList.'
|
||||
'include_in_cluster')
|
||||
def test_init_host_added_to_cluster(self, vol_include_mock,
|
||||
cg_include_mock):
|
||||
def test_init_host_added_to_cluster(self, cg_include_mock,
|
||||
vol_include_mock, vol_get_all_mock,
|
||||
snap_get_all_mock):
|
||||
self.mock_object(self.volume, 'cluster', mock.sentinel.cluster)
|
||||
self.volume.init_host(True)
|
||||
self.volume.init_host(added_to_cluster=True,
|
||||
service_id=self.service_id)
|
||||
|
||||
vol_include_mock.assert_called_once_with(mock.ANY,
|
||||
mock.sentinel.cluster,
|
||||
@ -498,6 +350,10 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
cg_include_mock.assert_called_once_with(mock.ANY,
|
||||
mock.sentinel.cluster,
|
||||
host=self.volume.host)
|
||||
vol_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, filters={'cluster_name': mock.sentinel.cluster})
|
||||
snap_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, search_opts={'cluster_name': mock.sentinel.cluster})
|
||||
|
||||
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
|
||||
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
|
||||
@ -552,45 +408,6 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
self.volume.driver._initialized = False
|
||||
self.assertFalse(self.volume.is_working())
|
||||
|
||||
def test_create_volume_fails_with_creating_and_downloading_status(self):
|
||||
"""Test init_host in case of volume.
|
||||
|
||||
While the status of volume is 'creating' or 'downloading',
|
||||
volume process down.
|
||||
After process restarting this 'creating' status is changed to 'error'.
|
||||
"""
|
||||
for status in ['creating', 'downloading']:
|
||||
volume = tests_utils.create_volume(self.context, status=status,
|
||||
size=0, host=CONF.host)
|
||||
|
||||
self.volume.init_host()
|
||||
volume.refresh()
|
||||
self.assertEqual('error', volume.status)
|
||||
self.volume.delete_volume(self.context, volume)
|
||||
|
||||
def test_create_snapshot_fails_with_creating_status(self):
|
||||
"""Test init_host in case of snapshot.
|
||||
|
||||
While the status of snapshot is 'creating', volume process
|
||||
down. After process restarting this 'creating' status is
|
||||
changed to 'error'.
|
||||
"""
|
||||
volume = tests_utils.create_volume(self.context,
|
||||
**self.volume_params)
|
||||
snapshot = tests_utils.create_snapshot(
|
||||
self.context,
|
||||
volume['id'],
|
||||
status=fields.SnapshotStatus.CREATING)
|
||||
snap_id = snapshot['id']
|
||||
self.volume.init_host()
|
||||
|
||||
snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
|
||||
|
||||
self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status)
|
||||
|
||||
self.volume.delete_snapshot(self.context, snapshot_obj)
|
||||
self.volume.delete_volume(self.context, volume)
|
||||
|
||||
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
|
||||
@mock.patch.object(QUOTAS, 'reserve')
|
||||
@mock.patch.object(QUOTAS, 'commit')
|
||||
@ -1297,7 +1114,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
def test_delete_volume_not_found(self, mock_get_volume):
|
||||
"""Test delete volume moves on if the volume does not exist."""
|
||||
volume_id = '12345678-1234-5678-1234-567812345678'
|
||||
volume = objects.Volume(self.context, id=volume_id)
|
||||
volume = objects.Volume(self.context, status='available', id=volume_id)
|
||||
self.volume.delete_volume(self.context, volume)
|
||||
self.assertTrue(mock_get_volume.called)
|
||||
|
||||
@ -4685,16 +4502,6 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
self.context,
|
||||
snap)
|
||||
|
||||
def test_init_host_clears_deleting_snapshots(self):
|
||||
"""Test that init_host will delete a snapshot stuck in deleting."""
|
||||
volume = tests_utils.create_volume(self.context, status='deleting',
|
||||
size=1, host=CONF.host)
|
||||
snapshot = tests_utils.create_snapshot(self.context,
|
||||
volume.id, status='deleting')
|
||||
self.volume.init_host()
|
||||
self.assertRaises(exception.VolumeNotFound, volume.refresh)
|
||||
self.assertRaises(exception.SnapshotNotFound, snapshot.refresh)
|
||||
|
||||
@mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.'
|
||||
'manage_existing')
|
||||
@mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.'
|
||||
@ -4815,7 +4622,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class VolumeMigrationTestCase(BaseVolumeTestCase):
|
||||
class VolumeMigrationTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(VolumeMigrationTestCase, self).setUp()
|
||||
@ -5631,7 +5438,7 @@ class VolumeMigrationTestCase(BaseVolumeTestCase):
|
||||
self.context, volume.id)
|
||||
|
||||
|
||||
class ReplicationTestCase(BaseVolumeTestCase):
|
||||
class ReplicationTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
@mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host')
|
||||
@mock.patch.object(cinder.db, 'conditional_update')
|
||||
@ -5734,7 +5541,7 @@ class ReplicationTestCase(BaseVolumeTestCase):
|
||||
host=CONF.host)
|
||||
|
||||
|
||||
class CopyVolumeToImageTestCase(BaseVolumeTestCase):
|
||||
class CopyVolumeToImageTestCase(base.BaseVolumeTestCase):
|
||||
def fake_local_path(self, volume):
|
||||
return self.dst_path
|
||||
|
||||
@ -6008,7 +5815,7 @@ class CopyVolumeToImageTestCase(BaseVolumeTestCase):
|
||||
self.assertTrue(mock_delete.called)
|
||||
|
||||
|
||||
class GetActiveByWindowTestCase(BaseVolumeTestCase):
|
||||
class GetActiveByWindowTestCase(base.BaseVolumeTestCase):
|
||||
def setUp(self):
|
||||
super(GetActiveByWindowTestCase, self).setUp()
|
||||
self.ctx = context.get_admin_context(read_deleted="yes")
|
||||
@ -6761,7 +6568,7 @@ class VolumePolicyTestCase(test.TestCase):
|
||||
target)
|
||||
|
||||
|
||||
class ImageVolumeCacheTestCase(BaseVolumeTestCase):
|
||||
class ImageVolumeCacheTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ImageVolumeCacheTestCase, self).setUp()
|
||||
@ -6833,7 +6640,7 @@ class ImageVolumeCacheTestCase(BaseVolumeTestCase):
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class DiscardFlagTestCase(BaseVolumeTestCase):
|
||||
class DiscardFlagTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(DiscardFlagTestCase, self).setUp()
|
||||
|
177
cinder/tests/unit/test_volume_cleanup.py
Normal file
177
cinder/tests/unit/test_volume_cleanup.py
Normal file
@ -0,0 +1,177 @@
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
from cinder.objects import fields
|
||||
from cinder import service
|
||||
from cinder.tests.unit.api import fakes
|
||||
from cinder.tests.unit import utils as tests_utils
|
||||
from cinder.tests.unit import volume as base
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class VolumeCleanupTestCase(base.BaseVolumeTestCase):
|
||||
MOCK_WORKER = False
|
||||
|
||||
def setUp(self):
|
||||
super(VolumeCleanupTestCase, self).setUp()
|
||||
self.service_id = 1
|
||||
self.mock_object(service.Service, 'service_id', self.service_id)
|
||||
self.patch('cinder.volume.utils.clear_volume', autospec=True)
|
||||
|
||||
def _assert_workers_are_removed(self):
|
||||
workers = db.worker_get_all(self.context, read_deleted='yes')
|
||||
self.assertListEqual([], workers)
|
||||
|
||||
def test_init_host_clears_uploads_available_volume(self):
|
||||
"""init_host will clean an available volume stuck in uploading."""
|
||||
volume = tests_utils.create_volume(self.context, status='uploading',
|
||||
size=0, host=CONF.host)
|
||||
|
||||
db.worker_create(self.context, resource_type='Volume',
|
||||
resource_id=volume.id, status=volume.status,
|
||||
service_id=self.service_id)
|
||||
|
||||
self.volume.init_host(service_id=service.Service.service_id)
|
||||
volume.refresh()
|
||||
self.assertEqual("available", volume.status)
|
||||
self._assert_workers_are_removed()
|
||||
|
||||
@mock.patch('cinder.manager.CleanableManager.init_host')
|
||||
def test_init_host_clears_uploads_in_use_volume(self, init_host_mock):
|
||||
"""init_host will clean an in-use volume stuck in uploading."""
|
||||
volume = tests_utils.create_volume(self.context, status='uploading',
|
||||
size=0, host=CONF.host)
|
||||
|
||||
db.worker_create(self.context, resource_type='Volume',
|
||||
resource_id=volume.id, status=volume.status,
|
||||
service_id=self.service_id)
|
||||
|
||||
fake_uuid = fakes.get_fake_uuid()
|
||||
tests_utils.attach_volume(self.context, volume.id, fake_uuid,
|
||||
'fake_host', '/dev/vda')
|
||||
self.volume.init_host(service_id=mock.sentinel.service_id)
|
||||
init_host_mock.assert_called_once_with(
|
||||
service_id=mock.sentinel.service_id, added_to_cluster=None)
|
||||
volume.refresh()
|
||||
self.assertEqual("in-use", volume.status)
|
||||
self._assert_workers_are_removed()
|
||||
|
||||
def test_init_host_clears_downloads(self):
|
||||
"""Test that init_host will unwedge a volume stuck in downloading."""
|
||||
volume = tests_utils.create_volume(self.context, status='downloading',
|
||||
size=0, host=CONF.host)
|
||||
db.worker_create(self.context, resource_type='Volume',
|
||||
resource_id=volume.id, status=volume.status,
|
||||
service_id=self.service_id)
|
||||
mock_clear = self.mock_object(self.volume.driver, 'clear_download')
|
||||
|
||||
self.volume.init_host(service_id=service.Service.service_id)
|
||||
self.assertEqual(1, mock_clear.call_count)
|
||||
self.assertEqual(volume.id, mock_clear.call_args[0][1].id)
|
||||
volume.refresh()
|
||||
self.assertEqual("error", volume['status'])
|
||||
|
||||
self.volume.delete_volume(self.context, volume=volume)
|
||||
self._assert_workers_are_removed()
|
||||
|
||||
def test_init_host_resumes_deletes(self):
|
||||
"""init_host will resume deleting volume in deleting status."""
|
||||
volume = tests_utils.create_volume(self.context, status='deleting',
|
||||
size=0, host=CONF.host)
|
||||
|
||||
db.worker_create(self.context, resource_type='Volume',
|
||||
resource_id=volume.id, status=volume.status,
|
||||
service_id=self.service_id)
|
||||
|
||||
self.volume.init_host(service_id=service.Service.service_id)
|
||||
|
||||
self.assertRaises(exception.VolumeNotFound, db.volume_get,
|
||||
context.get_admin_context(), volume.id)
|
||||
self._assert_workers_are_removed()
|
||||
|
||||
def test_create_volume_fails_with_creating_and_downloading_status(self):
|
||||
"""Test init_host_with_service in case of volume.
|
||||
|
||||
While the status of volume is 'creating' or 'downloading',
|
||||
volume process down.
|
||||
After process restarting this 'creating' status is changed to 'error'.
|
||||
"""
|
||||
for status in ('creating', 'downloading'):
|
||||
volume = tests_utils.create_volume(self.context, status=status,
|
||||
size=0, host=CONF.host)
|
||||
|
||||
db.worker_create(self.context, resource_type='Volume',
|
||||
resource_id=volume.id, status=volume.status,
|
||||
service_id=self.service_id)
|
||||
|
||||
self.volume.init_host(service_id=service.Service.service_id)
|
||||
volume.refresh()
|
||||
|
||||
self.assertEqual('error', volume['status'])
|
||||
self.volume.delete_volume(self.context, volume)
|
||||
self._assert_workers_are_removed()
|
||||
|
||||
def test_create_snapshot_fails_with_creating_status(self):
|
||||
"""Test init_host_with_service in case of snapshot.
|
||||
|
||||
While the status of snapshot is 'creating', volume process
|
||||
down. After process restarting this 'creating' status is
|
||||
changed to 'error'.
|
||||
"""
|
||||
volume = tests_utils.create_volume(self.context,
|
||||
**self.volume_params)
|
||||
snapshot = tests_utils.create_snapshot(
|
||||
self.context,
|
||||
volume.id,
|
||||
status=fields.SnapshotStatus.CREATING)
|
||||
db.worker_create(self.context, resource_type='Snapshot',
|
||||
resource_id=snapshot.id, status=snapshot.status,
|
||||
service_id=self.service_id)
|
||||
|
||||
self.volume.init_host(service_id=service.Service.service_id)
|
||||
|
||||
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot.id)
|
||||
|
||||
self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status)
|
||||
self.assertEqual(service.Service.service_id,
|
||||
self.volume.service_id)
|
||||
self._assert_workers_are_removed()
|
||||
|
||||
self.volume.delete_snapshot(self.context, snapshot_obj)
|
||||
self.volume.delete_volume(self.context, volume)
|
||||
|
||||
def test_init_host_clears_deleting_snapshots(self):
|
||||
"""Test that init_host will delete a snapshot stuck in deleting."""
|
||||
volume = tests_utils.create_volume(self.context, status='deleting',
|
||||
size=1, host=CONF.host)
|
||||
snapshot = tests_utils.create_snapshot(self.context,
|
||||
volume.id, status='deleting')
|
||||
|
||||
db.worker_create(self.context, resource_type='Volume',
|
||||
resource_id=volume.id, status=volume.status,
|
||||
service_id=self.service_id)
|
||||
|
||||
self.volume.init_host(service_id=self.service_id)
|
||||
self.assertRaises(exception.VolumeNotFound, volume.refresh)
|
||||
self.assertRaises(exception.SnapshotNotFound, snapshot.refresh)
|
@ -0,0 +1,150 @@
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import importutils
|
||||
from stevedore import extension
|
||||
|
||||
from cinder.brick.local_dev import lvm as brick_lvm
|
||||
from cinder import context
|
||||
from cinder.image import image_utils
|
||||
from cinder import objects
|
||||
from cinder import test
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.tests.unit.image import fake as fake_image
|
||||
from cinder.tests.unit import utils as tests_utils
|
||||
from cinder.volume import api as volume_api
|
||||
from cinder.volume import configuration as conf
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class BaseVolumeTestCase(test.TestCase):
|
||||
"""Test Case for volumes."""
|
||||
|
||||
FAKE_UUID = fake.IMAGE_ID
|
||||
|
||||
def setUp(self, *args, **kwargs):
|
||||
super(BaseVolumeTestCase, self).setUp(*args, **kwargs)
|
||||
self.extension_manager = extension.ExtensionManager(
|
||||
"BaseVolumeTestCase")
|
||||
vol_tmpdir = tempfile.mkdtemp()
|
||||
self.flags(volumes_dir=vol_tmpdir)
|
||||
self.addCleanup(self._cleanup)
|
||||
self.volume = importutils.import_object(CONF.volume_manager)
|
||||
self.volume.message_api = mock.Mock()
|
||||
self.configuration = mock.Mock(conf.Configuration)
|
||||
self.context = context.get_admin_context()
|
||||
self.context.user_id = fake.USER_ID
|
||||
# NOTE(mriedem): The id is hard-coded here for tracking race fail
|
||||
# assertions with the notification code, it's part of an
|
||||
# elastic-recheck query so don't remove it or change it.
|
||||
self.project_id = '7f265bd4-3a85-465e-a899-5dc4854a86d3'
|
||||
self.context.project_id = self.project_id
|
||||
self.volume_params = {
|
||||
'status': 'creating',
|
||||
'host': CONF.host,
|
||||
'size': 1}
|
||||
self.mock_object(brick_lvm.LVM,
|
||||
'get_all_volume_groups',
|
||||
self.fake_get_all_volume_groups)
|
||||
fake_image.mock_image_service(self)
|
||||
self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
|
||||
self.mock_object(os.path, 'exists', lambda x: True)
|
||||
self.volume.driver.set_initialized()
|
||||
self.volume.stats = {'allocated_capacity_gb': 0,
|
||||
'pools': {}}
|
||||
# keep ordered record of what we execute
|
||||
self.called = []
|
||||
self.volume_api = volume_api.API()
|
||||
|
||||
def _cleanup(self):
|
||||
try:
|
||||
shutil.rmtree(CONF.volumes_dir)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
|
||||
return [{'name': 'cinder-volumes',
|
||||
'size': '5.00',
|
||||
'available': '2.50',
|
||||
'lv_count': '2',
|
||||
'uuid': 'vR1JU3-FAKE-C4A9-PQFh-Mctm-9FwA-Xwzc1m'}]
|
||||
|
||||
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
|
||||
@mock.patch('cinder.volume.flows.manager.create_volume.'
|
||||
'CreateVolumeFromSpecTask._clone_image_volume')
|
||||
def _create_volume_from_image(self, mock_clone_image_volume,
|
||||
mock_fetch_img,
|
||||
fakeout_copy_image_to_volume=False,
|
||||
fakeout_clone_image=False,
|
||||
clone_image_volume=False):
|
||||
"""Test function of create_volume_from_image.
|
||||
|
||||
Test cases call this function to create a volume from image, caller
|
||||
can choose whether to fake out copy_image_to_volume and clone_image,
|
||||
after calling this, test cases should check status of the volume.
|
||||
"""
|
||||
def fake_local_path(volume):
|
||||
return dst_path
|
||||
|
||||
def fake_copy_image_to_volume(context, volume,
|
||||
image_service, image_id):
|
||||
pass
|
||||
|
||||
def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize,
|
||||
size=None, throttle=None):
|
||||
pass
|
||||
|
||||
def fake_clone_image(ctx, volume_ref,
|
||||
image_location, image_meta,
|
||||
image_service):
|
||||
return {'provider_location': None}, True
|
||||
|
||||
dst_fd, dst_path = tempfile.mkstemp()
|
||||
os.close(dst_fd)
|
||||
self.mock_object(self.volume.driver, 'local_path', fake_local_path)
|
||||
if fakeout_clone_image:
|
||||
self.mock_object(self.volume.driver, 'clone_image',
|
||||
fake_clone_image)
|
||||
self.mock_object(image_utils, 'fetch_to_raw', fake_fetch_to_raw)
|
||||
if fakeout_copy_image_to_volume:
|
||||
self.mock_object(self.volume.driver, 'copy_image_to_volume',
|
||||
fake_copy_image_to_volume)
|
||||
mock_clone_image_volume.return_value = ({}, clone_image_volume)
|
||||
mock_fetch_img.return_value = mock.MagicMock(
|
||||
spec=tests_utils.get_file_spec())
|
||||
|
||||
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
|
||||
volume = tests_utils.create_volume(self.context, **self.volume_params)
|
||||
# creating volume testdata
|
||||
try:
|
||||
request_spec = {
|
||||
'volume_properties': self.volume_params,
|
||||
'image_id': image_id,
|
||||
}
|
||||
self.volume.create_volume(self.context, volume, request_spec)
|
||||
finally:
|
||||
# cleanup
|
||||
os.unlink(dst_path)
|
||||
volume = objects.Volume.get_by_id(self.context, volume.id)
|
||||
|
||||
return volume
|
@ -14,13 +14,12 @@
|
||||
|
||||
import mock
|
||||
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.tests.unit import fake_volume
|
||||
from cinder.tests.unit import test_volume
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.tests.unit import fake_volume
|
||||
from cinder.tests.unit import volume as base
|
||||
from cinder.volume.flows.manager import manage_existing
|
||||
from cinder.volume import manager
|
||||
from cinder.volume import utils
|
||||
@ -29,7 +28,7 @@ FAKE_HOST_POOL = 'volPool'
|
||||
FAKE_HOST = 'hostname@backend'
|
||||
|
||||
|
||||
class ManageVolumeTestCase(test_volume.BaseVolumeTestCase):
|
||||
class ManageVolumeTestCase(base.BaseVolumeTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ManageVolumeTestCase, self).setUp()
|
||||
|
@ -152,7 +152,8 @@ MAPPING = {
|
||||
}
|
||||
|
||||
|
||||
class VolumeManager(manager.SchedulerDependentManager):
|
||||
class VolumeManager(manager.CleanableManager,
|
||||
manager.SchedulerDependentManager):
|
||||
"""Manages attachable block storage devices."""
|
||||
|
||||
RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION
|
||||
@ -192,7 +193,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
LOG.debug("Cinder Volume DB check: vol_db_empty=%s", vol_db_empty)
|
||||
|
||||
# We pass the current setting for service.active_backend_id to
|
||||
# the driver on init, incase there was a restart or something
|
||||
# the driver on init, in case there was a restart or something
|
||||
curr_active_backend_id = None
|
||||
svc_host = vol_utils.extract_host(self.host, 'backend')
|
||||
try:
|
||||
@ -317,9 +318,9 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
|
||||
def _sync_provider_info(self, ctxt, volumes, snapshots):
|
||||
# NOTE(jdg): For now this just updates provider_id, we can add more
|
||||
# add more items to the update if they're relevant but we need
|
||||
# to be safe in what we allow and add a list of allowed keys
|
||||
# things that make sense are provider_*, replication_status etc
|
||||
# items to the update if they're relevant but we need to be safe in
|
||||
# what we allow and add a list of allowed keys. Things that make sense
|
||||
# are provider_*, replication_status etc
|
||||
|
||||
updates, snapshot_updates = self.driver.update_provider_info(
|
||||
volumes, snapshots)
|
||||
@ -370,7 +371,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
{'num_vols': num_vols, 'num_cgs': num_cgs,
|
||||
'host': self.host, 'cluster': self.cluster})
|
||||
|
||||
def init_host(self, added_to_cluster=None):
|
||||
def init_host(self, added_to_cluster=None, **kwargs):
|
||||
"""Perform any required initialization."""
|
||||
ctxt = context.get_admin_context()
|
||||
if not self.driver.supported:
|
||||
@ -407,14 +408,19 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
# Initialize backend capabilities list
|
||||
self.driver.init_capabilities()
|
||||
|
||||
volumes = objects.VolumeList.get_all_by_host(ctxt, self.host)
|
||||
snapshots = objects.SnapshotList.get_by_host(ctxt, self.host)
|
||||
if self.cluster:
|
||||
filters = {'cluster_name': self.cluster}
|
||||
else:
|
||||
filters = {'host': self.host}
|
||||
volumes = objects.VolumeList.get_all(ctxt, filters=filters)
|
||||
snapshots = objects.SnapshotList.get_all(ctxt, search_opts=filters)
|
||||
self._sync_provider_info(ctxt, volumes, snapshots)
|
||||
# FIXME volume count for exporting is wrong
|
||||
|
||||
try:
|
||||
self.stats['pools'] = {}
|
||||
self.stats.update({'allocated_capacity_gb': 0})
|
||||
|
||||
try:
|
||||
for volume in volumes:
|
||||
# available volume should also be counted into allocated
|
||||
if volume['status'] in ['in-use', 'available']:
|
||||
@ -428,32 +434,10 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
LOG.exception(_LE("Failed to re-export volume, "
|
||||
"setting to ERROR."),
|
||||
resource=volume)
|
||||
volume.status = 'error'
|
||||
volume.save()
|
||||
elif volume['status'] in ('downloading', 'creating'):
|
||||
LOG.warning(_LW("Detected volume stuck "
|
||||
"in %(curr_status)s "
|
||||
"status, setting to ERROR."),
|
||||
{'curr_status': volume['status']},
|
||||
resource=volume)
|
||||
volume.conditional_update({'status': 'error'},
|
||||
{'status': 'in-use'})
|
||||
# All other cleanups are processed by parent class CleanableManager
|
||||
|
||||
if volume['status'] == 'downloading':
|
||||
self.driver.clear_download(ctxt, volume)
|
||||
volume.status = 'error'
|
||||
volume.save()
|
||||
elif volume.status == 'uploading':
|
||||
# Set volume status to available or in-use.
|
||||
self.db.volume_update_status_based_on_attachment(
|
||||
ctxt, volume.id)
|
||||
else:
|
||||
pass
|
||||
snapshots = objects.SnapshotList.get_by_host(
|
||||
ctxt, self.host, {'status': fields.SnapshotStatus.CREATING})
|
||||
for snapshot in snapshots:
|
||||
LOG.warning(_LW("Detected snapshot stuck in creating "
|
||||
"status, setting to ERROR."), resource=snapshot)
|
||||
snapshot.status = fields.SnapshotStatus.ERROR
|
||||
snapshot.save()
|
||||
except Exception:
|
||||
LOG.exception(_LE("Error during re-export on driver init."),
|
||||
resource=volume)
|
||||
@ -466,26 +450,16 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
# that an entry exists in the service table
|
||||
self.driver.set_initialized()
|
||||
|
||||
for volume in volumes:
|
||||
if volume['status'] == 'deleting':
|
||||
if CONF.volume_service_inithost_offload:
|
||||
# Offload all the pending volume delete operations to the
|
||||
# threadpool to prevent the main volume service thread
|
||||
# from being blocked.
|
||||
self._add_to_threadpool(self.delete_volume, ctxt, volume,
|
||||
cascade=True)
|
||||
else:
|
||||
# By default, delete volumes sequentially
|
||||
self.delete_volume(ctxt, volume, cascade=True)
|
||||
LOG.info(_LI("Resume volume delete completed successfully."),
|
||||
resource=volume)
|
||||
|
||||
# collect and publish service capabilities
|
||||
self.publish_service_capabilities(ctxt)
|
||||
LOG.info(_LI("Driver initialization completed successfully."),
|
||||
resource={'type': 'driver',
|
||||
'id': self.driver.__class__.__name__})
|
||||
|
||||
# Make sure to call CleanableManager to do the cleanup
|
||||
super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster,
|
||||
**kwargs)
|
||||
|
||||
def init_host_with_rpc(self):
|
||||
LOG.info(_LI("Initializing RPC dependent components of volume "
|
||||
"driver %(driver_name)s (%(version)s)"),
|
||||
@ -527,6 +501,37 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
resource={'type': 'driver',
|
||||
'id': self.driver.__class__.__name__})
|
||||
|
||||
def _do_cleanup(self, ctxt, vo_resource):
|
||||
if isinstance(vo_resource, objects.Volume):
|
||||
if vo_resource.status == 'downloading':
|
||||
self.driver.clear_download(ctxt, vo_resource)
|
||||
|
||||
elif vo_resource.status == 'uploading':
|
||||
# Set volume status to available or in-use.
|
||||
self.db.volume_update_status_based_on_attachment(
|
||||
ctxt, vo_resource.id)
|
||||
|
||||
elif vo_resource.status == 'deleting':
|
||||
if CONF.volume_service_inithost_offload:
|
||||
# Offload all the pending volume delete operations to the
|
||||
# threadpool to prevent the main volume service thread
|
||||
# from being blocked.
|
||||
self._add_to_threadpool(self.delete_volume, ctxt,
|
||||
vo_resource, cascade=True)
|
||||
else:
|
||||
# By default, delete volumes sequentially
|
||||
self.delete_volume(ctxt, vo_resource, cascade=True)
|
||||
# We signal that we take care of cleaning the worker ourselves
|
||||
# (with set_workers decorator in delete_volume method) so
|
||||
# do_cleanup method doesn't need to remove it.
|
||||
return True
|
||||
|
||||
# For Volume creating and downloading and for Snapshot downloading
|
||||
# statuses we have to set status to error
|
||||
if vo_resource.status in ('creating', 'downloading'):
|
||||
vo_resource.status = 'error'
|
||||
vo_resource.save()
|
||||
|
||||
def is_working(self):
|
||||
"""Return if Manager is ready to accept requests.
|
||||
|
||||
@ -536,6 +541,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
"""
|
||||
return self.driver.initialized
|
||||
|
||||
@objects.Volume.set_workers
|
||||
def create_volume(self, context, volume, request_spec=None,
|
||||
filter_properties=None, allow_reschedule=True):
|
||||
"""Creates the volume."""
|
||||
@ -627,6 +633,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
return volume.id
|
||||
|
||||
@coordination.synchronized('{volume.id}-{f_name}')
|
||||
@objects.Volume.set_workers
|
||||
def delete_volume(self, context, volume, unmanage_only=False,
|
||||
cascade=False):
|
||||
"""Deletes and unexports volume.
|
||||
@ -786,6 +793,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
volume_ref.status = status
|
||||
volume_ref.save()
|
||||
|
||||
@objects.Snapshot.set_workers
|
||||
def create_snapshot(self, context, snapshot):
|
||||
"""Creates and exports the snapshot."""
|
||||
context = context.elevated()
|
||||
|
@ -164,6 +164,7 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
volume=volume)
|
||||
|
||||
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
|
||||
volume.create_worker()
|
||||
cctxt = self._get_cctxt(volume.host)
|
||||
msg_args = {
|
||||
'volume': volume, 'unmanage_only': unmanage_only,
|
||||
@ -173,6 +174,7 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
cctxt.cast(ctxt, 'delete_volume', **msg_args)
|
||||
|
||||
def create_snapshot(self, ctxt, volume, snapshot):
|
||||
snapshot.create_worker()
|
||||
cctxt = self._get_cctxt(volume['host'])
|
||||
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user