Remove support for 2.x volume RPC API

This commit gets rid of most of our Mitaka compatibility code in
volume RPC API.

Change-Id: I5606528c8db9a725c6450d084fdcb369db60b49b
This commit is contained in:
Michał Dulko 2016-09-19 12:12:49 +02:00
parent 2c0eea799c
commit 9a73f5999c
9 changed files with 352 additions and 793 deletions

View File

@ -140,24 +140,22 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
self.context,
consistencygroup_id=group.id,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume)
volume2 = tests_utils.create_volume(
self.context,
consistencygroup_id=None,
**self.volume_params)
volume_id2 = volume2['id']
self.volume.create_volume(self.context, volume_id2)
self.volume.create_volume(self.context, volume2)
fake_update_cg.return_value = (
{'status': fields.ConsistencyGroupStatus.AVAILABLE},
[{'id': volume_id2, 'status': 'available'}],
[{'id': volume_id, 'status': 'available'}])
[{'id': volume2.id, 'status': 'available'}],
[{'id': volume.id, 'status': 'available'}])
self.volume.update_consistencygroup(self.context, group,
add_volumes=volume_id2,
remove_volumes=volume_id)
add_volumes=volume2.id,
remove_volumes=volume.id)
cg = objects.ConsistencyGroup.get_by_id(self.context, group.id)
expected = {
'status': fields.ConsistencyGroupStatus.AVAILABLE,
@ -180,9 +178,9 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
cgvolumes = db.volume_get_all_by_group(self.context, group.id)
cgvol_ids = [cgvol['id'] for cgvol in cgvolumes]
# Verify volume is removed.
self.assertNotIn(volume_id, cgvol_ids)
self.assertNotIn(volume.id, cgvol_ids)
# Verify volume is added.
self.assertIn(volume_id2, cgvol_ids)
self.assertIn(volume2.id, cgvol_ids)
self.volume_params['status'] = 'wrong-status'
volume3 = tests_utils.create_volume(
@ -261,7 +259,7 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
consistencygroup_id=group2.id,
snapshot_id=snapshot_id,
**self.volume_params)
self.volume.create_volume(self.context, volume2.id, volume=volume2)
self.volume.create_volume(self.context, volume2)
self.volume.create_consistencygroup_from_src(
self.context, group2, cgsnapshot=cgsnapshot)
cg2 = objects.ConsistencyGroup.get_by_id(self.context, group2.id)
@ -328,7 +326,7 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
consistencygroup_id=group3.id,
source_volid=volume_id,
**self.volume_params)
self.volume.create_volume(self.context, volume3.id, volume=volume3)
self.volume.create_volume(self.context, volume3)
self.volume.create_consistencygroup_from_src(
self.context, group3, source_cg=group)
@ -487,14 +485,13 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
self.context,
consistencygroup_id=group.id,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume)
self.assert_notify_called(mock_notify,
(['INFO', 'volume.create.start'],
['INFO', 'volume.create.end']))
cgsnapshot_returns = self._create_cgsnapshot(group.id, [volume_id])
cgsnapshot_returns = self._create_cgsnapshot(group.id, [volume.id])
cgsnapshot = cgsnapshot_returns[0]
self.volume.create_cgsnapshot(self.context, cgsnapshot)
self.assertEqual(cgsnapshot.id,
@ -564,7 +561,7 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
status='creating',
size=1)
self.volume.host = 'host1@backend1'
self.volume.create_volume(self.context, volume.id, volume=volume)
self.volume.create_volume(self.context, volume)
self.volume.delete_consistencygroup(self.context, group)
cg = objects.ConsistencyGroup.get_by_id(
@ -599,7 +596,7 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
status='creating',
size=1)
self.volume.host = 'host1@backend2'
self.volume.create_volume(self.context, volume.id, volume=volume)
self.volume.create_volume(self.context, volume)
self.assertRaises(exception.InvalidVolume,
self.volume.delete_consistencygroup,
@ -656,8 +653,7 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
self.context,
consistencygroup_id=group.id,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume)
# Create a bootable volume
bootable_vol_params = {'status': 'creating', 'host': CONF.host,
'size': 1, 'bootable': True}
@ -665,10 +661,9 @@ class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase):
consistencygroup_id=group.id,
**bootable_vol_params)
# Create a common volume
bootable_vol_id = bootable_vol['id']
self.volume.create_volume(self.context, bootable_vol_id)
self.volume.create_volume(self.context, bootable_vol)
volume_ids = [volume_id, bootable_vol_id]
volume_ids = [volume.id, bootable_vol.id]
cgsnapshot_returns = self._create_cgsnapshot(group.id, volume_ids)
cgsnapshot = cgsnapshot_returns[0]
self.volume.create_cgsnapshot(self.context, cgsnapshot)

View File

