VMAX driver - Add async replication support

Currently, only Synchronous mode is supported for volume replication
on VMAX. This patch adds Asynchronous remote replication support.

Change-Id: If5df30b6ac8544f4c98b4dec89ad3b032b80d379
Partially-Implements: blueprint vmax-replication-enhancements
This commit is contained in:
Ciara Stacke 2017-10-03 17:24:53 +01:00 committed by Helen Walsh
parent ec7f04ee97
commit 84e39916c7
9 changed files with 475 additions and 81 deletions

View File

@ -88,6 +88,7 @@ class VMAXCommonData(object):
group_snapshot_name = 'Grp_snapshot'
target_group_name = 'Grp_target'
storagegroup_name_with_id = 'GrpId_group_name'
rdf_managed_async_grp = "OS-%s-async-rdf-sg" % rdf_group_name
# connector info
wwpn1 = "123456789012345"
@ -244,6 +245,7 @@ class VMAXCommonData(object):
rep_extra_specs['interval'] = 0
rep_extra_specs['retries'] = 0
rep_extra_specs['srp'] = srp2
rep_extra_specs['rep_mode'] = 'Synchronous'
test_volume_type_1 = volume_type.VolumeType(
id='2b06255d-f5f0-4520-a953-b029196add6a', name='abc',
@ -1259,11 +1261,8 @@ class VMAXUtilsTest(test.TestCase):
rep_config1 = self.utils.get_replication_config(rep_device_list1)
self.assertEqual(self.data.remote_array, rep_config1['array'])
# Success, allow_extend true
rep_device_list2 = [{'target_device_id': self.data.remote_array,
'remote_pool': self.data.srp,
'rdf_group_label': self.data.rdf_group_name,
'remote_port_group': self.data.port_group_name_f,
'allow_extend': 'true'}]
rep_device_list2 = rep_device_list1
rep_device_list2[0]['allow_extend'] = 'true'
rep_config2 = self.utils.get_replication_config(rep_device_list2)
self.assertTrue(rep_config2['allow_extend'])
# No rep_device_list
@ -1275,6 +1274,11 @@ class VMAXUtilsTest(test.TestCase):
'remote_pool': self.data.srp}]
self.assertRaises(exception.VolumeBackendAPIException,
self.utils.get_replication_config, rep_device_list4)
# Success, mode is async
rep_device_list5 = rep_device_list2
rep_device_list5[0]['mode'] = 'async'
rep_config5 = self.utils.get_replication_config(rep_device_list5)
self.assertEqual(utils.REP_ASYNC, rep_config5['mode'])
def test_is_volume_failed_over(self):
vol = deepcopy(self.data.test_volume)
@ -1392,6 +1396,17 @@ class VMAXUtilsTest(test.TestCase):
self.utils.check_rep_status_enabled,
self.data.test_group)
def test_get_replication_prefix(self):
async_prefix = self.utils.get_replication_prefix(utils.REP_ASYNC)
self.assertEqual('-RA', async_prefix)
sync_prefix = self.utils.get_replication_prefix(utils.REP_SYNC)
self.assertEqual('-RE', sync_prefix)
def test_get_async_rdf_managed_grp_name(self):
rep_config = {'rdf_group_label': self.data.rdf_group_name}
grp_name = self.utils.get_async_rdf_managed_grp_name(rep_config)
self.assertEqual(self.data.rdf_managed_async_grp, grp_name)
class VMAXRestTest(test.TestCase):
def setUp(self):
@ -2556,10 +2571,20 @@ class VMAXRestTest(test.TestCase):
'device_id': self.data.device_id2}
rdf_dict = self.rest.create_rdf_device_pair(
self.data.array, self.data.device_id, self.data.rdf_group_no,
self.data.device_id2, self.data.remote_array, "OS-2",
self.data.device_id2, self.data.remote_array,
self.data.extra_specs)
self.assertEqual(ref_dict, rdf_dict)
def test_create_rdf_device_pair_async(self):
ref_dict = {'array': self.data.remote_array,
'device_id': self.data.device_id2}
extra_specs = deepcopy(self.data.extra_specs)
extra_specs[utils.REP_MODE] = utils.REP_ASYNC
rdf_dict = self.rest.create_rdf_device_pair(
self.data.array, self.data.device_id, self.data.rdf_group_no,
self.data.device_id2, self.data.remote_array, extra_specs)
self.assertEqual(ref_dict, rdf_dict)
def test_modify_rdf_device_pair(self):
resource_name = "70/volume/00001"
common_opts = {"force": 'false',
@ -2670,6 +2695,22 @@ class VMAXRestTest(test.TestCase):
is_next_gen2 = self.rest.is_next_gen_array(self.data.array_herc)
self.assertTrue(is_next_gen2)
@mock.patch.object(rest.VMAXRest, 'are_vols_rdf_paired',
side_effect=[('', '', 'syncinprog'),
('', '', 'consistent'),
exception.CinderException])
def test_wait_for_rdf_consistent_state(self, mock_paired):
self.rest.wait_for_rdf_consistent_state(
self.data.array, self.data.remote_array,
self.data.device_id, self.data.device_id2,
self.data.extra_specs)
self.assertEqual(2, mock_paired.call_count)
self.assertRaises(exception.VolumeBackendAPIException,
self.rest.wait_for_rdf_consistent_state,
self.data.array, self.data.remote_array,
self.data.device_id, self.data.device_id2,
self.data.extra_specs)
class VMAXProvisionTest(test.TestCase):
def setUp(self):
@ -2922,6 +2963,16 @@ class VMAXProvisionTest(test.TestCase):
del_rdf.assert_called_once_with(
array, device_id, rdf_group_name)
def test_delete_rdf_pair_async(self):
with mock.patch.object(
self.provision.rest, 'delete_rdf_pair') as mock_del_rdf:
extra_specs = deepcopy(self.data.extra_specs)
extra_specs[utils.REP_MODE] = utils.REP_ASYNC
self.provision.delete_rdf_pair(
self.data.array, self.data.device_id,
self.data.rdf_group_no, extra_specs)
mock_del_rdf.assert_called_once()
def test_failover_volume(self):
array = self.data.array
device_id = self.data.device_id
@ -3197,7 +3248,7 @@ class VMAXCommonTest(test.TestCase):
extra_specs, self.data.connector)
mock_rm.assert_called_once_with(
array, volume, device_id, volume_name,
extra_specs, True, self.data.connector)
extra_specs, True, self.data.connector, async_grp=None)
def test_unmap_lun(self):
array = self.data.array
@ -3209,7 +3260,8 @@ class VMAXCommonTest(test.TestCase):
with mock.patch.object(self.common, '_remove_members'):
self.common._unmap_lun(volume, connector)
self.common._remove_members.assert_called_once_with(
array, volume, device_id, extra_specs, connector)
array, volume, device_id, extra_specs,
connector, async_grp=None)
def test_unmap_lun_not_mapped(self):
volume = self.data.test_volume
@ -3230,7 +3282,7 @@ class VMAXCommonTest(test.TestCase):
with mock.patch.object(self.common, '_remove_members'):
self.common._unmap_lun(volume, None)
self.common._remove_members.assert_called_once_with(
array, volume, device_id, extra_specs, None)
array, volume, device_id, extra_specs, None, async_grp=None)
def test_initialize_connection_already_mapped(self):
volume = self.data.test_volume
@ -4367,16 +4419,18 @@ class VMAXCommonTest(test.TestCase):
@mock.patch.object(volume_utils, 'is_group_a_cg_snapshot_type',
return_value=True)
@mock.patch.object(volume_utils, 'is_group_a_type',
side_effect=[False, False])
@mock.patch.object(volume_utils, 'is_group_a_type', return_value=False)
def test_create_group(self, mock_type, mock_cg_type):
ref_model_update = {'status': fields.GroupStatus.AVAILABLE}
model_update = self.common.create_group(None, self.data.test_group_1)
self.assertEqual(ref_model_update, model_update)
def test_create_group_exception(self):
@mock.patch.object(provision.VMAXProvision, 'create_volume_group',
side_effect=exception.CinderException)
@mock.patch.object(volume_utils, 'is_group_a_type', return_value=False)
def test_create_group_exception(self, mock_type, mock_create):
context = None
group = self.data.test_group_snapshot_failed
group = self.data.test_group_failed
with mock.patch.object(
volume_utils, 'is_group_a_cg_snapshot_type',
return_value=True):
@ -5591,15 +5645,16 @@ class VMAXMaskingTest(test.TestCase):
def test_cleanup_deletion(self, mock_add, mock_remove_vol, mock_get_sg):
self.mask._cleanup_deletion(
self.data.array, self.data.test_volume, self.device_id,
self.volume_name, self.extra_specs, None, True)
self.volume_name, self.extra_specs, None, True, None)
mock_add.assert_not_called()
self.mask._cleanup_deletion(
self.data.array, self.data.test_volume, self.device_id,
self.volume_name, self.extra_specs, self.data.connector, True)
self.volume_name, self.extra_specs,
self.data.connector, True, None)
mock_add.assert_not_called()
self.mask._cleanup_deletion(
self.data.array, self.data.test_volume, self.device_id,
self.volume_name, self.extra_specs, None, True)
self.volume_name, self.extra_specs, None, True, None)
mock_add.assert_called_once_with(
self.data.array, self.device_id,
self.volume_name, self.extra_specs, volume=self.data.test_volume)
@ -6000,6 +6055,17 @@ class VMAXCommonReplicationTest(test.TestCase):
self.extra_specs = deepcopy(self.data.extra_specs_rep_enabled)
self.extra_specs['retries'] = 0
self.extra_specs['interval'] = 0
self.extra_specs['rep_mode'] = 'Synchronous'
self.async_rep_device = {
'target_device_id': self.data.remote_array,
'remote_port_group': self.data.port_group_name_f,
'remote_pool': self.data.srp2,
'rdf_group_label': self.data.rdf_group_name,
'allow_extend': 'True', 'mode': 'async'}
async_configuration = FakeConfiguration(
self.fake_xml, config_group,
replication_device=self.async_rep_device)
self.async_driver = fc.VMAXFCDriver(configuration=async_configuration)
def test_get_replication_info(self):
self.common._get_replication_info()
@ -6506,14 +6572,17 @@ class VMAXCommonReplicationTest(test.TestCase):
@mock.patch.object(volume_utils, 'is_group_a_cg_snapshot_type',
return_value=False)
@mock.patch.object(volume_utils, 'is_group_a_type',
side_effect=[True, True])
@mock.patch.object(volume_utils, 'is_group_a_type', return_value=True)
def test_create_replicaton_group(self, mock_type, mock_cg_type):
ref_model_update = {
'status': fields.GroupStatus.AVAILABLE,
'replication_status': fields.ReplicationStatus.ENABLED}
model_update = self.common.create_group(None, self.data.test_group_1)
self.assertEqual(ref_model_update, model_update)
# Replication mode is async
self.assertRaises(exception.InvalidInput,
self.async_driver.common.create_group,
None, self.data.test_group_1)
def test_enable_replication(self):
# Case 1: Group not replicated
@ -6566,31 +6635,24 @@ class VMAXCommonReplicationTest(test.TestCase):
model_update['replication_status'])
def test_failover_replication(self):
# Case 1: Group not replicated
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=False):
self.assertRaises(NotImplementedError,
self.common.failover_replication,
None, self.data.test_group,
[self.data.test_volume])
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=True):
# Case 2: Empty group
# Case 1: Empty group
model_update, __ = self.common.failover_replication(
None, self.data.test_group, [])
self.assertEqual({}, model_update)
# Case 3: Successfully failed over
# Case 2: Successfully failed over
model_update, __ = self.common.failover_replication(
None, self.data.test_group, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
model_update['replication_status'])
# Case 4: Successfully failed back
# Case 3: Successfully failed back
model_update, __ = self.common.failover_replication(
None, self.data.test_group, [self.data.test_volume],
secondary_backend_id='default')
self.assertEqual(fields.ReplicationStatus.ENABLED,
model_update['replication_status'])
# Case 5: Exception
# Case 4: Exception
model_update, __ = self.common.failover_replication(
None, self.data.test_group_failed, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.ERROR,
@ -6648,3 +6710,50 @@ class VMAXCommonReplicationTest(test.TestCase):
self.data.array, self.data.test_vol_grp_name,
[self.data.device_id], self.extra_specs)
mock_rm.assert_called_once()
@mock.patch.object(masking.VMAXMasking, 'add_volume_to_storage_group')
def test_add_volume_to_async_group(self, mock_add):
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
self.async_driver.common._add_volume_to_async_rdf_managed_grp(
self.data.array, self.data.device_id, 'name',
self.data.remote_array, self.data.device_id2, extra_specs)
self.assertEqual(2, mock_add.call_count)
def test_add_volume_to_async_group_exception(self):
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
self.assertRaises(
exception.VolumeBackendAPIException,
self.async_driver.common._add_volume_to_async_rdf_managed_grp,
self.data.failed_resource, self.data.device_id, 'name',
self.data.remote_array, self.data.device_id2, extra_specs)
@mock.patch.object(common.VMAXCommon,
'_add_volume_to_async_rdf_managed_grp')
@mock.patch.object(masking.VMAXMasking, 'remove_and_reset_members')
def test_setup_volume_replication_async(self, mock_rm, mock_add):
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
rep_status, rep_data = (
self.async_driver.common.setup_volume_replication(
self.data.array, self.data.test_volume,
self.data.device_id, extra_specs))
self.assertEqual(fields.ReplicationStatus.ENABLED, rep_status)
self.assertEqual({'array': self.data.remote_array,
'device_id': self.data.device_id}, rep_data)
mock_add.assert_called_once()
@mock.patch.object(common.VMAXCommon, '_failover_replication',
return_value=({}, {}))
@mock.patch.object(common.VMAXCommon, '_failover_volume',
return_value={})
def test_failover_host_async(self, mock_fv, mock_fg):
volumes = [self.data.test_volume]
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
with mock.patch.object(common.VMAXCommon, '_initial_setup',
return_value=extra_specs):
self.async_driver.common.failover_host(volumes, None, [])
mock_fv.assert_not_called()
mock_fg.assert_called_once()

