Make c-vol use workers table for cleanup

To be able to support multiple hosts working with the same resources we
have added the workers table to keep track of which host is working with
each specific resource.

This patch makes c-vol service work with this new table by adding
entries on cleanable operations and removing them once these operations
have completed.

Service cleanup on initialization has also been changed to use this new
table so hosts will cleanup only resources from operations they left on
the air and leave any operations that are being processed by other
hosts.

Specs: https://review.openstack.org/236977

Implements: blueprint cinder-volume-active-active-support
Change-Id: I4e5440b8450558add372214fd1a0373ab4ad2434
This commit is contained in:
Gorka Eguileor 2016-03-22 16:43:37 +01:00
parent 5a0bbf23d3
commit d2ec578725
27 changed files with 1087 additions and 323 deletions

View File

@ -80,6 +80,15 @@ class AdminController(wsgi.Controller):
action = '%s_admin_actions:%s' % (self.resource_name, action_name) action = '%s_admin_actions:%s' % (self.resource_name, action_name)
extensions.extension_authorizer('volume', action)(context) extensions.extension_authorizer('volume', action)(context)
def _remove_worker(self, context, id):
# Remove the cleanup worker from the DB when we change a resource
# status since it renders useless the entry.
res = db.worker_destroy(context, resource_type=self.collection.title(),
resource_id=id)
if res:
LOG.debug('Worker entry for %s with id %s has been deleted.',
self.collection, id)
@wsgi.action('os-reset_status') @wsgi.action('os-reset_status')
def _reset_status(self, req, id, body): def _reset_status(self, req, id, body):
"""Reset status on the resource.""" """Reset status on the resource."""
@ -106,6 +115,7 @@ class AdminController(wsgi.Controller):
# Not found exception will be handled at the wsgi level # Not found exception will be handled at the wsgi level
self._update(context, id, update) self._update(context, id, update)
self._remove_worker(context, id)
if update.get('attach_status') == 'detached': if update.get('attach_status') == 'detached':
_clean_volume_attachment(context, id) _clean_volume_attachment(context, id)

View File