@ -163,8 +163,7 @@ class GroupManagerTestCase(test.TestCase):
volume_type_id=fake.VOLUME_TYPE_ID,
status='available',
host=group.host)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume)
volume2 = tests_utils.create_volume(
self.context,
@ -172,17 +171,16 @@ class GroupManagerTestCase(test.TestCase):
volume_type_id=fake.VOLUME_TYPE_ID,
status='available',
host=group.host)
volume_id2 = volume2['id']
self.volume.create_volume(self.context, volume_id2)
self.volume.create_volume(self.context, volume)
fake_update_grp.return_value = (
{'status': fields.GroupStatus.AVAILABLE},
[{'id': volume_id2, 'status': 'available'}],
[{'id': volume_id, 'status': 'available'}])
[{'id': volume2.id, 'status': 'available'}],
[{'id': volume.id, 'status': 'available'}])
self.volume.update_group(self.context, group,
add_volumes=volume_id2,
remove_volumes=volume_id)
add_volumes=volume2.id,
remove_volumes=volume.id)
grp = objects.Group.get_by_id(self.context, group.id)
expected = {
'status': fields.GroupStatus.AVAILABLE,
@ -206,9 +204,9 @@ class GroupManagerTestCase(test.TestCase):
grpvolumes = db.volume_get_all_by_generic_group(self.context, group.id)
grpvol_ids = [grpvol['id'] for grpvol in grpvolumes]
# Verify volume is removed.
self.assertNotIn(volume_id, grpvol_ids)
self.assertNotIn(volume.id, grpvol_ids)
# Verify volume is added.
self.assertIn(volume_id2, grpvol_ids)
self.assertIn(volume2.id, grpvol_ids)
volume3 = tests_utils.create_volume(
self.context,
@ -296,7 +294,7 @@ class GroupManagerTestCase(test.TestCase):
status='available',
host=group2.host,
volume_type_id=fake.VOLUME_TYPE_ID)
self.volume.create_volume(self.context, volume2.id, volume=volume2)
self.volume.create_volume(self.context, volume2)
self.volume.create_group_from_src(
self.context, group2, group_snapshot=group_snapshot)
grp2 = objects.Group.get_by_id(self.context, group2.id)
@ -368,7 +366,7 @@ class GroupManagerTestCase(test.TestCase):
status='available',
host=group3.host,
volume_type_id=fake.VOLUME_TYPE_ID)
self.volume.create_volume(self.context, volume3.id, volume=volume3)
self.volume.create_volume(self.context, volume3)
self.volume.create_group_from_src(
self.context, group3, source_group=group)
@ -530,15 +528,14 @@ class GroupManagerTestCase(test.TestCase):
group_id=group.id,
host=group.host,
volume_type_id=fake.VOLUME_TYPE_ID)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume)
self.assert_notify_called(mock_notify,
(['INFO', 'volume.create.start'],
['INFO', 'volume.create.end']))
group_snapshot_returns = self._create_group_snapshot(group.id,
[volume_id])
[volume.id])
group_snapshot = group_snapshot_returns[0]
self.volume.create_group_snapshot(self.context, group_snapshot)
self.assertEqual(group_snapshot.id,
@ -608,7 +605,7 @@ class GroupManagerTestCase(test.TestCase):
volume_type_id=fake.VOLUME_TYPE_ID,
size=1)
self.volume.host = 'host1@backend1'
self.volume.create_volume(self.context, volume.id, volume=volume)
self.volume.create_volume(self.context, volume)
self.volume.delete_group(self.context, group)
grp = objects.Group.get_by_id(
@ -643,7 +640,7 @@ class GroupManagerTestCase(test.TestCase):
volume_type_id=fake.VOLUME_TYPE_ID,
size=1)
self.volume.host = 'host1@backend2'
self.volume.create_volume(self.context, volume.id, volume=volume)
self.volume.create_volume(self.context, volume)
self.assertRaises(exception.InvalidVolume,
self.volume.delete_group,
@ -706,8 +703,7 @@ class GroupManagerTestCase(test.TestCase):
group_id=group.id,
host=group.host,
volume_type_id=fake.VOLUME_TYPE_ID)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume)
# Create a bootable volume
bootable_vol_params = {'status': 'creating', 'host': CONF.host,
'size': 1, 'bootable': True}
@ -715,10 +711,9 @@ class GroupManagerTestCase(test.TestCase):
group_id=group.id,
**bootable_vol_params)
# Create a common volume
bootable_vol_id = bootable_vol['id']
self.volume.create_volume(self.context, bootable_vol_id)
self.volume.create_volume(self.context, bootable_vol)
volume_ids = [volume_id, bootable_vol_id]
volume_ids = [volume.id, bootable_vol.id]
group_snapshot_returns = self._create_group_snapshot(group.id,
volume_ids)
group_snapshot = group_snapshot_returns[0]

File diff suppressed because it is too large Load Diff

View File

