diff --git a/cinder/rpc.py b/cinder/rpc.py index 4b1950e43ce..4308a8819e9 100644 --- a/cinder/rpc.py +++ b/cinder/rpc.py @@ -198,6 +198,13 @@ class RPCAPI(object): self.client = get_client(target, version_cap=rpc_version_cap, serializer=serializer) + def _compat_ver(self, current, *legacy): + versions = (current,) + legacy + for version in versions[:-1]: + if self.client.can_send_version(version): + return version + return versions[-1] + @classmethod def determine_rpc_version_cap(cls): global LAST_RPC_VERSIONS diff --git a/cinder/tests/unit/api/contrib/test_admin_actions.py b/cinder/tests/unit/api/contrib/test_admin_actions.py index 6b05baeb117..cc1afed823d 100644 --- a/cinder/tests/unit/api/contrib/test_admin_actions.py +++ b/cinder/tests/unit/api/contrib/test_admin_actions.py @@ -21,6 +21,7 @@ import webob from webob import exc from cinder.api.contrib import admin_actions +from cinder.backup import rpcapi as backup_rpcapi from cinder.common import constants from cinder import context from cinder import db @@ -28,6 +29,7 @@ from cinder import exception from cinder import objects from cinder.objects import base as obj_base from cinder.objects import fields +from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import test from cinder.tests.unit.api.contrib import test_backups from cinder.tests.unit.api import fakes @@ -87,8 +89,17 @@ class AdminActionsTest(BaseAdminTest): self.patch( 'cinder.objects.Service.get_minimum_obj_version', return_value=obj_base.OBJ_VERSIONS.get_current()) + + def _get_minimum_rpc_version_mock(ctxt, binary): + binary_map = { + 'cinder-volume': rpcapi.VolumeAPI, + 'cinder-backup': backup_rpcapi.BackupAPI, + 'cinder-scheduler': scheduler_rpcapi.SchedulerAPI, + } + return binary_map[binary].RPC_API_VERSION + self.patch('cinder.objects.Service.get_minimum_rpc_version', - return_value=rpcapi.VolumeAPI.RPC_API_VERSION) + side_effect=_get_minimum_rpc_version_mock) def tearDown(self): self.svc.stop() diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index f0018eb6cac..d92dc10f08b 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -55,6 +55,9 @@ class VolumeRpcAPITestCase(test.TestCase): vol['size'] = 1 volume = db.volume_create(self.context, vol) + self.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + kwargs = { 'status': fields.SnapshotStatus.CREATING, 'progress': '0%', @@ -325,24 +328,24 @@ class VolumeRpcAPITestCase(test.TestCase): def test_create_consistencygroup(self): self._test_volume_api('create_consistencygroup', rpc_method='cast', group=self.fake_cg, host='fake_host1', - version='2.0') + version='3.0') def test_delete_consistencygroup(self): self._test_volume_api('delete_consistencygroup', rpc_method='cast', - group=self.fake_cg, version='2.0') + group=self.fake_cg, version='3.0') def test_update_consistencygroup(self): self._test_volume_api('update_consistencygroup', rpc_method='cast', group=self.fake_cg, add_volumes=['vol1'], - remove_volumes=['vol2'], version='2.0') + remove_volumes=['vol2'], version='3.0') def test_create_cgsnapshot(self): self._test_volume_api('create_cgsnapshot', rpc_method='cast', - cgsnapshot=self.fake_cgsnap, version='2.0') + cgsnapshot=self.fake_cgsnap, version='3.0') def test_delete_cgsnapshot(self): self._test_volume_api('delete_cgsnapshot', rpc_method='cast', - cgsnapshot=self.fake_cgsnap, version='2.0') + cgsnapshot=self.fake_cgsnap, version='3.0') @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) def test_create_volume(self, can_send_version): @@ -353,8 +356,8 @@ class VolumeRpcAPITestCase(test.TestCase): request_spec='fake_request_spec', filter_properties='fake_properties', allow_reschedule=True, - version='2.4') - can_send_version.assert_has_calls([mock.call('2.4')]) + version='3.0') + can_send_version.assert_has_calls([mock.call('3.0')]) @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=False) @@ -368,7 +371,7 @@ class VolumeRpcAPITestCase(test.TestCase): filter_properties='fake_properties', allow_reschedule=True, version='2.0') - can_send_version.assert_has_calls([mock.call('2.4')]) + can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.4')]) def test_delete_volume(self): self._test_volume_api('delete_volume', @@ -376,7 +379,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, unmanage_only=False, cascade=False, - version='2.0') + version='3.0') def test_delete_volume_cascade(self): self._test_volume_api('delete_volume', @@ -384,14 +387,14 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, unmanage_only=False, cascade=True, - version='2.0') + version='3.0') def test_create_snapshot(self): self._test_volume_api('create_snapshot', rpc_method='cast', volume=self.fake_volume, snapshot=self.fake_snapshot, - version='2.0') + version='3.0') def test_delete_snapshot(self): self._test_volume_api('delete_snapshot', @@ -399,7 +402,7 @@ class VolumeRpcAPITestCase(test.TestCase): snapshot=self.fake_snapshot, host='fake_host', unmanage_only=False, - version='2.0') + version='3.0') def test_delete_snapshot_with_unmanage_only(self): self._test_volume_api('delete_snapshot', @@ -407,7 +410,7 @@ class VolumeRpcAPITestCase(test.TestCase): snapshot=self.fake_snapshot, host='fake_host', unmanage_only=True, - version='2.0') + version='3.0') def test_attach_volume_to_instance(self): self._test_volume_api('attach_volume', @@ -417,7 +420,7 @@ class VolumeRpcAPITestCase(test.TestCase): host_name=None, mountpoint='fake_mountpoint', mode='ro', - version='2.0') + version='3.0') def test_attach_volume_to_host(self): self._test_volume_api('attach_volume', @@ -427,14 +430,14 @@ class VolumeRpcAPITestCase(test.TestCase): host_name='fake_host', mountpoint='fake_mountpoint', mode='rw', - version='2.0') + version='3.0') def test_detach_volume(self): self._test_volume_api('detach_volume', rpc_method='call', volume=self.fake_volume, attachment_id='fake_uuid', - version="2.0") + version="3.0") def test_copy_volume_to_image(self): self._test_volume_api('copy_volume_to_image', @@ -443,7 +446,7 @@ class VolumeRpcAPITestCase(test.TestCase): image_meta={'id': 'fake_image_id', 'container_format': 'fake_type', 'disk_format': 'fake_type'}, - version='2.0') + version='3.0') @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) def test_initialize_connection(self, mock_can_send_version): @@ -451,7 +454,7 @@ class VolumeRpcAPITestCase(test.TestCase): rpc_method='call', volume=self.fake_volume_obj, connector='fake_connector', - version='2.3') + version='3.0') mock_can_send_version.return_value = False self._test_volume_api('initialize_connection', @@ -466,7 +469,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume, connector='fake_connector', force=False, - version='2.0') + version='3.0') def test_accept_transfer(self): self._test_volume_api('accept_transfer', @@ -476,7 +479,7 @@ class VolumeRpcAPITestCase(test.TestCase): '8ffd-0800200c9b77', new_project='e4465fd0-06c8-11e3' '-8ffd-0800200c9a66', - version='2.0') + version='3.0') def test_extend_volume(self): self._test_volume_api('extend_volume', @@ -484,7 +487,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, new_size=1, reservations=self.fake_reservations, - version='2.0') + version='3.0') def test_migrate_volume(self): class FakeHost(object): @@ -497,7 +500,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, dest_host=dest_host, force_host_copy=True, - version='2.0') + version='3.0') def test_migrate_volume_completion(self): self._test_volume_api('migrate_volume_completion', @@ -505,7 +508,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, new_volume=self.fake_volume_obj, error=False, - version='2.0') + version='3.0') def test_retype(self): class FakeHost(object): @@ -521,9 +524,9 @@ class VolumeRpcAPITestCase(test.TestCase): migration_policy='never', reservations=self.fake_reservations, old_reservations=self.fake_reservations, - version='2.0') + version='3.0') - @ddt.data('2.0', '2.2') + @ddt.data('2.0', '2.2', '3.0') @mock.patch('oslo_messaging.RPCClient.can_send_version') def test_manage_existing(self, version, can_send_version): can_send_version.side_effect = lambda x: x == version @@ -532,7 +535,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, ref={'lv_name': 'foo'}, version=version) - can_send_version.assert_called_once_with('2.2') + can_send_version.assert_has_calls([mock.call('3.0')]) @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) def test_manage_existing_snapshot(self, mock_can_send_version): @@ -554,33 +557,33 @@ class VolumeRpcAPITestCase(test.TestCase): snapshot=my_fake_snapshot_obj, ref='foo', host='fake_host', - version='2.0') + version='3.0') def test_promote_replica(self): self._test_volume_api('promote_replica', rpc_method='cast', volume=self.fake_volume, - version='2.0') + version='3.0') def test_reenable_replica(self): self._test_volume_api('reenable_replication', rpc_method='cast', volume=self.fake_volume, - version='2.0') + version='3.0') def test_freeze_host(self): self._test_volume_api('freeze_host', rpc_method='call', - host='fake_host', version='2.0') + host='fake_host', version='3.0') def test_thaw_host(self): self._test_volume_api('thaw_host', rpc_method='call', host='fake_host', - version='2.0') + version='3.0') def test_failover_host(self): self._test_volume_api('failover_host', rpc_method='cast', host='fake_host', secondary_backend_id='fake_backend', - version='2.0') + version='3.0') def test_create_consistencygroup_from_src_cgsnapshot(self): self._test_volume_api('create_consistencygroup_from_src', @@ -588,7 +591,7 @@ class VolumeRpcAPITestCase(test.TestCase): group=self.fake_cg, cgsnapshot=self.fake_cgsnap, source_cg=None, - version='2.0') + version='3.0') def test_create_consistencygroup_from_src_cg(self): self._test_volume_api('create_consistencygroup_from_src', @@ -596,61 +599,61 @@ class VolumeRpcAPITestCase(test.TestCase): group=self.fake_cg2, cgsnapshot=None, source_cg=self.fake_src_cg, - version='2.0') + version='3.0') def test_get_capabilities(self): self._test_volume_api('get_capabilities', rpc_method='call', host='fake_host', discover=True, - version='2.0') + version='3.0') def test_remove_export(self): self._test_volume_api('remove_export', rpc_method='cast', volume=self.fake_volume, - version='2.0') + version='3.0') def test_get_backup_device(self): self._test_volume_api('get_backup_device', rpc_method='call', backup=self.fake_backup_obj, volume=self.fake_volume_obj, - version='2.0') + version='3.0') def test_secure_file_operations_enabled(self): self._test_volume_api('secure_file_operations_enabled', rpc_method='call', volume=self.fake_volume_obj, - version='2.0') + version='3.0') def test_create_group(self): self._test_group_api('create_group', rpc_method='cast', group=self.fake_group, host='fake_host1', - version='2.5') + version='3.0') def test_delete_group(self): self._test_group_api('delete_group', rpc_method='cast', - group=self.fake_group, version='2.5') + group=self.fake_group, version='3.0') def test_update_group(self): self._test_group_api('update_group', rpc_method='cast', group=self.fake_group, add_volumes=['vol1'], - remove_volumes=['vol2'], version='2.5') + remove_volumes=['vol2'], version='3.0') def test_create_group_from_src(self): self._test_group_api('create_group_from_src', rpc_method='cast', group=self.fake_group, group_snapshot=self.fake_group_snapshot, source_group=None, - version='2.6') + version='3.0') def test_create_group_snapshot(self): self._test_group_api('create_group_snapshot', rpc_method='cast', group_snapshot=self.fake_group_snapshot, - version='2.6') + version='3.0') def test_delete_group_snapshot(self): self._test_group_api('delete_group_snapshot', rpc_method='cast', group_snapshot=self.fake_group_snapshot, - version='2.6') + version='3.0') diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index c45632a7f6c..4a263d9b62e 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -173,7 +173,7 @@ class VolumeManager(manager.SchedulerDependentManager): RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION - target = messaging.Target(version=RPC_API_VERSION) + target = messaging.Target(version='2.6') # On cloning a volume, we shouldn't copy volume_type, consistencygroup # and volume_attachment, because the db sets that according to [field]_id, @@ -190,6 +190,7 @@ class VolumeManager(manager.SchedulerDependentManager): # update_service_capabilities needs service_name to be volume super(VolumeManager, self).__init__(service_name='volume', *args, **kwargs) + self.additional_endpoints.append(_VolumeV3Proxy(self)) self.configuration = config.Configuration(volume_manager_opts, config_group=service_name) self.stats = {} @@ -848,11 +849,11 @@ class VolumeManager(manager.SchedulerDependentManager): snapshot.status = fields.SnapshotStatus.ERROR snapshot.save() - vol_ref = self.db.volume_get(context, volume_id) + vol_ref = self.db.volume_get(context, snapshot.volume_id) if vol_ref.bootable: try: self.db.volume_glance_metadata_copy_to_snapshot( - context, snapshot.id, volume_id) + context, snapshot.id, snapshot.volume_id) except exception.GlanceMetadataNotFound: # If volume is not created from image, No glance metadata # would be available for that volume in @@ -862,7 +863,8 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.exception(_LE("Failed updating snapshot" " metadata using the provided volumes" " %(volume_id)s metadata"), - {'volume_id': volume_id}, resource=snapshot) + {'volume_id': snapshot.volume_id}, + resource=snapshot) snapshot.status = fields.SnapshotStatus.ERROR snapshot.save() raise exception.MetadataCopyFailure(reason=six.text_type(ex)) @@ -4344,3 +4346,191 @@ class VolumeManager(manager.SchedulerDependentManager): def secure_file_operations_enabled(self, ctxt, volume): secure_enabled = self.driver.secure_file_operations_enabled() return secure_enabled + + +# TODO(dulek): This goes away immediately in Ocata and is just present in +# Newton so that we can receive v2.x and v3.0 messages. +class _VolumeV3Proxy(object): + target = messaging.Target(version='3.0') + + def __init__(self, manager): + self.manager = manager + + def create_volume(self, context, volume, request_spec=None, + filter_properties=None, allow_reschedule=True): + # NOTE(dulek): We're replacing volume_id with volume object (by + # switching it from optional keyword argument to positional argument). + return self.manager.create_volume( + context, volume.id, request_spec=request_spec, + filter_properties=filter_properties, + allow_reschedule=allow_reschedule, volume=volume) + + def delete_volume(self, context, volume, unmanage_only=False, + cascade=False): + return self.manager.delete_volume( + context, volume.id, unmanage_only=unmanage_only, volume=volume, + cascade=cascade) + + def create_snapshot(self, context, snapshot): + return self.manager.create_snapshot(context, snapshot.volume_id, + snapshot) + + def delete_snapshot(self, context, snapshot, unmanage_only=False): + return self.manager.delete_snapshot( + context, snapshot, unmanage_only=unmanage_only) + + def attach_volume(self, context, volume_id, instance_uuid, host_name, + mountpoint, mode): + return self.manager.attach_volume( + context, volume_id, instance_uuid, host_name, mountpoint, mode) + + def detach_volume(self, context, volume_id, attachment_id=None): + return self.manager.detach_volume(context, volume_id, + attachment_id=attachment_id) + + def copy_volume_to_image(self, context, volume_id, image_meta): + return self.manager.copy_volume_to_image(context, volume_id, + image_meta) + + def initialize_connection(self, context, volume, connector): + # NOTE(dulek): We're replacing volume_id with volume object (by + # switching it from optional keyword argument to positional argument). + return self.manager.initialize_connection(context, volume.id, + connector, volume=volume) + + def terminate_connection(self, context, volume_id, connector, force=False): + return self.manager.terminate_connection(context, volume_id, connector, + force=force) + + def remove_export(self, context, volume_id): + return self.manager.remove_export(context, volume_id) + + def accept_transfer(self, context, volume_id, new_user, new_project): + return self.manager.accept_transfer(context, volume_id, new_user, + new_project) + + def migrate_volume_completion(self, ctxt, volume, new_volume, error=False): + # NOTE(dulek): We're replacing volume_id with volume object, same with + # new_volume_id (by switching them from optional keyword arguments to + # positional arguments). + return self.manager.migrate_volume_completion( + ctxt, volume.id, new_volume.id, error=error, volume=volume, + new_volume=new_volume) + + def migrate_volume(self, ctxt, volume, host, force_host_copy=False, + new_type_id=None): + # NOTE(dulek): We're replacing volume_id with volume object (by + # switching it from optional keyword argument to positional argument). + return self.manager.migrate_volume( + ctxt, volume.id, host, force_host_copy=force_host_copy, + new_type_id=new_type_id, volume=volume) + + def publish_service_capabilities(self, context): + return self.manager.publish_service_capabilities(context) + + def extend_volume(self, context, volume, new_size, reservations): + # NOTE(dulek): We're replacing volume_id with volume object (by + # switching it from optional keyword argument to positional argument). + return self.manager.extend_volume( + context, volume.id, new_size, reservations, volume=volume) + + def retype(self, context, volume, new_type_id, host, + migration_policy='never', reservations=None, + old_reservations=None): + return self.manager.retype( + context, volume.id, new_type_id, host, + migration_policy=migration_policy, reservations=reservations, + volume=volume, old_reservations=old_reservations) + + def manage_existing(self, ctxt, volume, ref=None): + return self.manager.manage_existing(ctxt, volume.id, ref=ref, + volume=volume) + + def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys, + sort_dirs): + return self.manager.get_manageable_volumes(ctxt, marker, limit, offset, + sort_keys, sort_dirs) + + def promote_replica(self, ctxt, volume_id): + return self.manager.promote_replica(ctxt, volume_id) + + def reenable_replication(self, ctxt, volume_id): + return self.manager.reenable_replication(ctxt, volume_id) + + def create_consistencygroup(self, context, group): + return self.manager.create_consistencygroup(context, group) + + def create_group(self, context, group): + return self.manager.create_group(context, group) + + def create_consistencygroup_from_src(self, context, group, cgsnapshot=None, + source_cg=None): + return self.manager.create_consistencygroup_from_src( + context, group, cgsnapshot=cgsnapshot, source_cg=source_cg) + + def create_group_from_src(self, context, group, group_snapshot=None, + source_group=None): + return self.manager.create_group_from_src( + context, group, group_snapshot=group_snapshot, + source_group=source_group) + + def delete_consistencygroup(self, context, group): + return self.manager.delete_consistencygroup(context, group) + + def delete_group(self, context, group): + return self.manager.delete_group(context, group) + + def update_consistencygroup(self, context, group, add_volumes=None, + remove_volumes=None): + return self.manager.update_consistencygroup( + context, group, add_volumes=add_volumes, + remove_volumes=remove_volumes) + + def update_group(self, context, group, add_volumes=None, + remove_volumes=None): + return self.manager.update_group( + context, group, add_volumes=add_volumes, + remove_volumes=remove_volumes) + + def create_cgsnapshot(self, context, cgsnapshot): + return self.manager.create_cgsnapshot(context, cgsnapshot) + + def create_group_snapshot(self, context, group_snapshot): + return self.manager.create_group_snapshot(context, group_snapshot) + + def delete_cgsnapshot(self, context, cgsnapshot): + return self.manager.delete_cgsnapshot(context, cgsnapshot) + + def delete_group_snapshot(self, context, group_snapshot): + return self.manager.delete_group_snapshot(context, group_snapshot) + + def update_migrated_volume(self, ctxt, volume, new_volume, volume_status): + return self.manager.update_migrated_volume(ctxt, volume, new_volume, + volume_status) + + def failover_host(self, context, secondary_backend_id=None): + return self.manager.failover_host( + context, secondary_backend_id=secondary_backend_id) + + def freeze_host(self, context): + return self.manager.freeze_host(context) + + def thaw_host(self, context): + return self.manager.thaw_host(context) + + def manage_existing_snapshot(self, ctxt, snapshot, ref=None): + return self.manager.manage_existing_snapshot(ctxt, snapshot, ref=ref) + + def get_manageable_snapshots(self, ctxt, marker, limit, offset, sort_keys, + sort_dirs): + return self.manager.get_manageable_snapshots( + self, ctxt, marker, limit, offset, sort_keys, sort_dirs) + + def get_capabilities(self, context, discover): + return self.manager.get_capabilities(context, discover) + + def get_backup_device(self, ctxt, backup): + return self.manager.get_backup_device(ctxt, backup) + + def secure_file_operations_enabled(self, ctxt, volume): + return self.manager.secure_file_operations_enabled(ctxt, volume) diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 02766ad13e7..d408c5eb850 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -104,36 +104,38 @@ class VolumeAPI(rpc.RPCAPI): 2.5 - Adds create_group, delete_group, and update_group 2.6 - Adds create_group_snapshot, delete_group_snapshot, and create_group_from_src(). + + ... Newton supports messaging version 2.6. Any changes to existing + methods in 2.x after that point should be done so that they can handle + the version_cap being set to 2.6. + + 3.0 - Drop 2.x compatibility """ - RPC_API_VERSION = '2.6' + RPC_API_VERSION = '3.0' TOPIC = constants.VOLUME_TOPIC BINARY = 'cinder-volume' - def _compat_ver(self, current, *legacy): - versions = (current,) + legacy - for version in versions[:-1]: - if self.client.can_send_version(version): - return version - return versions[-1] - def _get_cctxt(self, host, version): new_host = utils.get_volume_rpc_host(host) return self.client.prepare(server=new_host, version=version) def create_consistencygroup(self, ctxt, group, host): - cctxt = self._get_cctxt(host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(host, version) cctxt.cast(ctxt, 'create_consistencygroup', group=group) def delete_consistencygroup(self, ctxt, group): - cctxt = self._get_cctxt(group.host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(group.host, version) cctxt.cast(ctxt, 'delete_consistencygroup', group=group) def update_consistencygroup(self, ctxt, group, add_volumes=None, remove_volumes=None): - cctxt = self._get_cctxt(group.host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(group.host, version) cctxt.cast(ctxt, 'update_consistencygroup', group=group, add_volumes=add_volumes, @@ -141,54 +143,75 @@ class VolumeAPI(rpc.RPCAPI): def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None, source_cg=None): - cctxt = self._get_cctxt(group.host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(group.host, version) cctxt.cast(ctxt, 'create_consistencygroup_from_src', group=group, cgsnapshot=cgsnapshot, source_cg=source_cg) def create_cgsnapshot(self, ctxt, cgsnapshot): - cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version) cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot) def delete_cgsnapshot(self, ctxt, cgsnapshot): - cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version) cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot) def create_volume(self, ctxt, volume, host, request_spec, filter_properties, allow_reschedule=True): - msg_args = {'volume_id': volume.id, 'request_spec': request_spec, + msg_args = {'request_spec': request_spec, 'filter_properties': filter_properties, 'allow_reschedule': allow_reschedule, 'volume': volume, } - version = '2.4' - if not self.client.can_send_version('2.4'): + version = self._compat_ver('3.0', '2.4', '2.0') + if version in ('2.4', '2.0'): + msg_args['volume_id'] = volume.id + if version == '2.0': # Send request_spec as dict - version = '2.0' msg_args['request_spec'] = jsonutils.to_primitive(request_spec) cctxt = self._get_cctxt(host, version) cctxt.cast(ctxt, 'create_volume', **msg_args) def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False): - cctxt = self._get_cctxt(volume.host, '2.0') - cctxt.cast(ctxt, 'delete_volume', volume_id=volume.id, - unmanage_only=unmanage_only, volume=volume, cascade=cascade) + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume.host, version) + msg_args = { + 'volume': volume, 'unmanage_only': unmanage_only, + 'cascade': cascade, + } + + if version == '2.0': + msg_args['volume_id'] = volume.id + + cctxt.cast(ctxt, 'delete_volume', **msg_args) def create_snapshot(self, ctxt, volume, snapshot): - cctxt = self._get_cctxt(volume['host'], '2.0') - cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'], - snapshot=snapshot) + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) + msg_args = { + 'snapshot': snapshot, + } + + if version == '2.0': + msg_args['volume_id'] = volume['id'] + + cctxt.cast(ctxt, 'create_snapshot', **msg_args) def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False): - cctxt = self._get_cctxt(host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(host, version) cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot, unmanage_only=unmanage_only) def attach_volume(self, ctxt, volume, instance_uuid, host_name, mountpoint, mode): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) return cctxt.call(ctxt, 'attach_volume', volume_id=volume['id'], instance_uuid=instance_uuid, @@ -197,20 +220,23 @@ class VolumeAPI(rpc.RPCAPI): mode=mode) def detach_volume(self, ctxt, volume, attachment_id): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'], attachment_id=attachment_id) def copy_volume_to_image(self, ctxt, volume, image_meta): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'], image_meta=image_meta) def initialize_connection(self, ctxt, volume, connector): - version = self._compat_ver('2.3', '2.0') - msg_args = {'volume_id': volume.id, 'connector': connector, - 'volume': volume} + version = self._compat_ver('3.0', '2.3', '2.0') + msg_args = {'connector': connector, 'volume': volume} + if version in ('2.0', '2.3'): + msg_args['volume_id'] = volume.id if version == '2.0': del msg_args['volume'] @@ -218,75 +244,115 @@ class VolumeAPI(rpc.RPCAPI): return cctxt.call(ctxt, 'initialize_connection', **msg_args) def terminate_connection(self, ctxt, volume, connector, force=False): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'], connector=connector, force=force) def remove_export(self, ctxt, volume): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) cctxt.cast(ctxt, 'remove_export', volume_id=volume['id']) def publish_service_capabilities(self, ctxt): - cctxt = self.client.prepare(fanout=True, version='2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self.client.prepare(fanout=True, version=version) cctxt.cast(ctxt, 'publish_service_capabilities') def accept_transfer(self, ctxt, volume, new_user, new_project): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'], new_user=new_user, new_project=new_project) def extend_volume(self, ctxt, volume, new_size, reservations): - cctxt = self._get_cctxt(volume.host, '2.0') - cctxt.cast(ctxt, 'extend_volume', volume_id=volume.id, - new_size=new_size, reservations=reservations, volume=volume) + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume.host, version) + msg_args = { + 'volume': volume, 'new_size': new_size, + 'reservations': reservations, + } + + if version == '2.0': + msg_args['volume_id'] = volume.id + + cctxt.cast(ctxt, 'extend_volume', **msg_args) def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): host_p = {'host': dest_host.host, 'capabilities': dest_host.capabilities} - cctxt = self._get_cctxt(volume.host, '2.0') - cctxt.cast(ctxt, 'migrate_volume', volume_id=volume.id, host=host_p, - force_host_copy=force_host_copy, volume=volume) + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume.host, version) + + msg_args = { + 'volume': volume, 'host': host_p, + 'force_host_copy': force_host_copy, + } + + if version == '2.0': + msg_args['volume_id'] = volume.id + + cctxt.cast(ctxt, 'migrate_volume', **msg_args) def migrate_volume_completion(self, ctxt, volume, new_volume, error): - cctxt = self._get_cctxt(volume.host, '2.0') - return cctxt.call(ctxt, 'migrate_volume_completion', - volume_id=volume.id, new_volume_id=new_volume.id, - error=error, volume=volume, new_volume=new_volume) + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume.host, version) + + msg_args = { + 'volume': volume, 'new_volume': new_volume, 'error': error, + } + + if version == '2.0': + msg_args['volume_id'] = volume.id + msg_args['new_volume_id'] = new_volume.id + + return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args) def retype(self, ctxt, volume, new_type_id, dest_host, migration_policy='never', reservations=None, old_reservations=None): host_p = {'host': dest_host.host, 'capabilities': dest_host.capabilities} - cctxt = self._get_cctxt(volume.host, '2.0') - cctxt.cast(ctxt, 'retype', volume_id=volume.id, - new_type_id=new_type_id, host=host_p, - migration_policy=migration_policy, - reservations=reservations, volume=volume, - old_reservations=old_reservations) + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume.host, version) + + msg_args = { + 'volume': volume, 'new_type_id': new_type_id, 'host': host_p, + 'migration_policy': migration_policy, 'reservations': reservations, + 'old_reservations': old_reservations, + } + + if version == '2.0': + msg_args['volume_id'] = volume.id + + cctxt.cast(ctxt, 'retype', **msg_args) def manage_existing(self, ctxt, volume, ref): msg_args = { - 'volume_id': volume.id, 'ref': ref, 'volume': volume, + 'ref': ref, 'volume': volume, } - version = '2.2' - if not self.client.can_send_version('2.2'): - version = '2.0' + version = self._compat_ver('3.0', '2.2', '2.0') + if version in ('2.2', '2.0'): + msg_args['volume_id'] = volume.id + if version == '2.0': msg_args.pop('volume') cctxt = self._get_cctxt(volume.host, version) cctxt.cast(ctxt, 'manage_existing', **msg_args) def promote_replica(self, ctxt, volume): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id']) def reenable_replication(self, ctxt, volume): - cctxt = self._get_cctxt(volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume['host'], version) cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id']) def update_migrated_volume(self, ctxt, volume, new_volume, original_volume_status): - cctxt = self._get_cctxt(new_volume['host'], '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(new_volume['host'], version) cctxt.call(ctxt, 'update_migrated_volume', volume=volume, @@ -295,32 +361,38 @@ class VolumeAPI(rpc.RPCAPI): def freeze_host(self, ctxt, host): """Set backend host to frozen.""" - cctxt = self._get_cctxt(host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(host, version) return cctxt.call(ctxt, 'freeze_host') def thaw_host(self, ctxt, host): """Clear the frozen setting on a backend host.""" - cctxt = self._get_cctxt(host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(host, version) return cctxt.call(ctxt, 'thaw_host') def failover_host(self, ctxt, host, secondary_backend_id=None): """Failover host to the specified backend_id (secondary). """ - cctxt = self._get_cctxt(host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(host, version) cctxt.cast(ctxt, 'failover_host', secondary_backend_id=secondary_backend_id) def manage_existing_snapshot(self, ctxt, snapshot, ref, host): - cctxt = self._get_cctxt(host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(host, version) cctxt.cast(ctxt, 'manage_existing_snapshot', snapshot=snapshot, ref=ref) def get_capabilities(self, ctxt, host, discover): - cctxt = self._get_cctxt(host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(host, version) return cctxt.call(ctxt, 'get_capabilities', discover=discover) def get_backup_device(self, ctxt, backup, volume): - cctxt = self._get_cctxt(volume.host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume.host, version) backup_dict = cctxt.call(ctxt, 'get_backup_device', backup=backup) # FIXME(dulek): Snippet below converts received raw dicts to o.vo. This @@ -336,37 +408,43 @@ class VolumeAPI(rpc.RPCAPI): return backup_dict def secure_file_operations_enabled(self, ctxt, volume): - cctxt = self._get_cctxt(volume.host, '2.0') + version = self._compat_ver('3.0', '2.0') + cctxt = self._get_cctxt(volume.host, version) return cctxt.call(ctxt, 'secure_file_operations_enabled', volume=volume) def get_manageable_volumes(self, ctxt, host, marker, limit, offset, sort_keys, sort_dirs): - cctxt = self._get_cctxt(host, '2.1') + version = self._compat_ver('3.0', '2.1') + cctxt = self._get_cctxt(host, version) return cctxt.call(ctxt, 'get_manageable_volumes', marker=marker, limit=limit, offset=offset, sort_keys=sort_keys, sort_dirs=sort_dirs) def get_manageable_snapshots(self, ctxt, host, marker, limit, offset, sort_keys, sort_dirs): - cctxt = self._get_cctxt(host, '2.1') + version = self._compat_ver('3.0', '2.1') + cctxt = self._get_cctxt(host, version) return cctxt.call(ctxt, 'get_manageable_snapshots', marker=marker, limit=limit, offset=offset, sort_keys=sort_keys, sort_dirs=sort_dirs) def create_group(self, ctxt, group, host): - cctxt = self._get_cctxt(host, '2.5') + version = self._compat_ver('3.0', '2.5') + cctxt = self._get_cctxt(host, version) cctxt.cast(ctxt, 'create_group', group=group) def delete_group(self, ctxt, group): - cctxt = self._get_cctxt(group.host, '2.5') + version = self._compat_ver('3.0', '2.5') + cctxt = self._get_cctxt(group.host, version) cctxt.cast(ctxt, 'delete_group', group=group) def update_group(self, ctxt, group, add_volumes=None, remove_volumes=None): - cctxt = self._get_cctxt(group.host, '2.5') + version = self._compat_ver('3.0', '2.5') + cctxt = self._get_cctxt(group.host, version) cctxt.cast(ctxt, 'update_group', group=group, add_volumes=add_volumes, @@ -374,18 +452,21 @@ class VolumeAPI(rpc.RPCAPI): def create_group_from_src(self, ctxt, group, group_snapshot=None, source_group=None): - cctxt = self._get_cctxt(group.host, '2.6') + version = self._compat_ver('3.0', '2.6') + cctxt = self._get_cctxt(group.host, version) cctxt.cast(ctxt, 'create_group_from_src', group=group, group_snapshot=group_snapshot, source_group=source_group) def create_group_snapshot(self, ctxt, group_snapshot): - cctxt = self._get_cctxt(group_snapshot.group.host, '2.6') + version = self._compat_ver('3.0', '2.6') + cctxt = self._get_cctxt(group_snapshot.group.host, version) cctxt.cast(ctxt, 'create_group_snapshot', group_snapshot=group_snapshot) def delete_group_snapshot(self, ctxt, group_snapshot): - cctxt = self._get_cctxt(group_snapshot.group.host, '2.6') + version = self._compat_ver('3.0', '2.6') + cctxt = self._get_cctxt(group_snapshot.group.host, version) cctxt.cast(ctxt, 'delete_group_snapshot', group_snapshot=group_snapshot)