@ -251,8 +251,8 @@ def volume_get(context, volume_id):
return IMPL.volume_get(context, volume_id) return IMPL.volume_get(context, volume_id)
def volume_get_all(context, marker, limit, sort_keys=None, sort_dirs=None, def volume_get_all(context, marker=None, limit=None, sort_keys=None,
filters=None, offset=None): sort_dirs=None, filters=None, offset=None):
"""Get all volumes.""" """Get all volumes."""
return IMPL.volume_get_all(context, marker, limit, sort_keys=sort_keys, return IMPL.volume_get_all(context, marker, limit, sort_keys=sort_keys,
sort_dirs=sort_dirs, filters=filters, sort_dirs=sort_dirs, filters=filters,
@ -1584,6 +1584,17 @@ def message_destroy(context, message_id):
################### ###################
def workers_init():
"""Check if DB supports subsecond resolution and set global flag.
MySQL 5.5 doesn't support subsecond resolution in datetime fields, so we
have to take it into account when working with the worker's table.
Once we drop support for MySQL 5.5 we can remove this method.
"""
return IMPL.workers_init()
def worker_create(context, **values): def worker_create(context, **values):
"""Create a worker entry from optional arguments.""" """Create a worker entry from optional arguments."""
return IMPL.worker_create(context, **values) return IMPL.worker_create(context, **values)

View File

@ -1751,8 +1751,8 @@ def volume_get(context, volume_id):
@require_admin_context @require_admin_context
def volume_get_all(context, marker, limit, sort_keys=None, sort_dirs=None, def volume_get_all(context, marker=None, limit=None, sort_keys=None,
filters=None, offset=None): sort_dirs=None, filters=None, offset=None):
"""Retrieves all volumes. """Retrieves all volumes.
If no sort parameters are specified then the returned volumes are sorted If no sort parameters are specified then the returned volumes are sorted
@ -1998,6 +1998,15 @@ def _process_volume_filters(query, filters):
LOG.debug("'migration_status' column could not be found.") LOG.debug("'migration_status' column could not be found.")
return None return None
host = filters.pop('host', None)
if host:
query = query.filter(_filter_host(models.Volume.host, host))
cluster_name = filters.pop('cluster_name', None)
if cluster_name:
query = query.filter(_filter_host(models.Volume.cluster_name,
cluster_name))
# Apply exact match filters for everything else, ensure that the # Apply exact match filters for everything else, ensure that the
# filter value exists on the model # filter value exists on the model
for key in filters.keys(): for key in filters.keys():
@ -2608,7 +2617,8 @@ def snapshot_get_all(context, filters=None, marker=None, limit=None,
order. order.
:param context: context to query under :param context: context to query under
:param filters: dictionary of filters; will do exact matching on values :param filters: dictionary of filters; will do exact matching on values.
Special keys host and cluster_name refer to the volume.
:param marker: the last item of the previous page, used to determine the :param marker: the last item of the previous page, used to determine the
next page of results to return next page of results to return
:param limit: maximum number of items to return :param limit: maximum number of items to return
@ -2618,7 +2628,9 @@ def snapshot_get_all(context, filters=None, marker=None, limit=None,
paired with corresponding item in sort_keys paired with corresponding item in sort_keys
:returns: list of matching snapshots :returns: list of matching snapshots
""" """
if filters and not is_valid_model_filters(models.Snapshot, filters): if filters and not is_valid_model_filters(models.Snapshot, filters,
exclude_list=('host',
'cluster_name')):
return [] return []
session = get_session() session = get_session()
@ -2642,8 +2654,19 @@ def _snaps_get_query(context, session=None, project_only=False):
def _process_snaps_filters(query, filters): def _process_snaps_filters(query, filters):
if filters: if filters:
# Ensure that filters' keys exist on the model # Ensure that filters' keys exist on the model
if not is_valid_model_filters(models.Snapshot, filters): if not is_valid_model_filters(models.Snapshot, filters,
exclude_list=('host', 'cluster_name')):
return None return None
filters = filters.copy()
host = filters.pop('host', None)
cluster = filters.pop('cluster_name', None)
if host or cluster:
query = query.join(models.Snapshot.volume)
vol_field = models.Volume
if host:
query = query.filter(_filter_host(vol_field.host, host))
if cluster:
query = query.filter(_filter_host(vol_field.cluster_name, cluster))
query = query.filter_by(**filters) query = query.filter_by(**filters)
return query return query
@ -5481,13 +5504,15 @@ def cgsnapshot_get(context, cgsnapshot_id):
return _cgsnapshot_get(context, cgsnapshot_id) return _cgsnapshot_get(context, cgsnapshot_id)
def is_valid_model_filters(model, filters): def is_valid_model_filters(model, filters, exclude_list=None):
"""Return True if filter values exist on the model """Return True if filter values exist on the model
:param model: a Cinder model :param model: a Cinder model
:param filters: dictionary of filters :param filters: dictionary of filters
""" """
for key in filters.keys(): for key in filters.keys():
if exclude_list and key in exclude_list:
continue
try: try:
getattr(model, key) getattr(model, key)
except AttributeError: except AttributeError:
@ -6055,7 +6080,7 @@ def image_volume_cache_get_all_for_host(context, host):
def _worker_query(context, session=None, until=None, db_filters=None, def _worker_query(context, session=None, until=None, db_filters=None,
**filters): ignore_sentinel=True, **filters):
# Remove all filters based on the workers table that are set to None # Remove all filters based on the workers table that are set to None
filters = _clean_filters(filters) filters = _clean_filters(filters)
@ -6064,6 +6089,11 @@ def _worker_query(context, session=None, until=None, db_filters=None,
query = model_query(context, models.Worker, session=session) query = model_query(context, models.Worker, session=session)
# TODO(geguileo): Once we remove support for MySQL 5.5 we can remove this
if ignore_sentinel:
# We don't want to retrieve the workers sentinel
query = query.filter(models.Worker.resource_type != 'SENTINEL')
if until: if until:
db_filters = list(db_filters) if db_filters else [] db_filters = list(db_filters) if db_filters else []
# Since we set updated_at at creation time we don't need to check # Since we set updated_at at creation time we don't need to check
@ -6079,8 +6109,41 @@ def _worker_query(context, session=None, until=None, db_filters=None,
return query return query
DB_SUPPORTS_SUBSECOND_RESOLUTION = True
def workers_init():
"""Check if DB supports subsecond resolution and set global flag.
MySQL 5.5 doesn't support subsecond resolution in datetime fields, so we
have to take it into account when working with the worker's table.
To do this we'll have 1 row in the DB, created by the migration script,
where we have tried to set the microseconds and we'll check it.
Once we drop support for MySQL 5.5 we can remove this method.
"""
global DB_SUPPORTS_SUBSECOND_RESOLUTION
session = get_session()
query = session.query(models.Worker).filter_by(resource_type='SENTINEL')
worker = query.first()
DB_SUPPORTS_SUBSECOND_RESOLUTION = bool(worker.updated_at.microsecond)
def _worker_set_updated_at_field(values):
# TODO(geguileo): Once we drop support for MySQL 5.5 we can simplify this
# method.
updated_at = values.get('updated_at', timeutils.utcnow())
if isinstance(updated_at, six.string_types):
return
if not DB_SUPPORTS_SUBSECOND_RESOLUTION:
updated_at = updated_at.replace(microsecond=0)
values['updated_at'] = updated_at
def worker_create(context, **values): def worker_create(context, **values):
"""Create a worker entry from optional arguments.""" """Create a worker entry from optional arguments."""
_worker_set_updated_at_field(values)
worker = models.Worker(**values) worker = models.Worker(**values)
session = get_session() session = get_session()
try: try:
@ -6118,6 +6181,11 @@ def worker_update(context, id, filters=None, orm_worker=None, **values):
"""Update a worker with given values.""" """Update a worker with given values."""
filters = filters or {} filters = filters or {}
query = _worker_query(context, id=id, **filters) query = _worker_query(context, id=id, **filters)
# If we want to update the orm_worker and we don't set the update_at field
# we set it here instead of letting SQLAlchemy do it to be able to update
# the orm_worker.
_worker_set_updated_at_field(values)
result = query.update(values) result = query.update(values)
if not result: if not result:
raise exception.WorkerNotFound(id=id, **filters) raise exception.WorkerNotFound(id=id, **filters)
@ -6131,6 +6199,7 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker):
# service_id is the same in the DB, thus flagging the claim. # service_id is the same in the DB, thus flagging the claim.
values = {'service_id': claimer_id, values = {'service_id': claimer_id,
'updated_at': timeutils.utcnow()} 'updated_at': timeutils.utcnow()}
_worker_set_updated_at_field(values)
# We only update the worker entry if it hasn't been claimed by other host # We only update the worker entry if it hasn't been claimed by other host
# or thread # or thread

View File

@ -0,0 +1,55 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import timeutils
from sqlalchemy.dialects import mysql
from sqlalchemy import MetaData, Table
def upgrade(migrate_engine):
"""Add microseconds precission on updated_at field in MySQL databases.
PostgreSQL, SQLite, and MSSQL have sub-second precission by default, but
MySQL defaults to second precision in DateTime fields, which creates
problems for the resource cleanup mechanism.
"""
meta = MetaData()
meta.bind = migrate_engine
workers = Table('workers', meta, autoload=True)
# This is only necessary for mysql, and since the table is not in use this
# will only be an schema update.
if migrate_engine.name.startswith('mysql'):
try:
workers.c.updated_at.alter(mysql.DATETIME(fsp=6))
except Exception:
# MySQL v5.5 or earlier don't support sub-second resolution so we
# may have cleanup races in Active-Active configurations, that's
# why upgrading is recommended in that case.
# Code in Cinder is capable of working with 5.5, so for 5.5 there's
# no problem
pass
# TODO(geguileo): Once we remove support for MySQL 5.5 we have to create
# an upgrade migration to remove this row.
# Set workers table sub-second support sentinel
wi = workers.insert()
now = timeutils.utcnow().replace(microsecond=123)
wi.execute({'created_at': now,
'updated_at': now,
'deleted': False,
'resource_type': 'SENTINEL',
'resource_id': 'SUB-SECOND',
'status': 'OK'})

View File

@ -56,9 +56,14 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_service import periodic_task from oslo_service import periodic_task
from oslo_utils import timeutils
from cinder import context
from cinder import db
from cinder.db import base from cinder.db import base
from cinder.i18n import _LI from cinder import exception
from cinder.i18n import _LE, _LI, _LW
from cinder import objects
from cinder import rpc from cinder import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder.scheduler import rpcapi as scheduler_rpcapi
@ -92,13 +97,14 @@ class Manager(base.Base, PeriodicTasks):
"""Tasks to be run at a periodic interval.""" """Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error) return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self, added_to_cluster=None): def init_host(self, service_id=None, added_to_cluster=None):
"""Handle initialization if this is a standalone service. """Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made A hook point for services to execute tasks before the services are made
available (i.e. showing up on RPC and starting to accept RPC calls) to available (i.e. showing up on RPC and starting to accept RPC calls) to
other components. Child classes should override this method. other components. Child classes should override this method.
:param service_id: ID of the service where the manager is running.
:param added_to_cluster: True when a host's cluster configuration has :param added_to_cluster: True when a host's cluster configuration has
changed from not being defined or being '' to changed from not being defined or being '' to
any other value and the DB service record any other value and the DB service record
@ -175,3 +181,101 @@ class SchedulerDependentManager(Manager):
def reset(self): def reset(self):
super(SchedulerDependentManager, self).reset() super(SchedulerDependentManager, self).reset()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
class CleanableManager(object):
def do_cleanup(self, context, cleanup_request):
LOG.info(_LI('Initiating service %s cleanup'),
cleanup_request.service_id)
# If the 'until' field in the cleanup request is not set, we default to
# this very moment.
until = cleanup_request.until or timeutils.utcnow()
keep_entry = False
to_clean = db.worker_get_all(
context,
resource_type=cleanup_request.resource_type,
resource_id=cleanup_request.resource_id,
service_id=cleanup_request.service_id,
until=until)
for clean in to_clean:
original_service_id = clean.service_id
original_time = clean.updated_at
# Try to do a soft delete to mark the entry as being cleaned up
# by us (setting service id to our service id).
res = db.worker_claim_for_cleanup(context,
claimer_id=self.service_id,
orm_worker=clean)
# Claim may fail if entry is being cleaned by another service, has
# been removed (finished cleaning) by another service or the user
# started a new cleanable operation.
# In any of these cases we don't have to do cleanup or remove the
# worker entry.
if not res:
continue
# Try to get versioned object for resource we have to cleanup
try:
vo_cls = getattr(objects, clean.resource_type)
vo = vo_cls.get_by_id(context, clean.resource_id)
# Set the worker DB entry in the VO and mark it as being a
# clean operation
clean.cleaning = True
vo.worker = clean
except exception.NotFound:
LOG.debug('Skipping cleanup for non existent %(type)s %(id)s.',
{'type': clean.resource_type,
'id': clean.resource_id})
else:
# Resource status should match
if vo.status != clean.status:
LOG.debug('Skipping cleanup for mismatching work on '
'%(type)s %(id)s: %(exp_sts)s <> %(found_sts)s.',
{'type': clean.resource_type,
'id': clean.resource_id,
'exp_sts': clean.status,
'found_sts': vo.status})
else:
LOG.info(_LI('Cleaning %(type)s with id %(id)s and status '
'%(status)s'),
{'type': clean.resource_type,
'id': clean.resource_id,
'status': clean.status},
resource=vo)
try:
# Some cleanup jobs are performed asynchronously, so
# we don't delete the worker entry, they'll take care
# of it
keep_entry = self._do_cleanup(context, vo)
except Exception:
LOG.exception(_LE('Could not perform cleanup.'))
# Return the worker DB entry to the original service
db.worker_update(context, clean.id,
service_id=original_service_id,
updated_at=original_time)
continue
# The resource either didn't exist or was properly cleaned, either
# way we can remove the entry from the worker table if the cleanup
# method doesn't want to keep the entry (for example for delayed
# deletion).
if not keep_entry and not db.worker_destroy(context, id=clean.id):
LOG.warning(_LW('Could not remove worker entry %s.'), clean.id)
LOG.info(_LI('Service %s cleanup completed.'),
cleanup_request.service_id)
def _do_cleanup(self, ctxt, vo_resource):
return False
def init_host(self, service_id, **kwargs):
ctxt = context.get_admin_context()
self.service_id = service_id
# TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove
# call to workers_init.
db.workers_init()
cleanup_request = objects.CleanupRequest(service_id=service_id)
self.do_cleanup(ctxt, cleanup_request)

View File

@ -119,6 +119,7 @@ OBJ_VERSIONS.add('1.11', {'GroupSnapshot': '1.0', 'GroupSnapshotList': '1.0',
OBJ_VERSIONS.add('1.12', {'VolumeType': '1.3'}) OBJ_VERSIONS.add('1.12', {'VolumeType': '1.3'})
OBJ_VERSIONS.add('1.13', {'CleanupRequest': '1.0'}) OBJ_VERSIONS.add('1.13', {'CleanupRequest': '1.0'})
OBJ_VERSIONS.add('1.14', {'VolumeAttachmentList': '1.1'}) OBJ_VERSIONS.add('1.14', {'VolumeAttachmentList': '1.1'})
OBJ_VERSIONS.add('1.15', {'Volume': '1.6', 'Snapshot': '1.2'})
class CinderObjectRegistry(base.VersionedObjectRegistry): class CinderObjectRegistry(base.VersionedObjectRegistry):

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from functools import wraps
import inspect import inspect
import decorator
from oslo_utils import versionutils from oslo_utils import versionutils
from cinder import db from cinder import db
@ -168,8 +168,7 @@ class CinderCleanableObject(base.CinderPersistentObject):
to be added. to be added.
""" """
def _decorator(f): def _decorator(f):
@wraps(f) def wrapper(f, *args, **kwargs):
def wrapper(*args, **kwargs):
if decorator_args: if decorator_args:
call_args = inspect.getcallargs(f, *args, **kwargs) call_args = inspect.getcallargs(f, *args, **kwargs)
candidates = [call_args[obj] for obj in decorator_args] candidates = [call_args[obj] for obj in decorator_args]
@ -201,7 +200,7 @@ class CinderCleanableObject(base.CinderPersistentObject):
except Exception: except Exception:
pass pass
return result return result
return wrapper return decorator.decorate(f, wrapper)
# If we don't have optional decorator arguments the argument in # If we don't have optional decorator arguments the argument in
# decorator_args is the function we have to decorate # decorator_args is the function we have to decorate

View File

@ -21,6 +21,7 @@ from cinder import exception
from cinder.i18n import _ from cinder.i18n import _
from cinder import objects from cinder import objects
from cinder.objects import base from cinder.objects import base
from cinder.objects import cleanable
from cinder.objects import fields as c_fields from cinder.objects import fields as c_fields
@ -28,11 +29,12 @@ CONF = cfg.CONF
@base.CinderObjectRegistry.register @base.CinderObjectRegistry.register
class Snapshot(base.CinderPersistentObject, base.CinderObject, class Snapshot(cleanable.CinderCleanableObject, base.CinderObject,
base.CinderObjectDictCompat): base.CinderObjectDictCompat):
# Version 1.0: Initial version # Version 1.0: Initial version
# Version 1.1: Changed 'status' field to use SnapshotStatusField # Version 1.1: Changed 'status' field to use SnapshotStatusField
VERSION = '1.1' # Version 1.2: This object is now cleanable (adds rows to workers table)
VERSION = '1.2'
# NOTE(thangp): OPTIONAL_FIELDS are fields that would be lazy-loaded. They # NOTE(thangp): OPTIONAL_FIELDS are fields that would be lazy-loaded. They
# are typically the relationship in the sqlalchemy object. # are typically the relationship in the sqlalchemy object.
@ -248,6 +250,13 @@ class Snapshot(base.CinderPersistentObject, base.CinderObject,
return db.snapshot_data_get_for_project(context, project_id, return db.snapshot_data_get_for_project(context, project_id,
volume_type_id) volume_type_id)
@staticmethod
def _is_cleanable(status, obj_version):
# Before 1.2 we didn't have workers table, so cleanup wasn't supported.
if obj_version and obj_version < 1.2:
return False
return status == 'creating'
@base.CinderObjectRegistry.register @base.CinderObjectRegistry.register
class SnapshotList(base.ObjectListBase, base.CinderObject): class SnapshotList(base.ObjectListBase, base.CinderObject):
@ -260,6 +269,11 @@ class SnapshotList(base.ObjectListBase, base.CinderObject):
@classmethod @classmethod
def get_all(cls, context, search_opts, marker=None, limit=None, def get_all(cls, context, search_opts, marker=None, limit=None,
sort_keys=None, sort_dirs=None, offset=None): sort_keys=None, sort_dirs=None, offset=None):
"""Get all snapshot given some search_opts (filters).
Special search options accepted are host and cluster_name, that refer
to the volume's fields.
"""
snapshots = db.snapshot_get_all(context, search_opts, marker, limit, snapshots = db.snapshot_get_all(context, search_opts, marker, limit,
sort_keys, sort_dirs, offset) sort_keys, sort_dirs, offset)
expected_attrs = Snapshot._get_expected_attrs(context) expected_attrs = Snapshot._get_expected_attrs(context)

View File

@ -21,6 +21,7 @@ from cinder import exception
from cinder.i18n import _ from cinder.i18n import _
from cinder import objects from cinder import objects
from cinder.objects import base from cinder.objects import base
from cinder.objects import cleanable
CONF = cfg.CONF CONF = cfg.CONF
@ -48,7 +49,7 @@ class MetadataObject(dict):
@base.CinderObjectRegistry.register @base.CinderObjectRegistry.register
class Volume(base.CinderPersistentObject, base.CinderObject, class Volume(cleanable.CinderCleanableObject, base.CinderObject,
base.CinderObjectDictCompat, base.CinderComparableObject, base.CinderObjectDictCompat, base.CinderComparableObject,
base.ClusteredObject): base.ClusteredObject):
# Version 1.0: Initial version # Version 1.0: Initial version
@ -58,7 +59,8 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
# Version 1.3: Added finish_volume_migration() # Version 1.3: Added finish_volume_migration()
# Version 1.4: Added cluster fields # Version 1.4: Added cluster fields
# Version 1.5: Added group # Version 1.5: Added group
VERSION = '1.5' # Version 1.6: This object is now cleanable (adds rows to workers table)
VERSION = '1.6'
OPTIONAL_FIELDS = ('metadata', 'admin_metadata', 'glance_metadata', OPTIONAL_FIELDS = ('metadata', 'admin_metadata', 'glance_metadata',
'volume_type', 'volume_attachment', 'consistencygroup', 'volume_type', 'volume_attachment', 'consistencygroup',
@ -364,6 +366,14 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
self.admin_metadata = db.volume_admin_metadata_update( self.admin_metadata = db.volume_admin_metadata_update(
self._context, self.id, metadata, True) self._context, self.id, metadata, True)
# When we are creating a volume and we change from 'creating'
# status to 'downloading' status we have to change the worker entry
# in the DB to reflect this change, otherwise the cleanup will
# not be performed as it will be mistaken for a volume that has
# been somehow changed (reset status, forced operation...)
if updates.get('status') == 'downloading':
self.set_worker()
db.volume_update(self._context, self.id, updates) db.volume_update(self._context, self.id, updates)
self.obj_reset_changes() self.obj_reset_changes()
@ -486,6 +496,14 @@ class Volume(base.CinderPersistentObject, base.CinderObject,
dest_volume.save() dest_volume.save()
return dest_volume return dest_volume
@staticmethod
def _is_cleanable(status, obj_version):
# Before 1.6 we didn't have workers table, so cleanup wasn't supported.
# cleaning.
if obj_version and obj_version < 1.6:
return False
return status in ('creating', 'deleting', 'uploading', 'downloading')
@base.CinderObjectRegistry.register @base.CinderObjectRegistry.register
class VolumeList(base.ObjectListBase, base.CinderObject): class VolumeList(base.ObjectListBase, base.CinderObject):
@ -523,8 +541,8 @@ class VolumeList(base.ObjectListBase, base.CinderObject):
return expected_attrs return expected_attrs
@classmethod @classmethod
def get_all(cls, context, marker, limit, sort_keys=None, sort_dirs=None, def get_all(cls, context, marker=None, limit=None, sort_keys=None,
filters=None, offset=None): sort_dirs=None, filters=None, offset=None):
volumes = db.volume_get_all(context, marker, limit, volumes = db.volume_get_all(context, marker, limit,
sort_keys=sort_keys, sort_dirs=sort_dirs, sort_keys=sort_keys, sort_dirs=sort_dirs,
filters=filters, offset=offset) filters=filters, offset=offset)

View File

@ -95,6 +95,7 @@ class SchedulerAPI(rpc.RPCAPI):
def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None, def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
request_spec=None, filter_properties=None): request_spec=None, filter_properties=None):
volume.create_worker()
cctxt = self._get_cctxt() cctxt = self._get_cctxt()
msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id, msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id,
'request_spec': request_spec, 'request_spec': request_spec,

View File

@ -121,7 +121,6 @@ class Service(service.Service):
on topic. It also periodically runs tasks on the manager and reports on topic. It also periodically runs tasks on the manager and reports
it state to the database services table. it state to the database services table.
""" """
# Make service_id a class attribute so it can be used for clean up # Make service_id a class attribute so it can be used for clean up
service_id = None service_id = None
@ -144,6 +143,8 @@ class Service(service.Service):
if CONF.profiler.enabled: if CONF.profiler.enabled:
manager_class = profiler.trace_cls("rpc")(manager_class) manager_class = profiler.trace_cls("rpc")(manager_class)
self.service = None
# NOTE(geguileo): We need to create the Service DB entry before we # NOTE(geguileo): We need to create the Service DB entry before we
# create the manager, otherwise capped versions for serializer and rpc # create the manager, otherwise capped versions for serializer and rpc
# client would use existing DB entries not including us, which could # client would use existing DB entries not including us, which could
@ -234,7 +235,8 @@ class Service(service.Service):
if self.coordination: if self.coordination:
coordination.COORDINATOR.start() coordination.COORDINATOR.start()
self.manager.init_host(added_to_cluster=self.added_to_cluster) self.manager.init_host(added_to_cluster=self.added_to_cluster,
service_id=Service.service_id)
LOG.debug("Creating RPC server for service %s", self.topic) LOG.debug("Creating RPC server for service %s", self.topic)

View File

@ -89,6 +89,7 @@ class TestCase(testtools.TestCase):
"""Test case base class for all unit tests.""" """Test case base class for all unit tests."""
POLICY_PATH = 'cinder/tests/unit/policy.json' POLICY_PATH = 'cinder/tests/unit/policy.json'
MOCK_WORKER = True
def _get_joined_notifier(self, *args, **kwargs): def _get_joined_notifier(self, *args, **kwargs):
# We create a new fake notifier but we join the notifications with # We create a new fake notifier but we join the notifications with
@ -110,6 +111,12 @@ class TestCase(testtools.TestCase):
side_effect=self._get_joined_notifier) side_effect=self._get_joined_notifier)
p.start() p.start()
if self.MOCK_WORKER:
# Mock worker creation for all tests that don't care about it
clean_path = 'cinder.objects.cleanable.CinderCleanableObject.%s'
for method in ('create_worker', 'set_worker', 'unset_worker'):
self.patch(clean_path % method, return_value=None)
# Unit tests do not need to use lazy gettext # Unit tests do not need to use lazy gettext
i18n.enable_lazy(False) i18n.enable_lazy(False)

View File

@ -23,8 +23,8 @@ from cinder import quota
from cinder.tests.unit import conf_fixture from cinder.tests.unit import conf_fixture
from cinder.tests.unit import fake_constants as fake from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import test_volume
from cinder.tests.unit import utils as tests_utils from cinder.tests.unit import utils as tests_utils
from cinder.tests.unit import volume as base
import cinder.volume import cinder.volume
from cinder.volume import driver from cinder.volume import driver
from cinder.volume import utils as volutils from cinder.volume import utils as volutils
@ -33,7 +33,7 @@ CGQUOTAS = quota.CGQUOTAS
CONF = cfg.CONF CONF = cfg.CONF
class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase): class ConsistencyGroupTestCase(base.BaseVolumeTestCase):
def test_delete_volume_in_consistency_group(self): def test_delete_volume_in_consistency_group(self):
"""Test deleting a volume that's tied to a consistency group fails.""" """Test deleting a volume that's tied to a consistency group fails."""
consistencygroup_id = fake.CONSISTENCY_GROUP_ID consistencygroup_id = fake.CONSISTENCY_GROUP_ID

View File

@ -21,8 +21,8 @@ from cinder import test
class BaseObjectsTestCase(test.TestCase): class BaseObjectsTestCase(test.TestCase):
def setUp(self): def setUp(self, *args, **kwargs):
super(BaseObjectsTestCase, self).setUp() super(BaseObjectsTestCase, self).setUp(*args, **kwargs)
self.user_id = 'fake-user' self.user_id = 'fake-user'
self.project_id = 'fake-project' self.project_id = 'fake-project'
self.context = context.RequestContext(self.user_id, self.project_id, self.context = context.RequestContext(self.user_id, self.project_id,

View File

@ -40,6 +40,7 @@ class Backup(cleanable.CinderCleanableObject):
class TestCleanable(test_objects.BaseObjectsTestCase): class TestCleanable(test_objects.BaseObjectsTestCase):
MOCK_WORKER = False
def setUp(self): def setUp(self):
super(TestCleanable, self).setUp() super(TestCleanable, self).setUp()

View File

@ -38,9 +38,9 @@ object_data = {
'RequestSpec': '1.1-b0bd1a28d191d75648901fa853e8a733', 'RequestSpec': '1.1-b0bd1a28d191d75648901fa853e8a733',
'Service': '1.4-c7d011989d1718ca0496ccf640b42712', 'Service': '1.4-c7d011989d1718ca0496ccf640b42712',
'ServiceList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'ServiceList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'Snapshot': '1.1-d6a9d58f627bb2a5cf804b0dd7a12bc7', 'Snapshot': '1.2-d6a9d58f627bb2a5cf804b0dd7a12bc7',
'SnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'SnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'Volume': '1.5-19919d8086d6a38ab9d3ab88139e70e0', 'Volume': '1.6-19919d8086d6a38ab9d3ab88139e70e0',
'VolumeList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'VolumeList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'VolumeAttachment': '1.0-b30dacf62b2030dd83d8a1603f1064ff', 'VolumeAttachment': '1.0-b30dacf62b2030dd83d8a1603f1064ff',
'VolumeAttachmentList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'VolumeAttachmentList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',

View File

@ -17,8 +17,6 @@
Unit Tests for cinder.scheduler.rpcapi Unit Tests for cinder.scheduler.rpcapi
""" """
import copy
import mock import mock
from cinder import context from cinder import context
@ -46,7 +44,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
"version": kwargs.pop('version', rpcapi.RPC_API_VERSION) "version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
} }
expected_msg = copy.deepcopy(kwargs) expected_msg = kwargs.copy()
self.fake_args = None self.fake_args = None
self.fake_kwargs = None self.fake_kwargs = None
@ -86,44 +84,65 @@ class SchedulerRpcAPITestCase(test.TestCase):
version='3.0') version='3.0')
def test_create_volume(self): def test_create_volume(self):
volume = fake_volume.fake_volume_obj(self.context)
create_worker_mock = self.mock_object(volume, 'create_worker')
self._test_scheduler_api('create_volume', self._test_scheduler_api('create_volume',
rpc_method='cast', rpc_method='cast',
snapshot_id='snapshot_id', snapshot_id='snapshot_id',
image_id='image_id', image_id='image_id',
request_spec='fake_request_spec', request_spec='fake_request_spec',
filter_properties='filter_properties', filter_properties='filter_properties',
volume=fake_volume.fake_volume_obj( volume=volume,
self.context),
version='3.0') version='3.0')
create_worker_mock.assert_called_once()
def test_create_volume_serialization(self):
volume = fake_volume.fake_volume_obj(self.context)
create_worker_mock = self.mock_object(volume, 'create_worker')
self._test_scheduler_api('create_volume',
rpc_method='cast',
snapshot_id='snapshot_id',
image_id='image_id',
request_spec={'volume_type': {}},
filter_properties='filter_properties',
volume=volume,
version='3.0')
create_worker_mock.assert_called_once()
def test_migrate_volume_to_host(self): def test_migrate_volume_to_host(self):
volume = fake_volume.fake_volume_obj(self.context)
create_worker_mock = self.mock_object(volume, 'create_worker')
self._test_scheduler_api('migrate_volume_to_host', self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast', rpc_method='cast',
host='host', host='host',
force_host_copy=True, force_host_copy=True,
request_spec='fake_request_spec', request_spec='fake_request_spec',
filter_properties='filter_properties', filter_properties='filter_properties',
volume=fake_volume.fake_volume_obj( volume=volume,
self.context),
version='3.0') version='3.0')
create_worker_mock.assert_not_called()
def test_retype(self): def test_retype(self):
volume = fake_volume.fake_volume_obj(self.context)
create_worker_mock = self.mock_object(volume, 'create_worker')
self._test_scheduler_api('retype', self._test_scheduler_api('retype',
rpc_method='cast', rpc_method='cast',
request_spec='fake_request_spec', request_spec='fake_request_spec',
filter_properties='filter_properties', filter_properties='filter_properties',
volume=fake_volume.fake_volume_obj( volume=volume,
self.context),
version='3.0') version='3.0')
create_worker_mock.assert_not_called()
def test_manage_existing(self): def test_manage_existing(self):
volume = fake_volume.fake_volume_obj(self.context)
create_worker_mock = self.mock_object(volume, 'create_worker')
self._test_scheduler_api('manage_existing', self._test_scheduler_api('manage_existing',
rpc_method='cast', rpc_method='cast',
request_spec='fake_request_spec', request_spec='fake_request_spec',
filter_properties='filter_properties', filter_properties='filter_properties',
volume=fake_volume.fake_volume_obj( volume=volume,
self.context),
version='3.0') version='3.0')
create_worker_mock.assert_not_called()
def test_get_pools(self): def test_get_pools(self):
self._test_scheduler_api('get_pools', self._test_scheduler_api('get_pools',

View File

@ -0,0 +1,227 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import timeutils
from cinder import context
from cinder import db
from cinder import manager
from cinder import objects
from cinder import test
from cinder.tests.unit import fake_constants
from cinder.tests.unit import utils
class FakeManager(manager.CleanableManager):
def __init__(self, service_id=None, keep_after_clean=False):
if service_id:
self.service_id = service_id
self.keep_after_clean = keep_after_clean
def _do_cleanup(self, ctxt, vo_resource):
vo_resource.status += '_cleaned'
vo_resource.save()
return self.keep_after_clean
class TestCleanableManager(test.TestCase):
def setUp(self):
super(TestCleanableManager, self).setUp()
self.user_id = fake_constants.USER_ID
self.project_id = fake_constants.PROJECT_ID
self.context = context.RequestContext(self.user_id, self.project_id,
is_admin=True)
self.service = db.service_create(self.context, {})
@mock.patch('cinder.db.workers_init', autospec=True)
@mock.patch('cinder.manager.CleanableManager.do_cleanup', autospec=True)
def test_init_host_with_service(self, mock_cleanup, mock_workers_init):
mngr = FakeManager()
self.assertFalse(hasattr(mngr, 'service_id'))
mngr.init_host(service_id=self.service.id)
self.assertEqual(self.service.id, mngr.service_id)
mock_cleanup.assert_called_once_with(mngr, mock.ANY, mock.ANY)
clean_req = mock_cleanup.call_args[0][2]
self.assertIsInstance(clean_req, objects.CleanupRequest)
self.assertEqual(self.service.id, clean_req.service_id)
mock_workers_init.assert_called_once_with()
def test_do_cleanup(self):
"""Basic successful cleanup."""
vol = utils.create_volume(self.context, status='creating')
db.worker_create(self.context, status='creating',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id)
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id)
mngr.do_cleanup(self.context, clean_req)
self.assertListEqual([], db.worker_get_all(self.context))
vol.refresh()
self.assertEqual('creating_cleaned', vol.status)
def test_do_cleanup_not_cleaning_already_claimed(self):
"""Basic cleanup that doesn't touch already cleaning works."""
vol = utils.create_volume(self.context, status='creating')
worker1 = db.worker_create(self.context, status='creating',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id)
worker1 = db.worker_get(self.context, id=worker1.id)
vol2 = utils.create_volume(self.context, status='deleting')
worker2 = db.worker_create(self.context, status='deleting',
resource_type='Volume', resource_id=vol2.id,
service_id=self.service.id + 1)
worker2 = db.worker_get(self.context, id=worker2.id)
# Simulate that the change to vol2 worker happened between
# worker_get_all and trying to claim a work for cleanup
worker2.service_id = self.service.id
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id)
with mock.patch('cinder.db.worker_get_all') as get_all_mock:
get_all_mock.return_value = [worker1, worker2]
mngr.do_cleanup(self.context, clean_req)
workers = db.worker_get_all(self.context)
self.assertEqual(1, len(workers))
self.assertEqual(worker2.id, workers[0].id)
vol.refresh()
self.assertEqual('creating_cleaned', vol.status)
vol2.refresh()
self.assertEqual('deleting', vol2.status)
def test_do_cleanup_not_cleaning_already_claimed_by_us(self):
"""Basic cleanup that doesn't touch other thread's claimed works."""
original_time = timeutils.utcnow()
other_thread_claimed_time = timeutils.utcnow()
vol = utils.create_volume(self.context, status='creating')
worker1 = db.worker_create(self.context, status='creating',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id,
updated_at=original_time)
worker1 = db.worker_get(self.context, id=worker1.id)
vol2 = utils.create_volume(self.context, status='deleting')
worker2 = db.worker_create(self.context, status='deleting',
resource_type='Volume', resource_id=vol2.id,
service_id=self.service.id,
updated_at=other_thread_claimed_time)
worker2 = db.worker_get(self.context, id=worker2.id)
# Simulate that the change to vol2 worker happened between
# worker_get_all and trying to claim a work for cleanup
worker2.updated_at = original_time
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id)
with mock.patch('cinder.db.worker_get_all') as get_all_mock:
get_all_mock.return_value = [worker1, worker2]
mngr.do_cleanup(self.context, clean_req)
workers = db.worker_get_all(self.context)
self.assertEqual(1, len(workers))
self.assertEqual(worker2.id, workers[0].id)
vol.refresh()
self.assertEqual('creating_cleaned', vol.status)
vol2.refresh()
self.assertEqual('deleting', vol2.status)
def test_do_cleanup_resource_deleted(self):
"""Cleanup on a resource that's been already deleted."""
vol = utils.create_volume(self.context, status='creating')
db.worker_create(self.context, status='creating',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id)
vol.destroy()
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id)
mngr.do_cleanup(self.context, clean_req)
workers = db.worker_get_all(self.context)
self.assertListEqual([], workers)
def test_do_cleanup_resource_on_another_service(self):
"""Cleanup on a resource that's been claimed by other service."""
vol = utils.create_volume(self.context, status='deleting')
db.worker_create(self.context, status='deleting',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id + 1)
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id)
mngr.do_cleanup(self.context, clean_req)
workers = db.worker_get_all(self.context)
self.assertEqual(1, len(workers))
vol.refresh()
self.assertEqual('deleting', vol.status)
def test_do_cleanup_resource_changed_status(self):
"""Cleanup on a resource that's changed status."""
vol = utils.create_volume(self.context, status='available')
db.worker_create(self.context, status='creating',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id)
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id)
mngr.do_cleanup(self.context, clean_req)
workers = db.worker_get_all(self.context)
self.assertListEqual([], workers)
vol.refresh()
self.assertEqual('available', vol.status)
def test_do_cleanup_keep_worker(self):
"""Cleanup on a resource that will remove worker when cleaning up."""
vol = utils.create_volume(self.context, status='deleting')
db.worker_create(self.context, status='deleting',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id)
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id, keep_after_clean=True)
mngr.do_cleanup(self.context, clean_req)
workers = db.worker_get_all(self.context)
self.assertEqual(1, len(workers))
vol.refresh()
self.assertEqual('deleting_cleaned', vol.status)
@mock.patch.object(FakeManager, '_do_cleanup', side_effect=Exception)
def test_do_cleanup_revive_on_cleanup_fail(self, mock_clean):
"""Cleanup will revive a worker if cleanup fails."""
vol = utils.create_volume(self.context, status='creating')
db.worker_create(self.context, status='creating',
resource_type='Volume', resource_id=vol.id,
service_id=self.service.id)
clean_req = objects.CleanupRequest(service_id=self.service.id)
mngr = FakeManager(self.service.id)
mngr.do_cleanup(self.context, clean_req)
workers = db.worker_get_all(self.context)
self.assertEqual(1, len(workers))
vol.refresh()
self.assertEqual('creating', vol.status)