@ -16,9 +16,7 @@
Unit Tests for cinder.volume.rpcapi
"""
import copy
import mock
import ddt
from oslo_config import cfg
from oslo_serialization import jsonutils
@ -40,7 +38,6 @@ from cinder.volume import utils
CONF = cfg.CONF
@ddt.ddt
class VolumeRpcAPITestCase(test.TestCase):
def setUp(self):
@ -55,9 +52,6 @@ class VolumeRpcAPITestCase(test.TestCase):
vol['size'] = 1
volume = db.volume_create(self.context, vol)
self.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
kwargs = {
'status': fields.SnapshotStatus.CREATING,
'progress': '0%',
@ -347,8 +341,7 @@ class VolumeRpcAPITestCase(test.TestCase):
self._test_volume_api('delete_cgsnapshot', rpc_method='cast',
cgsnapshot=self.fake_cgsnap, version='3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_create_volume(self, can_send_version):
def test_create_volume(self):
self._test_volume_api('create_volume',
rpc_method='cast',
volume=self.fake_volume_obj,
@ -357,21 +350,6 @@ class VolumeRpcAPITestCase(test.TestCase):
filter_properties='fake_properties',
allow_reschedule=True,
version='3.0')
can_send_version.assert_has_calls([mock.call('3.0')])
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_create_volume_serialization(self, can_send_version):
request_spec = {"metadata": self.fake_volume_metadata}
self._test_volume_api('create_volume',
rpc_method='cast',
volume=self.fake_volume_obj,
host='fake_host1',
request_spec=request_spec,
filter_properties='fake_properties',
allow_reschedule=True,
version='2.0')
can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.4')])
def test_delete_volume(self):
self._test_volume_api('delete_volume',
@ -448,21 +426,13 @@ class VolumeRpcAPITestCase(test.TestCase):
'disk_format': 'fake_type'},
version='3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_initialize_connection(self, mock_can_send_version):
def test_initialize_connection(self):
self._test_volume_api('initialize_connection',
rpc_method='call',
volume=self.fake_volume_obj,
connector='fake_connector',
version='3.0')
mock_can_send_version.return_value = False
self._test_volume_api('initialize_connection',
rpc_method='call',
volume=self.fake_volume_obj,
connector='fake_connector',
version='2.0')
def test_terminate_connection(self):
self._test_volume_api('terminate_connection',
rpc_method='call',
@ -526,19 +496,14 @@ class VolumeRpcAPITestCase(test.TestCase):
old_reservations=self.fake_reservations,
version='3.0')
@ddt.data('2.0', '2.2', '3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_manage_existing(self, version, can_send_version):
can_send_version.side_effect = lambda x: x == version
def test_manage_existing(self):
self._test_volume_api('manage_existing',
rpc_method='cast',
volume=self.fake_volume_obj,
ref={'lv_name': 'foo'},
version=version)
can_send_version.assert_has_calls([mock.call('3.0')])
version='3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_manage_existing_snapshot(self, mock_can_send_version):
def test_manage_existing_snapshot(self):
volume_update = {'host': 'fake_host'}
snpshot = {
'id': fake.SNAPSHOT_ID,

View File

@ -906,7 +906,7 @@ class LVMISCSITestCase(test_volume.DriverTestCase):
vol = {}
vol['size'] = 0
vol_ref = db.volume_create(self.context, vol)
self.volume.create_volume(self.context, vol_ref['id'])
self.volume.create_volume(self.context, vol_ref)
vol_ref = db.volume_get(self.context, vol_ref['id'])
# each volume has a different mountpoint

View File

@ -1204,17 +1204,14 @@ class ManagedRBDTestCase(test_volume.DriverTestCase):
try:
if not clone_error:
self.volume.create_volume(self.context,
volume.id,
request_spec={'image_id': image_id},
volume=volume)
self.volume.create_volume(self.context, volume,
request_spec={'image_id': image_id})
else:
self.assertRaises(exception.CinderException,
self.volume.create_volume,
self.context,
volume.id,
request_spec={'image_id': image_id},
volume=volume)
volume,
request_spec={'image_id': image_id})
volume = objects.Volume.get_by_id(self.context, volume.id)
self.assertEqual(expected_status, volume.status)

View File

@ -50,19 +50,15 @@ class ManageVolumeTestCase(test_volume.BaseVolumeTestCase):
def test_manage_existing(self):
volume_object = self._stub_volume_object_get(self)
mock_object_volume = self.mock_object(
objects.Volume, 'get_by_id', mock.Mock(return_value=volume_object))
mock_run_flow_engine = self.mock_object(
self.manager, '_run_manage_existing_flow_engine',
mock.Mock(return_value=volume_object))
mock_update_volume_stats = self.mock_object(
self.manager, '_update_stats_for_managed')
result = self.manager.manage_existing(self.context, volume_object.id)
result = self.manager.manage_existing(self.context, volume_object)
self.assertEqual(fake.VOLUME_ID, result)
mock_object_volume.assert_called_once_with(self.context,
volume_object.id)
mock_run_flow_engine.assert_called_once_with(self.context,
volume_object,
None)
@ -78,7 +74,7 @@ class ManageVolumeTestCase(test_volume.BaseVolumeTestCase):
self.manager, '_update_stats_for_managed')
result = self.manager.manage_existing(
self.context, volume_object.id, volume=volume_object)
self.context, volume_object)
self.assertEqual(fake.VOLUME_ID, result)
mock_object_volume.assert_not_called()

View File

@ -173,7 +173,7 @@ class VolumeManager(manager.SchedulerDependentManager):
RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION
target = messaging.Target(version='2.6')
target = messaging.Target(version=RPC_API_VERSION)
# On cloning a volume, we shouldn't copy volume_type, consistencygroup
# and volume_attachment, because the db sets that according to [field]_id,
@ -190,7 +190,6 @@ class VolumeManager(manager.SchedulerDependentManager):
# update_service_capabilities needs service_name to be volume
super(VolumeManager, self).__init__(service_name='volume',
*args, **kwargs)
self.additional_endpoints.append(_VolumeV3Proxy(self))
self.configuration = config.Configuration(volume_manager_opts,
config_group=service_name)
self.stats = {}
@ -488,13 +487,11 @@ class VolumeManager(manager.SchedulerDependentManager):
# Offload all the pending volume delete operations to the
# threadpool to prevent the main volume service thread
# from being blocked.
self._add_to_threadpool(self.delete_volume, ctxt,
volume['id'], volume=volume,
self._add_to_threadpool(self.delete_volume, ctxt, volume,
cascade=True)
else:
# By default, delete volumes sequentially
self.delete_volume(ctxt, volume['id'], volume=volume,
cascade=True)
self.delete_volume(ctxt, volume, cascade=True)
LOG.info(_LI("Resume volume delete completed successfully."),
resource=volume)
@ -554,24 +551,12 @@ class VolumeManager(manager.SchedulerDependentManager):
"""
return self.driver.initialized
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
volume=None):
def create_volume(self, context, volume, request_spec=None,
filter_properties=None, allow_reschedule=True):
"""Creates the volume."""
# Log about unsupported drivers
utils.log_unsupported_driver_warning(self.driver)
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
# FIXME(dulek): Remove this in v3.0 of RPC API.
if isinstance(request_spec, dict):
# We may receive request_spec as dict from older clients.
request_spec = objects.RequestSpec.from_primitives(request_spec)
context_elevated = context.elevated()
if filter_properties is None:
filter_properties = {}
@ -656,11 +641,8 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.info(_LI("Created volume successfully."), resource=volume)
return volume.id
# FIXME(bluex): replace volume_id with volume.id when volume_id is removed
@coordination.synchronized('{volume_id}-{f_name}')
def delete_volume(self, context, volume_id,
unmanage_only=False,
volume=None,
@coordination.synchronized('{volume.id}-{f_name}')
def delete_volume(self, context, volume, unmanage_only=False,
cascade=False):
"""Deletes and unexports volume.
@ -675,15 +657,11 @@ class VolumeManager(manager.SchedulerDependentManager):
context = context.elevated()
try:
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
volume = objects.Volume.get_by_id(context, volume_id)
else:
volume.refresh()
except exception.VolumeNotFound:
# NOTE(thingee): It could be possible for a volume to
# be deleted when resuming deletes from init_host().
LOG.debug("Attempted delete of non-existent volume: %s", volume_id)
LOG.debug("Attempted delete of non-existent volume: %s", volume.id)
return
if context.project_id != volume.project_id:
@ -693,7 +671,7 @@ class VolumeManager(manager.SchedulerDependentManager):
if volume['attach_status'] == "attached":
# Volume is still attached, need to detach first
raise exception.VolumeAttached(volume_id=volume_id)
raise exception.VolumeAttached(volume_id=volume.id)
if vol_utils.extract_host(volume.host) != self.host:
raise exception.InvalidVolume(
reason=_("volume is not local to this node"))
@ -779,7 +757,7 @@ class VolumeManager(manager.SchedulerDependentManager):
resource=volume)
# Delete glance metadata if it exists
self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
self.db.volume_glance_metadata_delete_by_volume(context, volume.id)
volume.destroy()
@ -823,7 +801,7 @@ class VolumeManager(manager.SchedulerDependentManager):
volume_ref.status = status
volume_ref.save()
def create_snapshot(self, context, volume_id, snapshot):
def create_snapshot(self, context, snapshot):
"""Creates and exports the snapshot."""
context = context.elevated()
@ -1175,7 +1153,7 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.warning(_LW('Failed to create new image-volume cache entry.'
' Error: %(exception)s'), {'exception': e})
if image_volume:
self.delete_volume(ctx, image_volume.id)
self.delete_volume(ctx, image_volume)
def _clone_image_volume(self, ctx, volume, image_meta):
volume_type_id = volume.get('volume_type_id')
@ -1209,8 +1187,7 @@ class VolumeManager(manager.SchedulerDependentManager):
project_id=new_vol_values['project_id'])
try:
self.create_volume(ctx, image_volume.id,
allow_reschedule=False, volume=image_volume)
self.create_volume(ctx, image_volume, allow_reschedule=False)
image_volume = self.db.volume_get(ctx, image_volume.id)
if image_volume.status != 'available':
raise exception.InvalidVolume(_('Volume is not available.'))
@ -1226,7 +1203,7 @@ class VolumeManager(manager.SchedulerDependentManager):
{'volume_id': volume.id,
'image_id': image_meta['id']})
try:
self.delete_volume(ctx, image_volume.id)
self.delete_volume(ctx, image_volume)
except exception.CinderException:
LOG.exception(_LE('Could not delete the image volume %(id)s.'),
{'id': volume.id})
@ -1351,8 +1328,7 @@ class VolumeManager(manager.SchedulerDependentManager):
exc_info=True, resource={'type': 'image',
'id': image_id})
def initialize_connection(self, context, volume_id, connector,
volume=None):
def initialize_connection(self, context, volume, connector):
"""Prepare volume for connection from host represented by connector.
This method calls the driver initialize_connection and returns
@ -1389,11 +1365,6 @@ class VolumeManager(manager.SchedulerDependentManager):
json in various places, so it should not contain any non-json
data types.
"""
# FIXME(bluex): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@ -1592,7 +1563,7 @@ class VolumeManager(manager.SchedulerDependentManager):
self.db.volume_update(ctxt, volume['id'],
{'status': status})
else:
conn = self.initialize_connection(ctxt, volume['id'], properties)
conn = self.initialize_connection(ctxt, volume, properties)
attach_info = self._connect_device(conn)
try:
@ -1772,11 +1743,8 @@ class VolumeManager(manager.SchedulerDependentManager):
remote='dest')
# The above call is synchronous so we complete the migration
self.migrate_volume_completion(ctxt, volume.id,
new_volume.id,
error=False,
volume=volume,
new_volume=new_volume)
self.migrate_volume_completion(ctxt, volume, new_volume,
error=False)
else:
nova_api = compute.API()
# This is an async call to Nova, which will call the completion
@ -1833,15 +1801,7 @@ class VolumeManager(manager.SchedulerDependentManager):
"source volume may have been deleted."),
{'vol': new_volume.id})
def migrate_volume_completion(self, ctxt, volume_id, new_volume_id,
error=False, volume=None, new_volume=None):
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None or new_volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(ctxt, volume_id)
new_volume = objects.Volume.get_by_id(ctxt, new_volume_id)
def migrate_volume_completion(self, ctxt, volume, new_volume, error=False):
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@ -1926,15 +1886,9 @@ class VolumeManager(manager.SchedulerDependentManager):
resource=volume)
return volume.id
def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
new_type_id=None, volume=None):
def migrate_volume(self, ctxt, volume, host, force_host_copy=False,
new_type_id=None):
"""Migrate the volume to the specified host (called on source host)."""
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(ctxt, volume_id)
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@ -2146,14 +2100,7 @@ class VolumeManager(manager.SchedulerDependentManager):
context, snapshot, event_suffix,
extra_usage_info=extra_usage_info, host=self.host)
def extend_volume(self, context, volume_id, new_size, reservations,
volume=None):
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def extend_volume(self, context, volume, new_size, reservations):
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@ -2204,9 +2151,9 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.info(_LI("Extend volume completed successfully."),
resource=volume)
def retype(self, context, volume_id, new_type_id, host,
def retype(self, context, volume, new_type_id, host,
migration_policy='never', reservations=None,
volume=None, old_reservations=None):
old_reservations=None):
def _retype_error(context, volume, old_reservations,
new_reservations, status_update):
@ -2217,12 +2164,6 @@ class VolumeManager(manager.SchedulerDependentManager):
QUOTAS.rollback(context, old_reservations)
QUOTAS.rollback(context, new_reservations)
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
status_update = {'status': volume.previous_status}
if context.project_id != volume.project_id:
project_id = volume.project_id
@ -2346,7 +2287,7 @@ class VolumeManager(manager.SchedulerDependentManager):
volume.save()
try:
self.migrate_volume(context, volume.id, host,
self.migrate_volume(context, volume, host,
new_type_id=new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
@ -2372,13 +2313,7 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.info(_LI("Retype volume completed successfully."),
resource=volume)
def manage_existing(self, ctxt, volume_id, ref=None, volume=None):
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(ctxt, volume_id)
def manage_existing(self, ctxt, volume, ref=None):
vol_ref = self._run_manage_existing_flow_engine(
ctxt, volume, ref)
@ -4266,191 +4201,3 @@ class VolumeManager(manager.SchedulerDependentManager):
def secure_file_operations_enabled(self, ctxt, volume):
secure_enabled = self.driver.secure_file_operations_enabled()
return secure_enabled
# TODO(dulek): This goes away immediately in Ocata and is just present in
# Newton so that we can receive v2.x and v3.0 messages.
class _VolumeV3Proxy(object):
target = messaging.Target(version='3.1')
def __init__(self, manager):
self.manager = manager
def create_volume(self, context, volume, request_spec=None,
filter_properties=None, allow_reschedule=True):
# NOTE(dulek): We're replacing volume_id with volume object (by
# switching it from optional keyword argument to positional argument).
return self.manager.create_volume(
context, volume.id, request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=allow_reschedule, volume=volume)
def delete_volume(self, context, volume, unmanage_only=False,
cascade=False):
return self.manager.delete_volume(
context, volume.id, unmanage_only=unmanage_only, volume=volume,
cascade=cascade)
def create_snapshot(self, context, snapshot):
return self.manager.create_snapshot(context, snapshot.volume_id,
snapshot)
def delete_snapshot(self, context, snapshot, unmanage_only=False):
return self.manager.delete_snapshot(
context, snapshot, unmanage_only=unmanage_only)
def attach_volume(self, context, volume_id, instance_uuid, host_name,
mountpoint, mode):
return self.manager.attach_volume(
context, volume_id, instance_uuid, host_name, mountpoint, mode)
def detach_volume(self, context, volume_id, attachment_id=None):
return self.manager.detach_volume(context, volume_id,
attachment_id=attachment_id)
def copy_volume_to_image(self, context, volume_id, image_meta):
return self.manager.copy_volume_to_image(context, volume_id,
image_meta)
def initialize_connection(self, context, volume, connector):
# NOTE(dulek): We're replacing volume_id with volume object (by
# switching it from optional keyword argument to positional argument).
return self.manager.initialize_connection(context, volume.id,
connector, volume=volume)
def terminate_connection(self, context, volume_id, connector, force=False):
return self.manager.terminate_connection(context, volume_id, connector,
force=force)
def remove_export(self, context, volume_id):
return self.manager.remove_export(context, volume_id)
def accept_transfer(self, context, volume_id, new_user, new_project):
return self.manager.accept_transfer(context, volume_id, new_user,
new_project)
def migrate_volume_completion(self, ctxt, volume, new_volume, error=False):
# NOTE(dulek): We're replacing volume_id with volume object, same with
# new_volume_id (by switching them from optional keyword arguments to
# positional arguments).
return self.manager.migrate_volume_completion(
ctxt, volume.id, new_volume.id, error=error, volume=volume,
new_volume=new_volume)
def migrate_volume(self, ctxt, volume, host, force_host_copy=False,
new_type_id=None):
# NOTE(dulek): We're replacing volume_id with volume object (by
# switching it from optional keyword argument to positional argument).
return self.manager.migrate_volume(
ctxt, volume.id, host, force_host_copy=force_host_copy,
new_type_id=new_type_id, volume=volume)
def publish_service_capabilities(self, context):
return self.manager.publish_service_capabilities(context)
def extend_volume(self, context, volume, new_size, reservations):
# NOTE(dulek): We're replacing volume_id with volume object (by
# switching it from optional keyword argument to positional argument).
return self.manager.extend_volume(
context, volume.id, new_size, reservations, volume=volume)
def retype(self, context, volume, new_type_id, host,
migration_policy='never', reservations=None,
old_reservations=None):
return self.manager.retype(
context, volume.id, new_type_id, host,
migration_policy=migration_policy, reservations=reservations,
volume=volume, old_reservations=old_reservations)
def manage_existing(self, ctxt, volume, ref=None):
return self.manager.manage_existing(ctxt, volume.id, ref=ref,
volume=volume)
def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys,
sort_dirs):
return self.manager.get_manageable_volumes(ctxt, marker, limit, offset,
sort_keys, sort_dirs)
def promote_replica(self, ctxt, volume_id):
return self.manager.promote_replica(ctxt, volume_id)
def reenable_replication(self, ctxt, volume_id):
return self.manager.reenable_replication(ctxt, volume_id)
def create_consistencygroup(self, context, group):
return self.manager.create_consistencygroup(context, group)
def create_group(self, context, group):
return self.manager.create_group(context, group)
def create_consistencygroup_from_src(self, context, group, cgsnapshot=None,
source_cg=None):
return self.manager.create_consistencygroup_from_src(
context, group, cgsnapshot=cgsnapshot, source_cg=source_cg)
def create_group_from_src(self, context, group, group_snapshot=None,
source_group=None):
return self.manager.create_group_from_src(
context, group, group_snapshot=group_snapshot,
source_group=source_group)
def delete_consistencygroup(self, context, group):
return self.manager.delete_consistencygroup(context, group)
def delete_group(self, context, group):
return self.manager.delete_group(context, group)
def update_consistencygroup(self, context, group, add_volumes=None,
remove_volumes=None):
return self.manager.update_consistencygroup(
context, group, add_volumes=add_volumes,
remove_volumes=remove_volumes)
def update_group(self, context, group, add_volumes=None,
remove_volumes=None):
return self.manager.update_group(
context, group, add_volumes=add_volumes,
remove_volumes=remove_volumes)
def create_cgsnapshot(self, context, cgsnapshot):
return self.manager.create_cgsnapshot(context, cgsnapshot)
def create_group_snapshot(self, context, group_snapshot):
return self.manager.create_group_snapshot(context, group_snapshot)
def delete_cgsnapshot(self, context, cgsnapshot):
return self.manager.delete_cgsnapshot(context, cgsnapshot)
def delete_group_snapshot(self, context, group_snapshot):
return self.manager.delete_group_snapshot(context, group_snapshot)
def update_migrated_volume(self, ctxt, volume, new_volume, volume_status):
return self.manager.update_migrated_volume(ctxt, volume, new_volume,
volume_status)
def failover_host(self, context, secondary_backend_id=None):
return self.manager.failover_host(
context, secondary_backend_id=secondary_backend_id)
def freeze_host(self, context):
return self.manager.freeze_host(context)
def thaw_host(self, context):
return self.manager.thaw_host(context)
def manage_existing_snapshot(self, ctxt, snapshot, ref=None):
return self.manager.manage_existing_snapshot(ctxt, snapshot, ref=ref)
def get_manageable_snapshots(self, ctxt, marker, limit, offset, sort_keys,
sort_dirs):
return self.manager.get_manageable_snapshots(
ctxt, marker, limit, offset, sort_keys, sort_dirs)
def get_capabilities(self, context, discover):
return self.manager.get_capabilities(context, discover)
def get_backup_device(self, ctxt, backup):
return self.manager.get_backup_device(ctxt, backup)
def secure_file_operations_enabled(self, ctxt, volume):
return self.manager.secure_file_operations_enabled(ctxt, volume)

View File

@ -13,10 +13,7 @@
# under the License.
from oslo_serialization import jsonutils
from cinder.common import constants
from cinder import objects
from cinder import quota
from cinder import rpc
from cinder.volume import utils
@ -124,20 +121,20 @@ class VolumeAPI(rpc.RPCAPI):
return self.client.prepare(server=new_host, version=version)
def create_consistencygroup(self, ctxt, group, host):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'create_consistencygroup',
group=group)
def delete_consistencygroup(self, ctxt, group):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'delete_consistencygroup',
group=group)
def update_consistencygroup(self, ctxt, group, add_volumes=None,
remove_volumes=None):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'update_consistencygroup',
group=group,
@ -146,7 +143,7 @@ class VolumeAPI(rpc.RPCAPI):
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
source_cg=None):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
group=group,
@ -154,12 +151,12 @@ class VolumeAPI(rpc.RPCAPI):
source_cg=source_cg)
def create_cgsnapshot(self, ctxt, cgsnapshot):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version)
cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
def delete_cgsnapshot(self, ctxt, cgsnapshot):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version)
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
@ -170,50 +167,35 @@ class VolumeAPI(rpc.RPCAPI):
'allow_reschedule': allow_reschedule,
'volume': volume,
}
version = self._compat_ver('3.0', '2.4', '2.0')
if version in ('2.4', '2.0'):
msg_args['volume_id'] = volume.id
if version == '2.0':
# Send request_spec as dict
msg_args['request_spec'] = jsonutils.to_primitive(request_spec)
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'create_volume', **msg_args)
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
'volume': volume, 'unmanage_only': unmanage_only,
'cascade': cascade,
}
if version == '2.0':
msg_args['volume_id'] = volume.id
cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
msg_args = {
'snapshot': snapshot,
}
if version == '2.0':
msg_args['volume_id'] = volume['id']
cctxt.cast(ctxt, 'create_snapshot', **msg_args)
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
unmanage_only=unmanage_only)
def attach_volume(self, ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
return cctxt.call(ctxt, 'attach_volume',
volume_id=volume['id'],
@ -223,68 +205,59 @@ class VolumeAPI(rpc.RPCAPI):
mode=mode)
def detach_volume(self, ctxt, volume, attachment_id):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'],
attachment_id=attachment_id)
def copy_volume_to_image(self, ctxt, volume, image_meta):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
image_meta=image_meta)
def initialize_connection(self, ctxt, volume, connector):
version = self._compat_ver('3.0', '2.3', '2.0')
version = '3.0'
msg_args = {'connector': connector, 'volume': volume}
if version in ('2.0', '2.3'):
msg_args['volume_id'] = volume.id
if version == '2.0':
del msg_args['volume']
cctxt = self._get_cctxt(volume['host'], version=version)
return cctxt.call(ctxt, 'initialize_connection', **msg_args)
def terminate_connection(self, ctxt, volume, connector, force=False):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
connector=connector, force=force)
def remove_export(self, ctxt, volume):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
def publish_service_capabilities(self, ctxt):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self.client.prepare(fanout=True, version=version)
cctxt.cast(ctxt, 'publish_service_capabilities')
def accept_transfer(self, ctxt, volume, new_user, new_project):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
new_user=new_user, new_project=new_project)
def extend_volume(self, ctxt, volume, new_size, reservations):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
'volume': volume, 'new_size': new_size,
'reservations': reservations,
}
if version == '2.0':
msg_args['volume_id'] = volume.id
cctxt.cast(ctxt, 'extend_volume', **msg_args)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
@ -292,23 +265,16 @@ class VolumeAPI(rpc.RPCAPI):
'force_host_copy': force_host_copy,
}
if version == '2.0':
msg_args['volume_id'] = volume.id
cctxt.cast(ctxt, 'migrate_volume', **msg_args)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
'volume': volume, 'new_volume': new_volume, 'error': error,
}
if version == '2.0':
msg_args['volume_id'] = volume.id
msg_args['new_volume_id'] = new_volume.id
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
def retype(self, ctxt, volume, new_type_id, dest_host,
@ -316,7 +282,7 @@ class VolumeAPI(rpc.RPCAPI):
old_reservations=None):
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
@ -325,26 +291,19 @@ class VolumeAPI(rpc.RPCAPI):
'old_reservations': old_reservations,
}
if version == '2.0':
msg_args['volume_id'] = volume.id
cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, volume, ref):
msg_args = {
'ref': ref, 'volume': volume,
}
version = self._compat_ver('3.0', '2.2', '2.0')
if version in ('2.2', '2.0'):
msg_args['volume_id'] = volume.id
if version == '2.0':
msg_args.pop('volume')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'manage_existing', **msg_args)
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(new_volume['host'], version)
cctxt.call(ctxt,
'update_migrated_volume',
@ -354,61 +313,50 @@ class VolumeAPI(rpc.RPCAPI):
def freeze_host(self, ctxt, host):
"""Set backend host to frozen."""
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(host, version)
return cctxt.call(ctxt, 'freeze_host')
def thaw_host(self, ctxt, host):
"""Clear the frozen setting on a backend host."""
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(host, version)
return cctxt.call(ctxt, 'thaw_host')
def failover_host(self, ctxt, host, secondary_backend_id=None):
"""Failover host to the specified backend_id (secondary). """
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'failover_host',
secondary_backend_id=secondary_backend_id)
def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'manage_existing_snapshot',
snapshot=snapshot,
ref=ref)
def get_capabilities(self, ctxt, host, discover):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(host, version)
return cctxt.call(ctxt, 'get_capabilities', discover=discover)
def get_backup_device(self, ctxt, backup, volume):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
backup_dict = cctxt.call(ctxt, 'get_backup_device', backup=backup)
# FIXME(dulek): Snippet below converts received raw dicts to o.vo. This
# is only for a case when Mitaka's c-vol will answer us with volume
# dict instead of an o.vo and should go away in early Ocata.
if isinstance(backup_dict.get('backup_device'), dict):
is_snapshot = backup_dict.get('is_snapshot')
obj_class = objects.Snapshot if is_snapshot else objects.Volume
obj = obj_class()
obj_class._from_db_object(ctxt, obj, backup_dict['backup_device'])
backup_dict['backup_device'] = obj
return backup_dict
def secure_file_operations_enabled(self, ctxt, volume):
version = self._compat_ver('3.0', '2.0')
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
return cctxt.call(ctxt, 'secure_file_operations_enabled',
volume=volume)
def get_manageable_volumes(self, ctxt, host, marker, limit, offset,
sort_keys, sort_dirs):
version = self._compat_ver('3.0', '2.1')
version = '3.0'
cctxt = self._get_cctxt(host, version)
return cctxt.call(ctxt, 'get_manageable_volumes', marker=marker,
limit=limit, offset=offset, sort_keys=sort_keys,
@ -416,27 +364,27 @@ class VolumeAPI(rpc.RPCAPI):
def get_manageable_snapshots(self, ctxt, host, marker, limit, offset,
sort_keys, sort_dirs):
version = self._compat_ver('3.0', '2.1')
version = '3.0'
cctxt = self._get_cctxt(host, version)
return cctxt.call(ctxt, 'get_manageable_snapshots', marker=marker,
limit=limit, offset=offset, sort_keys=sort_keys,
sort_dirs=sort_dirs)
def create_group(self, ctxt, group, host):
version = self._compat_ver('3.0', '2.5')
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'create_group',
group=group)
def delete_group(self, ctxt, group):
version = self._compat_ver('3.0', '2.5')
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'delete_group',
group=group)
def update_group(self, ctxt, group, add_volumes=None,
remove_volumes=None):
version = self._compat_ver('3.0', '2.5')
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'update_group',
group=group,
@ -445,7 +393,7 @@ class VolumeAPI(rpc.RPCAPI):
def create_group_from_src(self, ctxt, group, group_snapshot=None,
source_group=None):
version = self._compat_ver('3.0', '2.6')
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'create_group_from_src',
group=group,
@ -453,13 +401,13 @@ class VolumeAPI(rpc.RPCAPI):
source_group=source_group)
def create_group_snapshot(self, ctxt, group_snapshot):
version = self._compat_ver('3.0', '2.6')
version = '3.0'
cctxt = self._get_cctxt(group_snapshot.group.host, version)
cctxt.cast(ctxt, 'create_group_snapshot',
group_snapshot=group_snapshot)
def delete_group_snapshot(self, ctxt, group_snapshot):
version = self._compat_ver('3.0', '2.6')
version = '3.0'
cctxt = self._get_cctxt(group_snapshot.group.host, version)
cctxt.cast(ctxt, 'delete_group_snapshot',
group_snapshot=group_snapshot)