Merge "DS8K driver: change the design of CG"

This commit is contained in:
Jenkins 2017-04-06 01:39:30 +00:00 committed by Gerrit Code Review
commit 022fde1fc3
5 changed files with 1104 additions and 627 deletions

View File

@ -29,6 +29,7 @@ from cinder.tests.unit import utils as testutils
from cinder.volume import configuration as conf
import cinder.volume.drivers.ibm.ibm_storage as storage
from cinder.volume.drivers.ibm.ibm_storage import proxy
from cinder.volume import group_types
from cinder.volume import volume_types
# mock decorator logger for all unit test cases.
@ -44,6 +45,7 @@ mock_logger.stop()
TEST_VOLUME_ID = '0001'
TEST_HOST_ID = 'H1'
TEST_VOLUME_BACKEND_NAME = 'ds8k_backend'
TEST_GROUP_HOST = 'test_host@' + TEST_VOLUME_BACKEND_NAME + '#fakepool'
TEST_LUN_ID = '00'
TEST_POOLS_STR = 'P0,P1'
TEST_POOL_ID_1 = 'P0'
@ -858,6 +860,10 @@ FAKE_REST_API_RESPONSES = {
FAKE_GET_FB_LSS_RESPONSE_3,
TEST_TARGET_DS8K_IP + '/lss/' + TEST_LSS_ID_3 + '/get':
FAKE_GET_FB_LSS_RESPONSE_3,
TEST_SOURCE_DS8K_IP + '/lss/' + TEST_LCU_ID + '/get':
FAKE_GET_CKD_LSS_RESPONSE,
TEST_TARGET_DS8K_IP + '/lss/' + TEST_LCU_ID + '/get':
FAKE_GET_CKD_LSS_RESPONSE,
TEST_SOURCE_DS8K_IP + '/lss/fb/get':
FAKE_GET_FB_LSS_RESPONSE_1,
TEST_SOURCE_DS8K_IP + '/lss/ckd/get':
@ -978,7 +984,9 @@ class FakeDS8KCommonHelper(helper.DS8KCommonHelper):
def _get_value(self, key):
value = getattr(self.conf, key, None)
return value if value else self.conf.get(key)
if not value and key not in self.OPTIONAL_PARAMS:
value = self.conf.get(key)
return value
def _create_client(self):
self._client = FakeRESTScheduler(self._get_value('san_ip'),
@ -1040,13 +1048,13 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy):
proxy.IBMStorageProxy.__init__(self, storage_info, logger,
exception, driver,
active_backend_id)
self._helper = None
self._replication = None
self._connector_obj = HTTPConnectorObject
self._replication_enabled = False
self._active_backend_id = active_backend_id
self.configuration = driver.configuration
self.consisgroup_cache = {}
self.setup(None)
def setup(self, context):
@ -1113,13 +1121,13 @@ class DS8KProxyTest(test.TestCase):
def _create_snapshot(self, **kwargs):
return testutils.create_snapshot(self.ctxt, **kwargs)
def _create_consistencygroup(self, **kwargs):
return testutils.create_consistencygroup(self.ctxt, **kwargs)
def _create_group(self, **kwargs):
return testutils.create_group(self.ctxt, **kwargs)
def _create_cgsnapshot(self, cg_id, **kwargs):
return testutils.create_cgsnapshot(self.ctxt,
consistencygroup_id= cg_id,
**kwargs)
def _create_group_snapshot(self, group_id, **kwargs):
return testutils.create_group_snapshot(self.ctxt,
group_id=group_id,
**kwargs)
def test_check_host_type(self):
"""host type should be a valid one."""
@ -1192,7 +1200,6 @@ class DS8KProxyTest(test.TestCase):
"total_capacity_gb": 10,
"free_capacity_gb": 10,
"reserved_percentage": 0,
"consistencygroup_support": True,
"consistent_group_snapshot_enabled": True,
"multiattach": False
}
@ -1225,7 +1232,7 @@ class DS8KProxyTest(test.TestCase):
"""create volume should choose biggest pool."""
self.configuration.san_clustername = TEST_POOLS_STR
cmn_helper = FakeDS8KCommonHelper(self.configuration, None)
pool_id, lss_id = cmn_helper.find_available_lss(None, False, None)
pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None)
self.assertEqual(TEST_POOL_ID_1, pool_id)
@mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss')
@ -1239,7 +1246,7 @@ class DS8KProxyTest(test.TestCase):
"configvols": "0"
}]
cmn_helper = FakeDS8KCommonHelper(self.configuration, None)
pool_id, lss_id = cmn_helper.find_available_lss(None, False, None)
pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None)
self.assertNotEqual(TEST_LSS_ID_1, lss_id)
@mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss')
@ -1254,7 +1261,7 @@ class DS8KProxyTest(test.TestCase):
"configvols": "0"
}]
cmn_helper = FakeDS8KCommonHelper(self.configuration, None)
pool_id, lss_id = cmn_helper.find_available_lss(None, False, None)
pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None)
self.assertEqual(TEST_LSS_ID_2, lss_id)
@mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss')
@ -1284,11 +1291,11 @@ class DS8KProxyTest(test.TestCase):
}
]
cmn_helper = FakeDS8KCommonHelper(self.configuration, None)
pool_id, lss_id = cmn_helper.find_available_lss(None, False, None)
pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None)
self.assertEqual(TEST_LSS_ID_2, lss_id)
@mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss')
@mock.patch.object(helper.DS8KCommonHelper, '_find_from_unexisting_lss')
@mock.patch.object(helper.DS8KCommonHelper, '_find_from_nonexistent_lss')
def test_find_lss_when_no_existing_lss_available(self, mock_find_lss,
mock_get_all_lss):
"""find LSS when no existing LSSs are available."""
@ -1300,7 +1307,7 @@ class DS8KProxyTest(test.TestCase):
"configvols": "256"
}]
cmn_helper = FakeDS8KCommonHelper(self.configuration, None)
pool_id, lss_id = cmn_helper.find_available_lss(None, False, None)
pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None)
self.assertTrue(mock_find_lss.called)
@mock.patch.object(helper.DS8KCommonHelper, '_find_lss')
@ -1309,7 +1316,127 @@ class DS8KProxyTest(test.TestCase):
cmn_helper = FakeDS8KCommonHelper(self.configuration, None)
mock_find_lss.return_value = None
self.assertRaises(restclient.LssIDExhaustError,
cmn_helper.find_available_lss, None, False, None)
cmn_helper.find_pool_lss_pair, None, False, None)
def test_find_lss_for_volume_which_belongs_to_cg(self):
"""find lss for volume, which is in empty CG."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
lun = ds8kproxy.Lun(volume)
self.driver._create_lun_helper(lun)
pid, lss = lun.pool_lss_pair['source']
self.assertTrue(lss in ['20', '21', '22', '23'])
def test_find_lss_for_volume_which_belongs_to_cg2(self):
"""find lss for volume, which is in CG having volumes."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': '2000'})
self._create_volume(group_id=group.id,
provider_location=location)
volume = self._create_volume(group_id=group.id)
lun = ds8kproxy.Lun(volume)
self.driver._create_lun_helper(lun)
pid, lss = lun.pool_lss_pair['source']
self.assertEqual(lss, '20')
def test_find_lss_for_volume_which_belongs_to_cg3(self):
"""find lss for volume, and other CGs have volumes."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
group_type2 = group_types.create(
self.ctxt,
'group2',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group2 = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type2.id)
location = six.text_type({'vol_hex_id': '2000'})
self._create_volume(group_id=group2.id,
provider_location=location)
lun = ds8kproxy.Lun(volume)
self.driver._create_lun_helper(lun)
pid, lss = lun.pool_lss_pair['source']
self.assertNotEqual(lss, '20')
def test_find_lss_for_volume_which_belongs_to_cg4(self):
"""find lss for volume, and other CGs are in error state."""
self.configuration.lss_range_for_cg = '20'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
group_type2 = group_types.create(
self.ctxt,
'group2',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group2 = self._create_group(status='error',
host=TEST_GROUP_HOST,
group_type_id=group_type2.id)
location = six.text_type({'vol_hex_id': '2000'})
self._create_volume(group_id=group2.id,
provider_location=location)
lun = ds8kproxy.Lun(volume)
self.driver._create_lun_helper(lun)
pid, lss = lun.pool_lss_pair['source']
# error group will be ignored, so LSS 20 can be used.
self.assertEqual(lss, '20')
def test_create_volume_and_assign_to_group_with_wrong_host(self):
# create volume for group which has wrong format of host.
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host="fake_invalid_host",
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
self.assertRaises(exception.VolumeDriverException,
self.driver.create_volume, volume)
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
def test_create_volume_but_lss_full_afterwards(self, mock_create_lun):
@ -1342,11 +1469,7 @@ class DS8KProxyTest(test.TestCase):
TEST_VOLUME_ID,
ast.literal_eval(vol['provider_location'])['vol_hex_id'])
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'drivers:os400': '050'})
def test_create_volume_of_OS400_050(self, mock_volume_types):
def test_create_volume_of_OS400_050(self):
"""create volume which type is OS400 050."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
@ -1360,12 +1483,8 @@ class DS8KProxyTest(test.TestCase):
ast.literal_eval(vol['provider_location'])['vol_hex_id'])
self.assertEqual('050 FB 520UV', vol['metadata']['data_type'])
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'drivers:thin_provision': 'False'})
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
def test_create_eckd_volume(self, mock_create_lun, mock_volume_types):
def test_create_eckd_volume(self, mock_create_lun):
"""create volume which type is ECKD."""
self.configuration.connection_type = (
storage.XIV_CONNECTION_TYPE_FC_ECKD)
@ -1470,18 +1589,14 @@ class DS8KProxyTest(test.TestCase):
device['san_clustername'] = TEST_ECKD_POOL_ID
repl = FakeReplication(src_helper, device)
repl.check_physical_links()
lss_pair = repl.find_available_lss_pair(None)
pool_lss_pair = repl.find_pool_lss_pair(None)
expected_lss_pair = {'source': (TEST_ECKD_POOL_ID, TEST_LCU_ID),
'target': (TEST_ECKD_POOL_ID, TEST_LCU_ID)}
self.assertDictEqual(expected_lss_pair, lss_pair)
expected_pair = {'source': (TEST_ECKD_POOL_ID, TEST_LCU_ID),
'target': (TEST_ECKD_POOL_ID, TEST_LCU_ID)}
self.assertDictEqual(expected_pair, pool_lss_pair)
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(eventlet, 'sleep')
def test_create_fb_replicated_volume(self, mock_sleep, mock_volume_types):
def test_create_fb_replicated_volume(self, mock_sleep):
"""create FB volume when enable replication."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -1499,17 +1614,12 @@ class DS8KProxyTest(test.TestCase):
self.assertEqual(TEST_VOLUME_ID,
repl[TEST_TARGET_DS8K_IP]['vol_hex_id'])
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths')
@mock.patch.object(replication.MetroMirrorManager, 'create_pprc_path')
@mock.patch.object(eventlet, 'sleep')
def test_create_fb_replicated_vol_but_no_path_available(self, mock_sleep,
create_pprc_path,
get_pprc_paths,
mock_volume_types):
get_pprc_paths):
"""create replicated volume but no pprc paths are available."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -1536,14 +1646,10 @@ class DS8KProxyTest(test.TestCase):
self.driver.create_volume(volume)
self.assertTrue(create_pprc_path.called)
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths')
@mock.patch.object(eventlet, 'sleep')
def test_create_fb_replicated_vol_and_verify_lss_in_path(
self, mock_sleep, get_pprc_paths, mock_volume_types):
self, mock_sleep, get_pprc_paths):
"""create replicated volume should verify the LSS in pprc paths."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -1588,14 +1694,10 @@ class DS8KProxyTest(test.TestCase):
self.assertEqual(TEST_LSS_ID_1,
repl[TEST_TARGET_DS8K_IP]['vol_hex_id'][:2])
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths')
@mock.patch.object(eventlet, 'sleep')
def test_create_fb_replicated_vol_when_paths_available(
self, mock_sleep, get_pprc_paths, mock_volume_types):
self, mock_sleep, get_pprc_paths):
"""create replicated volume when multiple pprc paths are available."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -1640,14 +1742,10 @@ class DS8KProxyTest(test.TestCase):
self.assertEqual(TEST_LSS_ID_1,
repl[TEST_TARGET_DS8K_IP]['vol_hex_id'][:2])
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
@mock.patch.object(eventlet, 'sleep')
def test_create_replicated_vol_but_lss_full_afterwards(
self, mock_sleep, create_lun, mock_volume_types):
self, mock_sleep, create_lun):
"""create replicated volume but lss is full afterwards."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -1715,15 +1813,10 @@ class DS8KProxyTest(test.TestCase):
self.driver.delete_volume(volume)
self.assertFalse(mock_delete_lun.called)
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(helper.DS8KCommonHelper, 'delete_lun_by_id')
@mock.patch.object(helper.DS8KCommonHelper, 'delete_lun')
def test_delete_fb_replicated_volume(self, mock_delete_lun,
mock_delete_lun_by_id,
mock_volume_types):
mock_delete_lun_by_id):
"""Delete volume when enable replication."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -1801,7 +1894,7 @@ class DS8KProxyTest(test.TestCase):
location = six.text_type({'vol_hex_id': None})
tgt_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location)
self.assertRaises(restclient.APIException,
self.assertRaises(exception.VolumeDriverException,
self.driver.create_cloned_volume, tgt_vol, src_vol)
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
@ -1877,12 +1970,7 @@ class DS8KProxyTest(test.TestCase):
replication_driver_data=data)
self.driver.extend_volume(volume, 2)
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
def test_extend_replicated_volume_that_has_been_failed_over(
self, mock_volume_types):
def test_extend_replicated_volume_that_has_been_failed_over(self):
"""extend replicated volume which has been failed over should fail."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -2415,210 +2503,375 @@ class DS8KProxyTest(test.TestCase):
fake_connector = {'ip': '127.0.0.1', 'initiator': 'iqn.fake'}
self.driver.terminate_connection(volume, fake_connector)
def test_delete_consistencygroup_sucessfully(self):
"""test a successful cg deletion."""
def test_create_consistency_group(self):
"""user should reserve LSS for consistency group."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
self.assertRaises(exception.VolumeDriverException,
self.driver.create_group,
self.ctxt, group)
def test_delete_consistency_group_sucessfully(self):
"""test a successful consistency group deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(volume_type_id=cg_type.id,
provider_location=location,
consistencygroup_id=cg.id)
volume = self._create_volume(provider_location=location,
group_id=group.id)
model_update, volumes_model_update = (
self.driver.delete_consistencygroup(self.ctxt, cg, [volume]))
self.driver.delete_group(self.ctxt, group, [volume]))
self.assertEqual('deleted', volumes_model_update[0]['status'])
self.assertEqual('deleted', model_update['status'])
self.assertEqual(fields.GroupStatus.DELETED,
model_update['status'])
@mock.patch.object(helper.DS8KCommonHelper, 'delete_lun')
def test_delete_consistencygroup_failed(self, mock_delete_lun):
"""test a failed cg deletion."""
def test_delete_consistency_group_failed(self, mock_delete_lun):
"""test a failed consistency group deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(volume_type_id=cg_type.id,
provider_location=location,
consistencygroup_id=cg.id)
volume = self._create_volume(provider_location=location,
group_id=group.id)
mock_delete_lun.side_effect = (
restclient.APIException('delete volume failed.'))
model_update, volumes_model_update = (
self.driver.delete_consistencygroup(self.ctxt, cg, [volume]))
self.driver.delete_group(self.ctxt, group, [volume]))
self.assertEqual('error_deleting', volumes_model_update[0]['status'])
self.assertEqual('error_deleting', model_update['status'])
self.assertEqual(fields.GroupStatus.ERROR_DELETING,
model_update['status'])
def test_create_consistency_group_without_reserve_lss(self):
"""user should reserve LSS for group if it enables cg."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
self.assertRaises(exception.VolumeDriverException,
self.driver.create_group, self.ctxt, group)
def test_update_generic_group_without_enable_cg(self):
"""update group which not enable cg should return None."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(self.ctxt, 'group', {})
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(provider_location=location)
model_update, add_volumes_update, remove_volumes_update = (
self.driver.update_group(self.ctxt, group, [volume], []))
self.assertIsNone(model_update)
self.assertIsNone(add_volumes_update)
self.assertIsNone(remove_volumes_update)
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
def test_update_generic_group_when_enable_cg(self, mock_create_lun,
mock_get_flashcopy,
mock_sleep):
"""update group, but volume is not in LSS which belongs to group."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(provider_location=location,
volume_metadata=metadata)
mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}]
mock_create_lun.return_value = '2200'
model_update, add_volumes_update, remove_volumes_update = (
self.driver.update_group(self.ctxt, group, [volume], []))
location = ast.literal_eval(add_volumes_update[0]['provider_location'])
self.assertEqual('2200', location['vol_hex_id'])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
def test_update_generic_group_when_enable_cg2(self, mock_create_lun,
mock_get_flashcopy,
mock_sleep):
"""add replicated volume into group."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
vol_type = volume_types.create(
self.ctxt, 'VOL_TYPE', {'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata)
mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}]
mock_create_lun.return_value = '2200'
model_update, add_volumes_update, remove_volumes_update = (
self.driver.update_group(self.ctxt, group, [volume], []))
location = ast.literal_eval(add_volumes_update[0]['provider_location'])
self.assertEqual('2200', location['vol_hex_id'])
@mock.patch.object(helper.DS8KCommonHelper, 'delete_lun')
def test_delete_generic_group_failed(self, mock_delete_lun):
"""test a failed group deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(self.ctxt, 'group', {})
group = self._create_group(group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(group_type_id=group_type.id,
provider_location=location,
group_id=group.id)
mock_delete_lun.side_effect = (
restclient.APIException('delete volume failed.'))
model_update, volumes_model_update = (
self.driver.delete_group(self.ctxt, group, [volume]))
self.assertEqual('error_deleting', volumes_model_update[0]['status'])
self.assertEqual(fields.GroupStatus.ERROR_DELETING,
model_update['status'])
def test_delete_generic_group_sucessfully(self):
"""test a successful generic group deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
group_type = group_types.create(self.ctxt, 'CG', {})
group = self._create_group(group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(volume_type_id=cg_type.id,
volume = self._create_volume(group_type_id=group_type.id,
provider_location=location,
consistencygroup_id=cg.id)
group_id=group.id)
model_update, volumes_model_update = (
self.driver.delete_group(self.ctxt, cg, [volume]))
self.driver.delete_group(self.ctxt, group, [volume]))
self.assertEqual('deleted', volumes_model_update[0]['status'])
self.assertEqual('deleted', model_update['status'])
self.assertEqual(fields.GroupStatus.DELETED, model_update['status'])
@mock.patch.object(helper.DS8KCommonHelper, 'delete_lun')
def test_delete_generic_group_failed(self, mock_delete_lun):
"""test a failed cg deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(volume_type_id=cg_type.id,
provider_location=location,
consistencygroup_id=cg.id)
mock_delete_lun.side_effect = (
restclient.APIException('delete volume failed.'))
model_update, volumes_model_update = (
self.driver.delete_group(self.ctxt, cg, [volume]))
self.assertEqual('error_deleting', volumes_model_update[0]['status'])
self.assertEqual('error_deleting', model_update['status'])
@mock.patch('oslo_concurrency.lockutils.external_lock',
new=mock.MagicMock())
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_create_cgsnapshot_sucessfully(self, mock_get_flashcopy,
mock_sleep):
"""test a successful cgsnapshot creation."""
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
def test_create_consistency_group_snapshot_sucessfully(
self, mock_create_lun, mock_get_flashcopy, mock_sleep):
"""test a successful consistency group snapshot creation."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
location = six.text_type({'vol_hex_id': '0002'})
volume = self._create_volume(volume_type_id=cg_type.id,
provider_location=location,
consistencygroup_id=cg.id)
cg_snapshot = self._create_cgsnapshot(cg_id=cg.id)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': '2000'})
volume = self._create_volume(provider_location=location,
group_id=group.id)
group_snapshot = (
self._create_group_snapshot(group_id=group.id,
group_type_id=group_type.id))
snapshot = self._create_snapshot(volume_id=volume.id,
volume_type_id=cg_type.id,
cgsnapshot_id=cg_snapshot.id)
group_snapshot_id=group_snapshot.id)
mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}]
mock_create_lun.return_value = '2200'
model_update, snapshots_model_update = (
self.driver.create_cgsnapshot(self.ctxt, cg_snapshot, [snapshot]))
self.driver.create_group_snapshot(
self.ctxt, group_snapshot, [snapshot]))
location = ast.literal_eval(
snapshots_model_update[0]['provider_location'])
self.assertEqual('2200', location['vol_hex_id'])
self.assertEqual('available', snapshots_model_update[0]['status'])
self.assertEqual('available', model_update['status'])
self.assertEqual(fields.GroupStatus.AVAILABLE, model_update['status'])
def test_delete_cgsnapshot_sucessfully(self):
"""test a successful cgsnapshot deletion."""
def test_delete_consistency_group_snapshot_sucessfully(self):
"""test a successful consistency group snapshot deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
location = six.text_type({'vol_hex_id': '0002'})
volume = self._create_volume(volume_type_id=cg_type.id,
provider_location=location,
consistencygroup_id=cg.id)
cg_snapshot = self._create_cgsnapshot(cg_id=cg.id)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': '2000'})
volume = self._create_volume(provider_location=location,
group_id=group.id)
group_snapshot = (
self._create_group_snapshot(group_id=group.id,
group_type_id=group_type.id))
snapshot = self._create_snapshot(volume_id=volume.id,
volume_type_id=cg_type.id,
cgsnapshot_id=cg_snapshot.id)
group_snapshot_id=group_snapshot.id)
model_update, snapshots_model_update = (
self.driver.delete_cgsnapshot(self.ctxt, cg_snapshot, [snapshot]))
self.driver.delete_group_snapshot(
self.ctxt, group_snapshot, [snapshot]))
self.assertEqual('deleted', snapshots_model_update[0]['status'])
self.assertEqual('deleted', model_update['status'])
self.assertEqual(fields.GroupSnapshotStatus.DELETED,
model_update['status'])
@mock.patch.object(helper.DS8KCommonHelper, 'delete_lun')
def test_delete_cgsnapshot_failed(self, mock_delete_lun):
"""test a failed cgsnapshot deletion."""
def test_delete_consistency_group_snapshot_failed(self,
mock_delete_lun):
"""test a failed consistency group snapshot deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
location = six.text_type({'vol_hex_id': '0002'})
volume = self._create_volume(volume_type_id=cg_type.id,
provider_location=location,
consistencygroup_id=cg.id)
cg_snapshot = self._create_cgsnapshot(cg_id=cg.id)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': '2000'})
volume = self._create_volume(provider_location=location,
group_id=group.id)
group_snapshot = (
self._create_group_snapshot(group_id=group.id,
group_type_id=group_type.id))
snapshot = self._create_snapshot(volume_id=volume.id,
volume_type_id=cg_type.id,
cgsnapshot_id=cg_snapshot.id)
group_snapshot_id=group_snapshot.id)
mock_delete_lun.side_effect = (
restclient.APIException('delete snapshot failed.'))
model_update, snapshots_model_update = (
self.driver.delete_cgsnapshot(self.ctxt, cg_snapshot, [snapshot]))
self.driver.delete_group_snapshot(
self.ctxt, group_snapshot, [snapshot]))
self.assertEqual('error_deleting', snapshots_model_update[0]['status'])
self.assertEqual('error_deleting', model_update['status'])
@mock.patch('oslo_concurrency.lockutils.external_lock',
new=mock.MagicMock())
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_create_consisgroup_from_consisgroup(self, mock_get_flashcopy,
mock_sleep):
"""test creation of consistency group from consistency group."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
src_cg = self._create_consistencygroup(volume_type_id=cg_type.id)
location = six.text_type({'vol_hex_id': '0002'})
src_vol = self._create_volume(volume_type_id=cg_type.id,
provider_location=location,
consistencygroup_id=src_cg.id)
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
volume = self._create_volume(volume_type_id=cg_type.id,
consistencygroup_id=cg.id)
mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}]
model_update, volumes_model_update = (
self.driver.create_consistencygroup_from_src(
self.ctxt, cg, [volume], None, None, src_cg, [src_vol]))
self.assertEqual(TEST_VOLUME_ID,
volumes_model_update[0]['metadata']['vol_hex_id'])
self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE,
self.assertEqual(fields.GroupSnapshotStatus.ERROR_DELETING,
model_update['status'])
@mock.patch('oslo_concurrency.lockutils.external_lock',
new=mock.MagicMock())
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_create_consisgroup_from_cgsnapshot(self, mock_get_flashcopy,
mock_sleep):
"""test creation of consistency group from cgsnapshot."""
def test_create_consisgroup_from_consisgroup(self, mock_get_flashcopy,
mock_create_lun, mock_sleep):
"""test creation of consistency group from consistency group."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
cg_type = volume_types.create(self.ctxt, 'CG', {})
src_cg = self._create_consistencygroup(volume_type_id=cg_type.id)
src_vol = self._create_volume(volume_type_id=cg_type.id,
consistencygroup_id=src_cg.id)
cg_snapshot = self._create_cgsnapshot(cg_id=src_cg.id)
location = six.text_type({'vol_hex_id': '0002'})
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
src_group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': '2000'})
src_vol = self._create_volume(provider_location=location,
group_id=src_group.id)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}]
mock_create_lun.return_value = '2200'
model_update, volumes_model_update = (
self.driver.create_group_from_src(
self.ctxt, group, [volume], None, None, src_group, [src_vol]))
self.assertEqual('2200',
volumes_model_update[0]['metadata']['vol_hex_id'])
self.assertEqual(fields.GroupStatus.AVAILABLE,
model_update['status'])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_create_consisgroup_from_cgsnapshot(self, mock_get_flashcopy,
mock_create_lun, mock_sleep):
"""test creation of consistency group from cgsnapshot."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
src_group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
src_vol = self._create_volume(group_id=src_group.id)
group_snapshot = (
self._create_group_snapshot(group_id=src_group.id,
group_type_id=group_type.id))
location = six.text_type({'vol_hex_id': '2000'})
snapshot = self._create_snapshot(volume_id=src_vol.id,
volume_type_id=cg_type.id,
provider_location=location,
cgsnapshot_id=cg_snapshot.id)
cg = self._create_consistencygroup(volume_type_id=cg_type.id)
volume = self._create_volume(volume_type_id=cg_type.id,
consistencygroup_id=cg.id)
group_snapshot_id=group_snapshot.id)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}]
mock_create_lun.return_value = '2200'
model_update, volumes_model_update = (
self.driver.create_consistencygroup_from_src(
self.ctxt, cg, [volume], cg_snapshot, [snapshot], None, None))
self.assertEqual(TEST_VOLUME_ID,
volumes_model_update[0]['metadata']['vol_hex_id'])
self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE,
self.driver.create_group_from_src(
self.ctxt, group, [volume], group_snapshot,
[snapshot], None, None))
self.assertEqual(
'2200', volumes_model_update[0]['metadata']['vol_hex_id'])
self.assertEqual(fields.GroupStatus.AVAILABLE,
model_update['status'])
@mock.patch.object(eventlet, 'sleep')
@ -2647,13 +2900,8 @@ class DS8KProxyTest(test.TestCase):
self.ctxt, [volume], TEST_TARGET_DS8K_IP)
self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id)
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(replication.Replication, 'do_pprc_failover')
def test_failover_host_failed(self, mock_do_pprc_failover,
mock_volume_types):
def test_failover_host_failed(self, mock_do_pprc_failover):
"""Failover host should raise exception when failed."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -2801,13 +3049,8 @@ class DS8KProxyTest(test.TestCase):
self.ctxt, [volume], 'default')
self.assertEqual('default', secondary_id)
@mock.patch.object(
volume_types,
'get_volume_type_extra_specs',
return_value={'replication_enabled': '<is> True'})
@mock.patch.object(replication.Replication, 'start_pprc_failback')
def test_failback_host_failed(self, mock_start_pprc_failback,
mock_volume_types):
def test_failback_host_failed(self, mock_start_pprc_failback):
"""Failback host should raise exception when failed."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,

View File

@ -61,6 +61,8 @@ def filter_alnum(s):
class DS8KCommonHelper(object):
"""Manage the primary backend, it is common class too."""
OPTIONAL_PARAMS = ['ds8k_host_type', 'lss_range_for_cg']
def __init__(self, conf, HTTPConnectorObject=None):
self.conf = conf
self._connector_obj = HTTPConnectorObject
@ -76,9 +78,13 @@ class DS8KCommonHelper(object):
def _get_value(self, key):
if getattr(self.conf, 'safe_get', 'get') == 'get':
return self.conf.get(key)
value = self.conf.get(key)
else:
return self.conf.safe_get(key)
value = self.conf.safe_get(key)
if not value and key not in self.OPTIONAL_PARAMS:
raise exception.InvalidParameterValue(
err=(_('Param [%s] should be provided.') % key))
return value
def get_thin_provision(self):
return self._disable_thin_provision
@ -100,6 +106,7 @@ class DS8KCommonHelper(object):
self._get_storage_information()
self._check_host_type()
self.backend['pools_str'] = self._get_value('san_clustername')
self._get_lss_ids_for_cg()
self._verify_version()
self._verify_pools()
@ -109,8 +116,8 @@ class DS8KCommonHelper(object):
def _get_certificate(self, host):
cert_file = strings.CERTIFICATES_PATH + host + '.pem'
msg = "certificate file for DS8K %(host)s: %(cert)s"
LOG.debug(msg, {'host': host, 'cert': cert_file})
LOG.debug("certificate file for DS8K %(host)s: %(cert)s",
{'host': host, 'cert': cert_file})
# Use the certificate if it exists, otherwise use the System CA Bundle
if os.path.exists(cert_file):
return cert_file
@ -119,23 +126,23 @@ class DS8KCommonHelper(object):
return True
def _create_client(self):
san_ip = self._get_value('san_ip')
try:
clear_pass = cryptish.decrypt(self._get_value('san_password'))
except TypeError:
err = _('Param [san_password] is invalid.')
raise exception.InvalidParameterValue(err=err)
verify = self._get_certificate(self._get_value('san_ip'))
raise exception.InvalidParameterValue(
err=_('Param [san_password] is invalid.'))
verify = self._get_certificate(san_ip)
try:
self._client = restclient.RESTScheduler(
self._get_value('san_ip'),
san_ip,
self._get_value('san_login'),
clear_pass,
self._connector_obj,
verify)
except restclient.TimeoutException:
msg = (_("Can't connect to %(host)s") %
{'host': self._get_value('san_ip')})
raise restclient.APIException(data=msg)
raise restclient.APIException(
data=(_("Can't connect to %(host)s") % {'host': san_ip}))
self.backend['rest_version'] = self._get_version()['bundle_version']
LOG.info("Connection to DS8K storage system %(host)s has been "
"established successfully, the version of REST is %(rest)s.",
@ -148,12 +155,31 @@ class DS8KCommonHelper(object):
self.backend['storage_wwnn'] = storage_info['wwnn']
self.backend['storage_version'] = storage_info['release']
def _get_lss_ids_for_cg(self):
lss_range = self._get_value('lss_range_for_cg')
if lss_range:
lss_range = lss_range.replace(' ', '').split('-')
if len(lss_range) == 1:
begin = int(lss_range[0], 16)
end = begin
else:
begin = int(lss_range[0], 16)
end = int(lss_range[1], 16)
if begin > 0xFF or end > 0xFF or begin > end:
raise exception.InvalidParameterValue(
err=_('Param [lss_range_for_cg] is invalid, it '
'should be within 00-FF.'))
self.backend['lss_ids_for_cg'] = set(
('%02x' % i).upper() for i in range(begin, end + 1))
else:
self.backend['lss_ids_for_cg'] = set()
def _check_host_type(self):
ds8k_host_type = self._get_value('ds8k_host_type')
if ((ds8k_host_type is not None) and
if (ds8k_host_type and
(ds8k_host_type not in VALID_HOST_TYPES)):
msg = (_("Param [ds8k_host_type] must be one of: %(values)s.") %
{'values': VALID_HOST_TYPES[1:-1]})
msg = (_("Param [ds8k_host_type] must be one of: %(values)s.")
% {'values': VALID_HOST_TYPES[1:-1]})
LOG.error(msg)
raise exception.InvalidParameterValue(err=msg)
self.backend['host_type_override'] = (
@ -161,12 +187,12 @@ class DS8KCommonHelper(object):
def _verify_version(self):
if self.backend['storage_version'] == '8.0.1':
msg = (_("8.0.1 does not support bulk deletion of volumes, "
"if you want to use this version of driver, "
"please upgrade the CCL, and make sure the REST "
"version is not lower than %s.")
% VALID_REST_VERSION_5_8_MIN)
raise exception.VolumeDriverException(data=msg)
raise exception.VolumeDriverException(
data=(_("8.0.1 does not support bulk deletion of volumes, "
"if you want to use this version of driver, "
"please upgrade the CCL, and make sure the REST "
"version is not lower than %s.")
% VALID_REST_VERSION_5_8_MIN))
else:
if (('5.7' in self.backend['rest_version'] and
dist_version.LooseVersion(self.backend['rest_version']) <
@ -174,13 +200,13 @@ class DS8KCommonHelper(object):
('5.8' in self.backend['rest_version'] and
dist_version.LooseVersion(self.backend['rest_version']) <
dist_version.LooseVersion(VALID_REST_VERSION_5_8_MIN))):
msg = (_("REST version %(invalid)s is lower than "
"%(valid)s, please upgrade it in DS8K.")
% {'invalid': self.backend['rest_version'],
'valid': (VALID_REST_VERSION_5_7_MIN if '5.7' in
self.backend['rest_version'] else
VALID_REST_VERSION_5_8_MIN)})
raise exception.VolumeDriverException(data=msg)
raise exception.VolumeDriverException(
data=(_("REST version %(invalid)s is lower than "
"%(valid)s, please upgrade it in DS8K.")
% {'invalid': self.backend['rest_version'],
'valid': (VALID_REST_VERSION_5_7_MIN if '5.7' in
self.backend['rest_version'] else
VALID_REST_VERSION_5_8_MIN)}))
if self._connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD:
if (dist_version.LooseVersion(self.backend['storage_version']) <
@ -193,15 +219,15 @@ class DS8KCommonHelper(object):
elif self._connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD:
ptype = 'ckd'
else:
err = _('Param [connection_type] is invalid.')
raise exception.InvalidParameterValue(err=err)
raise exception.InvalidParameterValue(
err=_('Param [connection_type] is invalid.'))
self._storage_pools = self.get_pools()
for pid, p in self._storage_pools.items():
if p['stgtype'] != ptype:
LOG.error('The stgtype of pool %(pool)s is %(ptype)s.',
{'pool': pid, 'ptype': p['stgtype']})
err = _('Param [san_clustername] is invalid.')
raise exception.InvalidParameterValue(err=err)
raise exception.InvalidParameterValue(
err='Param [san_clustername] is invalid.')
@proxy.logger
def get_pools(self, new_pools=None):
@ -229,7 +255,7 @@ class DS8KCommonHelper(object):
}) for p in pools)
@proxy.logger
def find_available_lss(self, pool, find_new_pid, excluded_lss):
def find_pool_lss_pair(self, pool, find_new_pid, excluded_lss):
if pool:
node = int(pool[1:], 16) % 2
lss = self._find_lss(node, excluded_lss)
@ -237,9 +263,9 @@ class DS8KCommonHelper(object):
return (pool, lss)
else:
if not find_new_pid:
msg = _('All LSS/LCU IDs for configured pools on '
'storage are exhausted.')
raise restclient.LssIDExhaustError(message=msg)
raise restclient.LssIDExhaustError(
message=_('All LSS/LCU IDs for configured pools '
'on storage are exhausted.'))
# find new pool id and lss for lun
return self.find_biggest_pool_and_lss(excluded_lss)
@ -250,8 +276,8 @@ class DS8KCommonHelper(object):
lss = self._find_lss(pool['node'], excluded_lss)
if lss:
return pool_id, lss
msg = _("All LSS/LCU IDs for configured pools are exhausted.")
raise restclient.LssIDExhaustError(message=msg)
raise restclient.LssIDExhaustError(
message=_("All LSS/LCU IDs for configured pools are exhausted."))
@proxy.logger
def _find_lss(self, node, excluded_lss):
@ -259,20 +285,35 @@ class DS8KCommonHelper(object):
existing_lss = self.get_all_lss(fileds)
LOG.info("existing LSS IDs are: %s.",
','.join([lss['id'] for lss in existing_lss]))
existing_lss_cg, nonexistent_lss_cg = (
self._classify_lss_for_cg(existing_lss))
# exclude LSSs that are full.
if excluded_lss:
existing_lss = [lss for lss in existing_lss
if lss['id'] not in excluded_lss]
lss = self._find_from_existing_lss(node, existing_lss)
lss = lss if lss else self._find_from_unexisting_lss(node,
existing_lss)
# exclude LSSs that reserved for CG.
candidates = [lss for lss in existing_lss
if lss['id'] not in existing_lss_cg]
lss = self._find_from_existing_lss(node, candidates)
if not lss:
lss = self._find_from_nonexistent_lss(node, existing_lss,
nonexistent_lss_cg)
return lss
def _classify_lss_for_cg(self, existing_lss):
existing_lss_ids = set(lss['id'] for lss in existing_lss)
existing_lss_cg = existing_lss_ids & self.backend['lss_ids_for_cg']
nonexistent_lss_cg = self.backend['lss_ids_for_cg'] - existing_lss_cg
return existing_lss_cg, nonexistent_lss_cg
def _find_from_existing_lss(self, node, existing_lss):
# exclude LSSs that are used by PPRC paths.
lss_in_pprc = self.get_lss_in_pprc_paths()
if lss_in_pprc:
existing_lss = [lss for lss in existing_lss
if lss['id'] not in lss_in_pprc]
# exclude wrong type of LSSs and those that are not in expected node.
existing_lss = [lss for lss in existing_lss if lss['type'] == 'fb'
and int(lss['group']) == node]
lss_id = None
@ -286,18 +327,19 @@ class DS8KCommonHelper(object):
{'lss': lss_id, 'num': lss['configvols']})
return lss_id
def _find_from_unexisting_lss(self, node, existing_lss):
def _find_from_nonexistent_lss(self, node, existing_lss, lss_cg=None):
addrgrps = set(int(lss['addrgrp'], 16) for lss in existing_lss if
lss['type'] == 'ckd' and int(lss['group']) == node)
fulllss = set(int(lss['id'], 16) for lss in existing_lss if
lss['type'] == 'fb' and int(lss['group']) == node)
# look for an available lss from unexisting lss
cglss = set(int(lss, 16) for lss in lss_cg) if lss_cg else set()
# look for an available lss from nonexistent lss
lss_id = None
for lss in range(node, LSS_SLOTS, 2):
addrgrp = lss // 16
if addrgrp not in addrgrps and lss not in fulllss:
if (addrgrp not in addrgrps and
lss not in fulllss and
lss not in cglss):
lss_id = ("%02x" % lss).upper()
break
LOG.info('_find_from_unexisting_lss: choose %s.', lss_id)
@ -314,7 +356,7 @@ class DS8KCommonHelper(object):
if lun.type_os400:
volData['os400'] = lun.type_os400
volData['name'] = lun.ds_name
volData['pool'], volData['lss'] = lun.lss_pair['source']
volData['pool'], volData['lss'] = lun.pool_lss_pair['source']
lun.ds_id = self._create_lun(volData)
return lun
@ -339,7 +381,7 @@ class DS8KCommonHelper(object):
else:
lun_ids_str = ','.join(lun_ids)
lun_ids = []
LOG.error("Deleting volumes: %s.", lun_ids_str)
LOG.info("Deleting volumes: %s.", lun_ids_str)
self._delete_lun(lun_ids_str)
def get_lss_in_pprc_paths(self):
@ -361,7 +403,7 @@ class DS8KCommonHelper(object):
vol_ids = [vol['volume_id'] for vol in host['mappings_briefs']]
if vol_id in vol_ids:
host_ids.append(host['id'])
LOG.info('_find_host: host IDs are: %s.', host_ids)
LOG.info('_find_host: host IDs are %s.', ','.join(host_ids))
return host_ids
def wait_flashcopy_finished(self, src_luns, tgt_luns):
@ -378,9 +420,9 @@ class DS8KCommonHelper(object):
continue
if fcs[0]['state'] not in ('valid',
'validation_required'):
msg = (_('Flashcopy ended up in bad state %s. '
'Rolling back.') % fcs[0]['state'])
raise restclient.APIException(data=msg)
raise restclient.APIException(
data=(_('Flashcopy ended up in bad state %s. '
'Rolling back.') % fcs[0]['state']))
if fc_state.count(False) == 0:
break
finished = True
@ -420,10 +462,10 @@ class DS8KCommonHelper(object):
unfinished_pairs = [p for p in pairs if p['state'] != state]
for p in unfinished_pairs:
if p['state'] in invalid_states:
msg = (_('Metro Mirror pair %(id)s enters into '
'state %(state)s. ') %
{'id': p['id'], 'state': p['state']})
raise restclient.APIException(data=msg)
raise restclient.APIException(
data=(_('Metro Mirror pair %(id)s enters into '
'state %(state)s. ')
% {'id': p['id'], 'state': p['state']}))
finally:
if not finished and delete:
pair_ids = {'ids': ','.join([p['id'] for p in pairs])}
@ -459,22 +501,19 @@ class DS8KCommonHelper(object):
hp['wwpn'] for hp in host_ports)
unconfigured_ports = set(
hp['wwpn'] for hp in host_ports if not hp['host_id'])
msg = ("initialize_connection: defined_hosts: %(defined)s, "
"unknown_ports: %(unknown)s, unconfigured_ports: "
"%(unconfigured)s.")
LOG.debug(msg, {
"defined": defined_hosts,
"unknown": unknown_ports,
"unconfigured": unconfigured_ports
})
LOG.debug("initialize_connection: defined_hosts: %(defined)s, "
"unknown_ports: %(unknown)s, unconfigured_ports: "
"%(unconfigured)s.", {"defined": defined_hosts,
"unknown": unknown_ports,
"unconfigured": unconfigured_ports})
# Create host if it is not defined
if not defined_hosts:
host_id = self._create_host(host)['id']
elif len(defined_hosts) == 1:
host_id = defined_hosts.pop()
else:
msg = _('More than one host defined for requested ports.')
raise restclient.APIException(message=msg)
raise restclient.APIException(
message='More than one host defined for requested ports.')
LOG.info('Volume will be attached to host %s.', host_id)
# Create missing host ports
@ -511,13 +550,11 @@ class DS8KCommonHelper(object):
host_ports = None
delete_ports = None
defined_hosts = self._find_host(vol_id)
msg = ("terminate_connection: host_ports: %(host)s, defined_hosts: "
"%(defined)s, delete_ports: %(delete)s.")
LOG.debug(msg, {
"host": host_ports,
"defined": defined_hosts,
"delete": delete_ports
})
LOG.debug("terminate_connection: host_ports: %(host)s, "
"defined_hosts: %(defined)s, delete_ports: %(delete)s.",
{"host": host_ports,
"defined": defined_hosts,
"delete": delete_ports})
if not defined_hosts:
LOG.info('Could not find host.')
@ -552,33 +589,49 @@ class DS8KCommonHelper(object):
target_map = {initiator.upper(): target_ports
for initiator in connector['wwpns']}
ret_info['data']['initiator_target_map'] = target_map
return ret_info
return ret_info
def create_group(self, ctxt, group):
def create_group(self, group):
return {'status': fields.GroupStatus.AVAILABLE}
def delete_group(self, ctxt, group, luns):
def delete_group(self, group, src_luns):
volumes_model_update = []
model_update = {'status': fields.GroupStatus.DELETED}
if luns:
if src_luns:
try:
self.delete_lun(luns)
except restclient.APIException:
self.delete_lun(src_luns)
except restclient.APIException as e:
model_update['status'] = fields.GroupStatus.ERROR_DELETING
LOG.exception(
"Failed to delete the volumes in group %(group)s",
{'group': group.id})
"Failed to delete the volumes in group %(group)s, "
"Exception = %(ex)s",
{'group': group.id, 'ex': e})
for lun in luns:
for src_lun in src_luns:
volumes_model_update.append({
'id': lun.os_id,
'id': src_lun.os_id,
'status': model_update['status']
})
return model_update, volumes_model_update
def update_group(self, ctxt, group, add_volumes, remove_volumes):
return None, None, None
def delete_group_snapshot(self, group_snapshot, tgt_luns):
snapshots_model_update = []
model_update = {'status': fields.GroupSnapshotStatus.DELETED}
if tgt_luns:
try:
self.delete_lun(tgt_luns)
except restclient.APIException as e:
model_update['status'] = (
fields.GroupSnapshotStatus.ERROR_DELETING)
LOG.error("Failed to delete snapshots in group snapshot "
"%(gsnapshot)s, Exception = %(ex)s",
{'gsnapshot': group_snapshot.id, 'ex': e})
for tgt_lun in tgt_luns:
snapshots_model_update.append({
'id': tgt_lun.os_id,
'status': model_update['status']
})
return model_update, snapshots_model_update
def _delete_lun(self, lun_ids_str):
self._client.send('DELETE', '/volumes',
@ -768,28 +821,35 @@ class DS8KReplicationSourceHelper(DS8KCommonHelper):
excluded_lss)
if lss:
return pool_id, lss
msg = _("All LSS/LCU IDs for configured pools are exhausted.")
raise restclient.LssIDExhaustError(message=msg)
raise restclient.LssIDExhaustError(
message=_("All LSS/LCU IDs for configured pools are exhausted."))
@proxy.logger
def _find_lss_for_type_replication(self, node, excluded_lss):
# prefer to choose the non-existing one firstly
# prefer to choose non-existing one first.
fileds = ['id', 'type', 'addrgrp', 'group', 'configvols']
existing_lss = self.get_all_lss(fileds)
LOG.info("existing LSS IDs are %s",
','.join([lss['id'] for lss in existing_lss]))
lss_id = self._find_from_unexisting_lss(node, existing_lss)
existing_lss_cg, nonexistent_lss_cg = (
self._classify_lss_for_cg(existing_lss))
lss_id = self._find_from_nonexistent_lss(node, existing_lss,
nonexistent_lss_cg)
if not lss_id:
if excluded_lss:
existing_lss = [lss for lss in existing_lss
if lss['id'] not in excluded_lss]
lss_id = self._find_from_existing_lss(node, existing_lss)
candidates = [lss for lss in existing_lss
if lss['id'] not in existing_lss_cg]
lss_id = self._find_from_existing_lss(node, candidates)
return lss_id
class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper):
"""Manage target storage for replication."""
OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs']
def setup(self):
self._create_client()
self._get_storage_information()
@ -814,6 +874,21 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper):
self.backend['port_pairs'] = port_pairs
self.backend['id'] = self._get_value('backend_id')
@proxy.logger
def _find_lss_for_type_replication(self, node, excluded_lss):
# prefer to choose non-existing one first.
fileds = ['id', 'type', 'addrgrp', 'group', 'configvols']
existing_lss = self.get_all_lss(fileds)
LOG.info("existing LSS IDs are %s",
','.join([lss['id'] for lss in existing_lss]))
lss_id = self._find_from_nonexistent_lss(node, existing_lss)
if not lss_id:
if excluded_lss:
existing_lss = [lss for lss in existing_lss
if lss['id'] not in excluded_lss]
lss_id = self._find_from_existing_lss(node, existing_lss)
return lss_id
def create_lun(self, lun):
volData = {
'cap': self._gb2b(lun.size),
@ -826,7 +901,7 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper):
volData['os400'] = lun.type_os400
volData['name'] = lun.replica_ds_name
volData['pool'], volData['lss'] = lun.lss_pair['target']
volData['pool'], volData['lss'] = lun.pool_lss_pair['target']
volID = self._create_lun(volData)
lun.replication_driver_data.update(
{self.backend['id']: {'vol_hex_id': volID}})
@ -848,16 +923,19 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper):
class DS8KECKDHelper(DS8KCommonHelper):
"""Manage ECKD volume."""
OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs', 'ds8k_ssid_prefix',
'lss_range_for_cg']
@staticmethod
def _gb2cyl(gb):
# now only support 3390, no 3380 or 3390-A
cyl = int(math.ceil(gb * 1263.28))
if cyl > 65520:
msg = (_("For 3390 volume, capacity can be in the range "
"1-65520(849KiB to 55.68GiB) cylinders, now it "
"is %(gb)d GiB, equals to %(cyl)d cylinders.") %
{'gb': gb, 'cyl': cyl})
raise exception.VolumeDriverException(data=msg)
raise exception.VolumeDriverException(
message=(_("For 3390 volume, capacity can be in the range "
"1-65520(849KiB to 55.68GiB) cylinders, now it "
"is %(gb)d GiB, equals to %(cyl)d cylinders.")
% {'gb': gb, 'cyl': cyl}))
return cyl
@staticmethod
@ -874,6 +952,7 @@ class DS8KECKDHelper(DS8KCommonHelper):
self._create_client()
self._get_storage_information()
self._check_host_type()
self._get_lss_ids_for_cg()
self.backend['pools_str'] = self._get_value('san_clustername')
ssid_prefix = self._get_value('ds8k_ssid_prefix')
self.backend['ssid_prefix'] = ssid_prefix if ssid_prefix else 'FF'
@ -885,10 +964,10 @@ class DS8KECKDHelper(DS8KCommonHelper):
def _check_and_verify_lcus(self):
map_str = self._get_value('ds8k_devadd_unitadd_mapping')
if not map_str:
err = _('Param [ds8k_devadd_unitadd_mapping] is not '
'provided, please provide the mapping between '
'IODevice address and unit address.')
raise exception.InvalidParameterValue(err=err)
raise exception.InvalidParameterValue(
err=_('Param [ds8k_devadd_unitadd_mapping] is not '
'provided, please provide the mapping between '
'IODevice address and unit address.'))
# verify the LCU
mappings = map_str.replace(' ', '').upper().split(';')
@ -896,9 +975,9 @@ class DS8KECKDHelper(DS8KCommonHelper):
dev_mapping = {p[1]: int(p[0], 16) for p in pairs}
for lcu in dev_mapping.keys():
if int(lcu, 16) > 255:
err = (_('LCU %s in param [ds8k_devadd_unitadd_mapping]'
'is invalid, it should be within 00-FF.') % lcu)
raise exception.InvalidParameterValue(err=err)
raise exception.InvalidParameterValue(
err=(_('LCU %s in param [ds8k_devadd_unitadd_mapping]'
'is invalid, it should be within 00-FF.') % lcu))
# verify address group
all_lss = self.get_all_lss(['id', 'type'])
@ -907,23 +986,24 @@ class DS8KECKDHelper(DS8KCommonHelper):
ckd_addrgrp = set((int(lcu, 16) // 16) for lcu in dev_mapping.keys())
intersection = ckd_addrgrp & fb_addrgrp
if intersection:
msg = (_('Invaild LCUs which first digit is %s, they are'
'for fb volume.') % ', '.join(intersection))
raise exception.VolumeDriverException(data=msg)
raise exception.VolumeDriverException(
message=(_('LCUs which first digit is %s are invalid, they '
'are for FB volume.') % ', '.join(intersection)))
# create LCU that doesn't exist
ckd_lss = set(lss['id'] for lss in all_lss if lss['type'] == 'ckd')
unexisting_lcu = set(dev_mapping.keys()) - ckd_lss
if unexisting_lcu:
LOG.info('LCUs %s do not exist in DS8K, they will be created.',
','.join(unexisting_lcu))
for lcu in unexisting_lcu:
nonexistent_lcu = set(dev_mapping.keys()) - ckd_lss
if nonexistent_lcu:
LOG.info('LCUs %s do not exist in DS8K, they will be '
'created.', ','.join(nonexistent_lcu))
for lcu in nonexistent_lcu:
try:
self._create_lcu(self.backend['ssid_prefix'], lcu)
except restclient.APIException as e:
msg = (_('can not create lcu %(lcu)s, Exception= '
'%(e)s') % {'lcu': lcu, 'e': six.text_type(e)})
raise exception.VolumeDriverException(data=msg)
raise exception.VolumeDriverException(
message=(_('Can not create lcu %(lcu)s, '
'Exception= %(e)s.')
% {'lcu': lcu, 'e': six.text_type(e)}))
return dev_mapping
def _format_pools(self, pools):
@ -944,14 +1024,19 @@ class DS8KECKDHelper(DS8KCommonHelper):
# all LCUs have existed, not like LSS
all_lss = self.get_all_lss(['id', 'type', 'group', 'configvols'])
existing_lcu = [lss for lss in all_lss if lss['type'] == 'ckd']
excluded_lcu = excluded_lcu or []
candidate_lcu = [lcu for lcu in existing_lcu if (
lcu['id'] in self.backend['device_mapping'].keys() and
lcu['id'] not in excluded_lcu and
lcu['group'] == str(node))]
# exclude LCUs reserved for CG.
candidate_lcu = [lss for lss in candidate_lcu if lss['id']
not in self.backend['lss_ids_for_cg']]
if not candidate_lcu:
return None
# perfer to use LCU that is not in PPRC path first.
# prefer to use LCU that is not in PPRC path first.
lcu_pprc = self.get_lss_in_pprc_paths() & set(
self.backend['device_mapping'].keys())
if lcu_pprc:
@ -984,7 +1069,7 @@ class DS8KECKDHelper(DS8KCommonHelper):
}
lun.data_type = '3390'
volData['name'] = lun.ds_name
volData['pool'], volData['lss'] = lun.lss_pair['source']
volData['pool'], volData['lss'] = lun.pool_lss_pair['source']
lun.ds_id = self._create_lun(volData)
return lun
@ -1030,7 +1115,7 @@ class DS8KReplicationTargetECKDHelper(DS8KECKDHelper,
lun.data_type = '3390'
volData['name'] = lun.replica_ds_name
volData['pool'], volData['lss'] = lun.lss_pair['target']
volData['pool'], volData['lss'] = lun.pool_lss_pair['target']
volID = self._create_lun(volData)
lun.replication_driver_data.update(
{self.backend['id']: {'vol_hex_id': volID}})

View File

@ -37,7 +37,7 @@ volume_driver =
cinder.volume.drivers.ibm.ibm_storage.ibm_storage.IBMStorageDriver
chap = disabled
connection_type = fibre_channel
replication_device = backend_id: bar,
replication_device = connection_type: fibre_channel, backend_id: bar,
san_ip: bar.com, san_login: actual_username,
san_password: actual_password, san_clustername: P4,
port_pairs: I0236-I0306; I0237-I0307
@ -57,24 +57,28 @@ connection_type = fibre_channel
"""
import ast
import collections
import json
import six
from oslo_config import cfg
from oslo_log import log as logging
from cinder import context
from cinder import coordination
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import fields
from cinder.utils import synchronized
import cinder.volume.drivers.ibm.ibm_storage as storage
from cinder.volume.drivers.ibm.ibm_storage import (
ds8k_replication as replication)
from cinder.volume.drivers.ibm.ibm_storage import ds8k_helper as helper
from cinder.volume.drivers.ibm.ibm_storage \
import ds8k_replication as replication
from cinder.volume.drivers.ibm.ibm_storage import ds8k_restclient as restclient
from cinder.volume.drivers.ibm.ibm_storage import proxy
from cinder.volume.drivers.ibm.ibm_storage import strings
from cinder.volume import group_types
from cinder.volume import utils
from cinder.volume import volume_types
LOG = logging.getLogger(__name__)
@ -91,10 +95,7 @@ EXTRA_SPECS_DEFAULTS = {
'thin': True,
'replication_enabled': False,
'consistency': False,
'os400': '',
'consistent_group_replication_enabled': False,
'group_replication_enabled': False,
'consistent_group_snapshot_enabled': False,
'os400': ''
}
ds8k_opts = [
@ -105,7 +106,11 @@ ds8k_opts = [
cfg.StrOpt(
'ds8k_ssid_prefix',
default='FF',
help='Set the first two digits of SSID'),
help='Set the first two digits of SSID.'),
cfg.StrOpt(
'lss_range_for_cg',
default='',
help='Reserve LSSs for consistency group.'),
cfg.StrOpt(
'ds8k_host_type',
default='auto',
@ -135,16 +140,23 @@ class Lun(object):
self.type_os400 = lun.type_os400
self.data_type = lun.data_type
self.type_replication = lun.type_replication
self.group = lun.group
if not self.is_snapshot and self.type_replication:
self.replica_ds_name = lun.replica_ds_name
self.replication_driver_data = lun.replication_driver_data
self.replication_driver_data = (
lun.replication_driver_data.copy())
self.replication_status = lun.replication_status
self.lss_pair = lun.lss_pair
self.pool_lss_pair = lun.pool_lss_pair
def update_volume(self, lun):
volume_update = lun.get_volume_update()
volume_update['provider_location'] = six.text_type({
'vol_hex_id': self.ds_id})
if self.type_replication:
volume_update['replication_driver_data'] = json.dumps(
self.replication_driver_data)
volume_update['metadata']['replication'] = six.text_type(
self.replication_driver_data)
volume_update['metadata']['vol_hex_id'] = self.ds_id
return volume_update
@ -165,19 +177,22 @@ class Lun(object):
if volume.provider_location:
provider_location = ast.literal_eval(volume.provider_location)
self.ds_id = provider_location[six.text_type('vol_hex_id')]
self.ds_id = provider_location['vol_hex_id']
else:
self.ds_id = None
self.cinder_name = volume.display_name
self.lss_pair = {}
self.pool_lss_pair = {}
self.is_snapshot = is_snapshot
if self.is_snapshot:
self.group = (Group(volume.group_snapshot, True)
if volume.group_snapshot else None)
self.size = volume.volume_size
# ds8k supports at most 16 chars
self.ds_name = (
"OS%s:%s" % ('snap', helper.filter_alnum(self.cinder_name))
)[:16]
else:
self.group = Group(volume.group) if volume.group else None
self.size = volume.size
self.ds_name = (
"OS%s:%s" % ('vol', helper.filter_alnum(self.cinder_name))
@ -193,17 +208,17 @@ class Lun(object):
# now only support one replication target.
replication_target = sorted(
self.replication_driver_data.values())[0]
replica_id = replication_target[six.text_type('vol_hex_id')]
self.lss_pair = {
replica_id = replication_target['vol_hex_id']
self.pool_lss_pair = {
'source': (None, self.ds_id[0:2]),
'target': (None, replica_id[0:2])
}
if os400:
if os400 not in VALID_OS400_VOLUME_TYPES.keys():
msg = (_("The OS400 volume type provided, %s, is not "
"a valid volume type.") % os400)
raise restclient.APIException(data=msg)
raise restclient.APIException(
data=(_("The OS400 volume type provided, %s, is not "
"a valid volume type.") % os400))
self.type_os400 = os400
if os400 not in ['050', '099']:
self.size = VALID_OS400_VOLUME_TYPES[os400]
@ -284,13 +299,14 @@ class Lun(object):
class Group(object):
"""provide group information for driver from group db object."""
def __init__(self, group):
gid = group.get('group_type_id')
specs = group_types.get_group_type_specs(gid) if gid else {}
self.type_cg_snapshot = specs.get(
'consistent_group_snapshot_enabled', '<is> %s' %
EXTRA_SPECS_DEFAULTS['consistent_group_snapshot_enabled']
).upper() == strings.METADATA_IS_TRUE
def __init__(self, group, is_snapshot=False):
self.id = group.id
self.host = group.host
if is_snapshot:
self.snapshots = group.snapshots
else:
self.volumes = group.volumes
self.consisgroup_enabled = utils.is_group_a_cg_snapshot_type(group)
class DS8KProxy(proxy.IBMStorageProxy):
@ -307,6 +323,9 @@ class DS8KProxy(proxy.IBMStorageProxy):
self._active_backend_id = active_backend_id
self.configuration = driver.configuration
self.configuration.append_config_values(ds8k_opts)
# TODO(jiamin): this cache is used to handle concurrency issue, but it
# hurts HA, we will find whether is it possible to store it in storage.
self.consisgroup_cache = {}
@proxy._trace_time
def setup(self, ctxt):
@ -325,9 +344,9 @@ class DS8KProxy(proxy.IBMStorageProxy):
self._helper = helper.DS8KECKDHelper(self.configuration,
self._connector_obj)
else:
err = (_("Param [connection_type] %s is invalid.")
% connection_type)
raise exception.InvalidParameterValue(err=err)
raise exception.InvalidParameterValue(
err=(_("Param [connection_type] %s is invalid.")
% connection_type))
if replication_devices:
self._do_replication_setup(replication_devices, self._helper)
@ -335,9 +354,9 @@ class DS8KProxy(proxy.IBMStorageProxy):
@proxy.logger
def _do_replication_setup(self, devices, src_helper):
if len(devices) >= 2:
err = _("Param [replication_device] is invalid, Driver "
"support only one replication target.")
raise exception.InvalidParameterValue(err=err)
raise exception.InvalidParameterValue(
err=_("Param [replication_device] is invalid, Driver "
"support only one replication target."))
self._replication = replication.Replication(src_helper, devices[0])
self._replication.check_physical_links()
@ -368,9 +387,9 @@ class DS8KProxy(proxy.IBMStorageProxy):
LOG.error(msg)
raise exception.CinderException(message=msg)
else:
msg = (_('Backend %s is not initialized.')
% self.configuration.volume_backend_name)
raise exception.CinderException(data=msg)
raise exception.VolumeDriverException(
message=(_('Backend %s is not initialized.')
% self.configuration.volume_backend_name))
stats = {
"volume_backend_name": self.configuration.volume_backend_name,
@ -384,7 +403,6 @@ class DS8KProxy(proxy.IBMStorageProxy):
"free_capacity_gb": self._b2gb(
sum(p['capavail'] for p in storage_pools.values())),
"reserved_percentage": self.configuration.reserved_percentage,
"consistencygroup_support": True,
"consistent_group_snapshot_enabled": True,
"multiattach": False
}
@ -397,7 +415,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
def _assert(self, assert_condition, exception_message=''):
if not assert_condition:
LOG.error(exception_message)
raise restclient.APIException(data=exception_message)
raise exception.VolumeDriverException(message=exception_message)
@proxy.logger
def _create_lun_helper(self, lun, pool=None, find_new_pid=True):
@ -415,20 +433,107 @@ class DS8KProxy(proxy.IBMStorageProxy):
raise restclient.APIException(message=msg)
# There is a time gap between find available LSS slot and
# lun actually occupies it.
excluded_lss = []
excluded_lss = set()
while True:
try:
if lun.type_replication and not lun.is_snapshot:
lun.lss_pair = self._replication.find_available_lss_pair(
excluded_lss)
if lun.group and lun.group.consisgroup_enabled:
lun.pool_lss_pair = {
'source': self._find_pool_lss_pair_for_cg(
lun, excluded_lss)}
else:
lun.lss_pair['source'] = self._helper.find_available_lss(
pool, find_new_pid, excluded_lss)
if lun.type_replication and not lun.is_snapshot:
lun.pool_lss_pair = (
self._replication.find_pool_lss_pair(
excluded_lss))
else:
lun.pool_lss_pair = {
'source': self._helper.find_pool_lss_pair(
pool, find_new_pid, excluded_lss)}
return self._helper.create_lun(lun)
except restclient.LssFullException:
LOG.warning("LSS %s is full, find another one.",
lun.lss_pair['source'][1])
excluded_lss.append(lun.lss_pair['source'][1])
lun.pool_lss_pair['source'][1])
excluded_lss.add(lun.pool_lss_pair['source'][1])
@coordination.synchronized('{self.prefix}-consistency-group')
def _find_pool_lss_pair_for_cg(self, lun, excluded_lss):
lss_in_cache = self.consisgroup_cache.get(lun.group.id, set())
if not lss_in_cache:
lss_in_cg = self._get_lss_in_cg(lun.group, lun.is_snapshot)
LOG.debug("LSSs used by CG %(cg)s are %(lss)s.",
{'cg': lun.group.id, 'lss': ','.join(lss_in_cg)})
available_lss = lss_in_cg - excluded_lss
else:
available_lss = lss_in_cache - excluded_lss
if not available_lss:
available_lss = self._find_lss_for_cg()
pid, lss = self._find_pool_for_lss(available_lss)
if pid:
lss_in_cache.add(lss)
self.consisgroup_cache[lun.group.id] = lss_in_cache
else:
raise exception.VolumeDriverException(
message=_('There are still some available LSSs for CG, '
'but they are not in the same node as pool.'))
return (pid, lss)
def _get_lss_in_cg(self, group, is_snapshot=False):
# Driver can not support the case that dedicating LSS for CG while
# user enable multiple backends which use the same DS8K.
try:
volume_backend_name = (
group.host[group.host.index('@') + 1:group.host.index('#')])
except ValueError:
raise exception.VolumeDriverException(
message=(_('Invalid host %(host)s in group %(group)s')
% {'host': group.host, 'group': group.id}))
lss_in_cg = set()
if volume_backend_name == self.configuration.volume_backend_name:
if is_snapshot:
luns = [Lun(snapshot, is_snapshot=True)
for snapshot in group.snapshots]
else:
luns = [Lun(volume) for volume in group.volumes]
lss_in_cg = set(lun.ds_id[:2] for lun in luns if lun.ds_id)
return lss_in_cg
def _find_lss_for_cg(self):
# Unable to get CGs/groups belonging to the current tenant, so
# get all of them, this function will consume some time if there
# are so many CGs/groups.
lss_used = set()
ctxt = context.get_admin_context()
existing_groups = objects.GroupList.get_all(
ctxt, filters={'status': 'available'})
for group in existing_groups:
if Group(group).consisgroup_enabled:
lss_used = lss_used | self._get_lss_in_cg(group)
existing_groupsnapshots = objects.GroupSnapshotList.get_all(
ctxt, filters={'status': 'available'})
for group in existing_groupsnapshots:
if Group(group, True).consisgroup_enabled:
lss_used = lss_used | self._get_lss_in_cg(group, True)
available_lss = set(self._helper.backend['lss_ids_for_cg']) - lss_used
for lss_set in self.consisgroup_cache.values():
available_lss -= lss_set
self._assert(available_lss,
"All LSSs reserved for CG have been used out, "
"please reserve more LSS for CG if there are still"
"some empty LSSs left.")
LOG.debug('_find_lss_for_cg: available LSSs for consistency '
'group are %s', ','.join(available_lss))
return available_lss
@proxy.logger
def _find_pool_for_lss(self, available_lss):
for lss in available_lss:
pid = self._helper.get_pool(lss)
if pid:
return (pid, lss)
raise exception.VolumeDriverException(
message=(_("Can not find pool for LSSs %s.")
% ','.join(available_lss)))
@proxy.logger
def _clone_lun(self, src_lun, tgt_lun):
@ -485,29 +590,36 @@ class DS8KProxy(proxy.IBMStorageProxy):
def _ensure_vol_not_fc_target(self, vol_hex_id):
for cp in self._helper.get_flashcopy(vol_hex_id):
if cp['targetvolume']['id'] == vol_hex_id:
msg = (_('Volume %s is currently a target of another '
'FlashCopy operation') % vol_hex_id)
raise restclient.APIException(data=msg)
raise restclient.APIException(
data=(_('Volume %s is currently a target of another '
'FlashCopy operation') % vol_hex_id))
def _create_replica_helper(self, lun):
if not lun.pool_lss_pair.get('target'):
lun = self._replication.enable_replication(lun, True)
else:
lun = self._replication.create_replica(lun)
return lun
@proxy._trace_time
def create_volume(self, volume):
lun = self._create_lun_helper(Lun(volume))
if lun.type_replication:
lun = self._replication.create_replica(lun)
lun = self._create_replica_helper(lun)
return lun.get_volume_update()
@proxy._trace_time
def create_cloned_volume(self, target_vol, source_vol):
lun = self._clone_lun(Lun(source_vol), Lun(target_vol))
if lun.type_replication:
lun = self._replication.create_replica(lun)
lun = self._create_replica_helper(lun)
return lun.get_volume_update()
@proxy._trace_time
def create_volume_from_snapshot(self, volume, snapshot):
lun = self._clone_lun(Lun(snapshot, is_snapshot=True), Lun(volume))
if lun.type_replication:
lun = self._replication.create_replica(lun)
lun = self._create_replica_helper(lun)
return lun.get_volume_update()
@proxy._trace_time
@ -524,9 +636,9 @@ class DS8KProxy(proxy.IBMStorageProxy):
self._replication.extend_replica(lun, param)
self._replication.create_pprc_pairs(lun)
else:
msg = (_("The volume %s has been failed over, it is "
"not suggested to extend it.") % lun.ds_id)
raise exception.CinderException(data=msg)
raise exception.CinderException(
message=(_("The volume %s has been failed over, it is "
"not suggested to extend it.") % lun.ds_id))
else:
self._helper.change_lun(lun.ds_id, param)
@ -658,10 +770,11 @@ class DS8KProxy(proxy.IBMStorageProxy):
lun = self._replication.delete_replica(lun)
lun = _convert_thin_and_thick(lun, new_type_thin)
else:
msg = (_("The volume %s is in replication relationship, "
"it is not supported to retype from thin to "
"thick or vice versus.") % lun.ds_id)
raise exception.CinderException(msg)
raise exception.CinderException(
message=(_("The volume %s is in replication "
"relationship, it is not supported to "
"retype from thin to thick or vice "
"versa.") % lun.ds_id))
else:
lun = _convert_thin_and_thick(lun, new_type_thin)
if new_type_replication:
@ -697,23 +810,53 @@ class DS8KProxy(proxy.IBMStorageProxy):
force, **kwargs)
@proxy.logger
def create_consistencygroup(self, ctxt, group):
"""Create a consistency group."""
return self._helper.create_group(ctxt, group)
def create_group(self, ctxt, group):
"""Create generic volume group."""
if Group(group).consisgroup_enabled:
self._assert(self._helper.backend['lss_ids_for_cg'],
'No LSS(s) for CG, please make sure you have '
'reserved LSS for CG via param lss_range_for_cg.')
return self._helper.create_group(group)
@proxy.logger
def delete_consistencygroup(self, ctxt, group, volumes):
"""Delete a consistency group."""
def delete_group(self, ctxt, group, volumes):
"""Delete group and the volumes in the group."""
luns = [Lun(volume) for volume in volumes]
return self._helper.delete_group(ctxt, group, luns)
if Group(group).consisgroup_enabled:
return self._delete_group_with_lock(group, luns)
else:
return self._helper.delete_group(group, luns)
@proxy._trace_time
def create_cgsnapshot(self, ctxt, cgsnapshot, snapshots):
"""Create a consistency group snapshot."""
return self._create_group_snapshot(ctxt, cgsnapshot, snapshots, True)
@coordination.synchronized('{self.prefix}-consistency-group')
def _delete_group_with_lock(self, group, luns):
model_update, volumes_model_update = (
self._helper.delete_group(group, luns))
if model_update['status'] == fields.GroupStatus.DELETED:
self._update_consisgroup_cache(group.id)
return model_update, volumes_model_update
def _create_group_snapshot(self, ctxt, cgsnapshot, snapshots,
cg_enabled=False):
@proxy.logger
def delete_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Delete volume group snapshot."""
tgt_luns = [Lun(s, is_snapshot=True) for s in snapshots]
if Group(group_snapshot, True).consisgroup_enabled:
return self._delete_group_snapshot_with_lock(
group_snapshot, tgt_luns)
else:
return self._helper.delete_group_snapshot(
group_snapshot, tgt_luns)
@coordination.synchronized('{self.prefix}-consistency-group')
def _delete_group_snapshot_with_lock(self, group_snapshot, tgt_luns):
model_update, snapshots_model_update = (
self._helper.delete_group_snapshot(group_snapshot, tgt_luns))
if model_update['status'] == fields.GroupStatus.DELETED:
self._update_consisgroup_cache(group_snapshot.id)
return model_update, snapshots_model_update
@proxy.logger
def create_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Create volume group snapshot."""
snapshots_model_update = []
model_update = {'status': fields.GroupStatus.AVAILABLE}
@ -722,7 +865,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
try:
if src_luns and tgt_luns:
self._clone_group(src_luns, tgt_luns, cg_enabled)
self._clone_group(src_luns, tgt_luns)
except restclient.APIException:
model_update['status'] = fields.GroupStatus.ERROR
LOG.exception('Failed to create group snapshot.')
@ -737,70 +880,100 @@ class DS8KProxy(proxy.IBMStorageProxy):
return model_update, snapshots_model_update
@proxy._trace_time
@proxy.logger
def delete_cgsnapshot(self, ctxt, cgsnapshot, snapshots):
"""Delete a consistency group snapshot."""
return self._delete_group_snapshot(ctxt, cgsnapshot, snapshots)
def update_group(self, ctxt, group, add_volumes, remove_volumes):
"""Update generic volume group."""
if Group(group).consisgroup_enabled:
return self._update_group(group, add_volumes, remove_volumes)
else:
return None, None, None
def _delete_group_snapshot(self, ctxt, group_snapshot, snapshots):
snapshots_model_update = []
model_update = {'status': fields.GroupStatus.DELETED}
def _update_group(self, group, add_volumes, remove_volumes):
add_volumes_update = []
group_volume_ids = [vol.id for vol in group.volumes]
add_volumes = [vol for vol in add_volumes
if vol.id not in group_volume_ids]
remove_volumes = [vol for vol in remove_volumes
if vol.id in group_volume_ids]
if add_volumes:
add_luns = [Lun(vol) for vol in add_volumes]
lss_in_cg = [Lun(vol).ds_id[:2] for vol in group.volumes]
if not lss_in_cg:
lss_in_cg = self._find_lss_for_empty_group(group, add_luns)
add_volumes_update = self._add_volumes_into_group(
group, add_luns, lss_in_cg)
if remove_volumes:
self._remove_volumes_in_group(group, add_volumes, remove_volumes)
return None, add_volumes_update, None
snapshots = [Lun(s, is_snapshot=True) for s in snapshots]
if snapshots:
try:
self._helper.delete_lun(snapshots)
except restclient.APIException as e:
model_update['status'] = fields.GroupStatus.ERROR_DELETING
LOG.error("Failed to delete group snapshot. "
"Error: %(err)s",
{'err': e})
@coordination.synchronized('{self.prefix}-consistency-group')
def _find_lss_for_empty_group(self, group, luns):
sorted_lss_ids = collections.Counter([lun.ds_id[:2] for lun in luns])
available_lss = self._find_lss_for_cg()
lss_for_cg = None
for lss_id in sorted_lss_ids:
if lss_id in available_lss:
lss_for_cg = lss_id
break
if not lss_for_cg:
lss_for_cg = available_lss.pop()
self._update_consisgroup_cache(group.id, lss_for_cg)
return lss_for_cg
for snapshot in snapshots:
snapshots_model_update.append({
'id': snapshot.os_id,
'status': model_update['status']
})
return model_update, snapshots_model_update
def _add_volumes_into_group(self, group, add_luns, lss_in_cg):
add_volumes_update = []
luns = [lun for lun in add_luns if lun.ds_id[:2] not in lss_in_cg]
for lun in luns:
if lun.type_replication:
new_lun = self._clone_lun_for_group(group, lun)
new_lun.type_replication = True
new_lun = self._replication.enable_replication(new_lun, True)
lun = self._replication.delete_replica(lun)
else:
new_lun = self._clone_lun_for_group(group, lun)
self._helper.delete_lun(lun)
volume_update = new_lun.update_volume(lun)
volume_update['id'] = lun.os_id
add_volumes_update.append(volume_update)
return add_volumes_update
def _clone_lun_for_group(self, group, lun):
lun.group = Group(group)
new_lun = lun.shallow_copy()
new_lun.type_replication = False
self._create_lun_helper(new_lun)
self._clone_lun(lun, new_lun)
return new_lun
@coordination.synchronized('{self.prefix}-consistency-group')
def _remove_volumes_in_group(self, group, add_volumes, remove_volumes):
if len(remove_volumes) == len(group.volumes) + len(add_volumes):
self._update_consisgroup_cache(group.id)
@proxy.logger
def update_consistencygroup(self, ctxt, group,
add_volumes, remove_volumes):
"""Add or remove volume(s) to/from an existing consistency group."""
return self._helper.update_group(ctxt, group,
add_volumes, remove_volumes)
def _update_consisgroup_cache(self, group_id, lss_id=None):
if lss_id:
self.consisgroup_cache[group_id] = set([lss_id])
else:
if self.consisgroup_cache.get(group_id):
LOG.debug('Group %(id)s owns LSS %(lss)s in the cache.', {
'id': group_id,
'lss': ','.join(self.consisgroup_cache[group_id])
})
self.consisgroup_cache.pop(group_id)
@proxy._trace_time
def create_consistencygroup_from_src(self, ctxt, group, volumes,
cgsnapshot, snapshots,
source_cg, sorted_source_vols):
"""Create a consistencygroup from source.
:param ctxt: the context of the caller.
:param group: the dictionary of the consistency group to be created.
:param volumes: a list of volume dictionaries in the group.
:param cgsnapshot: the dictionary of the cgsnapshot as source.
:param snapshots: a list of snapshot dictionaries in the cgsnapshot.
:param source_cg: the dictionary of the consisgroup as source.
:param sorted_source_vols: a list of volume dictionaries
in the consisgroup.
:return model_update, volumes_model_update
"""
return self._create_group_from_src(ctxt, group, volumes, cgsnapshot,
snapshots, source_cg,
sorted_source_vols, True)
def _create_group_from_src(self, ctxt, group, volumes, cgsnapshot,
snapshots, source_cg, sorted_source_vols,
cg_enabled=False):
def create_group_from_src(self, ctxt, group, volumes, group_snapshot,
sorted_snapshots, source_group,
sorted_source_vols):
"""Create volume group from volume group or volume group snapshot."""
model_update = {'status': fields.GroupStatus.AVAILABLE}
volumes_model_update = []
if cgsnapshot and snapshots:
if group_snapshot and sorted_snapshots:
src_luns = [Lun(snapshot, is_snapshot=True)
for snapshot in snapshots]
elif source_cg and sorted_source_vols:
for snapshot in sorted_snapshots]
elif source_group and sorted_source_vols:
src_luns = [Lun(source_vol)
for source_vol in sorted_source_vols]
else:
@ -811,9 +984,22 @@ class DS8KProxy(proxy.IBMStorageProxy):
raise exception.InvalidInput(message=msg)
try:
# Don't use paramter volumes because it has DetachedInstanceError
# issue frequently. here tries to get and sort new volumes, a lot
# of cases have been guaranteed by the _sort_source_vols in
# manange.py, so not verify again.
sorted_volumes = []
for vol in volumes:
found_vols = [v for v in group.volumes if v['id'] == vol['id']]
sorted_volumes.extend(found_vols)
volumes = sorted_volumes
tgt_luns = [Lun(volume) for volume in volumes]
if src_luns and tgt_luns:
self._clone_group(src_luns, tgt_luns, cg_enabled)
self._clone_group(src_luns, tgt_luns)
for tgt_lun in tgt_luns:
if tgt_lun.type_replication:
self._create_replica_helper(tgt_lun)
except restclient.APIException:
model_update['status'] = fields.GroupStatus.ERROR
LOG.exception("Failed to create group from group snapshot.")
@ -828,7 +1014,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
return model_update, volumes_model_update
def _clone_group(self, src_luns, tgt_luns, cg_enabled):
def _clone_group(self, src_luns, tgt_luns):
for src_lun in src_luns:
self._ensure_vol_not_fc_target(src_lun.ds_id)
finished = False
@ -842,7 +1028,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
"source_volume": src_lun.ds_id,
"target_volume": tgt_lun.ds_id
})
if cg_enabled:
if tgt_lun.group.consisgroup_enabled:
self._do_flashcopy_with_freeze(vol_pairs)
else:
self._helper.start_flashcopy(vol_pairs)
@ -851,7 +1037,6 @@ class DS8KProxy(proxy.IBMStorageProxy):
if not finished:
self._helper.delete_lun(tgt_luns)
@synchronized('OpenStackCinderIBMDS8KMutex-CG-', external=True)
@proxy._trace_time
def _do_flashcopy_with_freeze(self, vol_pairs):
# issue flashcopy with freeze
@ -861,48 +1046,6 @@ class DS8KProxy(proxy.IBMStorageProxy):
LOG.debug('Unfreezing the LSS: %s', ','.join(lss_ids))
self._helper.unfreeze_lss(lss_ids)
@proxy.logger
def create_group(self, ctxt, group):
"""Create generic volume group."""
return self._helper.create_group(ctxt, group)
@proxy.logger
def delete_group(self, ctxt, group, volumes):
"""Delete group and the volumes in the group."""
luns = [Lun(volume) for volume in volumes]
return self._helper.delete_group(ctxt, group, luns)
@proxy.logger
def update_group(self, ctxt, group, add_volumes, remove_volumes):
"""Update generic volume group."""
return self._helper.update_group(ctxt, group,
add_volumes, remove_volumes)
@proxy.logger
def create_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Create volume group snapshot."""
snapshot_group = Group(group_snapshot)
cg_enabled = True if snapshot_group.type_cg_snapshot else False
return self._create_group_snapshot(ctxt, group_snapshot,
snapshots, cg_enabled)
@proxy.logger
def delete_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Delete volume group snapshot."""
return self._delete_group_snapshot(ctxt, group_snapshot, snapshots)
@proxy._trace_time
def create_group_from_src(self, ctxt, group, volumes, group_snapshot,
sorted_snapshots, source_group,
sorted_source_vols):
"""Create volume group from volume group or volume group snapshot."""
volume_group = Group(group)
cg_enabled = True if volume_group.type_cg_snapshot else False
return self._create_group_from_src(ctxt, group, volumes,
group_snapshot, sorted_snapshots,
source_group, sorted_source_vols,
cg_enabled)
def freeze_backend(self, ctxt):
"""Notify the backend that it's frozen."""
pass
@ -935,9 +1078,9 @@ class DS8KProxy(proxy.IBMStorageProxy):
if secondary_id is None:
secondary_id = backend_id
elif secondary_id != backend_id:
msg = (_('Invalid secondary_backend_id specified. '
'Valid backend id is %s.') % backend_id)
raise exception.InvalidReplicationTarget(message=msg)
raise exception.InvalidReplicationTarget(
message=(_('Invalid secondary_backend_id specified. '
'Valid backend id is %s.') % backend_id))
LOG.debug("Starting failover to %s.", secondary_id)
@ -965,10 +1108,10 @@ class DS8KProxy(proxy.IBMStorageProxy):
self._active_backend_id = ""
self._helper = self._replication._source_helper
except restclient.APIException as e:
msg = (_("Unable to failover host to %(id)s. "
"Exception= %(ex)s")
% {'id': secondary_id, 'ex': six.text_type(e)})
raise exception.UnableToFailOver(reason=msg)
raise exception.UnableToFailOver(
reason=(_("Unable to failover host to %(id)s. "
"Exception= %(ex)s")
% {'id': secondary_id, 'ex': six.text_type(e)}))
for lun in replicated_luns:
volume_update = lun.get_volume_update()