View File

@ -407,6 +407,21 @@ class DBAPIVolumeTestCase(BaseTest):
self._assertEqualListsOfObjects(volumes, db.volume_get_all( self._assertEqualListsOfObjects(volumes, db.volume_get_all(
self.ctxt, None, None, ['host'], None)) self.ctxt, None, None, ['host'], None))
@ddt.data('cluster_name', 'host')
def test_volume_get_all_filter_host_and_cluster(self, field):
volumes = []
for i in range(2):
for value in ('host%d@backend#pool', 'host%d@backend', 'host%d'):
kwargs = {field: value % i}
volumes.append(utils.create_volume(self.ctxt, **kwargs))
for i in range(3):
filters = {field: getattr(volumes[i], field)}
result = db.volume_get_all(self.ctxt, filters=filters)
self.assertEqual(i + 1, len(result))
self.assertSetEqual({v.id for v in volumes[:i + 1]},
{v.id for v in result})
def test_volume_get_all_marker_passed(self): def test_volume_get_all_marker_passed(self):
volumes = [ volumes = [
db.volume_create(self.ctxt, {'id': 1}), db.volume_create(self.ctxt, {'id': 1}),
@ -1277,6 +1292,7 @@ class DBAPIVolumeTestCase(BaseTest):
db_vols[i].cluster_name) db_vols[i].cluster_name)
@ddt.ddt
class DBAPISnapshotTestCase(BaseTest): class DBAPISnapshotTestCase(BaseTest):
"""Tests for cinder.db.api.snapshot_*.""" """Tests for cinder.db.api.snapshot_*."""
@ -1367,6 +1383,24 @@ class DBAPISnapshotTestCase(BaseTest):
filters), filters),
ignored_keys=['metadata', 'volume']) ignored_keys=['metadata', 'volume'])
@ddt.data('cluster_name', 'host')
def test_snapshot_get_all_filter_host_and_cluster(self, field):
volumes = []
snapshots = []
for i in range(2):
for value in ('host%d@backend#pool', 'host%d@backend', 'host%d'):
kwargs = {field: value % i}
vol = utils.create_volume(self.ctxt, **kwargs)
volumes.append(vol)
snapshots.append(utils.create_snapshot(self.ctxt, vol.id))
for i in range(3):
filters = {field: getattr(volumes[i], field)}
result = db.snapshot_get_all(self.ctxt, filters=filters)
self.assertEqual(i + 1, len(result))
self.assertSetEqual({s.id for s in snapshots[:i + 1]},
{s.id for s in result})
def test_snapshot_get_by_host(self): def test_snapshot_get_by_host(self):
db.volume_create(self.ctxt, {'id': 1, 'host': 'host1'}) db.volume_create(self.ctxt, {'id': 1, 'host': 'host1'})
db.volume_create(self.ctxt, {'id': 2, 'host': 'host2'}) db.volume_create(self.ctxt, {'id': 2, 'host': 'host2'})

