Merge "model_update for temp volume or snapshot in backup"

This commit is contained in:
Jenkins 2016-06-09 23:34:35 +00:00 committed by Gerrit Code Review
commit 6db7a8a43d
2 changed files with 64 additions and 5 deletions

View File

@ -35,6 +35,7 @@ from cinder import objects
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.backup import fake_service_with_verify as fake_service
from cinder.tests.unit import fake_driver
from cinder.tests.unit import utils
from cinder.volume import driver
@ -641,6 +642,59 @@ class BackupTestCase(BaseBackupTest):
self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertEqual(vol_size, backup['size'])
@mock.patch.object(fake_driver.FakeISCSIDriver, 'create_snapshot')
def test_create_temp_snapshot(self, mock_create_snapshot):
volume_manager = importutils.import_object(CONF.volume_manager)
volume_manager.driver.set_initialized()
vol_size = 1
vol_id = self._create_volume_db_entry(size=vol_size,
previous_status='in-use')
vol = objects.Volume.get_by_id(self.ctxt, vol_id)
mock_create_snapshot.return_value = {'provider_id':
'fake_provider_id'}
temp_snap = volume_manager.driver._create_temp_snapshot(
self.ctxt, vol)
self.assertEqual('available', temp_snap['status'])
self.assertEqual('fake_provider_id', temp_snap['provider_id'])
@mock.patch.object(fake_driver.FakeISCSIDriver, 'create_cloned_volume')
def test_create_temp_cloned_volume(self, mock_create_cloned_volume):
volume_manager = importutils.import_object(CONF.volume_manager)
volume_manager.driver.set_initialized()
vol_size = 1
vol_id = self._create_volume_db_entry(size=vol_size,
previous_status='in-use')
vol = objects.Volume.get_by_id(self.ctxt, vol_id)
mock_create_cloned_volume.return_value = {'provider_id':
'fake_provider_id'}
temp_vol = volume_manager.driver._create_temp_cloned_volume(
self.ctxt, vol)
self.assertEqual('available', temp_vol['status'])
self.assertEqual('fake_provider_id', temp_vol['provider_id'])
@mock.patch.object(fake_driver.FakeISCSIDriver,
'create_volume_from_snapshot')
def test_create_temp_volume_from_snapshot(self, mock_create_vol_from_snap):
volume_manager = importutils.import_object(CONF.volume_manager)
volume_manager.driver.set_initialized()
vol_size = 1
vol_id = self._create_volume_db_entry(size=vol_size,
previous_status='in-use')
vol = objects.Volume.get_by_id(self.ctxt, vol_id)
snap = self._create_snapshot_db_entry(volume_id = vol_id)
mock_create_vol_from_snap.return_value = {'provider_id':
'fake_provider_id'}
temp_vol = volume_manager.driver._create_temp_volume_from_snapshot(
self.ctxt, vol, snap)
self.assertEqual('available', temp_vol['status'])
self.assertEqual('fake_provider_id', temp_vol['provider_id'])
@mock.patch('cinder.volume.utils.notify_about_backup_usage')
def test_create_backup_with_notify(self, notify):
"""Test normal backup creation with notifications."""

View File

@ -1339,7 +1339,9 @@ class BaseVD(object):
temp_snap_ref = objects.Snapshot(context=context, **kwargs)
temp_snap_ref.create()
try:
self.create_snapshot(temp_snap_ref)
model_update = self.create_snapshot(temp_snap_ref)
if model_update:
temp_snap_ref.update(model_update)
except Exception:
with excutils.save_and_reraise_exception():
with temp_snap_ref.obj_as_admin():
@ -1361,6 +1363,7 @@ class BaseVD(object):
'status': 'creating',
'attach_status': 'detached',
'availability_zone': volume.availability_zone,
'volume_type_id': volume.volume_type_id,
}
temp_vol_ref = self.db.volume_create(context, temp_volume)
try:
@ -1388,18 +1391,20 @@ class BaseVD(object):
'status': 'creating',
'attach_status': 'detached',
'availability_zone': volume.availability_zone,
'volume_type_id': volume.volume_type_id,
}
temp_vol_ref = self.db.volume_create(context, temp_volume)
try:
self.create_volume_from_snapshot(temp_vol_ref, snapshot)
model_update = self.create_volume_from_snapshot(temp_vol_ref,
snapshot) or {}
except Exception:
with excutils.save_and_reraise_exception():
self.db.volume_destroy(context.elevated(),
temp_vol_ref['id'])
self.db.volume_update(context, temp_vol_ref['id'],
{'status': 'available'})
return temp_vol_ref
model_update['status'] = 'available'
self.db.volume_update(context, temp_vol_ref['id'], model_update)
return self.db.volume_get(context, temp_vol_ref['id'])
def _delete_temp_snapshot(self, context, snapshot):
self.delete_snapshot(snapshot)