diff --git a/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py b/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py index f4fcabd4ff4..1b8aae874d6 100644 --- a/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py +++ b/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py @@ -29,6 +29,7 @@ from cinder.tests.unit import utils as testutils from cinder.volume import configuration as conf import cinder.volume.drivers.ibm.ibm_storage as storage from cinder.volume.drivers.ibm.ibm_storage import proxy +from cinder.volume import group_types from cinder.volume import volume_types # mock decorator logger for all unit test cases. @@ -44,6 +45,7 @@ mock_logger.stop() TEST_VOLUME_ID = '0001' TEST_HOST_ID = 'H1' TEST_VOLUME_BACKEND_NAME = 'ds8k_backend' +TEST_GROUP_HOST = 'test_host@' + TEST_VOLUME_BACKEND_NAME + '#fakepool' TEST_LUN_ID = '00' TEST_POOLS_STR = 'P0,P1' TEST_POOL_ID_1 = 'P0' @@ -858,6 +860,10 @@ FAKE_REST_API_RESPONSES = { FAKE_GET_FB_LSS_RESPONSE_3, TEST_TARGET_DS8K_IP + '/lss/' + TEST_LSS_ID_3 + '/get': FAKE_GET_FB_LSS_RESPONSE_3, + TEST_SOURCE_DS8K_IP + '/lss/' + TEST_LCU_ID + '/get': + FAKE_GET_CKD_LSS_RESPONSE, + TEST_TARGET_DS8K_IP + '/lss/' + TEST_LCU_ID + '/get': + FAKE_GET_CKD_LSS_RESPONSE, TEST_SOURCE_DS8K_IP + '/lss/fb/get': FAKE_GET_FB_LSS_RESPONSE_1, TEST_SOURCE_DS8K_IP + '/lss/ckd/get': @@ -978,7 +984,9 @@ class FakeDS8KCommonHelper(helper.DS8KCommonHelper): def _get_value(self, key): value = getattr(self.conf, key, None) - return value if value else self.conf.get(key) + if not value and key not in self.OPTIONAL_PARAMS: + value = self.conf.get(key) + return value def _create_client(self): self._client = FakeRESTScheduler(self._get_value('san_ip'), @@ -1040,13 +1048,13 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy): proxy.IBMStorageProxy.__init__(self, storage_info, logger, exception, driver, active_backend_id) - self._helper = None self._replication = None self._connector_obj = HTTPConnectorObject self._replication_enabled = False self._active_backend_id = active_backend_id self.configuration = driver.configuration + self.consisgroup_cache = {} self.setup(None) def setup(self, context): @@ -1113,13 +1121,13 @@ class DS8KProxyTest(test.TestCase): def _create_snapshot(self, **kwargs): return testutils.create_snapshot(self.ctxt, **kwargs) - def _create_consistencygroup(self, **kwargs): - return testutils.create_consistencygroup(self.ctxt, **kwargs) + def _create_group(self, **kwargs): + return testutils.create_group(self.ctxt, **kwargs) - def _create_cgsnapshot(self, cg_id, **kwargs): - return testutils.create_cgsnapshot(self.ctxt, - consistencygroup_id= cg_id, - **kwargs) + def _create_group_snapshot(self, group_id, **kwargs): + return testutils.create_group_snapshot(self.ctxt, + group_id=group_id, + **kwargs) def test_check_host_type(self): """host type should be a valid one.""" @@ -1192,7 +1200,6 @@ class DS8KProxyTest(test.TestCase): "total_capacity_gb": 10, "free_capacity_gb": 10, "reserved_percentage": 0, - "consistencygroup_support": True, "consistent_group_snapshot_enabled": True, "multiattach": False } @@ -1225,7 +1232,7 @@ class DS8KProxyTest(test.TestCase): """create volume should choose biggest pool.""" self.configuration.san_clustername = TEST_POOLS_STR cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertEqual(TEST_POOL_ID_1, pool_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') @@ -1239,7 +1246,7 @@ class DS8KProxyTest(test.TestCase): "configvols": "0" }] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertNotEqual(TEST_LSS_ID_1, lss_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') @@ -1254,7 +1261,7 @@ class DS8KProxyTest(test.TestCase): "configvols": "0" }] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertEqual(TEST_LSS_ID_2, lss_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') @@ -1284,11 +1291,11 @@ class DS8KProxyTest(test.TestCase): } ] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertEqual(TEST_LSS_ID_2, lss_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') - @mock.patch.object(helper.DS8KCommonHelper, '_find_from_unexisting_lss') + @mock.patch.object(helper.DS8KCommonHelper, '_find_from_nonexistent_lss') def test_find_lss_when_no_existing_lss_available(self, mock_find_lss, mock_get_all_lss): """find LSS when no existing LSSs are available.""" @@ -1300,7 +1307,7 @@ class DS8KProxyTest(test.TestCase): "configvols": "256" }] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertTrue(mock_find_lss.called) @mock.patch.object(helper.DS8KCommonHelper, '_find_lss') @@ -1309,7 +1316,127 @@ class DS8KProxyTest(test.TestCase): cmn_helper = FakeDS8KCommonHelper(self.configuration, None) mock_find_lss.return_value = None self.assertRaises(restclient.LssIDExhaustError, - cmn_helper.find_available_lss, None, False, None) + cmn_helper.find_pool_lss_pair, None, False, None) + + def test_find_lss_for_volume_which_belongs_to_cg(self): + """find lss for volume, which is in empty CG.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + self.assertTrue(lss in ['20', '21', '22', '23']) + + def test_find_lss_for_volume_which_belongs_to_cg2(self): + """find lss for volume, which is in CG having volumes.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + self._create_volume(group_id=group.id, + provider_location=location) + volume = self._create_volume(group_id=group.id) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + self.assertEqual(lss, '20') + + def test_find_lss_for_volume_which_belongs_to_cg3(self): + """find lss for volume, and other CGs have volumes.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + + group_type2 = group_types.create( + self.ctxt, + 'group2', + {'consistent_group_snapshot_enabled': ' True'} + ) + group2 = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type2.id) + location = six.text_type({'vol_hex_id': '2000'}) + self._create_volume(group_id=group2.id, + provider_location=location) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + self.assertNotEqual(lss, '20') + + def test_find_lss_for_volume_which_belongs_to_cg4(self): + """find lss for volume, and other CGs are in error state.""" + self.configuration.lss_range_for_cg = '20' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + + group_type2 = group_types.create( + self.ctxt, + 'group2', + {'consistent_group_snapshot_enabled': ' True'} + ) + group2 = self._create_group(status='error', + host=TEST_GROUP_HOST, + group_type_id=group_type2.id) + location = six.text_type({'vol_hex_id': '2000'}) + self._create_volume(group_id=group2.id, + provider_location=location) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + # error group will be ignored, so LSS 20 can be used. + self.assertEqual(lss, '20') + + def test_create_volume_and_assign_to_group_with_wrong_host(self): + # create volume for group which has wrong format of host. + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host="fake_invalid_host", + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_volume, volume) @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') def test_create_volume_but_lss_full_afterwards(self, mock_create_lun): @@ -1342,11 +1469,7 @@ class DS8KProxyTest(test.TestCase): TEST_VOLUME_ID, ast.literal_eval(vol['provider_location'])['vol_hex_id']) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'drivers:os400': '050'}) - def test_create_volume_of_OS400_050(self, mock_volume_types): + def test_create_volume_of_OS400_050(self): """create volume which type is OS400 050.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) @@ -1360,12 +1483,8 @@ class DS8KProxyTest(test.TestCase): ast.literal_eval(vol['provider_location'])['vol_hex_id']) self.assertEqual('050 FB 520UV', vol['metadata']['data_type']) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'drivers:thin_provision': 'False'}) @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') - def test_create_eckd_volume(self, mock_create_lun, mock_volume_types): + def test_create_eckd_volume(self, mock_create_lun): """create volume which type is ECKD.""" self.configuration.connection_type = ( storage.XIV_CONNECTION_TYPE_FC_ECKD) @@ -1470,18 +1589,14 @@ class DS8KProxyTest(test.TestCase): device['san_clustername'] = TEST_ECKD_POOL_ID repl = FakeReplication(src_helper, device) repl.check_physical_links() - lss_pair = repl.find_available_lss_pair(None) + pool_lss_pair = repl.find_pool_lss_pair(None) - expected_lss_pair = {'source': (TEST_ECKD_POOL_ID, TEST_LCU_ID), - 'target': (TEST_ECKD_POOL_ID, TEST_LCU_ID)} - self.assertDictEqual(expected_lss_pair, lss_pair) + expected_pair = {'source': (TEST_ECKD_POOL_ID, TEST_LCU_ID), + 'target': (TEST_ECKD_POOL_ID, TEST_LCU_ID)} + self.assertDictEqual(expected_pair, pool_lss_pair) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(eventlet, 'sleep') - def test_create_fb_replicated_volume(self, mock_sleep, mock_volume_types): + def test_create_fb_replicated_volume(self, mock_sleep): """create FB volume when enable replication.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1499,17 +1614,12 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(TEST_VOLUME_ID, repl[TEST_TARGET_DS8K_IP]['vol_hex_id']) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths') @mock.patch.object(replication.MetroMirrorManager, 'create_pprc_path') @mock.patch.object(eventlet, 'sleep') def test_create_fb_replicated_vol_but_no_path_available(self, mock_sleep, create_pprc_path, - get_pprc_paths, - mock_volume_types): + get_pprc_paths): """create replicated volume but no pprc paths are available.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1536,14 +1646,10 @@ class DS8KProxyTest(test.TestCase): self.driver.create_volume(volume) self.assertTrue(create_pprc_path.called) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths') @mock.patch.object(eventlet, 'sleep') def test_create_fb_replicated_vol_and_verify_lss_in_path( - self, mock_sleep, get_pprc_paths, mock_volume_types): + self, mock_sleep, get_pprc_paths): """create replicated volume should verify the LSS in pprc paths.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1588,14 +1694,10 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(TEST_LSS_ID_1, repl[TEST_TARGET_DS8K_IP]['vol_hex_id'][:2]) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths') @mock.patch.object(eventlet, 'sleep') def test_create_fb_replicated_vol_when_paths_available( - self, mock_sleep, get_pprc_paths, mock_volume_types): + self, mock_sleep, get_pprc_paths): """create replicated volume when multiple pprc paths are available.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1640,14 +1742,10 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(TEST_LSS_ID_1, repl[TEST_TARGET_DS8K_IP]['vol_hex_id'][:2]) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') @mock.patch.object(eventlet, 'sleep') def test_create_replicated_vol_but_lss_full_afterwards( - self, mock_sleep, create_lun, mock_volume_types): + self, mock_sleep, create_lun): """create replicated volume but lss is full afterwards.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1715,15 +1813,10 @@ class DS8KProxyTest(test.TestCase): self.driver.delete_volume(volume) self.assertFalse(mock_delete_lun.called) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun_by_id') @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') def test_delete_fb_replicated_volume(self, mock_delete_lun, - mock_delete_lun_by_id, - mock_volume_types): + mock_delete_lun_by_id): """Delete volume when enable replication.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1801,7 +1894,7 @@ class DS8KProxyTest(test.TestCase): location = six.text_type({'vol_hex_id': None}) tgt_vol = self._create_volume(volume_type_id=vol_type.id, provider_location=location) - self.assertRaises(restclient.APIException, + self.assertRaises(exception.VolumeDriverException, self.driver.create_cloned_volume, tgt_vol, src_vol) @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') @@ -1877,12 +1970,7 @@ class DS8KProxyTest(test.TestCase): replication_driver_data=data) self.driver.extend_volume(volume, 2) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) - def test_extend_replicated_volume_that_has_been_failed_over( - self, mock_volume_types): + def test_extend_replicated_volume_that_has_been_failed_over(self): """extend replicated volume which has been failed over should fail.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -2415,210 +2503,375 @@ class DS8KProxyTest(test.TestCase): fake_connector = {'ip': '127.0.0.1', 'initiator': 'iqn.fake'} self.driver.terminate_connection(volume, fake_connector) - def test_delete_consistencygroup_sucessfully(self): - """test a successful cg deletion.""" + def test_create_consistency_group(self): + """user should reserve LSS for consistency group.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_group, + self.ctxt, group) + + def test_delete_consistency_group_sucessfully(self): + """test a successful consistency group deletion.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) + volume = self._create_volume(provider_location=location, + group_id=group.id) model_update, volumes_model_update = ( - self.driver.delete_consistencygroup(self.ctxt, cg, [volume])) + self.driver.delete_group(self.ctxt, group, [volume])) self.assertEqual('deleted', volumes_model_update[0]['status']) - self.assertEqual('deleted', model_update['status']) + self.assertEqual(fields.GroupStatus.DELETED, + model_update['status']) @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') - def test_delete_consistencygroup_failed(self, mock_delete_lun): - """test a failed cg deletion.""" + def test_delete_consistency_group_failed(self, mock_delete_lun): + """test a failed consistency group deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) + volume = self._create_volume(provider_location=location, + group_id=group.id) mock_delete_lun.side_effect = ( restclient.APIException('delete volume failed.')) model_update, volumes_model_update = ( - self.driver.delete_consistencygroup(self.ctxt, cg, [volume])) + self.driver.delete_group(self.ctxt, group, [volume])) self.assertEqual('error_deleting', volumes_model_update[0]['status']) - self.assertEqual('error_deleting', model_update['status']) + self.assertEqual(fields.GroupStatus.ERROR_DELETING, + model_update['status']) + + def test_create_consistency_group_without_reserve_lss(self): + """user should reserve LSS for group if it enables cg.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_group, self.ctxt, group) + + def test_update_generic_group_without_enable_cg(self): + """update group which not enable cg should return None.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create(self.ctxt, 'group', {}) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + volume = self._create_volume(provider_location=location) + model_update, add_volumes_update, remove_volumes_update = ( + self.driver.update_group(self.ctxt, group, [volume], [])) + self.assertIsNone(model_update) + self.assertIsNone(add_volumes_update) + self.assertIsNone(remove_volumes_update) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + def test_update_generic_group_when_enable_cg(self, mock_create_lun, + mock_get_flashcopy, + mock_sleep): + """update group, but volume is not in LSS which belongs to group.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(provider_location=location, + volume_metadata=metadata) + + mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' + model_update, add_volumes_update, remove_volumes_update = ( + self.driver.update_group(self.ctxt, group, [volume], [])) + location = ast.literal_eval(add_volumes_update[0]['provider_location']) + self.assertEqual('2200', location['vol_hex_id']) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + def test_update_generic_group_when_enable_cg2(self, mock_create_lun, + mock_get_flashcopy, + mock_sleep): + """add replicated volume into group.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + + vol_type = volume_types.create( + self.ctxt, 'VOL_TYPE', {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata) + + mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' + model_update, add_volumes_update, remove_volumes_update = ( + self.driver.update_group(self.ctxt, group, [volume], [])) + location = ast.literal_eval(add_volumes_update[0]['provider_location']) + self.assertEqual('2200', location['vol_hex_id']) + + @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') + def test_delete_generic_group_failed(self, mock_delete_lun): + """test a failed group deletion.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create(self.ctxt, 'group', {}) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + volume = self._create_volume(group_type_id=group_type.id, + provider_location=location, + group_id=group.id) + mock_delete_lun.side_effect = ( + restclient.APIException('delete volume failed.')) + model_update, volumes_model_update = ( + self.driver.delete_group(self.ctxt, group, [volume])) + self.assertEqual('error_deleting', volumes_model_update[0]['status']) + self.assertEqual(fields.GroupStatus.ERROR_DELETING, + model_update['status']) def test_delete_generic_group_sucessfully(self): """test a successful generic group deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) + group_type = group_types.create(self.ctxt, 'CG', {}) + group = self._create_group(group_type_id=group_type.id) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, + volume = self._create_volume(group_type_id=group_type.id, provider_location=location, - consistencygroup_id=cg.id) + group_id=group.id) model_update, volumes_model_update = ( - self.driver.delete_group(self.ctxt, cg, [volume])) + self.driver.delete_group(self.ctxt, group, [volume])) self.assertEqual('deleted', volumes_model_update[0]['status']) - self.assertEqual('deleted', model_update['status']) + self.assertEqual(fields.GroupStatus.DELETED, model_update['status']) - @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') - def test_delete_generic_group_failed(self, mock_delete_lun): - """test a failed cg deletion.""" - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self) - self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - mock_delete_lun.side_effect = ( - restclient.APIException('delete volume failed.')) - model_update, volumes_model_update = ( - self.driver.delete_group(self.ctxt, cg, [volume])) - self.assertEqual('error_deleting', volumes_model_update[0]['status']) - self.assertEqual('error_deleting', model_update['status']) - - @mock.patch('oslo_concurrency.lockutils.external_lock', - new=mock.MagicMock()) @mock.patch.object(eventlet, 'sleep') @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') - def test_create_cgsnapshot_sucessfully(self, mock_get_flashcopy, - mock_sleep): - """test a successful cgsnapshot creation.""" + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + def test_create_consistency_group_snapshot_sucessfully( + self, mock_create_lun, mock_get_flashcopy, mock_sleep): + """test a successful consistency group snapshot creation.""" + self.configuration.lss_range_for_cg = '20-23' self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=cg.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + volume = self._create_volume(provider_location=location, + group_id=group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=group.id, + group_type_id=group_type.id)) snapshot = self._create_snapshot(volume_id=volume.id, - volume_type_id=cg_type.id, - cgsnapshot_id=cg_snapshot.id) + group_snapshot_id=group_snapshot.id) mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' model_update, snapshots_model_update = ( - self.driver.create_cgsnapshot(self.ctxt, cg_snapshot, [snapshot])) + self.driver.create_group_snapshot( + self.ctxt, group_snapshot, [snapshot])) + location = ast.literal_eval( + snapshots_model_update[0]['provider_location']) + self.assertEqual('2200', location['vol_hex_id']) self.assertEqual('available', snapshots_model_update[0]['status']) - self.assertEqual('available', model_update['status']) + self.assertEqual(fields.GroupStatus.AVAILABLE, model_update['status']) - def test_delete_cgsnapshot_sucessfully(self): - """test a successful cgsnapshot deletion.""" + def test_delete_consistency_group_snapshot_sucessfully(self): + """test a successful consistency group snapshot deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=cg.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + volume = self._create_volume(provider_location=location, + group_id=group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=group.id, + group_type_id=group_type.id)) snapshot = self._create_snapshot(volume_id=volume.id, - volume_type_id=cg_type.id, - cgsnapshot_id=cg_snapshot.id) + group_snapshot_id=group_snapshot.id) model_update, snapshots_model_update = ( - self.driver.delete_cgsnapshot(self.ctxt, cg_snapshot, [snapshot])) + self.driver.delete_group_snapshot( + self.ctxt, group_snapshot, [snapshot])) self.assertEqual('deleted', snapshots_model_update[0]['status']) - self.assertEqual('deleted', model_update['status']) + self.assertEqual(fields.GroupSnapshotStatus.DELETED, + model_update['status']) @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') - def test_delete_cgsnapshot_failed(self, mock_delete_lun): - """test a failed cgsnapshot deletion.""" + def test_delete_consistency_group_snapshot_failed(self, + mock_delete_lun): + """test a failed consistency group snapshot deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=cg.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + volume = self._create_volume(provider_location=location, + group_id=group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=group.id, + group_type_id=group_type.id)) snapshot = self._create_snapshot(volume_id=volume.id, - volume_type_id=cg_type.id, - cgsnapshot_id=cg_snapshot.id) + group_snapshot_id=group_snapshot.id) mock_delete_lun.side_effect = ( restclient.APIException('delete snapshot failed.')) model_update, snapshots_model_update = ( - self.driver.delete_cgsnapshot(self.ctxt, cg_snapshot, [snapshot])) + self.driver.delete_group_snapshot( + self.ctxt, group_snapshot, [snapshot])) self.assertEqual('error_deleting', snapshots_model_update[0]['status']) - self.assertEqual('error_deleting', model_update['status']) - - @mock.patch('oslo_concurrency.lockutils.external_lock', - new=mock.MagicMock()) - @mock.patch.object(eventlet, 'sleep') - @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') - def test_create_consisgroup_from_consisgroup(self, mock_get_flashcopy, - mock_sleep): - """test creation of consistency group from consistency group.""" - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self) - self.driver.setup(self.ctxt) - - cg_type = volume_types.create(self.ctxt, 'CG', {}) - src_cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - src_vol = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=src_cg.id) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - volume = self._create_volume(volume_type_id=cg_type.id, - consistencygroup_id=cg.id) - mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] - model_update, volumes_model_update = ( - self.driver.create_consistencygroup_from_src( - self.ctxt, cg, [volume], None, None, src_cg, [src_vol])) - self.assertEqual(TEST_VOLUME_ID, - volumes_model_update[0]['metadata']['vol_hex_id']) - self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE, + self.assertEqual(fields.GroupSnapshotStatus.ERROR_DELETING, model_update['status']) - @mock.patch('oslo_concurrency.lockutils.external_lock', - new=mock.MagicMock()) @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') - def test_create_consisgroup_from_cgsnapshot(self, mock_get_flashcopy, - mock_sleep): - """test creation of consistency group from cgsnapshot.""" + def test_create_consisgroup_from_consisgroup(self, mock_get_flashcopy, + mock_create_lun, mock_sleep): + """test creation of consistency group from consistency group.""" + self.configuration.lss_range_for_cg = '20-23' self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - src_cg = self._create_consistencygroup(volume_type_id=cg_type.id) - src_vol = self._create_volume(volume_type_id=cg_type.id, - consistencygroup_id=src_cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=src_cg.id) - location = six.text_type({'vol_hex_id': '0002'}) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + src_group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + src_vol = self._create_volume(provider_location=location, + group_id=src_group.id) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' + model_update, volumes_model_update = ( + self.driver.create_group_from_src( + self.ctxt, group, [volume], None, None, src_group, [src_vol])) + self.assertEqual('2200', + volumes_model_update[0]['metadata']['vol_hex_id']) + self.assertEqual(fields.GroupStatus.AVAILABLE, + model_update['status']) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') + def test_create_consisgroup_from_cgsnapshot(self, mock_get_flashcopy, + mock_create_lun, mock_sleep): + """test creation of consistency group from cgsnapshot.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + src_group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + src_vol = self._create_volume(group_id=src_group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=src_group.id, + group_type_id=group_type.id)) + location = six.text_type({'vol_hex_id': '2000'}) snapshot = self._create_snapshot(volume_id=src_vol.id, - volume_type_id=cg_type.id, provider_location=location, - cgsnapshot_id=cg_snapshot.id) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - volume = self._create_volume(volume_type_id=cg_type.id, - consistencygroup_id=cg.id) + group_snapshot_id=group_snapshot.id) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' model_update, volumes_model_update = ( - self.driver.create_consistencygroup_from_src( - self.ctxt, cg, [volume], cg_snapshot, [snapshot], None, None)) - self.assertEqual(TEST_VOLUME_ID, - volumes_model_update[0]['metadata']['vol_hex_id']) - self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE, + self.driver.create_group_from_src( + self.ctxt, group, [volume], group_snapshot, + [snapshot], None, None)) + self.assertEqual( + '2200', volumes_model_update[0]['metadata']['vol_hex_id']) + self.assertEqual(fields.GroupStatus.AVAILABLE, model_update['status']) @mock.patch.object(eventlet, 'sleep') @@ -2647,13 +2900,8 @@ class DS8KProxyTest(test.TestCase): self.ctxt, [volume], TEST_TARGET_DS8K_IP) self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(replication.Replication, 'do_pprc_failover') - def test_failover_host_failed(self, mock_do_pprc_failover, - mock_volume_types): + def test_failover_host_failed(self, mock_do_pprc_failover): """Failover host should raise exception when failed.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -2801,13 +3049,8 @@ class DS8KProxyTest(test.TestCase): self.ctxt, [volume], 'default') self.assertEqual('default', secondary_id) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(replication.Replication, 'start_pprc_failback') - def test_failback_host_failed(self, mock_start_pprc_failback, - mock_volume_types): + def test_failback_host_failed(self, mock_start_pprc_failback): """Failback host should raise exception when failed.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py index 4a2a865fd35..30b07cbfb9c 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py @@ -61,6 +61,8 @@ def filter_alnum(s): class DS8KCommonHelper(object): """Manage the primary backend, it is common class too.""" + OPTIONAL_PARAMS = ['ds8k_host_type', 'lss_range_for_cg'] + def __init__(self, conf, HTTPConnectorObject=None): self.conf = conf self._connector_obj = HTTPConnectorObject @@ -76,9 +78,13 @@ class DS8KCommonHelper(object): def _get_value(self, key): if getattr(self.conf, 'safe_get', 'get') == 'get': - return self.conf.get(key) + value = self.conf.get(key) else: - return self.conf.safe_get(key) + value = self.conf.safe_get(key) + if not value and key not in self.OPTIONAL_PARAMS: + raise exception.InvalidParameterValue( + err=(_('Param [%s] should be provided.') % key)) + return value def get_thin_provision(self): return self._disable_thin_provision @@ -100,6 +106,7 @@ class DS8KCommonHelper(object): self._get_storage_information() self._check_host_type() self.backend['pools_str'] = self._get_value('san_clustername') + self._get_lss_ids_for_cg() self._verify_version() self._verify_pools() @@ -109,8 +116,8 @@ class DS8KCommonHelper(object): def _get_certificate(self, host): cert_file = strings.CERTIFICATES_PATH + host + '.pem' - msg = "certificate file for DS8K %(host)s: %(cert)s" - LOG.debug(msg, {'host': host, 'cert': cert_file}) + LOG.debug("certificate file for DS8K %(host)s: %(cert)s", + {'host': host, 'cert': cert_file}) # Use the certificate if it exists, otherwise use the System CA Bundle if os.path.exists(cert_file): return cert_file @@ -119,23 +126,23 @@ class DS8KCommonHelper(object): return True def _create_client(self): + san_ip = self._get_value('san_ip') try: clear_pass = cryptish.decrypt(self._get_value('san_password')) except TypeError: - err = _('Param [san_password] is invalid.') - raise exception.InvalidParameterValue(err=err) - verify = self._get_certificate(self._get_value('san_ip')) + raise exception.InvalidParameterValue( + err=_('Param [san_password] is invalid.')) + verify = self._get_certificate(san_ip) try: self._client = restclient.RESTScheduler( - self._get_value('san_ip'), + san_ip, self._get_value('san_login'), clear_pass, self._connector_obj, verify) except restclient.TimeoutException: - msg = (_("Can't connect to %(host)s") % - {'host': self._get_value('san_ip')}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("Can't connect to %(host)s") % {'host': san_ip})) self.backend['rest_version'] = self._get_version()['bundle_version'] LOG.info("Connection to DS8K storage system %(host)s has been " "established successfully, the version of REST is %(rest)s.", @@ -148,12 +155,31 @@ class DS8KCommonHelper(object): self.backend['storage_wwnn'] = storage_info['wwnn'] self.backend['storage_version'] = storage_info['release'] + def _get_lss_ids_for_cg(self): + lss_range = self._get_value('lss_range_for_cg') + if lss_range: + lss_range = lss_range.replace(' ', '').split('-') + if len(lss_range) == 1: + begin = int(lss_range[0], 16) + end = begin + else: + begin = int(lss_range[0], 16) + end = int(lss_range[1], 16) + if begin > 0xFF or end > 0xFF or begin > end: + raise exception.InvalidParameterValue( + err=_('Param [lss_range_for_cg] is invalid, it ' + 'should be within 00-FF.')) + self.backend['lss_ids_for_cg'] = set( + ('%02x' % i).upper() for i in range(begin, end + 1)) + else: + self.backend['lss_ids_for_cg'] = set() + def _check_host_type(self): ds8k_host_type = self._get_value('ds8k_host_type') - if ((ds8k_host_type is not None) and + if (ds8k_host_type and (ds8k_host_type not in VALID_HOST_TYPES)): - msg = (_("Param [ds8k_host_type] must be one of: %(values)s.") % - {'values': VALID_HOST_TYPES[1:-1]}) + msg = (_("Param [ds8k_host_type] must be one of: %(values)s.") + % {'values': VALID_HOST_TYPES[1:-1]}) LOG.error(msg) raise exception.InvalidParameterValue(err=msg) self.backend['host_type_override'] = ( @@ -161,12 +187,12 @@ class DS8KCommonHelper(object): def _verify_version(self): if self.backend['storage_version'] == '8.0.1': - msg = (_("8.0.1 does not support bulk deletion of volumes, " - "if you want to use this version of driver, " - "please upgrade the CCL, and make sure the REST " - "version is not lower than %s.") - % VALID_REST_VERSION_5_8_MIN) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + data=(_("8.0.1 does not support bulk deletion of volumes, " + "if you want to use this version of driver, " + "please upgrade the CCL, and make sure the REST " + "version is not lower than %s.") + % VALID_REST_VERSION_5_8_MIN)) else: if (('5.7' in self.backend['rest_version'] and dist_version.LooseVersion(self.backend['rest_version']) < @@ -174,13 +200,13 @@ class DS8KCommonHelper(object): ('5.8' in self.backend['rest_version'] and dist_version.LooseVersion(self.backend['rest_version']) < dist_version.LooseVersion(VALID_REST_VERSION_5_8_MIN))): - msg = (_("REST version %(invalid)s is lower than " - "%(valid)s, please upgrade it in DS8K.") - % {'invalid': self.backend['rest_version'], - 'valid': (VALID_REST_VERSION_5_7_MIN if '5.7' in - self.backend['rest_version'] else - VALID_REST_VERSION_5_8_MIN)}) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + data=(_("REST version %(invalid)s is lower than " + "%(valid)s, please upgrade it in DS8K.") + % {'invalid': self.backend['rest_version'], + 'valid': (VALID_REST_VERSION_5_7_MIN if '5.7' in + self.backend['rest_version'] else + VALID_REST_VERSION_5_8_MIN)})) if self._connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: if (dist_version.LooseVersion(self.backend['storage_version']) < @@ -193,15 +219,15 @@ class DS8KCommonHelper(object): elif self._connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: ptype = 'ckd' else: - err = _('Param [connection_type] is invalid.') - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=_('Param [connection_type] is invalid.')) self._storage_pools = self.get_pools() for pid, p in self._storage_pools.items(): if p['stgtype'] != ptype: LOG.error('The stgtype of pool %(pool)s is %(ptype)s.', {'pool': pid, 'ptype': p['stgtype']}) - err = _('Param [san_clustername] is invalid.') - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err='Param [san_clustername] is invalid.') @proxy.logger def get_pools(self, new_pools=None): @@ -229,7 +255,7 @@ class DS8KCommonHelper(object): }) for p in pools) @proxy.logger - def find_available_lss(self, pool, find_new_pid, excluded_lss): + def find_pool_lss_pair(self, pool, find_new_pid, excluded_lss): if pool: node = int(pool[1:], 16) % 2 lss = self._find_lss(node, excluded_lss) @@ -237,9 +263,9 @@ class DS8KCommonHelper(object): return (pool, lss) else: if not find_new_pid: - msg = _('All LSS/LCU IDs for configured pools on ' - 'storage are exhausted.') - raise restclient.LssIDExhaustError(message=msg) + raise restclient.LssIDExhaustError( + message=_('All LSS/LCU IDs for configured pools ' + 'on storage are exhausted.')) # find new pool id and lss for lun return self.find_biggest_pool_and_lss(excluded_lss) @@ -250,8 +276,8 @@ class DS8KCommonHelper(object): lss = self._find_lss(pool['node'], excluded_lss) if lss: return pool_id, lss - msg = _("All LSS/LCU IDs for configured pools are exhausted.") - raise restclient.LssIDExhaustError(message=msg) + raise restclient.LssIDExhaustError( + message=_("All LSS/LCU IDs for configured pools are exhausted.")) @proxy.logger def _find_lss(self, node, excluded_lss): @@ -259,20 +285,35 @@ class DS8KCommonHelper(object): existing_lss = self.get_all_lss(fileds) LOG.info("existing LSS IDs are: %s.", ','.join([lss['id'] for lss in existing_lss])) + existing_lss_cg, nonexistent_lss_cg = ( + self._classify_lss_for_cg(existing_lss)) + # exclude LSSs that are full. if excluded_lss: existing_lss = [lss for lss in existing_lss if lss['id'] not in excluded_lss] - lss = self._find_from_existing_lss(node, existing_lss) - lss = lss if lss else self._find_from_unexisting_lss(node, - existing_lss) + # exclude LSSs that reserved for CG. + candidates = [lss for lss in existing_lss + if lss['id'] not in existing_lss_cg] + lss = self._find_from_existing_lss(node, candidates) + if not lss: + lss = self._find_from_nonexistent_lss(node, existing_lss, + nonexistent_lss_cg) return lss + def _classify_lss_for_cg(self, existing_lss): + existing_lss_ids = set(lss['id'] for lss in existing_lss) + existing_lss_cg = existing_lss_ids & self.backend['lss_ids_for_cg'] + nonexistent_lss_cg = self.backend['lss_ids_for_cg'] - existing_lss_cg + return existing_lss_cg, nonexistent_lss_cg + def _find_from_existing_lss(self, node, existing_lss): + # exclude LSSs that are used by PPRC paths. lss_in_pprc = self.get_lss_in_pprc_paths() if lss_in_pprc: existing_lss = [lss for lss in existing_lss if lss['id'] not in lss_in_pprc] + # exclude wrong type of LSSs and those that are not in expected node. existing_lss = [lss for lss in existing_lss if lss['type'] == 'fb' and int(lss['group']) == node] lss_id = None @@ -286,18 +327,19 @@ class DS8KCommonHelper(object): {'lss': lss_id, 'num': lss['configvols']}) return lss_id - def _find_from_unexisting_lss(self, node, existing_lss): + def _find_from_nonexistent_lss(self, node, existing_lss, lss_cg=None): addrgrps = set(int(lss['addrgrp'], 16) for lss in existing_lss if lss['type'] == 'ckd' and int(lss['group']) == node) - fulllss = set(int(lss['id'], 16) for lss in existing_lss if lss['type'] == 'fb' and int(lss['group']) == node) - - # look for an available lss from unexisting lss + cglss = set(int(lss, 16) for lss in lss_cg) if lss_cg else set() + # look for an available lss from nonexistent lss lss_id = None for lss in range(node, LSS_SLOTS, 2): addrgrp = lss // 16 - if addrgrp not in addrgrps and lss not in fulllss: + if (addrgrp not in addrgrps and + lss not in fulllss and + lss not in cglss): lss_id = ("%02x" % lss).upper() break LOG.info('_find_from_unexisting_lss: choose %s.', lss_id) @@ -314,7 +356,7 @@ class DS8KCommonHelper(object): if lun.type_os400: volData['os400'] = lun.type_os400 volData['name'] = lun.ds_name - volData['pool'], volData['lss'] = lun.lss_pair['source'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['source'] lun.ds_id = self._create_lun(volData) return lun @@ -339,7 +381,7 @@ class DS8KCommonHelper(object): else: lun_ids_str = ','.join(lun_ids) lun_ids = [] - LOG.error("Deleting volumes: %s.", lun_ids_str) + LOG.info("Deleting volumes: %s.", lun_ids_str) self._delete_lun(lun_ids_str) def get_lss_in_pprc_paths(self): @@ -361,7 +403,7 @@ class DS8KCommonHelper(object): vol_ids = [vol['volume_id'] for vol in host['mappings_briefs']] if vol_id in vol_ids: host_ids.append(host['id']) - LOG.info('_find_host: host IDs are: %s.', host_ids) + LOG.info('_find_host: host IDs are %s.', ','.join(host_ids)) return host_ids def wait_flashcopy_finished(self, src_luns, tgt_luns): @@ -378,9 +420,9 @@ class DS8KCommonHelper(object): continue if fcs[0]['state'] not in ('valid', 'validation_required'): - msg = (_('Flashcopy ended up in bad state %s. ' - 'Rolling back.') % fcs[0]['state']) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_('Flashcopy ended up in bad state %s. ' + 'Rolling back.') % fcs[0]['state'])) if fc_state.count(False) == 0: break finished = True @@ -420,10 +462,10 @@ class DS8KCommonHelper(object): unfinished_pairs = [p for p in pairs if p['state'] != state] for p in unfinished_pairs: if p['state'] in invalid_states: - msg = (_('Metro Mirror pair %(id)s enters into ' - 'state %(state)s. ') % - {'id': p['id'], 'state': p['state']}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_('Metro Mirror pair %(id)s enters into ' + 'state %(state)s. ') + % {'id': p['id'], 'state': p['state']})) finally: if not finished and delete: pair_ids = {'ids': ','.join([p['id'] for p in pairs])} @@ -459,22 +501,19 @@ class DS8KCommonHelper(object): hp['wwpn'] for hp in host_ports) unconfigured_ports = set( hp['wwpn'] for hp in host_ports if not hp['host_id']) - msg = ("initialize_connection: defined_hosts: %(defined)s, " - "unknown_ports: %(unknown)s, unconfigured_ports: " - "%(unconfigured)s.") - LOG.debug(msg, { - "defined": defined_hosts, - "unknown": unknown_ports, - "unconfigured": unconfigured_ports - }) + LOG.debug("initialize_connection: defined_hosts: %(defined)s, " + "unknown_ports: %(unknown)s, unconfigured_ports: " + "%(unconfigured)s.", {"defined": defined_hosts, + "unknown": unknown_ports, + "unconfigured": unconfigured_ports}) # Create host if it is not defined if not defined_hosts: host_id = self._create_host(host)['id'] elif len(defined_hosts) == 1: host_id = defined_hosts.pop() else: - msg = _('More than one host defined for requested ports.') - raise restclient.APIException(message=msg) + raise restclient.APIException( + message='More than one host defined for requested ports.') LOG.info('Volume will be attached to host %s.', host_id) # Create missing host ports @@ -511,13 +550,11 @@ class DS8KCommonHelper(object): host_ports = None delete_ports = None defined_hosts = self._find_host(vol_id) - msg = ("terminate_connection: host_ports: %(host)s, defined_hosts: " - "%(defined)s, delete_ports: %(delete)s.") - LOG.debug(msg, { - "host": host_ports, - "defined": defined_hosts, - "delete": delete_ports - }) + LOG.debug("terminate_connection: host_ports: %(host)s, " + "defined_hosts: %(defined)s, delete_ports: %(delete)s.", + {"host": host_ports, + "defined": defined_hosts, + "delete": delete_ports}) if not defined_hosts: LOG.info('Could not find host.') @@ -552,33 +589,49 @@ class DS8KCommonHelper(object): target_map = {initiator.upper(): target_ports for initiator in connector['wwpns']} ret_info['data']['initiator_target_map'] = target_map - return ret_info return ret_info - def create_group(self, ctxt, group): + def create_group(self, group): return {'status': fields.GroupStatus.AVAILABLE} - def delete_group(self, ctxt, group, luns): + def delete_group(self, group, src_luns): volumes_model_update = [] model_update = {'status': fields.GroupStatus.DELETED} - if luns: + if src_luns: try: - self.delete_lun(luns) - except restclient.APIException: + self.delete_lun(src_luns) + except restclient.APIException as e: model_update['status'] = fields.GroupStatus.ERROR_DELETING LOG.exception( - "Failed to delete the volumes in group %(group)s", - {'group': group.id}) + "Failed to delete the volumes in group %(group)s, " + "Exception = %(ex)s", + {'group': group.id, 'ex': e}) - for lun in luns: + for src_lun in src_luns: volumes_model_update.append({ - 'id': lun.os_id, + 'id': src_lun.os_id, 'status': model_update['status'] }) return model_update, volumes_model_update - def update_group(self, ctxt, group, add_volumes, remove_volumes): - return None, None, None + def delete_group_snapshot(self, group_snapshot, tgt_luns): + snapshots_model_update = [] + model_update = {'status': fields.GroupSnapshotStatus.DELETED} + if tgt_luns: + try: + self.delete_lun(tgt_luns) + except restclient.APIException as e: + model_update['status'] = ( + fields.GroupSnapshotStatus.ERROR_DELETING) + LOG.error("Failed to delete snapshots in group snapshot " + "%(gsnapshot)s, Exception = %(ex)s", + {'gsnapshot': group_snapshot.id, 'ex': e}) + for tgt_lun in tgt_luns: + snapshots_model_update.append({ + 'id': tgt_lun.os_id, + 'status': model_update['status'] + }) + return model_update, snapshots_model_update def _delete_lun(self, lun_ids_str): self._client.send('DELETE', '/volumes', @@ -768,28 +821,35 @@ class DS8KReplicationSourceHelper(DS8KCommonHelper): excluded_lss) if lss: return pool_id, lss - msg = _("All LSS/LCU IDs for configured pools are exhausted.") - raise restclient.LssIDExhaustError(message=msg) + raise restclient.LssIDExhaustError( + message=_("All LSS/LCU IDs for configured pools are exhausted.")) @proxy.logger def _find_lss_for_type_replication(self, node, excluded_lss): - # prefer to choose the non-existing one firstly + # prefer to choose non-existing one first. fileds = ['id', 'type', 'addrgrp', 'group', 'configvols'] existing_lss = self.get_all_lss(fileds) LOG.info("existing LSS IDs are %s", ','.join([lss['id'] for lss in existing_lss])) - lss_id = self._find_from_unexisting_lss(node, existing_lss) + existing_lss_cg, nonexistent_lss_cg = ( + self._classify_lss_for_cg(existing_lss)) + lss_id = self._find_from_nonexistent_lss(node, existing_lss, + nonexistent_lss_cg) if not lss_id: if excluded_lss: existing_lss = [lss for lss in existing_lss if lss['id'] not in excluded_lss] - lss_id = self._find_from_existing_lss(node, existing_lss) + candidates = [lss for lss in existing_lss + if lss['id'] not in existing_lss_cg] + lss_id = self._find_from_existing_lss(node, candidates) return lss_id class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): """Manage target storage for replication.""" + OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs'] + def setup(self): self._create_client() self._get_storage_information() @@ -814,6 +874,21 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): self.backend['port_pairs'] = port_pairs self.backend['id'] = self._get_value('backend_id') + @proxy.logger + def _find_lss_for_type_replication(self, node, excluded_lss): + # prefer to choose non-existing one first. + fileds = ['id', 'type', 'addrgrp', 'group', 'configvols'] + existing_lss = self.get_all_lss(fileds) + LOG.info("existing LSS IDs are %s", + ','.join([lss['id'] for lss in existing_lss])) + lss_id = self._find_from_nonexistent_lss(node, existing_lss) + if not lss_id: + if excluded_lss: + existing_lss = [lss for lss in existing_lss + if lss['id'] not in excluded_lss] + lss_id = self._find_from_existing_lss(node, existing_lss) + return lss_id + def create_lun(self, lun): volData = { 'cap': self._gb2b(lun.size), @@ -826,7 +901,7 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): volData['os400'] = lun.type_os400 volData['name'] = lun.replica_ds_name - volData['pool'], volData['lss'] = lun.lss_pair['target'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['target'] volID = self._create_lun(volData) lun.replication_driver_data.update( {self.backend['id']: {'vol_hex_id': volID}}) @@ -848,16 +923,19 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): class DS8KECKDHelper(DS8KCommonHelper): """Manage ECKD volume.""" + OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs', 'ds8k_ssid_prefix', + 'lss_range_for_cg'] + @staticmethod def _gb2cyl(gb): # now only support 3390, no 3380 or 3390-A cyl = int(math.ceil(gb * 1263.28)) if cyl > 65520: - msg = (_("For 3390 volume, capacity can be in the range " - "1-65520(849KiB to 55.68GiB) cylinders, now it " - "is %(gb)d GiB, equals to %(cyl)d cylinders.") % - {'gb': gb, 'cyl': cyl}) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + message=(_("For 3390 volume, capacity can be in the range " + "1-65520(849KiB to 55.68GiB) cylinders, now it " + "is %(gb)d GiB, equals to %(cyl)d cylinders.") + % {'gb': gb, 'cyl': cyl})) return cyl @staticmethod @@ -874,6 +952,7 @@ class DS8KECKDHelper(DS8KCommonHelper): self._create_client() self._get_storage_information() self._check_host_type() + self._get_lss_ids_for_cg() self.backend['pools_str'] = self._get_value('san_clustername') ssid_prefix = self._get_value('ds8k_ssid_prefix') self.backend['ssid_prefix'] = ssid_prefix if ssid_prefix else 'FF' @@ -885,10 +964,10 @@ class DS8KECKDHelper(DS8KCommonHelper): def _check_and_verify_lcus(self): map_str = self._get_value('ds8k_devadd_unitadd_mapping') if not map_str: - err = _('Param [ds8k_devadd_unitadd_mapping] is not ' - 'provided, please provide the mapping between ' - 'IODevice address and unit address.') - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=_('Param [ds8k_devadd_unitadd_mapping] is not ' + 'provided, please provide the mapping between ' + 'IODevice address and unit address.')) # verify the LCU mappings = map_str.replace(' ', '').upper().split(';') @@ -896,9 +975,9 @@ class DS8KECKDHelper(DS8KCommonHelper): dev_mapping = {p[1]: int(p[0], 16) for p in pairs} for lcu in dev_mapping.keys(): if int(lcu, 16) > 255: - err = (_('LCU %s in param [ds8k_devadd_unitadd_mapping]' - 'is invalid, it should be within 00-FF.') % lcu) - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=(_('LCU %s in param [ds8k_devadd_unitadd_mapping]' + 'is invalid, it should be within 00-FF.') % lcu)) # verify address group all_lss = self.get_all_lss(['id', 'type']) @@ -907,23 +986,24 @@ class DS8KECKDHelper(DS8KCommonHelper): ckd_addrgrp = set((int(lcu, 16) // 16) for lcu in dev_mapping.keys()) intersection = ckd_addrgrp & fb_addrgrp if intersection: - msg = (_('Invaild LCUs which first digit is %s, they are' - 'for fb volume.') % ', '.join(intersection)) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + message=(_('LCUs which first digit is %s are invalid, they ' + 'are for FB volume.') % ', '.join(intersection))) # create LCU that doesn't exist ckd_lss = set(lss['id'] for lss in all_lss if lss['type'] == 'ckd') - unexisting_lcu = set(dev_mapping.keys()) - ckd_lss - if unexisting_lcu: - LOG.info('LCUs %s do not exist in DS8K, they will be created.', - ','.join(unexisting_lcu)) - for lcu in unexisting_lcu: + nonexistent_lcu = set(dev_mapping.keys()) - ckd_lss + if nonexistent_lcu: + LOG.info('LCUs %s do not exist in DS8K, they will be ' + 'created.', ','.join(nonexistent_lcu)) + for lcu in nonexistent_lcu: try: self._create_lcu(self.backend['ssid_prefix'], lcu) except restclient.APIException as e: - msg = (_('can not create lcu %(lcu)s, Exception= ' - '%(e)s') % {'lcu': lcu, 'e': six.text_type(e)}) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + message=(_('Can not create lcu %(lcu)s, ' + 'Exception= %(e)s.') + % {'lcu': lcu, 'e': six.text_type(e)})) return dev_mapping def _format_pools(self, pools): @@ -944,14 +1024,19 @@ class DS8KECKDHelper(DS8KCommonHelper): # all LCUs have existed, not like LSS all_lss = self.get_all_lss(['id', 'type', 'group', 'configvols']) existing_lcu = [lss for lss in all_lss if lss['type'] == 'ckd'] + excluded_lcu = excluded_lcu or [] candidate_lcu = [lcu for lcu in existing_lcu if ( lcu['id'] in self.backend['device_mapping'].keys() and lcu['id'] not in excluded_lcu and lcu['group'] == str(node))] + + # exclude LCUs reserved for CG. + candidate_lcu = [lss for lss in candidate_lcu if lss['id'] + not in self.backend['lss_ids_for_cg']] if not candidate_lcu: return None - # perfer to use LCU that is not in PPRC path first. + # prefer to use LCU that is not in PPRC path first. lcu_pprc = self.get_lss_in_pprc_paths() & set( self.backend['device_mapping'].keys()) if lcu_pprc: @@ -984,7 +1069,7 @@ class DS8KECKDHelper(DS8KCommonHelper): } lun.data_type = '3390' volData['name'] = lun.ds_name - volData['pool'], volData['lss'] = lun.lss_pair['source'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['source'] lun.ds_id = self._create_lun(volData) return lun @@ -1030,7 +1115,7 @@ class DS8KReplicationTargetECKDHelper(DS8KECKDHelper, lun.data_type = '3390' volData['name'] = lun.replica_ds_name - volData['pool'], volData['lss'] = lun.lss_pair['target'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['target'] volID = self._create_lun(volData) lun.replication_driver_data.update( {self.backend['id']: {'vol_hex_id': volID}}) diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py index f9e11c30e7f..9a579882a92 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py @@ -37,7 +37,7 @@ volume_driver = cinder.volume.drivers.ibm.ibm_storage.ibm_storage.IBMStorageDriver chap = disabled connection_type = fibre_channel -replication_device = backend_id: bar, +replication_device = connection_type: fibre_channel, backend_id: bar, san_ip: bar.com, san_login: actual_username, san_password: actual_password, san_clustername: P4, port_pairs: I0236-I0306; I0237-I0307 @@ -57,24 +57,28 @@ connection_type = fibre_channel """ import ast +import collections import json import six from oslo_config import cfg from oslo_log import log as logging +from cinder import context +from cinder import coordination from cinder import exception from cinder.i18n import _ +from cinder import objects from cinder.objects import fields from cinder.utils import synchronized import cinder.volume.drivers.ibm.ibm_storage as storage +from cinder.volume.drivers.ibm.ibm_storage import ( + ds8k_replication as replication) from cinder.volume.drivers.ibm.ibm_storage import ds8k_helper as helper -from cinder.volume.drivers.ibm.ibm_storage \ - import ds8k_replication as replication from cinder.volume.drivers.ibm.ibm_storage import ds8k_restclient as restclient from cinder.volume.drivers.ibm.ibm_storage import proxy from cinder.volume.drivers.ibm.ibm_storage import strings -from cinder.volume import group_types +from cinder.volume import utils from cinder.volume import volume_types LOG = logging.getLogger(__name__) @@ -91,10 +95,7 @@ EXTRA_SPECS_DEFAULTS = { 'thin': True, 'replication_enabled': False, 'consistency': False, - 'os400': '', - 'consistent_group_replication_enabled': False, - 'group_replication_enabled': False, - 'consistent_group_snapshot_enabled': False, + 'os400': '' } ds8k_opts = [ @@ -105,7 +106,11 @@ ds8k_opts = [ cfg.StrOpt( 'ds8k_ssid_prefix', default='FF', - help='Set the first two digits of SSID'), + help='Set the first two digits of SSID.'), + cfg.StrOpt( + 'lss_range_for_cg', + default='', + help='Reserve LSSs for consistency group.'), cfg.StrOpt( 'ds8k_host_type', default='auto', @@ -135,16 +140,23 @@ class Lun(object): self.type_os400 = lun.type_os400 self.data_type = lun.data_type self.type_replication = lun.type_replication + self.group = lun.group if not self.is_snapshot and self.type_replication: self.replica_ds_name = lun.replica_ds_name - self.replication_driver_data = lun.replication_driver_data + self.replication_driver_data = ( + lun.replication_driver_data.copy()) self.replication_status = lun.replication_status - self.lss_pair = lun.lss_pair + self.pool_lss_pair = lun.pool_lss_pair def update_volume(self, lun): volume_update = lun.get_volume_update() volume_update['provider_location'] = six.text_type({ 'vol_hex_id': self.ds_id}) + if self.type_replication: + volume_update['replication_driver_data'] = json.dumps( + self.replication_driver_data) + volume_update['metadata']['replication'] = six.text_type( + self.replication_driver_data) volume_update['metadata']['vol_hex_id'] = self.ds_id return volume_update @@ -165,19 +177,22 @@ class Lun(object): if volume.provider_location: provider_location = ast.literal_eval(volume.provider_location) - self.ds_id = provider_location[six.text_type('vol_hex_id')] + self.ds_id = provider_location['vol_hex_id'] else: self.ds_id = None self.cinder_name = volume.display_name - self.lss_pair = {} + self.pool_lss_pair = {} self.is_snapshot = is_snapshot if self.is_snapshot: + self.group = (Group(volume.group_snapshot, True) + if volume.group_snapshot else None) self.size = volume.volume_size # ds8k supports at most 16 chars self.ds_name = ( "OS%s:%s" % ('snap', helper.filter_alnum(self.cinder_name)) )[:16] else: + self.group = Group(volume.group) if volume.group else None self.size = volume.size self.ds_name = ( "OS%s:%s" % ('vol', helper.filter_alnum(self.cinder_name)) @@ -193,17 +208,17 @@ class Lun(object): # now only support one replication target. replication_target = sorted( self.replication_driver_data.values())[0] - replica_id = replication_target[six.text_type('vol_hex_id')] - self.lss_pair = { + replica_id = replication_target['vol_hex_id'] + self.pool_lss_pair = { 'source': (None, self.ds_id[0:2]), 'target': (None, replica_id[0:2]) } if os400: if os400 not in VALID_OS400_VOLUME_TYPES.keys(): - msg = (_("The OS400 volume type provided, %s, is not " - "a valid volume type.") % os400) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("The OS400 volume type provided, %s, is not " + "a valid volume type.") % os400)) self.type_os400 = os400 if os400 not in ['050', '099']: self.size = VALID_OS400_VOLUME_TYPES[os400] @@ -284,13 +299,14 @@ class Lun(object): class Group(object): """provide group information for driver from group db object.""" - def __init__(self, group): - gid = group.get('group_type_id') - specs = group_types.get_group_type_specs(gid) if gid else {} - self.type_cg_snapshot = specs.get( - 'consistent_group_snapshot_enabled', ' %s' % - EXTRA_SPECS_DEFAULTS['consistent_group_snapshot_enabled'] - ).upper() == strings.METADATA_IS_TRUE + def __init__(self, group, is_snapshot=False): + self.id = group.id + self.host = group.host + if is_snapshot: + self.snapshots = group.snapshots + else: + self.volumes = group.volumes + self.consisgroup_enabled = utils.is_group_a_cg_snapshot_type(group) class DS8KProxy(proxy.IBMStorageProxy): @@ -307,6 +323,9 @@ class DS8KProxy(proxy.IBMStorageProxy): self._active_backend_id = active_backend_id self.configuration = driver.configuration self.configuration.append_config_values(ds8k_opts) + # TODO(jiamin): this cache is used to handle concurrency issue, but it + # hurts HA, we will find whether is it possible to store it in storage. + self.consisgroup_cache = {} @proxy._trace_time def setup(self, ctxt): @@ -325,9 +344,9 @@ class DS8KProxy(proxy.IBMStorageProxy): self._helper = helper.DS8KECKDHelper(self.configuration, self._connector_obj) else: - err = (_("Param [connection_type] %s is invalid.") - % connection_type) - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=(_("Param [connection_type] %s is invalid.") + % connection_type)) if replication_devices: self._do_replication_setup(replication_devices, self._helper) @@ -335,9 +354,9 @@ class DS8KProxy(proxy.IBMStorageProxy): @proxy.logger def _do_replication_setup(self, devices, src_helper): if len(devices) >= 2: - err = _("Param [replication_device] is invalid, Driver " - "support only one replication target.") - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=_("Param [replication_device] is invalid, Driver " + "support only one replication target.")) self._replication = replication.Replication(src_helper, devices[0]) self._replication.check_physical_links() @@ -368,9 +387,9 @@ class DS8KProxy(proxy.IBMStorageProxy): LOG.error(msg) raise exception.CinderException(message=msg) else: - msg = (_('Backend %s is not initialized.') - % self.configuration.volume_backend_name) - raise exception.CinderException(data=msg) + raise exception.VolumeDriverException( + message=(_('Backend %s is not initialized.') + % self.configuration.volume_backend_name)) stats = { "volume_backend_name": self.configuration.volume_backend_name, @@ -384,7 +403,6 @@ class DS8KProxy(proxy.IBMStorageProxy): "free_capacity_gb": self._b2gb( sum(p['capavail'] for p in storage_pools.values())), "reserved_percentage": self.configuration.reserved_percentage, - "consistencygroup_support": True, "consistent_group_snapshot_enabled": True, "multiattach": False } @@ -397,7 +415,7 @@ class DS8KProxy(proxy.IBMStorageProxy): def _assert(self, assert_condition, exception_message=''): if not assert_condition: LOG.error(exception_message) - raise restclient.APIException(data=exception_message) + raise exception.VolumeDriverException(message=exception_message) @proxy.logger def _create_lun_helper(self, lun, pool=None, find_new_pid=True): @@ -415,20 +433,107 @@ class DS8KProxy(proxy.IBMStorageProxy): raise restclient.APIException(message=msg) # There is a time gap between find available LSS slot and # lun actually occupies it. - excluded_lss = [] + excluded_lss = set() while True: try: - if lun.type_replication and not lun.is_snapshot: - lun.lss_pair = self._replication.find_available_lss_pair( - excluded_lss) + if lun.group and lun.group.consisgroup_enabled: + lun.pool_lss_pair = { + 'source': self._find_pool_lss_pair_for_cg( + lun, excluded_lss)} else: - lun.lss_pair['source'] = self._helper.find_available_lss( - pool, find_new_pid, excluded_lss) + if lun.type_replication and not lun.is_snapshot: + lun.pool_lss_pair = ( + self._replication.find_pool_lss_pair( + excluded_lss)) + else: + lun.pool_lss_pair = { + 'source': self._helper.find_pool_lss_pair( + pool, find_new_pid, excluded_lss)} return self._helper.create_lun(lun) except restclient.LssFullException: LOG.warning("LSS %s is full, find another one.", - lun.lss_pair['source'][1]) - excluded_lss.append(lun.lss_pair['source'][1]) + lun.pool_lss_pair['source'][1]) + excluded_lss.add(lun.pool_lss_pair['source'][1]) + + @coordination.synchronized('{self.prefix}-consistency-group') + def _find_pool_lss_pair_for_cg(self, lun, excluded_lss): + lss_in_cache = self.consisgroup_cache.get(lun.group.id, set()) + if not lss_in_cache: + lss_in_cg = self._get_lss_in_cg(lun.group, lun.is_snapshot) + LOG.debug("LSSs used by CG %(cg)s are %(lss)s.", + {'cg': lun.group.id, 'lss': ','.join(lss_in_cg)}) + available_lss = lss_in_cg - excluded_lss + else: + available_lss = lss_in_cache - excluded_lss + if not available_lss: + available_lss = self._find_lss_for_cg() + + pid, lss = self._find_pool_for_lss(available_lss) + if pid: + lss_in_cache.add(lss) + self.consisgroup_cache[lun.group.id] = lss_in_cache + else: + raise exception.VolumeDriverException( + message=_('There are still some available LSSs for CG, ' + 'but they are not in the same node as pool.')) + return (pid, lss) + + def _get_lss_in_cg(self, group, is_snapshot=False): + # Driver can not support the case that dedicating LSS for CG while + # user enable multiple backends which use the same DS8K. + try: + volume_backend_name = ( + group.host[group.host.index('@') + 1:group.host.index('#')]) + except ValueError: + raise exception.VolumeDriverException( + message=(_('Invalid host %(host)s in group %(group)s') + % {'host': group.host, 'group': group.id})) + lss_in_cg = set() + if volume_backend_name == self.configuration.volume_backend_name: + if is_snapshot: + luns = [Lun(snapshot, is_snapshot=True) + for snapshot in group.snapshots] + else: + luns = [Lun(volume) for volume in group.volumes] + lss_in_cg = set(lun.ds_id[:2] for lun in luns if lun.ds_id) + return lss_in_cg + + def _find_lss_for_cg(self): + # Unable to get CGs/groups belonging to the current tenant, so + # get all of them, this function will consume some time if there + # are so many CGs/groups. + lss_used = set() + ctxt = context.get_admin_context() + existing_groups = objects.GroupList.get_all( + ctxt, filters={'status': 'available'}) + for group in existing_groups: + if Group(group).consisgroup_enabled: + lss_used = lss_used | self._get_lss_in_cg(group) + existing_groupsnapshots = objects.GroupSnapshotList.get_all( + ctxt, filters={'status': 'available'}) + for group in existing_groupsnapshots: + if Group(group, True).consisgroup_enabled: + lss_used = lss_used | self._get_lss_in_cg(group, True) + available_lss = set(self._helper.backend['lss_ids_for_cg']) - lss_used + for lss_set in self.consisgroup_cache.values(): + available_lss -= lss_set + self._assert(available_lss, + "All LSSs reserved for CG have been used out, " + "please reserve more LSS for CG if there are still" + "some empty LSSs left.") + LOG.debug('_find_lss_for_cg: available LSSs for consistency ' + 'group are %s', ','.join(available_lss)) + return available_lss + + @proxy.logger + def _find_pool_for_lss(self, available_lss): + for lss in available_lss: + pid = self._helper.get_pool(lss) + if pid: + return (pid, lss) + raise exception.VolumeDriverException( + message=(_("Can not find pool for LSSs %s.") + % ','.join(available_lss))) @proxy.logger def _clone_lun(self, src_lun, tgt_lun): @@ -485,29 +590,36 @@ class DS8KProxy(proxy.IBMStorageProxy): def _ensure_vol_not_fc_target(self, vol_hex_id): for cp in self._helper.get_flashcopy(vol_hex_id): if cp['targetvolume']['id'] == vol_hex_id: - msg = (_('Volume %s is currently a target of another ' - 'FlashCopy operation') % vol_hex_id) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_('Volume %s is currently a target of another ' + 'FlashCopy operation') % vol_hex_id)) + + def _create_replica_helper(self, lun): + if not lun.pool_lss_pair.get('target'): + lun = self._replication.enable_replication(lun, True) + else: + lun = self._replication.create_replica(lun) + return lun @proxy._trace_time def create_volume(self, volume): lun = self._create_lun_helper(Lun(volume)) if lun.type_replication: - lun = self._replication.create_replica(lun) + lun = self._create_replica_helper(lun) return lun.get_volume_update() @proxy._trace_time def create_cloned_volume(self, target_vol, source_vol): lun = self._clone_lun(Lun(source_vol), Lun(target_vol)) if lun.type_replication: - lun = self._replication.create_replica(lun) + lun = self._create_replica_helper(lun) return lun.get_volume_update() @proxy._trace_time def create_volume_from_snapshot(self, volume, snapshot): lun = self._clone_lun(Lun(snapshot, is_snapshot=True), Lun(volume)) if lun.type_replication: - lun = self._replication.create_replica(lun) + lun = self._create_replica_helper(lun) return lun.get_volume_update() @proxy._trace_time @@ -524,9 +636,9 @@ class DS8KProxy(proxy.IBMStorageProxy): self._replication.extend_replica(lun, param) self._replication.create_pprc_pairs(lun) else: - msg = (_("The volume %s has been failed over, it is " - "not suggested to extend it.") % lun.ds_id) - raise exception.CinderException(data=msg) + raise exception.CinderException( + message=(_("The volume %s has been failed over, it is " + "not suggested to extend it.") % lun.ds_id)) else: self._helper.change_lun(lun.ds_id, param) @@ -658,10 +770,11 @@ class DS8KProxy(proxy.IBMStorageProxy): lun = self._replication.delete_replica(lun) lun = _convert_thin_and_thick(lun, new_type_thin) else: - msg = (_("The volume %s is in replication relationship, " - "it is not supported to retype from thin to " - "thick or vice versus.") % lun.ds_id) - raise exception.CinderException(msg) + raise exception.CinderException( + message=(_("The volume %s is in replication " + "relationship, it is not supported to " + "retype from thin to thick or vice " + "versa.") % lun.ds_id)) else: lun = _convert_thin_and_thick(lun, new_type_thin) if new_type_replication: @@ -697,23 +810,53 @@ class DS8KProxy(proxy.IBMStorageProxy): force, **kwargs) @proxy.logger - def create_consistencygroup(self, ctxt, group): - """Create a consistency group.""" - return self._helper.create_group(ctxt, group) + def create_group(self, ctxt, group): + """Create generic volume group.""" + if Group(group).consisgroup_enabled: + self._assert(self._helper.backend['lss_ids_for_cg'], + 'No LSS(s) for CG, please make sure you have ' + 'reserved LSS for CG via param lss_range_for_cg.') + return self._helper.create_group(group) @proxy.logger - def delete_consistencygroup(self, ctxt, group, volumes): - """Delete a consistency group.""" + def delete_group(self, ctxt, group, volumes): + """Delete group and the volumes in the group.""" luns = [Lun(volume) for volume in volumes] - return self._helper.delete_group(ctxt, group, luns) + if Group(group).consisgroup_enabled: + return self._delete_group_with_lock(group, luns) + else: + return self._helper.delete_group(group, luns) - @proxy._trace_time - def create_cgsnapshot(self, ctxt, cgsnapshot, snapshots): - """Create a consistency group snapshot.""" - return self._create_group_snapshot(ctxt, cgsnapshot, snapshots, True) + @coordination.synchronized('{self.prefix}-consistency-group') + def _delete_group_with_lock(self, group, luns): + model_update, volumes_model_update = ( + self._helper.delete_group(group, luns)) + if model_update['status'] == fields.GroupStatus.DELETED: + self._update_consisgroup_cache(group.id) + return model_update, volumes_model_update - def _create_group_snapshot(self, ctxt, cgsnapshot, snapshots, - cg_enabled=False): + @proxy.logger + def delete_group_snapshot(self, ctxt, group_snapshot, snapshots): + """Delete volume group snapshot.""" + tgt_luns = [Lun(s, is_snapshot=True) for s in snapshots] + if Group(group_snapshot, True).consisgroup_enabled: + return self._delete_group_snapshot_with_lock( + group_snapshot, tgt_luns) + else: + return self._helper.delete_group_snapshot( + group_snapshot, tgt_luns) + + @coordination.synchronized('{self.prefix}-consistency-group') + def _delete_group_snapshot_with_lock(self, group_snapshot, tgt_luns): + model_update, snapshots_model_update = ( + self._helper.delete_group_snapshot(group_snapshot, tgt_luns)) + if model_update['status'] == fields.GroupStatus.DELETED: + self._update_consisgroup_cache(group_snapshot.id) + return model_update, snapshots_model_update + + @proxy.logger + def create_group_snapshot(self, ctxt, group_snapshot, snapshots): + """Create volume group snapshot.""" snapshots_model_update = [] model_update = {'status': fields.GroupStatus.AVAILABLE} @@ -722,7 +865,7 @@ class DS8KProxy(proxy.IBMStorageProxy): try: if src_luns and tgt_luns: - self._clone_group(src_luns, tgt_luns, cg_enabled) + self._clone_group(src_luns, tgt_luns) except restclient.APIException: model_update['status'] = fields.GroupStatus.ERROR LOG.exception('Failed to create group snapshot.') @@ -737,70 +880,100 @@ class DS8KProxy(proxy.IBMStorageProxy): return model_update, snapshots_model_update - @proxy._trace_time @proxy.logger - def delete_cgsnapshot(self, ctxt, cgsnapshot, snapshots): - """Delete a consistency group snapshot.""" - return self._delete_group_snapshot(ctxt, cgsnapshot, snapshots) + def update_group(self, ctxt, group, add_volumes, remove_volumes): + """Update generic volume group.""" + if Group(group).consisgroup_enabled: + return self._update_group(group, add_volumes, remove_volumes) + else: + return None, None, None - def _delete_group_snapshot(self, ctxt, group_snapshot, snapshots): - snapshots_model_update = [] - model_update = {'status': fields.GroupStatus.DELETED} + def _update_group(self, group, add_volumes, remove_volumes): + add_volumes_update = [] + group_volume_ids = [vol.id for vol in group.volumes] + add_volumes = [vol for vol in add_volumes + if vol.id not in group_volume_ids] + remove_volumes = [vol for vol in remove_volumes + if vol.id in group_volume_ids] + if add_volumes: + add_luns = [Lun(vol) for vol in add_volumes] + lss_in_cg = [Lun(vol).ds_id[:2] for vol in group.volumes] + if not lss_in_cg: + lss_in_cg = self._find_lss_for_empty_group(group, add_luns) + add_volumes_update = self._add_volumes_into_group( + group, add_luns, lss_in_cg) + if remove_volumes: + self._remove_volumes_in_group(group, add_volumes, remove_volumes) + return None, add_volumes_update, None - snapshots = [Lun(s, is_snapshot=True) for s in snapshots] - if snapshots: - try: - self._helper.delete_lun(snapshots) - except restclient.APIException as e: - model_update['status'] = fields.GroupStatus.ERROR_DELETING - LOG.error("Failed to delete group snapshot. " - "Error: %(err)s", - {'err': e}) + @coordination.synchronized('{self.prefix}-consistency-group') + def _find_lss_for_empty_group(self, group, luns): + sorted_lss_ids = collections.Counter([lun.ds_id[:2] for lun in luns]) + available_lss = self._find_lss_for_cg() + lss_for_cg = None + for lss_id in sorted_lss_ids: + if lss_id in available_lss: + lss_for_cg = lss_id + break + if not lss_for_cg: + lss_for_cg = available_lss.pop() + self._update_consisgroup_cache(group.id, lss_for_cg) + return lss_for_cg - for snapshot in snapshots: - snapshots_model_update.append({ - 'id': snapshot.os_id, - 'status': model_update['status'] - }) - return model_update, snapshots_model_update + def _add_volumes_into_group(self, group, add_luns, lss_in_cg): + add_volumes_update = [] + luns = [lun for lun in add_luns if lun.ds_id[:2] not in lss_in_cg] + for lun in luns: + if lun.type_replication: + new_lun = self._clone_lun_for_group(group, lun) + new_lun.type_replication = True + new_lun = self._replication.enable_replication(new_lun, True) + lun = self._replication.delete_replica(lun) + else: + new_lun = self._clone_lun_for_group(group, lun) + self._helper.delete_lun(lun) + volume_update = new_lun.update_volume(lun) + volume_update['id'] = lun.os_id + add_volumes_update.append(volume_update) + return add_volumes_update + + def _clone_lun_for_group(self, group, lun): + lun.group = Group(group) + new_lun = lun.shallow_copy() + new_lun.type_replication = False + self._create_lun_helper(new_lun) + self._clone_lun(lun, new_lun) + return new_lun + + @coordination.synchronized('{self.prefix}-consistency-group') + def _remove_volumes_in_group(self, group, add_volumes, remove_volumes): + if len(remove_volumes) == len(group.volumes) + len(add_volumes): + self._update_consisgroup_cache(group.id) @proxy.logger - def update_consistencygroup(self, ctxt, group, - add_volumes, remove_volumes): - """Add or remove volume(s) to/from an existing consistency group.""" - return self._helper.update_group(ctxt, group, - add_volumes, remove_volumes) + def _update_consisgroup_cache(self, group_id, lss_id=None): + if lss_id: + self.consisgroup_cache[group_id] = set([lss_id]) + else: + if self.consisgroup_cache.get(group_id): + LOG.debug('Group %(id)s owns LSS %(lss)s in the cache.', { + 'id': group_id, + 'lss': ','.join(self.consisgroup_cache[group_id]) + }) + self.consisgroup_cache.pop(group_id) @proxy._trace_time - def create_consistencygroup_from_src(self, ctxt, group, volumes, - cgsnapshot, snapshots, - source_cg, sorted_source_vols): - """Create a consistencygroup from source. - - :param ctxt: the context of the caller. - :param group: the dictionary of the consistency group to be created. - :param volumes: a list of volume dictionaries in the group. - :param cgsnapshot: the dictionary of the cgsnapshot as source. - :param snapshots: a list of snapshot dictionaries in the cgsnapshot. - :param source_cg: the dictionary of the consisgroup as source. - :param sorted_source_vols: a list of volume dictionaries - in the consisgroup. - :return model_update, volumes_model_update - """ - return self._create_group_from_src(ctxt, group, volumes, cgsnapshot, - snapshots, source_cg, - sorted_source_vols, True) - - def _create_group_from_src(self, ctxt, group, volumes, cgsnapshot, - snapshots, source_cg, sorted_source_vols, - cg_enabled=False): + def create_group_from_src(self, ctxt, group, volumes, group_snapshot, + sorted_snapshots, source_group, + sorted_source_vols): + """Create volume group from volume group or volume group snapshot.""" model_update = {'status': fields.GroupStatus.AVAILABLE} volumes_model_update = [] - if cgsnapshot and snapshots: + if group_snapshot and sorted_snapshots: src_luns = [Lun(snapshot, is_snapshot=True) - for snapshot in snapshots] - elif source_cg and sorted_source_vols: + for snapshot in sorted_snapshots] + elif source_group and sorted_source_vols: src_luns = [Lun(source_vol) for source_vol in sorted_source_vols] else: @@ -811,9 +984,22 @@ class DS8KProxy(proxy.IBMStorageProxy): raise exception.InvalidInput(message=msg) try: + # Don't use paramter volumes because it has DetachedInstanceError + # issue frequently. here tries to get and sort new volumes, a lot + # of cases have been guaranteed by the _sort_source_vols in + # manange.py, so not verify again. + sorted_volumes = [] + for vol in volumes: + found_vols = [v for v in group.volumes if v['id'] == vol['id']] + sorted_volumes.extend(found_vols) + volumes = sorted_volumes + tgt_luns = [Lun(volume) for volume in volumes] if src_luns and tgt_luns: - self._clone_group(src_luns, tgt_luns, cg_enabled) + self._clone_group(src_luns, tgt_luns) + for tgt_lun in tgt_luns: + if tgt_lun.type_replication: + self._create_replica_helper(tgt_lun) except restclient.APIException: model_update['status'] = fields.GroupStatus.ERROR LOG.exception("Failed to create group from group snapshot.") @@ -828,7 +1014,7 @@ class DS8KProxy(proxy.IBMStorageProxy): return model_update, volumes_model_update - def _clone_group(self, src_luns, tgt_luns, cg_enabled): + def _clone_group(self, src_luns, tgt_luns): for src_lun in src_luns: self._ensure_vol_not_fc_target(src_lun.ds_id) finished = False @@ -842,7 +1028,7 @@ class DS8KProxy(proxy.IBMStorageProxy): "source_volume": src_lun.ds_id, "target_volume": tgt_lun.ds_id }) - if cg_enabled: + if tgt_lun.group.consisgroup_enabled: self._do_flashcopy_with_freeze(vol_pairs) else: self._helper.start_flashcopy(vol_pairs) @@ -851,7 +1037,6 @@ class DS8KProxy(proxy.IBMStorageProxy): if not finished: self._helper.delete_lun(tgt_luns) - @synchronized('OpenStackCinderIBMDS8KMutex-CG-', external=True) @proxy._trace_time def _do_flashcopy_with_freeze(self, vol_pairs): # issue flashcopy with freeze @@ -861,48 +1046,6 @@ class DS8KProxy(proxy.IBMStorageProxy): LOG.debug('Unfreezing the LSS: %s', ','.join(lss_ids)) self._helper.unfreeze_lss(lss_ids) - @proxy.logger - def create_group(self, ctxt, group): - """Create generic volume group.""" - return self._helper.create_group(ctxt, group) - - @proxy.logger - def delete_group(self, ctxt, group, volumes): - """Delete group and the volumes in the group.""" - luns = [Lun(volume) for volume in volumes] - return self._helper.delete_group(ctxt, group, luns) - - @proxy.logger - def update_group(self, ctxt, group, add_volumes, remove_volumes): - """Update generic volume group.""" - return self._helper.update_group(ctxt, group, - add_volumes, remove_volumes) - - @proxy.logger - def create_group_snapshot(self, ctxt, group_snapshot, snapshots): - """Create volume group snapshot.""" - snapshot_group = Group(group_snapshot) - cg_enabled = True if snapshot_group.type_cg_snapshot else False - return self._create_group_snapshot(ctxt, group_snapshot, - snapshots, cg_enabled) - - @proxy.logger - def delete_group_snapshot(self, ctxt, group_snapshot, snapshots): - """Delete volume group snapshot.""" - return self._delete_group_snapshot(ctxt, group_snapshot, snapshots) - - @proxy._trace_time - def create_group_from_src(self, ctxt, group, volumes, group_snapshot, - sorted_snapshots, source_group, - sorted_source_vols): - """Create volume group from volume group or volume group snapshot.""" - volume_group = Group(group) - cg_enabled = True if volume_group.type_cg_snapshot else False - return self._create_group_from_src(ctxt, group, volumes, - group_snapshot, sorted_snapshots, - source_group, sorted_source_vols, - cg_enabled) - def freeze_backend(self, ctxt): """Notify the backend that it's frozen.""" pass @@ -935,9 +1078,9 @@ class DS8KProxy(proxy.IBMStorageProxy): if secondary_id is None: secondary_id = backend_id elif secondary_id != backend_id: - msg = (_('Invalid secondary_backend_id specified. ' - 'Valid backend id is %s.') % backend_id) - raise exception.InvalidReplicationTarget(message=msg) + raise exception.InvalidReplicationTarget( + message=(_('Invalid secondary_backend_id specified. ' + 'Valid backend id is %s.') % backend_id)) LOG.debug("Starting failover to %s.", secondary_id) @@ -965,10 +1108,10 @@ class DS8KProxy(proxy.IBMStorageProxy): self._active_backend_id = "" self._helper = self._replication._source_helper except restclient.APIException as e: - msg = (_("Unable to failover host to %(id)s. " - "Exception= %(ex)s") - % {'id': secondary_id, 'ex': six.text_type(e)}) - raise exception.UnableToFailOver(reason=msg) + raise exception.UnableToFailOver( + reason=(_("Unable to failover host to %(id)s. " + "Exception= %(ex)s") + % {'id': secondary_id, 'ex': six.text_type(e)})) for lun in replicated_luns: volume_update = lun.get_volume_update() diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py index f29f436d8a2..72cc6c390e9 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py @@ -50,10 +50,11 @@ class MetroMirrorManager(object): ports = self._source.get_physical_links( self._target.backend['storage_wwnn']) if not ports: - msg = (_("DS8K %(tgt)s is not connected to the DS8K %(src)s!") % - {'tgt': self._target.backend['storage_wwnn'], - 'src': self._source.backend['storage_wwnn']}) - raise exception.CinderException(msg) + raise exception.VolumeDriverException( + message=((_("%(tgt)s is not connected to %(src)s!") % { + 'tgt': self._target.backend['storage_wwnn'], + 'src': self._source.backend['storage_wwnn'] + }))) pairs = [{ 'source_port_id': p['source_port_id'], @@ -72,15 +73,13 @@ class MetroMirrorManager(object): ["%s-%s" % (p['source_port_id'], p['target_port_id']) for p in pairs]) - invalid_pair = "%s-%s" % (pair['source_port_id'], pair['target_port_id']) - - msg = (_("Invalid port pair: %(invalid)s, valid port " - "pair(s) are: %(valid)s") % - {'invalid': invalid_pair, - 'valid': valid_pairs}) - raise exception.CinderException(msg) + raise exception.VolumeDriverException( + message=((_("Invalid port pair: %(invalid)s, valid " + "port pair(s) are: %(valid)s") + % {'invalid': invalid_pair, + 'valid': valid_pairs}))) self._source.backend['port_pairs'] = [{ 'source_port_id': p['target_port_id'], 'target_port_id': p['source_port_id'] @@ -96,13 +95,13 @@ class MetroMirrorManager(object): return True - def find_available_pprc_path(self, lss=None, excluded_lss=None): - """find lss from existed pprc path. + def find_from_pprc_paths(self, specified_lss=None, excluded_lss=None): + """find lss from existing pprc paths and pool id for it. - the format of lss_pair returned is as below: + the format of pool_lss_pair returned is as below: {'source': (pid, lss), 'target': (pid, lss)} """ - state, paths = self._filter_pprc_paths(lss) + state, paths = self._filter_pprc_paths(specified_lss) if state != PPRC_PATH_HEALTHY: # check whether the physical links are available or not, # or have been changed. @@ -111,43 +110,47 @@ class MetroMirrorManager(object): if excluded_lss: paths = [p for p in paths if p['source_lss_id'] not in excluded_lss] + # only enable_replication will specify the source LSS + # and it need to reuse LSS reserved for CG if this LSS + # is in PPRC path. + if not specified_lss: + paths = [p for p in paths if p['source_lss_id'] not in + self._source.backend['lss_ids_for_cg']] - lss_pair = {} - if len(paths) == 1: - path = paths[0] - pid = self._source.get_pool(path['source_lss_id']) - lss_pair['source'] = (pid, path['source_lss_id']) - else: - # sort the lss pairs according to the number of luns, - # get the lss pair which has least luns. - candidates = [] - source_lss_set = set(p['source_lss_id'] for p in paths) - for lss in source_lss_set: - # get the number of lun in source. - src_luns = self._source.get_lun_number_in_lss(lss) - if src_luns == helper.LSS_VOL_SLOTS: - continue + # sort pairs according to the number of luns in their LSSes, + # and get the pair which LSS has least luns. + candidates = [] + source_lss_set = set(p['source_lss_id'] for p in paths) + for lss in source_lss_set: + # get the number of luns in source. + src_luns = self._source.get_lun_number_in_lss(lss) + if src_luns == helper.LSS_VOL_SLOTS and not specified_lss: + continue - spec_paths = [p for p in paths if p['source_lss_id'] == lss] - for path in spec_paths: - # get the number of lun in target. + spec_paths = [p for p in paths if p['source_lss_id'] == lss] + for path in spec_paths: + # get the number of luns in target. + try: tgt_luns = self._target.get_lun_number_in_lss( path['target_lss_id']) - candidates.append((lss, path, src_luns + tgt_luns)) - - if candidates: - candidate = sorted(candidates, key=lambda c: c[2])[0] - pid = self._source.get_pool(candidate[0]) - lss_pair['source'] = (pid, candidate[0]) - path = candidate[1] - else: - return PPRC_PATH_FULL, None - - # format the target in lss_pair. - pid = self._target.get_pool(path['target_lss_id']) - lss_pair['target'] = (pid, path['target_lss_id']) - - return PPRC_PATH_HEALTHY, lss_pair + except restclient.APIException: + # if DS8K can fix this problem, then remove the + # exception here. + LOG.error("Target LSS %s in PPRC path may doesn't " + "exist although PPRC path is available.", + path['target_lss_id']) + tgt_luns = 0 + candidates.append((path['source_lss_id'], + path['target_lss_id'], + src_luns + tgt_luns)) + if not candidates: + return PPRC_PATH_FULL, None + else: + src_lss, tgt_lss, num = sorted(candidates, key=lambda c: c[2])[0] + return PPRC_PATH_HEALTHY, { + 'source': (self._source.get_pool(src_lss), src_lss), + 'target': (self._target.get_pool(tgt_lss), tgt_lss) + } def _filter_pprc_paths(self, lss): paths = self._source.get_pprc_paths(lss) @@ -225,9 +228,9 @@ class MetroMirrorManager(object): return PPRC_PATH_HEALTHY, paths - def create_pprc_path(self, lss_pair): - src_lss = lss_pair['source'][1] - tgt_lss = lss_pair['target'][1] + def create_pprc_path(self, pool_lss_pair): + src_lss = pool_lss_pair['source'][1] + tgt_lss = pool_lss_pair['target'][1] # check whether the pprc path exists and is healthy or not firstly. pid = (self._source.backend['storage_wwnn'] + '_' + src_lss + ':' + self._target.backend['storage_wwnn'] + '_' + tgt_lss) @@ -256,9 +259,9 @@ class MetroMirrorManager(object): break if retry == 3: self._source.delete_pprc_path(pid) - msg = (_("Fail to create PPRC path %(src)s:%(tgt)s.") % - {'src': src_lss, 'tgt': tgt_lss}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("Failed to create PPRC path %(src)s:%(tgt)s.") + % {'src': src_lss, 'tgt': tgt_lss})) LOG.debug("Create the new PPRC path successfully.") def _is_pprc_paths_healthy(self, path_id): @@ -280,8 +283,7 @@ class MetroMirrorManager(object): vol_pairs = [{ 'source_volume': lun.ds_id, - 'source_system_id': - self._source.backend['storage_unit'], + 'source_system_id': self._source.backend['storage_unit'], 'target_volume': tgt_vol_id, 'target_system_id': tgt_stg_id }] @@ -298,10 +300,9 @@ class MetroMirrorManager(object): def delete_pprc_pairs(self, lun): self._source.delete_pprc_pair(lun.ds_id) - if self.is_target_alive(): + if self.is_target_alive() and lun.replication_driver_data: replica = sorted(lun.replication_driver_data.values())[0] - self._target.delete_pprc_pair( - six.text_type(replica['vol_hex_id'])) + self._target.delete_pprc_pair(replica['vol_hex_id']) def do_pprc_failover(self, luns, backend_id): vol_pairs = [] @@ -317,12 +318,10 @@ class MetroMirrorManager(object): continue vol_pairs.append({ - 'source_volume': six.text_type(target_vol_id), - 'source_system_id': six.text_type( - self._target.backend['storage_unit']), - 'target_volume': six.text_type(lun.ds_id), - 'target_system_id': six.text_type( - self._source.backend['storage_unit']) + 'source_volume': target_vol_id, + 'source_system_id': self._target.backend['storage_unit'], + 'target_volume': lun.ds_id, + 'target_system_id': self._source.backend['storage_unit'] }) target_vol_ids.append(target_vol_id) @@ -383,9 +382,16 @@ class Replication(object): if connection_type == storage.XIV_CONNECTION_TYPE_FC: self._target_helper = ( helper.DS8KReplicationTargetHelper(target_device)) - else: + elif connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: self._target_helper = ( helper.DS8KReplicationTargetECKDHelper(target_device)) + else: + raise exception.InvalidParameterValue( + err=(_("Param [connection_type] %s in replication_device " + "is invalid.") % connection_type)) + + self._target_helper.backend['lss_ids_for_cg'] = ( + self._source_helper.backend['lss_ids_for_cg']) self._mm_manager = MetroMirrorManager(self._source_helper, self._target_helper) @@ -393,11 +399,12 @@ class Replication(object): src_conn_type = self._source_helper.get_connection_type() tgt_conn_type = self._target_helper.get_connection_type() if src_conn_type != tgt_conn_type: - msg = (_("The connection type in primary backend is " - "%(primary)s, but in secondary backend it is " - "%(secondary)s") % - {'primary': src_conn_type, 'secondary': tgt_conn_type}) - raise exception.CinderException(msg) + raise exception.VolumeDriverException( + message=(_("The connection type in primary backend is " + "%(primary)s, but in secondary backend it is " + "%(secondary)s") + % {'primary': src_conn_type, + 'secondary': tgt_conn_type})) # PPRC can not copy from ESE volume to standard volume or vice versus. if src_conn_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: src_thin = self._source_helper.get_thin_provision() @@ -425,13 +432,13 @@ class Replication(object): return luns @proxy.logger - def find_available_lss_pair(self, excluded_lss): - state, lss_pair = ( - self._mm_manager.find_available_pprc_path(None, excluded_lss)) - if lss_pair is None: - lss_pair = self.find_new_lss_for_source(excluded_lss) - lss_pair.update(self.find_new_lss_for_target()) - return lss_pair + def find_pool_lss_pair(self, excluded_lss): + state, pool_lss_pair = ( + self._mm_manager.find_from_pprc_paths(None, excluded_lss)) + if pool_lss_pair is None: + pool_lss_pair = self.find_new_lss_for_source(excluded_lss) + pool_lss_pair.update(self.find_new_lss_for_target()) + return pool_lss_pair @proxy.logger def find_new_lss_for_source(self, excluded_lss): @@ -444,23 +451,22 @@ class Replication(object): return {'target': (tgt_pid, tgt_lss)} @proxy.logger - def enable_replication(self, lun): - state, lun.lss_pair = ( - self._mm_manager.find_available_pprc_path(lun.ds_id[0:2])) + def enable_replication(self, lun, delete_source=False): + state, lun.pool_lss_pair = ( + self._mm_manager.find_from_pprc_paths(lun.ds_id[0:2])) + LOG.debug("enable_replication: pool_lss_pair is %s.", + lun.pool_lss_pair) if state == PPRC_PATH_UNHEALTHY: - msg = (_("The path(s) for volume %(name)s isn't available " - "any more, please make sure the state of the path(s) " - "which source LSS is %(lss)s is success.") % - {'name': lun.cinder_name, 'lss': lun.ds_id[0:2]}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("The path(s) for volume %(name)s isn't available " + "any more, please make sure the state of the path(s) " + "which source LSS is %(lss)s is success.") + % {'name': lun.cinder_name, 'lss': lun.ds_id[0:2]})) elif state == PPRC_PATH_NOT_EXIST: pid = self._source_helper.get_pool(lun.ds_id[0:2]) - lss_pair = {'source': (pid, lun.ds_id[0:2])} - lss_pair.update(self.find_new_lss_for_target()) - lun.lss_pair = lss_pair - LOG.debug("Begin to create replication volume, lss_pair is %s." % - lun.lss_pair) - lun = self.create_replica(lun, False) + lun.pool_lss_pair = {'source': (pid, lun.ds_id[0:2])} + lun.pool_lss_pair.update(self.find_new_lss_for_target()) + lun = self.create_replica(lun, delete_source) return lun @proxy.logger @@ -469,7 +475,7 @@ class Replication(object): try: self._target_helper.create_lun(lun) # create PPRC paths if need. - self._mm_manager.create_pprc_path(lun.lss_pair) + self._mm_manager.create_pprc_path(lun.pool_lss_pair) # create pprc pair self._mm_manager.create_pprc_pairs(lun) except restclient.APIException: @@ -477,7 +483,6 @@ class Replication(object): self.delete_replica(lun) if delete_source: self._source_helper.delete_lun(lun) - lun.replication_status = 'enabled' return lun @@ -488,11 +493,10 @@ class Replication(object): self._mm_manager.delete_pprc_pairs(lun) self._delete_replica(lun) except restclient.APIException as e: - msg = (_('Failed to delete the target volume for volume ' - '%(volume)s, Exception: %(ex)s.') % - {'volume': lun.ds_id, 'ex': six.text_type(e)}) - raise exception.CinderException(msg) - + raise exception.VolumeDriverException( + message=(_('Failed to delete the target volume for ' + 'volume %(volume)s, Exception: %(ex)s.') + % {'volume': lun.ds_id, 'ex': six.text_type(e)})) lun.replication_status = 'disabled' lun.replication_driver_data = {} return lun @@ -542,7 +546,7 @@ class Replication(object): LOG.debug("Failback starts, backend id is %s.", backend_id) for lun in luns: - self._mm_manager.create_pprc_path(lun.lss_pair) + self._mm_manager.create_pprc_path(lun.pool_lss_pair) self._mm_manager.do_pprc_failback(luns, backend_id) # revert the relationship of source volume and target volume self.do_pprc_failover(luns, backend_id) diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py index 45123e09dd4..d11e6f40626 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py @@ -271,18 +271,19 @@ class RESTScheduler(object): attempts == 0): self.connect() elif response['server'].get('code') in AUTHENTICATION_ERROR_CODES: - msg = (_('Authentication failed for host %(host)s. ' - 'Exception= %(e)s') % - {'host': self.host, 'e': response['server']['message']}) - raise APIAuthenticationException(data=msg) + raise APIAuthenticationException( + data=(_('Authentication failed for host %(host)s. ' + 'Exception= %(e)s') % + {'host': self.host, + 'e': response['server']['message']})) elif response['server'].get('code') in LSS_ERROR_CODES: - msg = (_('Can not put the volume in LSS: %s') % - response['server']['message']) - raise LssFullException(data=msg) + raise LssFullException( + data=(_('Can not put the volume in LSS: %s') + % response['server']['message'])) elif response['server']['status'] == 'timeout': - msg = (_('Request to storage API time out: %s') % - response['server']['message']) - raise TimeoutException(data=msg) + raise TimeoutException( + data=(_('Request to storage API time out: %s') + % response['server']['message'])) elif (response['server']['status'] != 'ok' and (badStatusException or 'code' not in response['server'])): # if code is not in response means that error was in @@ -290,10 +291,11 @@ class RESTScheduler(object): # via badStatusException=False, but will retry it to # confirm the problem. if attempts == 1: - msg = (_("Request to storage API failed: %(err)s, " - "(%(url)s).") % - {'err': response['server']['message'], 'url': url}) - raise APIException(data=msg) + raise APIException( + data=(_("Request to storage API failed: %(err)s, " + "(%(url)s).") + % {'err': response['server']['message'], + 'url': url})) eventlet.sleep(2) else: return response @@ -303,8 +305,8 @@ class RESTScheduler(object): def fetchall(self, *args, **kwargs): r = self.send(*args, **kwargs)['data'] if len(r) != 1: - msg = _('Expected one result but got %d.') % len(r) - raise APIException(data=msg) + raise APIException( + data=(_('Expected one result but got %d.') % len(r))) else: return r.popitem()[1] @@ -313,8 +315,8 @@ class RESTScheduler(object): def fetchone(self, *args, **kwargs): r = self.fetchall(*args, **kwargs) if len(r) != 1: - msg = _('Expected one item in result but got %d.') % len(r) - raise APIException(data=msg) + raise APIException( + data=(_('Expected one item in result but got %d.') % len(r))) return r[0] # same as the send method above but returns the last element of the @@ -323,9 +325,9 @@ class RESTScheduler(object): r = self.send(*args, **kwargs) if 'responses' in r: if len(r['responses']) != 1: - msg = (_('Expected one item in result responses but ' - 'got %d.') % len(r['responses'])) - raise APIException(data=msg) + raise APIException( + data=(_('Expected one item in result responses but ' + 'got %d.') % len(r['responses']))) r = r['responses'][0] return r['link']['href'].split('/')[-1]