View File

@ -15,9 +15,11 @@
"""Unit tests for cinder.db.api.Worker""" """Unit tests for cinder.db.api.Worker"""
from datetime import datetime
import time import time
import uuid import uuid
import mock
from oslo_db import exception as db_exception from oslo_db import exception as db_exception
import six import six
@ -40,12 +42,41 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
super(DBAPIWorkerTestCase, self).setUp() super(DBAPIWorkerTestCase, self).setUp()
self.ctxt = context.get_admin_context() self.ctxt = context.get_admin_context()
def tearDown(self):
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = True
super(DBAPIWorkerTestCase, self).tearDown()
def test_workers_init(self):
# SQLite supports subsecond resolution so result is True
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = None
db.workers_init()
self.assertTrue(db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION)
def test_workers_init_not_supported(self):
# Fake a Db that doesn't support sub-second resolution in datetimes
db.worker_update(
self.ctxt, None,
{'resource_type': 'SENTINEL', 'ignore_sentinel': False},
updated_at=datetime.utcnow().replace(microsecond=0))
db.workers_init()
self.assertFalse(db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION)
def test_worker_create_and_get(self): def test_worker_create_and_get(self):
"""Test basic creation of a worker record.""" """Test basic creation of a worker record."""
worker = db.worker_create(self.ctxt, **self.worker_fields) worker = db.worker_create(self.ctxt, **self.worker_fields)
db_worker = db.worker_get(self.ctxt, id=worker.id) db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker) self._assertEqualObjects(worker, db_worker)
@mock.patch('oslo_utils.timeutils.utcnow',
return_value=datetime.utcnow().replace(microsecond=123))
def test_worker_create_no_subsecond(self, mock_utcnow):
"""Test basic creation of a worker record."""
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = False
worker = db.worker_create(self.ctxt, **self.worker_fields)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker)
self.assertEqual(0, db_worker.updated_at.microsecond)
def test_worker_create_unique_constrains(self): def test_worker_create_unique_constrains(self):
"""Test when we use an already existing resource type and id.""" """Test when we use an already existing resource type and id."""
db.worker_create(self.ctxt, **self.worker_fields) db.worker_create(self.ctxt, **self.worker_fields)
@ -131,6 +162,21 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
db_worker = db.worker_get(self.ctxt, id=worker.id) db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker, ['updated_at']) self._assertEqualObjects(worker, db_worker, ['updated_at'])
def test_worker_update_no_subsecond(self):
"""Test basic worker update."""
db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = False
worker = self._create_workers(1)[0]
worker = db.worker_get(self.ctxt, id=worker.id)
now = datetime.utcnow().replace(microsecond=123)
with mock.patch('oslo_utils.timeutils.utcnow', return_value=now):
res = db.worker_update(self.ctxt, worker.id, service_id=1)
self.assertEqual(1, res)
worker.service_id = 1
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker, ['updated_at'])
self.assertEqual(0, db_worker.updated_at.microsecond)
def test_worker_update_update_orm(self): def test_worker_update_update_orm(self):
"""Test worker update updating the worker orm object.""" """Test worker update updating the worker orm object."""
worker = self._create_workers(1)[0] worker = self._create_workers(1)[0]
@ -139,7 +185,9 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
self.assertEqual(1, res) self.assertEqual(1, res)
db_worker = db.worker_get(self.ctxt, id=worker.id) db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker, ['updated_at']) # If we are updating the ORM object we don't ignore the update_at field
# because it will get updated in the ORM instance.
self._assertEqualObjects(worker, db_worker)
def test_worker_destroy(self): def test_worker_destroy(self):
"""Test that worker destroy really deletes the DB entry.""" """Test that worker destroy really deletes the DB entry."""
@ -152,7 +200,7 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
def test_worker_destroy_non_existent(self): def test_worker_destroy_non_existent(self):
"""Test that worker destroy returns 0 when entry doesn't exist.""" """Test that worker destroy returns 0 when entry doesn't exist."""
res = db.worker_destroy(self.ctxt, id=1) res = db.worker_destroy(self.ctxt, id=100)
self.assertEqual(0, res) self.assertEqual(0, res)
def test_worker_claim(self): def test_worker_claim(self):