View File

@ -50,10 +50,11 @@ class MetroMirrorManager(object):
ports = self._source.get_physical_links(
self._target.backend['storage_wwnn'])
if not ports:
msg = (_("DS8K %(tgt)s is not connected to the DS8K %(src)s!") %
{'tgt': self._target.backend['storage_wwnn'],
'src': self._source.backend['storage_wwnn']})
raise exception.CinderException(msg)
raise exception.VolumeDriverException(
message=((_("%(tgt)s is not connected to %(src)s!") % {
'tgt': self._target.backend['storage_wwnn'],
'src': self._source.backend['storage_wwnn']
})))
pairs = [{
'source_port_id': p['source_port_id'],
@ -72,15 +73,13 @@ class MetroMirrorManager(object):
["%s-%s" % (p['source_port_id'],
p['target_port_id'])
for p in pairs])
invalid_pair = "%s-%s" % (pair['source_port_id'],
pair['target_port_id'])
msg = (_("Invalid port pair: %(invalid)s, valid port "
"pair(s) are: %(valid)s") %
{'invalid': invalid_pair,
'valid': valid_pairs})
raise exception.CinderException(msg)
raise exception.VolumeDriverException(
message=((_("Invalid port pair: %(invalid)s, valid "
"port pair(s) are: %(valid)s")
% {'invalid': invalid_pair,
'valid': valid_pairs})))
self._source.backend['port_pairs'] = [{
'source_port_id': p['target_port_id'],
'target_port_id': p['source_port_id']
@ -96,13 +95,13 @@ class MetroMirrorManager(object):
return True
def find_available_pprc_path(self, lss=None, excluded_lss=None):
"""find lss from existed pprc path.
def find_from_pprc_paths(self, specified_lss=None, excluded_lss=None):
"""find lss from existing pprc paths and pool id for it.
the format of lss_pair returned is as below:
the format of pool_lss_pair returned is as below:
{'source': (pid, lss), 'target': (pid, lss)}
"""
state, paths = self._filter_pprc_paths(lss)
state, paths = self._filter_pprc_paths(specified_lss)
if state != PPRC_PATH_HEALTHY:
# check whether the physical links are available or not,
# or have been changed.
@ -111,43 +110,47 @@ class MetroMirrorManager(object):
if excluded_lss:
paths = [p for p in paths
if p['source_lss_id'] not in excluded_lss]
# only enable_replication will specify the source LSS
# and it need to reuse LSS reserved for CG if this LSS
# is in PPRC path.
if not specified_lss:
paths = [p for p in paths if p['source_lss_id'] not in
self._source.backend['lss_ids_for_cg']]
lss_pair = {}
if len(paths) == 1:
path = paths[0]
pid = self._source.get_pool(path['source_lss_id'])
lss_pair['source'] = (pid, path['source_lss_id'])
else:
# sort the lss pairs according to the number of luns,
# get the lss pair which has least luns.
candidates = []
source_lss_set = set(p['source_lss_id'] for p in paths)
for lss in source_lss_set:
# get the number of lun in source.
src_luns = self._source.get_lun_number_in_lss(lss)
if src_luns == helper.LSS_VOL_SLOTS:
continue
# sort pairs according to the number of luns in their LSSes,
# and get the pair which LSS has least luns.
candidates = []
source_lss_set = set(p['source_lss_id'] for p in paths)
for lss in source_lss_set:
# get the number of luns in source.
src_luns = self._source.get_lun_number_in_lss(lss)
if src_luns == helper.LSS_VOL_SLOTS and not specified_lss:
continue
spec_paths = [p for p in paths if p['source_lss_id'] == lss]
for path in spec_paths:
# get the number of lun in target.
spec_paths = [p for p in paths if p['source_lss_id'] == lss]
for path in spec_paths:
# get the number of luns in target.
try:
tgt_luns = self._target.get_lun_number_in_lss(
path['target_lss_id'])
candidates.append((lss, path, src_luns + tgt_luns))
if candidates:
candidate = sorted(candidates, key=lambda c: c[2])[0]
pid = self._source.get_pool(candidate[0])
lss_pair['source'] = (pid, candidate[0])
path = candidate[1]
else:
return PPRC_PATH_FULL, None
# format the target in lss_pair.
pid = self._target.get_pool(path['target_lss_id'])
lss_pair['target'] = (pid, path['target_lss_id'])
return PPRC_PATH_HEALTHY, lss_pair
except restclient.APIException:
# if DS8K can fix this problem, then remove the
# exception here.
LOG.error("Target LSS %s in PPRC path may doesn't "
"exist although PPRC path is available.",
path['target_lss_id'])
tgt_luns = 0
candidates.append((path['source_lss_id'],
path['target_lss_id'],
src_luns + tgt_luns))
if not candidates:
return PPRC_PATH_FULL, None
else:
src_lss, tgt_lss, num = sorted(candidates, key=lambda c: c[2])[0]
return PPRC_PATH_HEALTHY, {
'source': (self._source.get_pool(src_lss), src_lss),
'target': (self._target.get_pool(tgt_lss), tgt_lss)
}
def _filter_pprc_paths(self, lss):
paths = self._source.get_pprc_paths(lss)
@ -225,9 +228,9 @@ class MetroMirrorManager(object):
return PPRC_PATH_HEALTHY, paths
def create_pprc_path(self, lss_pair):
src_lss = lss_pair['source'][1]
tgt_lss = lss_pair['target'][1]
def create_pprc_path(self, pool_lss_pair):
src_lss = pool_lss_pair['source'][1]
tgt_lss = pool_lss_pair['target'][1]
# check whether the pprc path exists and is healthy or not firstly.
pid = (self._source.backend['storage_wwnn'] + '_' + src_lss + ':' +
self._target.backend['storage_wwnn'] + '_' + tgt_lss)
@ -256,9 +259,9 @@ class MetroMirrorManager(object):
break
if retry == 3:
self._source.delete_pprc_path(pid)
msg = (_("Fail to create PPRC path %(src)s:%(tgt)s.") %
{'src': src_lss, 'tgt': tgt_lss})
raise restclient.APIException(data=msg)
raise restclient.APIException(
data=(_("Failed to create PPRC path %(src)s:%(tgt)s.")
% {'src': src_lss, 'tgt': tgt_lss}))
LOG.debug("Create the new PPRC path successfully.")
def _is_pprc_paths_healthy(self, path_id):
@ -280,8 +283,7 @@ class MetroMirrorManager(object):
vol_pairs = [{
'source_volume': lun.ds_id,
'source_system_id':
self._source.backend['storage_unit'],
'source_system_id': self._source.backend['storage_unit'],
'target_volume': tgt_vol_id,
'target_system_id': tgt_stg_id
}]
@ -298,10 +300,9 @@ class MetroMirrorManager(object):
def delete_pprc_pairs(self, lun):
self._source.delete_pprc_pair(lun.ds_id)
if self.is_target_alive():
if self.is_target_alive() and lun.replication_driver_data:
replica = sorted(lun.replication_driver_data.values())[0]
self._target.delete_pprc_pair(
six.text_type(replica['vol_hex_id']))
self._target.delete_pprc_pair(replica['vol_hex_id'])
def do_pprc_failover(self, luns, backend_id):
vol_pairs = []
@ -317,12 +318,10 @@ class MetroMirrorManager(object):
continue
vol_pairs.append({
'source_volume': six.text_type(target_vol_id),
'source_system_id': six.text_type(
self._target.backend['storage_unit']),
'target_volume': six.text_type(lun.ds_id),
'target_system_id': six.text_type(
self._source.backend['storage_unit'])
'source_volume': target_vol_id,
'source_system_id': self._target.backend['storage_unit'],
'target_volume': lun.ds_id,
'target_system_id': self._source.backend['storage_unit']
})
target_vol_ids.append(target_vol_id)
@ -383,9 +382,16 @@ class Replication(object):
if connection_type == storage.XIV_CONNECTION_TYPE_FC:
self._target_helper = (
helper.DS8KReplicationTargetHelper(target_device))
else:
elif connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD:
self._target_helper = (
helper.DS8KReplicationTargetECKDHelper(target_device))
else:
raise exception.InvalidParameterValue(
err=(_("Param [connection_type] %s in replication_device "
"is invalid.") % connection_type))
self._target_helper.backend['lss_ids_for_cg'] = (
self._source_helper.backend['lss_ids_for_cg'])
self._mm_manager = MetroMirrorManager(self._source_helper,
self._target_helper)
@ -393,11 +399,12 @@ class Replication(object):
src_conn_type = self._source_helper.get_connection_type()
tgt_conn_type = self._target_helper.get_connection_type()
if src_conn_type != tgt_conn_type:
msg = (_("The connection type in primary backend is "
"%(primary)s, but in secondary backend it is "
"%(secondary)s") %
{'primary': src_conn_type, 'secondary': tgt_conn_type})
raise exception.CinderException(msg)
raise exception.VolumeDriverException(
message=(_("The connection type in primary backend is "
"%(primary)s, but in secondary backend it is "
"%(secondary)s")
% {'primary': src_conn_type,
'secondary': tgt_conn_type}))
# PPRC can not copy from ESE volume to standard volume or vice versus.
if src_conn_type == storage.XIV_CONNECTION_TYPE_FC_ECKD:
src_thin = self._source_helper.get_thin_provision()
@ -425,13 +432,13 @@ class Replication(object):
return luns
@proxy.logger
def find_available_lss_pair(self, excluded_lss):
state, lss_pair = (
self._mm_manager.find_available_pprc_path(None, excluded_lss))
if lss_pair is None:
lss_pair = self.find_new_lss_for_source(excluded_lss)
lss_pair.update(self.find_new_lss_for_target())
return lss_pair
def find_pool_lss_pair(self, excluded_lss):
state, pool_lss_pair = (
self._mm_manager.find_from_pprc_paths(None, excluded_lss))
if pool_lss_pair is None:
pool_lss_pair = self.find_new_lss_for_source(excluded_lss)
pool_lss_pair.update(self.find_new_lss_for_target())
return pool_lss_pair
@proxy.logger
def find_new_lss_for_source(self, excluded_lss):
@ -444,23 +451,22 @@ class Replication(object):
return {'target': (tgt_pid, tgt_lss)}
@proxy.logger
def enable_replication(self, lun):
state, lun.lss_pair = (
self._mm_manager.find_available_pprc_path(lun.ds_id[0:2]))
def enable_replication(self, lun, delete_source=False):
state, lun.pool_lss_pair = (
self._mm_manager.find_from_pprc_paths(lun.ds_id[0:2]))
LOG.debug("enable_replication: pool_lss_pair is %s.",
lun.pool_lss_pair)
if state == PPRC_PATH_UNHEALTHY:
msg = (_("The path(s) for volume %(name)s isn't available "
"any more, please make sure the state of the path(s) "
"which source LSS is %(lss)s is success.") %
{'name': lun.cinder_name, 'lss': lun.ds_id[0:2]})
raise restclient.APIException(data=msg)
raise restclient.APIException(
data=(_("The path(s) for volume %(name)s isn't available "
"any more, please make sure the state of the path(s) "
"which source LSS is %(lss)s is success.")
% {'name': lun.cinder_name, 'lss': lun.ds_id[0:2]}))
elif state == PPRC_PATH_NOT_EXIST:
pid = self._source_helper.get_pool(lun.ds_id[0:2])
lss_pair = {'source': (pid, lun.ds_id[0:2])}
lss_pair.update(self.find_new_lss_for_target())
lun.lss_pair = lss_pair
LOG.debug("Begin to create replication volume, lss_pair is %s." %
lun.lss_pair)
lun = self.create_replica(lun, False)
lun.pool_lss_pair = {'source': (pid, lun.ds_id[0:2])}
lun.pool_lss_pair.update(self.find_new_lss_for_target())
lun = self.create_replica(lun, delete_source)
return lun
@proxy.logger
@ -469,7 +475,7 @@ class Replication(object):
try:
self._target_helper.create_lun(lun)
# create PPRC paths if need.
self._mm_manager.create_pprc_path(lun.lss_pair)
self._mm_manager.create_pprc_path(lun.pool_lss_pair)
# create pprc pair
self._mm_manager.create_pprc_pairs(lun)
except restclient.APIException:
@ -477,7 +483,6 @@ class Replication(object):
self.delete_replica(lun)
if delete_source:
self._source_helper.delete_lun(lun)
lun.replication_status = 'enabled'
return lun
@ -488,11 +493,10 @@ class Replication(object):
self._mm_manager.delete_pprc_pairs(lun)
self._delete_replica(lun)
except restclient.APIException as e:
msg = (_('Failed to delete the target volume for volume '
'%(volume)s, Exception: %(ex)s.') %
{'volume': lun.ds_id, 'ex': six.text_type(e)})
raise exception.CinderException(msg)
raise exception.VolumeDriverException(
message=(_('Failed to delete the target volume for '
'volume %(volume)s, Exception: %(ex)s.')
% {'volume': lun.ds_id, 'ex': six.text_type(e)}))
lun.replication_status = 'disabled'
lun.replication_driver_data = {}
return lun
@ -542,7 +546,7 @@ class Replication(object):
LOG.debug("Failback starts, backend id is %s.", backend_id)
for lun in luns:
self._mm_manager.create_pprc_path(lun.lss_pair)
self._mm_manager.create_pprc_path(lun.pool_lss_pair)
self._mm_manager.do_pprc_failback(luns, backend_id)
# revert the relationship of source volume and target volume
self.do_pprc_failover(luns, backend_id)

View File

@ -271,18 +271,19 @@ class RESTScheduler(object):
attempts == 0):
self.connect()
elif response['server'].get('code') in AUTHENTICATION_ERROR_CODES:
msg = (_('Authentication failed for host %(host)s. '
'Exception= %(e)s') %
{'host': self.host, 'e': response['server']['message']})
raise APIAuthenticationException(data=msg)
raise APIAuthenticationException(
data=(_('Authentication failed for host %(host)s. '
'Exception= %(e)s') %
{'host': self.host,
'e': response['server']['message']}))
elif response['server'].get('code') in LSS_ERROR_CODES:
msg = (_('Can not put the volume in LSS: %s') %
response['server']['message'])
raise LssFullException(data=msg)
raise LssFullException(
data=(_('Can not put the volume in LSS: %s')
% response['server']['message']))
elif response['server']['status'] == 'timeout':
msg = (_('Request to storage API time out: %s') %
response['server']['message'])
raise TimeoutException(data=msg)
raise TimeoutException(
data=(_('Request to storage API time out: %s')
% response['server']['message']))
elif (response['server']['status'] != 'ok' and
(badStatusException or 'code' not in response['server'])):
# if code is not in response means that error was in
@ -290,10 +291,11 @@ class RESTScheduler(object):
# via badStatusException=False, but will retry it to
# confirm the problem.
if attempts == 1:
msg = (_("Request to storage API failed: %(err)s, "
"(%(url)s).") %
{'err': response['server']['message'], 'url': url})
raise APIException(data=msg)
raise APIException(
data=(_("Request to storage API failed: %(err)s, "
"(%(url)s).")
% {'err': response['server']['message'],
'url': url}))
eventlet.sleep(2)
else:
return response
@ -303,8 +305,8 @@ class RESTScheduler(object):
def fetchall(self, *args, **kwargs):
r = self.send(*args, **kwargs)['data']
if len(r) != 1:
msg = _('Expected one result but got %d.') % len(r)
raise APIException(data=msg)
raise APIException(
data=(_('Expected one result but got %d.') % len(r)))
else:
return r.popitem()[1]
@ -313,8 +315,8 @@ class RESTScheduler(object):
def fetchone(self, *args, **kwargs):
r = self.fetchall(*args, **kwargs)
if len(r) != 1:
msg = _('Expected one item in result but got %d.') % len(r)
raise APIException(data=msg)
raise APIException(
data=(_('Expected one item in result but got %d.') % len(r)))
return r[0]
# same as the send method above but returns the last element of the
@ -323,9 +325,9 @@ class RESTScheduler(object):
r = self.send(*args, **kwargs)
if 'responses' in r:
if len(r['responses']) != 1:
msg = (_('Expected one item in result responses but '
'got %d.') % len(r['responses']))
raise APIException(data=msg)
raise APIException(
data=(_('Expected one item in result responses but '
'got %d.') % len(r['responses'])))
r = r['responses'][0]
return r['link']['href'].split('/')[-1]