View File

@ -442,7 +442,7 @@ class VMAXCommon(object):
{'ssname': snap_name})
def _remove_members(self, array, volume, device_id,
extra_specs, connector):
extra_specs, connector, async_grp=None):
"""This method unmaps a volume from a host.
Removes volume from the storage group that belongs to a masking view.
@ -451,12 +451,13 @@ class VMAXCommon(object):
:param device_id: the VMAX volume device id
:param extra_specs: extra specifications
:param connector: the connector object
:param async_grp: the name if the async group, if applicable
"""
volume_name = volume.name
LOG.debug("Detaching volume %s.", volume_name)
return self.masking.remove_and_reset_members(
array, volume, device_id, volume_name,
extra_specs, True, connector)
extra_specs, True, connector, async_grp=async_grp)
def _unmap_lun(self, volume, connector):
"""Unmaps a volume from the host.
@ -469,6 +470,7 @@ class VMAXCommon(object):
extra_specs = self._get_replication_extra_specs(
extra_specs, self.rep_config)
volume_name = volume.name
async_grp = None
LOG.info("Unmap volume: %(volume)s.",
{'volume': volume_name})
if connector is not None:
@ -490,6 +492,10 @@ class VMAXCommon(object):
return
source_nf_sg = None
array = extra_specs[utils.ARRAY]
if (self.utils.is_replication_enabled(extra_specs) and
extra_specs.get(utils.REP_MODE, None) == utils.REP_ASYNC):
async_grp = self.utils.get_async_rdf_managed_grp_name(
self.rep_config)
if len(source_storage_group_list) > 1:
for storage_group in source_storage_group_list:
if 'NONFAST' in storage_group:
@ -502,7 +508,7 @@ class VMAXCommon(object):
extra_specs)
else:
self._remove_members(array, volume, device_info['device_id'],
extra_specs, connector)
extra_specs, connector, async_grp=async_grp)
def initialize_connection(self, volume, connector):
"""Initializes the connection and returns device and connection info.
@ -890,6 +896,8 @@ class VMAXCommon(object):
config_group = self.configuration.config_group
if extra_specs.get('replication_enabled') == '<is> True':
extra_specs[utils.IS_RE] = True
if self.rep_config and self.rep_config.get('mode'):
extra_specs[utils.REP_MODE] = self.rep_config['mode']
if register_config_file:
config_file = self._register_config_file_from_config_group(
config_group)
@ -1152,7 +1160,8 @@ class VMAXCommon(object):
% {'shortHostName': short_host_name,
'pg': short_pg_name})
if rep_enabled:
child_sg_name += "-RE"
rep_mode = extra_specs.get(utils.REP_MODE, None)
child_sg_name += self.utils.get_replication_prefix(rep_mode)
masking_view_dict['replication_enabled'] = True
mv_prefix = (
"OS-%(shortHostName)s-%(protocol)s-%(pg)s"
@ -2183,6 +2192,12 @@ class VMAXCommon(object):
array, volume, device_id, rdf_group_no, self.rep_config,
target_name, remote_array, target_device_id, extra_specs)
rep_mode = extra_specs.get(utils.REP_MODE, None)
if rep_mode == utils.REP_ASYNC:
self._add_volume_to_async_rdf_managed_grp(
array, device_id, source_name, remote_array,
target_device_id, extra_specs)
LOG.info('Successfully setup replication for %s.',
target_name)
replication_status = REPLICATION_ENABLED
@ -2190,6 +2205,39 @@ class VMAXCommon(object):
return replication_status, replication_driver_data
def _add_volume_to_async_rdf_managed_grp(
self, array, device_id, volume_name, remote_array,
target_device_id, extra_specs):
"""Add an async volume to its rdf management group.
:param array: the array serial number
:param device_id: the device id
:param volume_name: the volume name
:param remote_array: the remote array
:param target_device_id: the target device id
:param extra_specs: the extra specifications
:raises: VolumeBackendAPIException
"""
group_name = self.utils.get_async_rdf_managed_grp_name(
self.rep_config)
try:
self.provision.get_or_create_group(array, group_name, extra_specs)
self.masking.add_volume_to_storage_group(
array, device_id, group_name, volume_name, extra_specs)
# Add remote volume
self.provision.get_or_create_group(
remote_array, group_name, extra_specs)
self.masking.add_volume_to_storage_group(
remote_array, target_device_id,
group_name, volume_name, extra_specs)
except Exception as e:
exception_message = (
_('Exception occurred adding volume %(vol)s to its async '
'rdf management group - the exception received was: %(e)s')
% {'vol': volume_name, 'e': six.text_type(e)})
LOG.error(exception_message)
raise exception.VolumeBackendAPIException(data=exception_message)
def cleanup_lun_replication(self, volume, volume_name,
device_id, extra_specs):
"""Cleanup target volume on delete.
@ -2237,6 +2285,16 @@ class VMAXCommon(object):
'replication-enabled volume: %(volume)s',
{'volume': volume_name})
except Exception as e:
if extra_specs.get(utils.REP_MODE, None) == utils.REP_ASYNC:
(target_device, remote_array, rdf_group_no,
local_vol_state, pair_state) = (
self.get_remote_target_device(
extra_specs[utils.ARRAY], volume, device_id))
if target_device is not None:
# Return devices to their async rdf management groups
self._add_volume_to_async_rdf_managed_grp(
extra_specs[utils.ARRAY], device_id, volume_name,
remote_array, target_device, extra_specs)
exception_message = (
_('Cannot get necessary information to cleanup '
'replication target for volume: %(volume)s. '
@ -2290,6 +2348,8 @@ class VMAXCommon(object):
"Volume name: %(sourceName)s ",
{'sourceName': volume_name})
device_id = volume_dict['device_id']
# Check if volume is snap target (e.g. if clone volume)
self._sync_check(array, device_id, volume_name, extra_specs)
# Remove from any storage groups and cleanup replication
self._remove_vol_and_cleanup_replication(
array, device_id, volume_name, extra_specs, volume)
@ -2389,12 +2449,22 @@ class VMAXCommon(object):
'updates': grp_update})
volume_update_list += vol_updates
rep_mode = self.rep_config['mode']
if rep_mode == utils.REP_ASYNC:
vol_grp_name = self.utils.get_async_rdf_managed_grp_name(
self.rep_config)
__, volume_update_list = (
self._failover_replication(
volumes, None, vol_grp_name,
secondary_backend_id=group_fo, host=True))
for volume in volumes:
extra_specs = self._initial_setup(volume)
if self.utils.is_replication_enabled(extra_specs):
model_update = self._failover_volume(
volume, self.failover, extra_specs)
volume_update_list.append(model_update)
if rep_mode == utils.REP_SYNC:
model_update = self._failover_volume(
volume, self.failover, extra_specs)
volume_update_list.append(model_update)
else:
if self.failover:
# Since the array has been failed-over,
@ -2626,7 +2696,7 @@ class VMAXCommon(object):
# Establish replication relationship
rdf_dict = self.rest.create_rdf_device_pair(
array, device_id, rdf_group_no, target_device, remote_array,
target_name, extra_specs)
extra_specs)
# Add source and target instances to their replication groups
LOG.debug("Adding source device to default replication group.")
@ -2671,12 +2741,13 @@ class VMAXCommon(object):
"""
do_disable_compression = self.utils.is_compression_disabled(
extra_specs)
rep_mode = extra_specs.get(utils.REP_MODE, None)
try:
storagegroup_name = (
self.masking.get_or_create_default_storage_group(
array, extra_specs[utils.SRP], extra_specs[utils.SLO],
extra_specs[utils.WORKLOAD], extra_specs,
do_disable_compression, is_re=True))
do_disable_compression, is_re=True, rep_mode=rep_mode))
except Exception as e:
exception_message = (_("Failed to get or create replication"
"group. Exception received: %(e)s")
@ -2781,11 +2852,17 @@ class VMAXCommon(object):
:param context: the context
:param group: the group object to be created
:returns: dict -- modelUpdate
:raises: VolumeBackendAPIException, NotImplementedError
:raises: VolumeBackendAPIException, NotImplementedError, InvalidInput
"""
if (not volume_utils.is_group_a_cg_snapshot_type(group)
and not group.is_replicated):
raise NotImplementedError()
if group.is_replicated:
if (self.rep_config and self.rep_config.get('mode')
and self.rep_config['mode'] == utils.REP_ASYNC):
msg = _('Replication groups are not supported '
'for use with Asynchronous replication.')
raise exception.InvalidInput(reason=msg)
model_update = {'status': fields.GroupStatus.AVAILABLE}
@ -3523,11 +3600,24 @@ class VMAXCommon(object):
:param volumes: the list of volumes
:param secondary_backend_id: the secondary backend id - default None
:param host: flag to indicate if whole host is being failed over
:returns: model_update, None
:returns: model_update, vol_model_updates
"""
if not group.is_replicated:
raise NotImplementedError()
return self._failover_replication(
volumes, group, None,
secondary_backend_id=secondary_backend_id, host=host)
def _failover_replication(
self, volumes, group, vol_grp_name,
secondary_backend_id=None, host=False):
"""Failover replication for a group.
:param volumes: the list of volumes
:param group: the group object
:param vol_grp_name: the group name
:param secondary_backend_id: the secondary backend id - default None
:param host: flag to indicate if whole host is being failed over
:returns: model_update, vol_model_updates
"""
model_update = {}
vol_model_updates = []
if not volumes:
@ -3535,15 +3625,15 @@ class VMAXCommon(object):
return model_update, vol_model_updates
try:
vol_grp_name = None
extra_specs = self._initial_setup(volumes[0])
array = extra_specs[utils.ARRAY]
volume_group = self._find_volume_group(array, group)
if volume_group:
if 'name' in volume_group:
vol_grp_name = volume_group['name']
if vol_grp_name is None:
raise exception.GroupNotFound(group_id=group.id)
if group:
volume_group = self._find_volume_group(array, group)
if volume_group:
if 'name' in volume_group:
vol_grp_name = volume_group['name']
if vol_grp_name is None:
raise exception.GroupNotFound(group_id=group.id)
rdf_group_no, _ = self.get_rdf_details(array)
# As we only support a single replication target, ignore
@ -3567,7 +3657,7 @@ class VMAXCommon(object):
vol_rep_status = fields.ReplicationStatus.ERROR
LOG.error("Error failover replication on group %(group)s. "
"Exception received: %(e)s.",
{'group': group.id, 'e': e})
{'group': vol_grp_name, 'e': e})
for vol in volumes:
loc = vol.provider_location
@ -3583,6 +3673,7 @@ class VMAXCommon(object):
update = {'volume_id': vol.id, 'updates': update}
vol_model_updates.append(update)
LOG.debug("Volume model updates: %s", vol_model_updates)
return model_update, vol_model_updates
def get_attributes_from_cinder_config(self):