View File

@ -427,7 +427,8 @@ class ServiceTestCase(test.TestCase):
# Since we have created the service entry we call init_host with # Since we have created the service entry we call init_host with
# added_to_cluster=True # added_to_cluster=True
init_host_mock.assert_called_once_with( init_host_mock.assert_called_once_with(
added_to_cluster=added_to_cluster) added_to_cluster=added_to_cluster,
service_id=self.service_ref['id'])
expected_target_calls = [mock.call(topic=self.topic, server=self.host)] expected_target_calls = [mock.call(topic=self.topic, server=self.host)]
expected_rpc_calls = [mock.call(target_mock.return_value, mock.ANY, expected_rpc_calls = [mock.call(target_mock.return_value, mock.ANY,

View File

@ -36,7 +36,6 @@ from oslo_utils import importutils
from oslo_utils import timeutils from oslo_utils import timeutils
from oslo_utils import units from oslo_utils import units
import six import six
from stevedore import extension
from taskflow.engines.action_engine import engine from taskflow.engines.action_engine import engine
from cinder.api import common from cinder.api import common
@ -65,6 +64,7 @@ from cinder.tests.unit import fake_volume
from cinder.tests.unit.image import fake as fake_image from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit.keymgr import fake as fake_keymgr from cinder.tests.unit.keymgr import fake as fake_keymgr
from cinder.tests.unit import utils as tests_utils from cinder.tests.unit import utils as tests_utils
from cinder.tests.unit import volume as base
from cinder import utils from cinder import utils
import cinder.volume import cinder.volume
from cinder.volume import api as volume_api from cinder.volume import api as volume_api
@ -124,120 +124,7 @@ class FakeImageService(object):
'status': 'active'} 'status': 'active'}
class BaseVolumeTestCase(test.TestCase): class AvailabilityZoneTestCase(base.BaseVolumeTestCase):
"""Test Case for volumes."""
FAKE_UUID = fake.IMAGE_ID
def setUp(self):
super(BaseVolumeTestCase, self).setUp()
self.extension_manager = extension.ExtensionManager(
"BaseVolumeTestCase")
vol_tmpdir = tempfile.mkdtemp()
self.flags(volumes_dir=vol_tmpdir)
self.addCleanup(self._cleanup)
self.volume = importutils.import_object(CONF.volume_manager)
self.volume.message_api = mock.Mock()
self.configuration = mock.Mock(conf.Configuration)
self.context = context.get_admin_context()
self.context.user_id = fake.USER_ID
# NOTE(mriedem): The id is hard-coded here for tracking race fail
# assertions with the notification code, it's part of an
# elastic-recheck query so don't remove it or change it.
self.project_id = '7f265bd4-3a85-465e-a899-5dc4854a86d3'
self.context.project_id = self.project_id
self.volume_params = {
'status': 'creating',
'host': CONF.host,
'size': 1}
self.mock_object(brick_lvm.LVM,
'get_all_volume_groups',
self.fake_get_all_volume_groups)
fake_image.mock_image_service(self)
self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.mock_object(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0,
'pools': {}}
# keep ordered record of what we execute
self.called = []
self.volume_api = cinder.volume.api.API()
def _cleanup(self):
try:
shutil.rmtree(CONF.volumes_dir)
except OSError:
pass
def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
return [{'name': 'cinder-volumes',
'size': '5.00',
'available': '2.50',
'lv_count': '2',
'uuid': 'vR1JU3-FAKE-C4A9-PQFh-Mctm-9FwA-Xwzc1m'}]
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask._clone_image_volume')
def _create_volume_from_image(self, mock_clone_image_volume,
mock_fetch_img,
fakeout_copy_image_to_volume=False,
fakeout_clone_image=False,
clone_image_volume=False):
"""Test function of create_volume_from_image.
Test cases call this function to create a volume from image, caller
can choose whether to fake out copy_image_to_volume and clone_image,
after calling this, test cases should check status of the volume.
"""
def fake_local_path(volume):
return dst_path
def fake_copy_image_to_volume(context, volume,
image_service, image_id):
pass
def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize,
size=None, throttle=None):
pass
def fake_clone_image(ctx, volume_ref,
image_location, image_meta,
image_service):
return {'provider_location': None}, True
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
self.mock_object(self.volume.driver, 'local_path', fake_local_path)
if fakeout_clone_image:
self.mock_object(self.volume.driver, 'clone_image',
fake_clone_image)
self.mock_object(image_utils, 'fetch_to_raw', fake_fetch_to_raw)
if fakeout_copy_image_to_volume:
self.mock_object(self.volume.driver, 'copy_image_to_volume',
fake_copy_image_to_volume)
mock_clone_image_volume.return_value = ({}, clone_image_volume)
mock_fetch_img.return_value = mock.MagicMock(
spec=tests_utils.get_file_spec())
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
volume = tests_utils.create_volume(self.context, **self.volume_params)
# creating volume testdata
try:
request_spec = {
'volume_properties': self.volume_params,
'image_id': image_id,
}
self.volume.create_volume(self.context, volume, request_spec)
finally:
# cleanup
os.unlink(dst_path)
volume = objects.Volume.get_by_id(self.context, volume.id)
return volume
class AvailabilityZoneTestCase(BaseVolumeTestCase):
def setUp(self): def setUp(self):
super(AvailabilityZoneTestCase, self).setUp() super(AvailabilityZoneTestCase, self).setUp()
self.get_all = self.patch( self.get_all = self.patch(
@ -317,59 +204,16 @@ class AvailabilityZoneTestCase(BaseVolumeTestCase):
@ddt.ddt @ddt.ddt
class VolumeTestCase(BaseVolumeTestCase): class VolumeTestCase(base.BaseVolumeTestCase):
def setUp(self): def setUp(self):
super(VolumeTestCase, self).setUp() super(VolumeTestCase, self).setUp()
self._clear_patch = mock.patch('cinder.volume.utils.clear_volume', self.patch('cinder.volume.utils.clear_volume', autospec=True)
autospec=True)
self._clear_patch.start()
self.expected_status = 'available' self.expected_status = 'available'
self.service_id = 1
def tearDown(self): @mock.patch('cinder.manager.CleanableManager.init_host')
super(VolumeTestCase, self).tearDown() def test_init_host_count_allocated_capacity(self, init_host_mock):
self._clear_patch.stop()
def test_init_host_clears_downloads(self):
"""Test that init_host will unwedge a volume stuck in downloading."""
volume = tests_utils.create_volume(self.context, status='downloading',
size=0, host=CONF.host)
self.volume.init_host()
volume.refresh()
self.assertEqual("error", volume.status)
self.volume.delete_volume(self.context, volume)
def test_init_host_clears_uploads_available_volume(self):
"""init_host will clean an available volume stuck in uploading."""
volume = tests_utils.create_volume(self.context, status='uploading',
size=0, host=CONF.host)
self.volume.init_host()
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual("available", volume.status)
def test_init_host_clears_uploads_in_use_volume(self):
"""init_host will clean an in-use volume stuck in uploading."""
volume = tests_utils.create_volume(self.context, status='uploading',
size=0, host=CONF.host)
fake_uuid = fakes.get_fake_uuid()
tests_utils.attach_volume(self.context, volume.id, fake_uuid,
'fake_host', '/dev/vda')
self.volume.init_host()
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual("in-use", volume.status)
def test_init_host_resumes_deletes(self):
"""init_host will resume deleting volume in deleting status."""
volume = tests_utils.create_volume(self.context, status='deleting',
size=0, host=CONF.host)
volume_id = volume['id']
self.volume.init_host()
self.assertRaises(exception.VolumeNotFound, db.volume_get,
context.get_admin_context(), volume_id)
def test_init_host_count_allocated_capacity(self):
vol0 = tests_utils.create_volume( vol0 = tests_utils.create_volume(
self.context, size=100, host=CONF.host) self.context, size=100, host=CONF.host)
vol1 = tests_utils.create_volume( vol1 = tests_utils.create_volume(
@ -384,7 +228,9 @@ class VolumeTestCase(BaseVolumeTestCase):
vol4 = tests_utils.create_volume( vol4 = tests_utils.create_volume(
self.context, size=1024, self.context, size=1024,
host=volutils.append_host(CONF.host, 'pool2')) host=volutils.append_host(CONF.host, 'pool2'))
self.volume.init_host() self.volume.init_host(service_id=self.service_id)
init_host_mock.assert_called_once_with(
service_id=self.service_id, added_to_cluster=None)
stats = self.volume.stats stats = self.volume.stats
self.assertEqual(2020, stats['allocated_capacity_gb']) self.assertEqual(2020, stats['allocated_capacity_gb'])
self.assertEqual( self.assertEqual(
@ -427,7 +273,7 @@ class VolumeTestCase(BaseVolumeTestCase):
{'id': snap1.id, 'provider_id': '7 8 yyyy'}] {'id': snap1.id, 'provider_id': '7 8 yyyy'}]
mock_update.return_value = (volumes, snapshots) mock_update.return_value = (volumes, snapshots)
# initialize # initialize
self.volume.init_host() self.volume.init_host(service_id=self.service_id)
# Grab volume and snapshot objects # Grab volume and snapshot objects
vol0_obj = objects.Volume.get_by_id(context.get_admin_context(), vol0_obj = objects.Volume.get_by_id(context.get_admin_context(),
vol0.id) vol0.id)
@ -459,7 +305,7 @@ class VolumeTestCase(BaseVolumeTestCase):
snap1 = tests_utils.create_snapshot(self.context, vol1.id) snap1 = tests_utils.create_snapshot(self.context, vol1.id)
mock_update.return_value = ([], []) mock_update.return_value = ([], [])
# initialize # initialize
self.volume.init_host() self.volume.init_host(service_id=self.service_id)
# Grab volume and snapshot objects # Grab volume and snapshot objects
vol0_obj = objects.Volume.get_by_id(context.get_admin_context(), vol0_obj = objects.Volume.get_by_id(context.get_admin_context(),
vol0.id) vol0.id)
@ -481,16 +327,22 @@ class VolumeTestCase(BaseVolumeTestCase):
@mock.patch('cinder.volume.manager.VolumeManager.' @mock.patch('cinder.volume.manager.VolumeManager.'
'_include_resources_in_cluster') '_include_resources_in_cluster')
def test_init_host_cluster_not_changed(self, include_in_cluster_mock): def test_init_host_cluster_not_changed(self, include_in_cluster_mock):
self.volume.init_host(False) self.volume.init_host(added_to_cluster=False,
service_id=self.service_id)
include_in_cluster_mock.assert_not_called() include_in_cluster_mock.assert_not_called()
@mock.patch('cinder.objects.snapshot.SnapshotList.get_all',
return_value=[])
@mock.patch('cinder.objects.volume.VolumeList.get_all', return_value=[])
@mock.patch('cinder.objects.volume.VolumeList.include_in_cluster') @mock.patch('cinder.objects.volume.VolumeList.include_in_cluster')
@mock.patch('cinder.objects.consistencygroup.ConsistencyGroupList.' @mock.patch('cinder.objects.consistencygroup.ConsistencyGroupList.'
'include_in_cluster') 'include_in_cluster')
def test_init_host_added_to_cluster(self, vol_include_mock, def test_init_host_added_to_cluster(self, cg_include_mock,
cg_include_mock): vol_include_mock, vol_get_all_mock,
snap_get_all_mock):
self.mock_object(self.volume, 'cluster', mock.sentinel.cluster) self.mock_object(self.volume, 'cluster', mock.sentinel.cluster)
self.volume.init_host(True) self.volume.init_host(added_to_cluster=True,
service_id=self.service_id)
vol_include_mock.assert_called_once_with(mock.ANY, vol_include_mock.assert_called_once_with(mock.ANY,
mock.sentinel.cluster, mock.sentinel.cluster,
@ -498,6 +350,10 @@ class VolumeTestCase(BaseVolumeTestCase):
cg_include_mock.assert_called_once_with(mock.ANY, cg_include_mock.assert_called_once_with(mock.ANY,
mock.sentinel.cluster, mock.sentinel.cluster,
host=self.volume.host) host=self.volume.host)
vol_get_all_mock.assert_called_once_with(
mock.ANY, filters={'cluster_name': mock.sentinel.cluster})
snap_get_all_mock.assert_called_once_with(
mock.ANY, search_opts={'cluster_name': mock.sentinel.cluster})
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version') @mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version') @mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
@ -552,45 +408,6 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.driver._initialized = False self.volume.driver._initialized = False
self.assertFalse(self.volume.is_working()) self.assertFalse(self.volume.is_working())
def test_create_volume_fails_with_creating_and_downloading_status(self):
"""Test init_host in case of volume.
While the status of volume is 'creating' or 'downloading',
volume process down.
After process restarting this 'creating' status is changed to 'error'.
"""
for status in ['creating', 'downloading']:
volume = tests_utils.create_volume(self.context, status=status,
size=0, host=CONF.host)
self.volume.init_host()
volume.refresh()
self.assertEqual('error', volume.status)
self.volume.delete_volume(self.context, volume)
def test_create_snapshot_fails_with_creating_status(self):
"""Test init_host in case of snapshot.
While the status of snapshot is 'creating', volume process
down. After process restarting this 'creating' status is
changed to 'error'.
"""
volume = tests_utils.create_volume(self.context,
**self.volume_params)
snapshot = tests_utils.create_snapshot(
self.context,
volume['id'],
status=fields.SnapshotStatus.CREATING)
snap_id = snapshot['id']
self.volume.init_host()
snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status)
self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume)
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify') @mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch.object(QUOTAS, 'reserve') @mock.patch.object(QUOTAS, 'reserve')
@mock.patch.object(QUOTAS, 'commit') @mock.patch.object(QUOTAS, 'commit')
@ -1297,7 +1114,7 @@ class VolumeTestCase(BaseVolumeTestCase):
def test_delete_volume_not_found(self, mock_get_volume): def test_delete_volume_not_found(self, mock_get_volume):
"""Test delete volume moves on if the volume does not exist.""" """Test delete volume moves on if the volume does not exist."""
volume_id = '12345678-1234-5678-1234-567812345678' volume_id = '12345678-1234-5678-1234-567812345678'
volume = objects.Volume(self.context, id=volume_id) volume = objects.Volume(self.context, status='available', id=volume_id)
self.volume.delete_volume(self.context, volume) self.volume.delete_volume(self.context, volume)
self.assertTrue(mock_get_volume.called) self.assertTrue(mock_get_volume.called)
@ -4685,16 +4502,6 @@ class VolumeTestCase(BaseVolumeTestCase):
self.context, self.context,
snap) snap)
def test_init_host_clears_deleting_snapshots(self):
"""Test that init_host will delete a snapshot stuck in deleting."""
volume = tests_utils.create_volume(self.context, status='deleting',
size=1, host=CONF.host)
snapshot = tests_utils.create_snapshot(self.context,
volume.id, status='deleting')
self.volume.init_host()
self.assertRaises(exception.VolumeNotFound, volume.refresh)
self.assertRaises(exception.SnapshotNotFound, snapshot.refresh)
@mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.' @mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.'
'manage_existing') 'manage_existing')
@mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.' @mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.'
@ -4815,7 +4622,7 @@ class VolumeTestCase(BaseVolumeTestCase):
@ddt.ddt @ddt.ddt
class VolumeMigrationTestCase(BaseVolumeTestCase): class VolumeMigrationTestCase(base.BaseVolumeTestCase):
def setUp(self): def setUp(self):
super(VolumeMigrationTestCase, self).setUp() super(VolumeMigrationTestCase, self).setUp()
@ -5631,7 +5438,7 @@ class VolumeMigrationTestCase(BaseVolumeTestCase):
self.context, volume.id) self.context, volume.id)
class ReplicationTestCase(BaseVolumeTestCase): class ReplicationTestCase(base.BaseVolumeTestCase):
@mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host') @mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host')
@mock.patch.object(cinder.db, 'conditional_update') @mock.patch.object(cinder.db, 'conditional_update')
@ -5734,7 +5541,7 @@ class ReplicationTestCase(BaseVolumeTestCase):
host=CONF.host) host=CONF.host)
class CopyVolumeToImageTestCase(BaseVolumeTestCase): class CopyVolumeToImageTestCase(base.BaseVolumeTestCase):
def fake_local_path(self, volume): def fake_local_path(self, volume):
return self.dst_path return self.dst_path
@ -6008,7 +5815,7 @@ class CopyVolumeToImageTestCase(BaseVolumeTestCase):
self.assertTrue(mock_delete.called) self.assertTrue(mock_delete.called)
class GetActiveByWindowTestCase(BaseVolumeTestCase): class GetActiveByWindowTestCase(base.BaseVolumeTestCase):
def setUp(self): def setUp(self):
super(GetActiveByWindowTestCase, self).setUp() super(GetActiveByWindowTestCase, self).setUp()
self.ctx = context.get_admin_context(read_deleted="yes") self.ctx = context.get_admin_context(read_deleted="yes")
@ -6761,7 +6568,7 @@ class VolumePolicyTestCase(test.TestCase):
target) target)
class ImageVolumeCacheTestCase(BaseVolumeTestCase): class ImageVolumeCacheTestCase(base.BaseVolumeTestCase):
def setUp(self): def setUp(self):
super(ImageVolumeCacheTestCase, self).setUp() super(ImageVolumeCacheTestCase, self).setUp()
@ -6833,7 +6640,7 @@ class ImageVolumeCacheTestCase(BaseVolumeTestCase):
@ddt.ddt @ddt.ddt
class DiscardFlagTestCase(BaseVolumeTestCase): class DiscardFlagTestCase(base.BaseVolumeTestCase):
def setUp(self): def setUp(self):
super(DiscardFlagTestCase, self).setUp() super(DiscardFlagTestCase, self).setUp()

View File

@ -0,0 +1,177 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from cinder import context
from cinder import db
from cinder import exception
from cinder import objects
from cinder.objects import fields
from cinder import service
from cinder.tests.unit.api import fakes
from cinder.tests.unit import utils as tests_utils
from cinder.tests.unit import volume as base
CONF = cfg.CONF
class VolumeCleanupTestCase(base.BaseVolumeTestCase):
MOCK_WORKER = False
def setUp(self):
super(VolumeCleanupTestCase, self).setUp()
self.service_id = 1
self.mock_object(service.Service, 'service_id', self.service_id)
self.patch('cinder.volume.utils.clear_volume', autospec=True)
def _assert_workers_are_removed(self):
workers = db.worker_get_all(self.context, read_deleted='yes')
self.assertListEqual([], workers)
def test_init_host_clears_uploads_available_volume(self):
"""init_host will clean an available volume stuck in uploading."""
volume = tests_utils.create_volume(self.context, status='uploading',
size=0, host=CONF.host)
db.worker_create(self.context, resource_type='Volume',
resource_id=volume.id, status=volume.status,
service_id=self.service_id)
self.volume.init_host(service_id=service.Service.service_id)
volume.refresh()
self.assertEqual("available", volume.status)
self._assert_workers_are_removed()
@mock.patch('cinder.manager.CleanableManager.init_host')
def test_init_host_clears_uploads_in_use_volume(self, init_host_mock):
"""init_host will clean an in-use volume stuck in uploading."""
volume = tests_utils.create_volume(self.context, status='uploading',
size=0, host=CONF.host)
db.worker_create(self.context, resource_type='Volume',
resource_id=volume.id, status=volume.status,
service_id=self.service_id)
fake_uuid = fakes.get_fake_uuid()
tests_utils.attach_volume(self.context, volume.id, fake_uuid,
'fake_host', '/dev/vda')
self.volume.init_host(service_id=mock.sentinel.service_id)
init_host_mock.assert_called_once_with(
service_id=mock.sentinel.service_id, added_to_cluster=None)
volume.refresh()
self.assertEqual("in-use", volume.status)
self._assert_workers_are_removed()
def test_init_host_clears_downloads(self):
"""Test that init_host will unwedge a volume stuck in downloading."""
volume = tests_utils.create_volume(self.context, status='downloading',
size=0, host=CONF.host)
db.worker_create(self.context, resource_type='Volume',
resource_id=volume.id, status=volume.status,
service_id=self.service_id)
mock_clear = self.mock_object(self.volume.driver, 'clear_download')
self.volume.init_host(service_id=service.Service.service_id)
self.assertEqual(1, mock_clear.call_count)
self.assertEqual(volume.id, mock_clear.call_args[0][1].id)
volume.refresh()
self.assertEqual("error", volume['status'])
self.volume.delete_volume(self.context, volume=volume)
self._assert_workers_are_removed()
def test_init_host_resumes_deletes(self):
"""init_host will resume deleting volume in deleting status."""
volume = tests_utils.create_volume(self.context, status='deleting',
size=0, host=CONF.host)
db.worker_create(self.context, resource_type='Volume',
resource_id=volume.id, status=volume.status,
service_id=self.service_id)
self.volume.init_host(service_id=service.Service.service_id)
self.assertRaises(exception.VolumeNotFound, db.volume_get,
context.get_admin_context(), volume.id)
self._assert_workers_are_removed()
def test_create_volume_fails_with_creating_and_downloading_status(self):
"""Test init_host_with_service in case of volume.
While the status of volume is 'creating' or 'downloading',
volume process down.
After process restarting this 'creating' status is changed to 'error'.
"""
for status in ('creating', 'downloading'):
volume = tests_utils.create_volume(self.context, status=status,
size=0, host=CONF.host)
db.worker_create(self.context, resource_type='Volume',
resource_id=volume.id, status=volume.status,
service_id=self.service_id)
self.volume.init_host(service_id=service.Service.service_id)
volume.refresh()
self.assertEqual('error', volume['status'])
self.volume.delete_volume(self.context, volume)
self._assert_workers_are_removed()
def test_create_snapshot_fails_with_creating_status(self):
"""Test init_host_with_service in case of snapshot.
While the status of snapshot is 'creating', volume process
down. After process restarting this 'creating' status is
changed to 'error'.
"""
volume = tests_utils.create_volume(self.context,
**self.volume_params)
snapshot = tests_utils.create_snapshot(
self.context,
volume.id,
status=fields.SnapshotStatus.CREATING)
db.worker_create(self.context, resource_type='Snapshot',
resource_id=snapshot.id, status=snapshot.status,
service_id=self.service_id)
self.volume.init_host(service_id=service.Service.service_id)
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot.id)
self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status)
self.assertEqual(service.Service.service_id,
self.volume.service_id)
self._assert_workers_are_removed()
self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume)
def test_init_host_clears_deleting_snapshots(self):
"""Test that init_host will delete a snapshot stuck in deleting."""
volume = tests_utils.create_volume(self.context, status='deleting',
size=1, host=CONF.host)
snapshot = tests_utils.create_snapshot(self.context,
volume.id, status='deleting')
db.worker_create(self.context, resource_type='Volume',
resource_id=volume.id, status=volume.status,
service_id=self.service_id)
self.volume.init_host(service_id=self.service_id)
self.assertRaises(exception.VolumeNotFound, volume.refresh)
self.assertRaises(exception.SnapshotNotFound, snapshot.refresh)