View File

@ -85,6 +85,7 @@ class VMAXFCDriver(san.SanDriver, driver.FibreChannelDriver):
- Support for Generic Volume Group
3.1.0 - Support for replication groups (Tiramisu)
- Deprecate backend xml configuration
- Support for async replication (vmax-replication-enhancements)
"""
VERSION = "3.1.0"

View File

@ -89,6 +89,7 @@ class VMAXISCSIDriver(san.SanISCSIDriver):
- Support for Generic Volume Group
3.1.0 - Support for replication groups (Tiramisu)
- Deprecate backend xml configuration
- Support for async replication (vmax-replication-enhancements)
"""
VERSION = "3.1.0"

View File

@ -69,12 +69,13 @@ class VMAXMasking(object):
volume_name = masking_view_dict[utils.VOL_NAME]
masking_view_dict[utils.EXTRA_SPECS] = extra_specs
device_id = masking_view_dict[utils.DEVICE_ID]
rep_mode = extra_specs.get(utils.REP_MODE, None)
default_sg_name = self.utils.get_default_storage_group_name(
masking_view_dict[utils.SRP],
masking_view_dict[utils.SLO],
masking_view_dict[utils.WORKLOAD],
masking_view_dict[utils.DISABLECOMPRESSION],
masking_view_dict[utils.IS_RE])
masking_view_dict[utils.IS_RE], rep_mode)
try:
error_message = self._get_or_create_masking_view(
@ -1023,7 +1024,7 @@ class VMAXMasking(object):
@coordination.synchronized("emc-vol-{device_id}")
def remove_and_reset_members(
self, serial_number, volume, device_id, volume_name,
extra_specs, reset=True, connector=None):
extra_specs, reset=True, connector=None, async_grp=None):
"""This is called on a delete, unmap device or rollback.
:param serial_number: the array serial number
@ -1033,14 +1034,15 @@ class VMAXMasking(object):
:param extra_specs: additional info
:param reset: reset, return to original SG (optional)
:param connector: the connector object (optional)
:param async_grp: the async rep group (optional)
"""
self._cleanup_deletion(
serial_number, volume, device_id, volume_name,
extra_specs, connector, reset)
extra_specs, connector, reset, async_grp)
def _cleanup_deletion(
self, serial_number, volume, device_id, volume_name,
extra_specs, connector, reset):
extra_specs, connector, reset, async_grp):
"""Prepare a volume for a delete operation.
:param serial_number: the array serial number
@ -1049,6 +1051,7 @@ class VMAXMasking(object):
:param volume_name: the volume name
:param extra_specs: the extra specifications
:param connector: the connector object
:param async_grp: the async rep group
"""
move = False
short_host_name = None
@ -1069,6 +1072,10 @@ class VMAXMasking(object):
extra_specs, connector, move)
break
else:
if reset is True and async_grp is not None:
for index, sg in enumerate(storagegroup_names):
if sg == async_grp:
storagegroup_names.pop(index)
for sg_name in storagegroup_names:
self.remove_volume_from_sg(
serial_number, device_id, volume_name, sg_name,
@ -1418,10 +1425,11 @@ class VMAXMasking(object):
do_disable_compression = self.utils.is_compression_disabled(
extra_specs)
rep_enabled = self.utils.is_replication_enabled(extra_specs)
rep_mode = extra_specs.get(utils.REP_MODE, None)
storagegroup_name = self.get_or_create_default_storage_group(
serial_number, extra_specs[utils.SRP], extra_specs[utils.SLO],
extra_specs[utils.WORKLOAD], extra_specs, do_disable_compression,
rep_enabled)
rep_enabled, rep_mode)
if src_sg is not None:
# Need to lock the default storage group
@coordination.synchronized("emc-sg-{default_sg_name}")
@ -1447,7 +1455,7 @@ class VMAXMasking(object):
def get_or_create_default_storage_group(
self, serial_number, srp, slo, workload, extra_specs,
do_disable_compression=False, is_re=False):
do_disable_compression=False, is_re=False, rep_mode=None):
"""Get or create a default storage group.
:param serial_number: the array serial number
@ -1457,13 +1465,14 @@ class VMAXMasking(object):
:param extra_specs: extra specifications
:param do_disable_compression: flag for compression
:param is_re: is replication enabled
:param rep_mode: flag to indicate replication mode
:returns: storagegroup_name
:raises: VolumeBackendAPIException
"""
storagegroup, storagegroup_name = (
self.rest.get_vmax_default_storage_group(
serial_number, srp, slo, workload, do_disable_compression,
is_re))
is_re, rep_mode))
if storagegroup is None:
self.provision.create_storage_group(
serial_number, storagegroup_name, srp, slo, workload,

View File

@ -416,13 +416,72 @@ class VMAXProvision(object):
LOG.info("Splitting rdf pair: source device: %(src)s "
"target device: %(tgt)s.",
{'src': device_id, 'tgt': target_device})
if state == 'Synchronized':
state_check = state.lower()
if state_check == utils.RDF_SYNC_STATE:
self.rest.modify_rdf_device_pair(
array, device_id, rdf_group, rep_extra_specs, split=True)
elif state_check in [utils.RDF_CONSISTENT_STATE,
utils.RDF_SYNCINPROG_STATE]:
if state_check == utils.RDF_SYNCINPROG_STATE:
self.rest.wait_for_rdf_consistent_state(
array, device_id, target_device,
rep_extra_specs, state)
self.rest.modify_rdf_device_pair(
array, device_id, rdf_group, rep_extra_specs, suspend=True)
LOG.info("Deleting rdf pair: source device: %(src)s "
"target device: %(tgt)s.",
{'src': device_id, 'tgt': target_device})
self.rest.delete_rdf_pair(array, device_id, rdf_group)
self.delete_rdf_pair(array, device_id, rdf_group, rep_extra_specs)
def delete_rdf_pair(
self, array, device_id, rdf_group, extra_specs):
"""Delete an rdf pairing.
If the replication mode is synchronous, only one attempt is required
to delete the pair. Otherwise, we need to wait until all the tracks
are cleared before the delete will be successful. As there is
currently no way to track this information, we keep attempting the
operation until it is successful.
:param array: the array serial number
:param device_id: source volume device id
:param rdf_group: the rdf group number
:param extra_specs: extra specifications
"""
if (extra_specs.get(utils.REP_MODE) and
extra_specs.get(utils.REP_MODE) == utils.REP_SYNC):
return self.rest.delete_rdf_pair(array, device_id, rdf_group)
def _delete_pair():
"""Delete a rdf volume pair.
Called at an interval until all the tracks are cleared
and the operation is successful.
:raises: loopingcall.LoopingCallDone
"""
retries = kwargs['retries']
try:
kwargs['retries'] = retries + 1
if not kwargs['delete_pair_success']:
self.rest.delete_rdf_pair(
array, device_id, rdf_group)
kwargs['delete_pair_success'] = True
except exception.VolumeBackendAPIException:
pass
if kwargs['retries'] > UNLINK_RETRIES:
LOG.error("Delete volume pair failed after %(retries)d "
"tries.", {'retries': retries})
raise loopingcall.LoopingCallDone(retvalue=30)
if kwargs['delete_pair_success']:
raise loopingcall.LoopingCallDone()
kwargs = {'retries': 0,
'delete_pair_success': False}
timer = loopingcall.FixedIntervalLoopingCall(_delete_pair)
rc = timer.start(interval=UNLINK_INTERVAL).wait()
return rc
def failover_volume(self, array, device_id, rdf_group,
extra_specs, local_vol_state, failover):

View File

@ -897,7 +897,7 @@ class VMAXRest(object):
def get_vmax_default_storage_group(
self, array, srp, slo, workload,
do_disable_compression=False, is_re=False):
do_disable_compression=False, is_re=False, rep_mode=None):
"""Get the default storage group.
:param array: the array serial number
@ -906,10 +906,11 @@ class VMAXRest(object):
:param workload: the workload
:param do_disable_compression: flag for disabling compression
:param is_re: flag for replication
:param rep_mode: flag to indicate replication mode
:returns: the storage group dict (or None), the storage group name
"""
storagegroup_name = self.utils.get_default_storage_group_name(
srp, slo, workload, do_disable_compression, is_re)
srp, slo, workload, do_disable_compression, is_re, rep_mode)
storagegroup = self.get_storage_group(array, storagegroup_name)
return storagegroup, storagegroup_name
@ -1870,6 +1871,54 @@ class VMAXRest(object):
LOG.warning("Cannot locate RDF session for volume %s", device_id)
return paired, local_vol_state, rdf_pair_state
def wait_for_rdf_consistent_state(
self, array, remote_array, device_id, target_device, extra_specs):
"""Wait for async pair to be in a consistent state before suspending.
:param array: the array serial number
:param remote_array: the remote array serial number
:param device_id: the device id
:param target_device: the target device id
:param extra_specs: the extra specifications
"""
def _wait_for_consistent_state():
# Called at an interval until the state of the
# rdf pair is 'consistent'.
retries = kwargs['retries']
try:
kwargs['retries'] = retries + 1
if not kwargs['consistent_state']:
__, __, state = (
self.are_vols_rdf_paired(
array, remote_array, device_id, target_device))
kwargs['state'] = state
if state.lower() == utils.RDF_CONSISTENT_STATE:
kwargs['consistent_state'] = True
kwargs['rc'] = 0
except Exception:
exception_message = _("Issue encountered waiting for job.")
LOG.exception(exception_message)
raise exception.VolumeBackendAPIException(
data=exception_message)
if retries > int(extra_specs[utils.RETRIES]):
LOG.error("_wait_for_consistent_state failed after "
"%(retries)d tries.", {'retries': retries})
kwargs['rc'] = -1
raise loopingcall.LoopingCallDone()
if kwargs['consistent_state']:
raise loopingcall.LoopingCallDone()
kwargs = {'retries': 0, 'consistent_state': False,
'rc': 0, 'state': 'syncinprog'}
timer = loopingcall.FixedIntervalLoopingCall(
_wait_for_consistent_state)
timer.start(interval=int(extra_specs[utils.INTERVAL])).wait()
LOG.debug("Return code is: %(rc)lu. State is %(state)s",
{'rc': kwargs['rc'], 'state': kwargs['state']})
def get_rdf_group_number(self, array, rdf_group_label):
"""Given an rdf_group_label, return the associated group number.
@ -1891,8 +1940,7 @@ class VMAXRest(object):
@coordination.synchronized('emc-rg-{rdf_group_no}')
def create_rdf_device_pair(self, array, device_id, rdf_group_no,
target_device, remote_array,
target_vol_name, extra_specs):
target_device, remote_array, extra_specs):
"""Create an RDF pairing.
Create a remote replication relationship between source and target
@ -1902,15 +1950,19 @@ class VMAXRest(object):
:param rdf_group_no: the rdf group number
:param target_device: the target device id
:param remote_array: the remote array serial
:param target_vol_name: the name of the target volume
:param extra_specs: the extra specs
:returns: rdf_dict
"""
rep_mode = (extra_specs[utils.REP_MODE]
if extra_specs.get(utils.REP_MODE) else utils.REP_SYNC)
payload = ({"deviceNameListSource": [{"name": device_id}],
"deviceNameListTarget": [{"name": target_device}],
"replicationMode": "Synchronous",
"replicationMode": rep_mode,
"establish": 'true',
"rdfType": 'RDF1'})
if rep_mode == utils.REP_ASYNC:
payload_update = self._get_async_payload_info(array, rdf_group_no)
payload.update(payload_update)
resource_type = ("rdf_group/%(rdf_num)s/volume"
% {'rdf_num': rdf_group_no})
status_code, job = self.create_resource(array, REPLICATION,
@ -1921,9 +1973,25 @@ class VMAXRest(object):
rdf_dict = {'array': remote_array, 'device_id': target_device}
return rdf_dict
def _get_async_payload_info(self, array, rdf_group_no):
"""Get the payload details for an async create pair.
:param array: the array serial number
:param rdf_group_no: the rdf group number
:return: payload_update
"""
num_vols, payload_update = 0, {}
rdfg_details = self.get_rdf_group(array, rdf_group_no)
if rdfg_details is not None and rdfg_details.get('numDevices'):
num_vols = int(rdfg_details['numDevices'])
if num_vols > 0:
payload_update = {'consExempt': 'true'}
return payload_update
@coordination.synchronized('emc-rg-{rdf_group}')
def modify_rdf_device_pair(
self, array, device_id, rdf_group, extra_specs, split=False):
self, array, device_id, rdf_group, extra_specs,
split=False, suspend=False):
"""Modify an rdf device pair.
:param array: the array serial number
@ -1931,6 +1999,7 @@ class VMAXRest(object):
:param rdf_group: the rdf group
:param extra_specs: the extra specs
:param split: flag to indicate "split" action
:param suspend: flag to indicate "suspend" action
"""
common_opts = {"force": 'false',
"symForce": 'false',
@ -1943,6 +2012,12 @@ class VMAXRest(object):
"executionOption": "ASYNCHRONOUS",
"split": common_opts}
elif suspend:
common_opts.update({"immediate": 'false', "consExempt": 'true'})
payload = {"action": "Suspend",
"executionOption": "ASYNCHRONOUS",
"suspend": common_opts}
else:
common_opts.update({"establish": 'true',
"restore": 'false',
@ -2089,7 +2164,7 @@ class VMAXRest(object):
resource_name = ("storagegroup/%(sg_name)s/rdf_group"
% {'sg_name': storagegroup_name})
payload = {"executionOption": "ASYNCHRONOUS",
"replicationMode": "Synchronous",
"replicationMode": utils.REP_SYNC,
"remoteSymmId": remote_array,
"remoteStorageGroupName": storagegroup_name,
"rdfgNumber": rdf_group_num, "establish": 'true'}
@ -2113,13 +2188,18 @@ class VMAXRest(object):
array, storagegroup_name, rdf_group_num)
if sg_rdf_details:
state_list = sg_rdf_details['states']
LOG.debug("RDF state: %(sl)s; Action required: %(action)s",
{'sl': state_list, 'action': action})
for state in state_list:
if (action.lower() in ["establish", "failback", "resume"] and
state.lower() in ["suspended", "failed over"]):
state.lower() in [utils.RDF_SUSPENDED_STATE,
utils.RDF_FAILEDOVER_STATE]):
mod_rqd = True
break
elif (action.lower() in ["split", "failover", "suspend"] and
state.lower() in ["synchronized", "syncinprog"]):
state.lower() in [utils.RDF_SYNC_STATE,
utils.RDF_SYNCINPROG_STATE,
utils.RDF_CONSISTENT_STATE]):
mod_rqd = True
break
return mod_rqd

View File

@ -58,6 +58,14 @@ VOL_NAME = 'volume_name'
EXTRA_SPECS = 'extra_specs'
IS_RE = 'replication_enabled'
DISABLECOMPRESSION = 'storagetype:disablecompression'
REP_SYNC = 'Synchronous'
REP_ASYNC = 'Asynchronous'
REP_MODE = 'rep_mode'
RDF_SYNC_STATE = 'synchronized'
RDF_SYNCINPROG_STATE = 'syncinprog'
RDF_CONSISTENT_STATE = 'consistent'
RDF_SUSPENDED_STATE = 'suspended'
RDF_FAILEDOVER_STATE = 'failed over'
# Cinder.conf vmax configuration
VMAX_SERVER_IP = 'san_ip'
@ -160,10 +168,9 @@ class VMAXUtils(object):
delta = end_time - start_time
return six.text_type(datetime.timedelta(seconds=int(delta)))
@staticmethod
def get_default_storage_group_name(
srp_name, slo, workload, is_compression_disabled=False,
is_re=False):
self, srp_name, slo, workload, is_compression_disabled=False,
is_re=False, rep_mode=None):
"""Determine default storage group from extra_specs.
:param srp_name: the name of the srp on the array
@ -171,6 +178,7 @@ class VMAXUtils(object):
:param workload: the workload string e.g DSS
:param is_compression_disabled: flag for disabling compression
:param is_re: flag for replication
:param rep_mode: flag to indicate replication mode
:returns: storage_group_name
"""
if slo and workload:
@ -184,7 +192,7 @@ class VMAXUtils(object):
else:
prefix = "OS-no_SLO"
if is_re:
prefix += "-RE"
prefix += self.get_replication_prefix(rep_mode)
storage_group_name = ("%(prefix)s-SG" % {'prefix': prefix})
return storage_group_name
@ -469,7 +477,8 @@ class VMAXUtils(object):
replication_enabled = True
return replication_enabled
def get_replication_config(self, rep_device_list):
@staticmethod
def get_replication_config(rep_device_list):
"""Gather necessary replication configuration info.
:param rep_device_list: the replication device list from cinder.conf
@ -493,15 +502,18 @@ class VMAXUtils(object):
LOG.exception(error_message)
raise exception.VolumeBackendAPIException(data=error_message)
try:
allow_extend = target['allow_extend']
if strutils.bool_from_string(allow_extend):
rep_config['allow_extend'] = True
else:
rep_config['allow_extend'] = False
except KeyError:
allow_extend = target.get('allow_extend', 'false')
if strutils.bool_from_string(allow_extend):
rep_config['allow_extend'] = True
else:
rep_config['allow_extend'] = False
rep_mode = target.get('mode', '')
if rep_mode.lower() in ['async', 'asynchronous']:
rep_config['mode'] = REP_ASYNC
else:
rep_config['mode'] = REP_SYNC
return rep_config
@staticmethod
@ -693,6 +705,7 @@ class VMAXUtils(object):
"""Check volume type and group type.
This will make sure they do not conflict with each other.
:param volume: volume to be checked
:param extra_specs: the extra specifications
:raises: InvalidInput
@ -717,6 +730,7 @@ class VMAXUtils(object):
Group status must be enabled before proceeding with certain
operations.
:param group: the group object
:raises: InvalidInput
"""
@ -729,3 +743,28 @@ class VMAXUtils(object):
else:
LOG.debug('Replication is not enabled on group %s, '
'skip status check.', group.id)
@staticmethod
def get_replication_prefix(rep_mode):
"""Get the replication prefix.
Replication prefix for storage group naming is based on whether it is
synchronous or asynchronous replication mode.
:param rep_mode: flag to indicate if replication is async
:return: prefix
"""
prefix = "-RE" if rep_mode == REP_SYNC else "-RA"
return prefix
@staticmethod
def get_async_rdf_managed_grp_name(rep_config):
"""Get the name of the group used for async replication management.
:param rep_config: the replication configuration
:return: group name
"""
rdf_group_name = rep_config['rdf_group_label']
async_grp_name = "OS-%(rdf)s-async-rdf-sg" % {'rdf': rdf_group_name}
LOG.debug("The async rdf managed group name is %s", async_grp_name)
return async_grp_name

View File

@ -0,0 +1,5 @@
---
features:
- |
Added asynchronous remote replication support in Dell EMC VMAX cinder driver.