View File

@ -0,0 +1,150 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
import tempfile
import mock
from oslo_config import cfg
from oslo_utils import importutils
from stevedore import extension
from cinder.brick.local_dev import lvm as brick_lvm
from cinder import context
from cinder.image import image_utils
from cinder import objects
from cinder import test
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit import utils as tests_utils
from cinder.volume import api as volume_api
from cinder.volume import configuration as conf
CONF = cfg.CONF
class BaseVolumeTestCase(test.TestCase):
"""Test Case for volumes."""
FAKE_UUID = fake.IMAGE_ID
def setUp(self, *args, **kwargs):
super(BaseVolumeTestCase, self).setUp(*args, **kwargs)
self.extension_manager = extension.ExtensionManager(
"BaseVolumeTestCase")
vol_tmpdir = tempfile.mkdtemp()
self.flags(volumes_dir=vol_tmpdir)
self.addCleanup(self._cleanup)
self.volume = importutils.import_object(CONF.volume_manager)
self.volume.message_api = mock.Mock()
self.configuration = mock.Mock(conf.Configuration)
self.context = context.get_admin_context()
self.context.user_id = fake.USER_ID
# NOTE(mriedem): The id is hard-coded here for tracking race fail
# assertions with the notification code, it's part of an
# elastic-recheck query so don't remove it or change it.
self.project_id = '7f265bd4-3a85-465e-a899-5dc4854a86d3'
self.context.project_id = self.project_id
self.volume_params = {
'status': 'creating',
'host': CONF.host,
'size': 1}
self.mock_object(brick_lvm.LVM,
'get_all_volume_groups',
self.fake_get_all_volume_groups)
fake_image.mock_image_service(self)
self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.mock_object(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0,
'pools': {}}
# keep ordered record of what we execute
self.called = []
self.volume_api = volume_api.API()
def _cleanup(self):
try:
shutil.rmtree(CONF.volumes_dir)
except OSError:
pass
def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
return [{'name': 'cinder-volumes',
'size': '5.00',
'available': '2.50',
'lv_count': '2',
'uuid': 'vR1JU3-FAKE-C4A9-PQFh-Mctm-9FwA-Xwzc1m'}]
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask._clone_image_volume')
def _create_volume_from_image(self, mock_clone_image_volume,
mock_fetch_img,
fakeout_copy_image_to_volume=False,
fakeout_clone_image=False,
clone_image_volume=False):
"""Test function of create_volume_from_image.
Test cases call this function to create a volume from image, caller
can choose whether to fake out copy_image_to_volume and clone_image,
after calling this, test cases should check status of the volume.
"""
def fake_local_path(volume):
return dst_path
def fake_copy_image_to_volume(context, volume,
image_service, image_id):
pass
def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize,
size=None, throttle=None):
pass
def fake_clone_image(ctx, volume_ref,
image_location, image_meta,
image_service):
return {'provider_location': None}, True
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
self.mock_object(self.volume.driver, 'local_path', fake_local_path)
if fakeout_clone_image:
self.mock_object(self.volume.driver, 'clone_image',
fake_clone_image)
self.mock_object(image_utils, 'fetch_to_raw', fake_fetch_to_raw)
if fakeout_copy_image_to_volume:
self.mock_object(self.volume.driver, 'copy_image_to_volume',
fake_copy_image_to_volume)
mock_clone_image_volume.return_value = ({}, clone_image_volume)
mock_fetch_img.return_value = mock.MagicMock(
spec=tests_utils.get_file_spec())
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
volume = tests_utils.create_volume(self.context, **self.volume_params)
# creating volume testdata
try:
request_spec = {
'volume_properties': self.volume_params,
'image_id': image_id,
}
self.volume.create_volume(self.context, volume, request_spec)
finally:
# cleanup
os.unlink(dst_path)
volume = objects.Volume.get_by_id(self.context, volume.id)
return volume

View File

@ -14,13 +14,12 @@
import mock import mock
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit import test_volume
from cinder import context from cinder import context
from cinder import exception from cinder import exception
from cinder import objects from cinder import objects
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit import volume as base
from cinder.volume.flows.manager import manage_existing from cinder.volume.flows.manager import manage_existing
from cinder.volume import manager from cinder.volume import manager
from cinder.volume import utils from cinder.volume import utils
@ -29,7 +28,7 @@ FAKE_HOST_POOL = 'volPool'
FAKE_HOST = 'hostname@backend' FAKE_HOST = 'hostname@backend'
class ManageVolumeTestCase(test_volume.BaseVolumeTestCase): class ManageVolumeTestCase(base.BaseVolumeTestCase):
def setUp(self): def setUp(self):
super(ManageVolumeTestCase, self).setUp() super(ManageVolumeTestCase, self).setUp()

View File

@ -152,7 +152,8 @@ MAPPING = {
} }
class VolumeManager(manager.SchedulerDependentManager): class VolumeManager(manager.CleanableManager,
manager.SchedulerDependentManager):
"""Manages attachable block storage devices.""" """Manages attachable block storage devices."""
RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION
@ -192,7 +193,7 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.debug("Cinder Volume DB check: vol_db_empty=%s", vol_db_empty) LOG.debug("Cinder Volume DB check: vol_db_empty=%s", vol_db_empty)
# We pass the current setting for service.active_backend_id to # We pass the current setting for service.active_backend_id to
# the driver on init, incase there was a restart or something # the driver on init, in case there was a restart or something
curr_active_backend_id = None curr_active_backend_id = None
svc_host = vol_utils.extract_host(self.host, 'backend') svc_host = vol_utils.extract_host(self.host, 'backend')
try: try:
@ -317,9 +318,9 @@ class VolumeManager(manager.SchedulerDependentManager):
def _sync_provider_info(self, ctxt, volumes, snapshots): def _sync_provider_info(self, ctxt, volumes, snapshots):
# NOTE(jdg): For now this just updates provider_id, we can add more # NOTE(jdg): For now this just updates provider_id, we can add more
# add more items to the update if they're relevant but we need # items to the update if they're relevant but we need to be safe in
# to be safe in what we allow and add a list of allowed keys # what we allow and add a list of allowed keys. Things that make sense
# things that make sense are provider_*, replication_status etc # are provider_*, replication_status etc
updates, snapshot_updates = self.driver.update_provider_info( updates, snapshot_updates = self.driver.update_provider_info(
volumes, snapshots) volumes, snapshots)
@ -370,7 +371,7 @@ class VolumeManager(manager.SchedulerDependentManager):
{'num_vols': num_vols, 'num_cgs': num_cgs, {'num_vols': num_vols, 'num_cgs': num_cgs,
'host': self.host, 'cluster': self.cluster}) 'host': self.host, 'cluster': self.cluster})
def init_host(self, added_to_cluster=None): def init_host(self, added_to_cluster=None, **kwargs):
"""Perform any required initialization.""" """Perform any required initialization."""
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
if not self.driver.supported: if not self.driver.supported:
@ -407,14 +408,19 @@ class VolumeManager(manager.SchedulerDependentManager):
# Initialize backend capabilities list # Initialize backend capabilities list
self.driver.init_capabilities() self.driver.init_capabilities()
volumes = objects.VolumeList.get_all_by_host(ctxt, self.host) if self.cluster:
snapshots = objects.SnapshotList.get_by_host(ctxt, self.host) filters = {'cluster_name': self.cluster}
else:
filters = {'host': self.host}
volumes = objects.VolumeList.get_all(ctxt, filters=filters)
snapshots = objects.SnapshotList.get_all(ctxt, search_opts=filters)
self._sync_provider_info(ctxt, volumes, snapshots) self._sync_provider_info(ctxt, volumes, snapshots)
# FIXME volume count for exporting is wrong # FIXME volume count for exporting is wrong
try:
self.stats['pools'] = {} self.stats['pools'] = {}
self.stats.update({'allocated_capacity_gb': 0}) self.stats.update({'allocated_capacity_gb': 0})
try:
for volume in volumes: for volume in volumes:
# available volume should also be counted into allocated # available volume should also be counted into allocated
if volume['status'] in ['in-use', 'available']: if volume['status'] in ['in-use', 'available']:
@ -428,32 +434,10 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.exception(_LE("Failed to re-export volume, " LOG.exception(_LE("Failed to re-export volume, "
"setting to ERROR."), "setting to ERROR."),
resource=volume) resource=volume)
volume.status = 'error' volume.conditional_update({'status': 'error'},
volume.save() {'status': 'in-use'})
elif volume['status'] in ('downloading', 'creating'): # All other cleanups are processed by parent class CleanableManager
LOG.warning(_LW("Detected volume stuck "
"in %(curr_status)s "
"status, setting to ERROR."),
{'curr_status': volume['status']},
resource=volume)
if volume['status'] == 'downloading':
self.driver.clear_download(ctxt, volume)
volume.status = 'error'
volume.save()
elif volume.status == 'uploading':
# Set volume status to available or in-use.
self.db.volume_update_status_based_on_attachment(
ctxt, volume.id)
else:
pass
snapshots = objects.SnapshotList.get_by_host(
ctxt, self.host, {'status': fields.SnapshotStatus.CREATING})
for snapshot in snapshots:
LOG.warning(_LW("Detected snapshot stuck in creating "
"status, setting to ERROR."), resource=snapshot)
snapshot.status = fields.SnapshotStatus.ERROR
snapshot.save()
except Exception: except Exception:
LOG.exception(_LE("Error during re-export on driver init."), LOG.exception(_LE("Error during re-export on driver init."),
resource=volume) resource=volume)
@ -466,26 +450,16 @@ class VolumeManager(manager.SchedulerDependentManager):
# that an entry exists in the service table # that an entry exists in the service table
self.driver.set_initialized() self.driver.set_initialized()
for volume in volumes:
if volume['status'] == 'deleting':
if CONF.volume_service_inithost_offload:
# Offload all the pending volume delete operations to the
# threadpool to prevent the main volume service thread
# from being blocked.
self._add_to_threadpool(self.delete_volume, ctxt, volume,
cascade=True)
else:
# By default, delete volumes sequentially
self.delete_volume(ctxt, volume, cascade=True)
LOG.info(_LI("Resume volume delete completed successfully."),
resource=volume)
# collect and publish service capabilities # collect and publish service capabilities
self.publish_service_capabilities(ctxt) self.publish_service_capabilities(ctxt)
LOG.info(_LI("Driver initialization completed successfully."), LOG.info(_LI("Driver initialization completed successfully."),
resource={'type': 'driver', resource={'type': 'driver',
'id': self.driver.__class__.__name__}) 'id': self.driver.__class__.__name__})
# Make sure to call CleanableManager to do the cleanup
super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster,
**kwargs)
def init_host_with_rpc(self): def init_host_with_rpc(self):
LOG.info(_LI("Initializing RPC dependent components of volume " LOG.info(_LI("Initializing RPC dependent components of volume "
"driver %(driver_name)s (%(version)s)"), "driver %(driver_name)s (%(version)s)"),
@ -527,6 +501,37 @@ class VolumeManager(manager.SchedulerDependentManager):
resource={'type': 'driver', resource={'type': 'driver',
'id': self.driver.__class__.__name__}) 'id': self.driver.__class__.__name__})
def _do_cleanup(self, ctxt, vo_resource):
if isinstance(vo_resource, objects.Volume):
if vo_resource.status == 'downloading':
self.driver.clear_download(ctxt, vo_resource)
elif vo_resource.status == 'uploading':
# Set volume status to available or in-use.
self.db.volume_update_status_based_on_attachment(
ctxt, vo_resource.id)
elif vo_resource.status == 'deleting':
if CONF.volume_service_inithost_offload:
# Offload all the pending volume delete operations to the
# threadpool to prevent the main volume service thread
# from being blocked.
self._add_to_threadpool(self.delete_volume, ctxt,
vo_resource, cascade=True)
else:
# By default, delete volumes sequentially
self.delete_volume(ctxt, vo_resource, cascade=True)
# We signal that we take care of cleaning the worker ourselves
# (with set_workers decorator in delete_volume method) so
# do_cleanup method doesn't need to remove it.
return True
# For Volume creating and downloading and for Snapshot downloading
# statuses we have to set status to error
if vo_resource.status in ('creating', 'downloading'):
vo_resource.status = 'error'
vo_resource.save()
def is_working(self): def is_working(self):
"""Return if Manager is ready to accept requests. """Return if Manager is ready to accept requests.
@ -536,6 +541,7 @@ class VolumeManager(manager.SchedulerDependentManager):
""" """
return self.driver.initialized return self.driver.initialized
@objects.Volume.set_workers
def create_volume(self, context, volume, request_spec=None, def create_volume(self, context, volume, request_spec=None,
filter_properties=None, allow_reschedule=True): filter_properties=None, allow_reschedule=True):
"""Creates the volume.""" """Creates the volume."""
@ -627,6 +633,7 @@ class VolumeManager(manager.SchedulerDependentManager):
return volume.id return volume.id
@coordination.synchronized('{volume.id}-{f_name}') @coordination.synchronized('{volume.id}-{f_name}')
@objects.Volume.set_workers
def delete_volume(self, context, volume, unmanage_only=False, def delete_volume(self, context, volume, unmanage_only=False,
cascade=False): cascade=False):
"""Deletes and unexports volume. """Deletes and unexports volume.
@ -786,6 +793,7 @@ class VolumeManager(manager.SchedulerDependentManager):
volume_ref.status = status volume_ref.status = status
volume_ref.save() volume_ref.save()
@objects.Snapshot.set_workers
def create_snapshot(self, context, snapshot): def create_snapshot(self, context, snapshot):
"""Creates and exports the snapshot.""" """Creates and exports the snapshot."""
context = context.elevated() context = context.elevated()

View File

@ -164,6 +164,7 @@ class VolumeAPI(rpc.RPCAPI):
volume=volume) volume=volume)
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False): def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
volume.create_worker()
cctxt = self._get_cctxt(volume.host) cctxt = self._get_cctxt(volume.host)
msg_args = { msg_args = {
'volume': volume, 'unmanage_only': unmanage_only, 'volume': volume, 'unmanage_only': unmanage_only,
@ -173,6 +174,7 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'delete_volume', **msg_args) cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot): def create_snapshot(self, ctxt, volume, snapshot):
snapshot.create_worker()
cctxt = self._get_cctxt(volume['host']) cctxt = self._get_cctxt(volume['host'])
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot) cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)