XIV\A9000: Added replication group support

This patch adds group replication capabilities for
the IBM driver for the A-line system.

implements:
* Enable replication on group
* Disable replication on group
* Fail over replication
* Get replication error status

Implements: blueprint replication-cg-ibm-storage
Change-Id: Ic59c568502258096e24ca5ab81dc5b8cd1779995
Depends-On: Ia4af4dd011d569a3ac84387b37dcf2606da48fba
This commit is contained in:
Tzur Eliyahu 2017-05-28 18:54:37 +03:00 committed by Isaac Beckman
parent 695c1f9272
commit bb9a4e1a90
6 changed files with 869 additions and 314 deletions

View File

@ -25,6 +25,7 @@ pyxcli_client.errors = fake_pyxcli_exceptions
pyxcli_client.events = mock.Mock() pyxcli_client.events = mock.Mock()
pyxcli_client.mirroring = mock.Mock() pyxcli_client.mirroring = mock.Mock()
pyxcli_client.transports = fake_pyxcli_exceptions pyxcli_client.transports = fake_pyxcli_exceptions
pyxcli_client.mirroring.cg_recovery_manager = mock.Mock()
sys.modules['pyxcli'] = pyxcli_client sys.modules['pyxcli'] = pyxcli_client
sys.modules['pyxcli.events'] = pyxcli_client.events sys.modules['pyxcli.events'] = pyxcli_client.events

View File

@ -20,6 +20,7 @@ from xml.etree import ElementTree
from cinder import context from cinder import context
from cinder import exception from cinder import exception
from cinder import objects from cinder import objects
from cinder.objects import fields
from cinder import test from cinder import test
from cinder.tests.unit import fake_constants as fake from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import utils as testutils from cinder.tests.unit import utils as testutils
@ -27,9 +28,11 @@ from cinder.tests.unit.volume.drivers.ibm import fake_pyxcli
import cinder.volume.drivers.ibm.ibm_storage as storage import cinder.volume.drivers.ibm.ibm_storage as storage
from cinder.volume.drivers.ibm.ibm_storage import cryptish from cinder.volume.drivers.ibm.ibm_storage import cryptish
from cinder.volume.drivers.ibm.ibm_storage.xiv_proxy import XIVProxy from cinder.volume.drivers.ibm.ibm_storage.xiv_proxy import XIVProxy
from cinder.volume.drivers.ibm.ibm_storage import xiv_replication
from cinder.volume import group_types from cinder.volume import group_types
errors = fake_pyxcli.pyxcli_client.errors errors = fake_pyxcli.pyxcli_client.errors
mirroring = fake_pyxcli.pyxcli_client.mirroring
test_mock = mock.MagicMock() test_mock = mock.MagicMock()
module_patcher = mock.MagicMock() module_patcher = mock.MagicMock()
@ -45,6 +48,11 @@ TEST_VOLUME = {
'group_id': fake.CONSISTENCY_GROUP_ID, 'group_id': fake.CONSISTENCY_GROUP_ID,
} }
TEST_GROUP_SPECS = {
'group_replication_enabled': '<is> True',
'replication_type': 'sync',
}
TEST_EXTRA_SPECS = { TEST_EXTRA_SPECS = {
'replication_enabled': '<is> False', 'replication_enabled': '<is> False',
} }
@ -356,7 +364,174 @@ class XIVProxyTest(test.TestCase):
ex = getattr(p, "_get_exception")() ex = getattr(p, "_get_exception")()
self.assertRaises(ex, p.create_volume, volume) self.assertRaises(ex, p.create_volume, volume)
@mock.patch("cinder.volume.utils.group_get_by_id", mock.MagicMock()) @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_replication.VolumeReplication.create_replication",
mock.MagicMock())
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_replication.GroupReplication.create_replication",
mock.MagicMock())
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy.get_group_specs_by_group_resource",
mock.MagicMock(return_value=(TEST_GROUP_SPECS, '')))
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_target_params",
mock.MagicMock(return_value=REPLICA_PARAMS))
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_target",
mock.MagicMock(return_value="BLABLA"))
def test_enable_replication(self):
"""Test enable_replication"""
driver = mock.MagicMock()
driver.VERSION = "VERSION"
p = self.proxy(
self.default_storage_info,
mock.MagicMock(),
test_mock.cinder.exception,
driver)
p.ibm_storage_cli = mock.MagicMock()
p._call_remote_xiv_xcli = mock.MagicMock()
p._update_consistencygroup = mock.MagicMock()
p.targets = {'tgt1': 'info1'}
group = self._create_test_group('WTF')
vol = testutils.create_volume(self.ctxt)
ret = p.enable_replication(self.ctxt, group, [vol])
self.assertEqual((
{'replication_status': fields.ReplicationStatus.ENABLED},
[{'id': vol['id'],
'replication_status': fields.ReplicationStatus.ENABLED}]), ret)
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_replication.VolumeReplication.delete_replication",
mock.MagicMock())
@mock.patch("cinder.volume.group_types.get_group_type_specs",
mock.MagicMock(return_value=TEST_GROUP_SPECS))
def test_disable_replication(self):
"""Test disable_replication"""
driver = mock.MagicMock()
driver.VERSION = "VERSION"
p = self.proxy(
self.default_storage_info,
mock.MagicMock(),
test_mock.cinder.exception,
driver)
p.ibm_storage_cli = mock.MagicMock()
p._call_remote_xiv_xcli = mock.MagicMock()
p._update_consistencygroup = mock.MagicMock()
group = self._create_test_group('WTF')
ret = p.disable_replication(self.ctxt, group, [])
self.assertEqual((
{'replication_status': fields.ReplicationStatus.DISABLED}, []),
ret)
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._using_default_backend",
mock.MagicMock(return_value=False))
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_target_params",
mock.MagicMock(return_value={'san_clustername': "master"}))
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._init_xcli",
mock.MagicMock())
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._init_xcli",
mock.MagicMock())
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy.get_group_specs_by_group_resource",
mock.MagicMock(return_value=(TEST_GROUP_SPECS, '')))
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_replication.GroupReplication.failover",
mock.MagicMock(return_value=(True, 'good')))
def test_failover_replication_with_default(self):
driver = mock.MagicMock()
driver.VERSION = "VERSION"
p = self.proxy(
self.default_storage_info,
mock.MagicMock(),
test_mock.cinder.exception,
driver)
group = self._create_test_group('WTF')
group.replication_status = fields.ReplicationStatus.FAILED_OVER
vol = testutils.create_volume(self.ctxt)
group_update, vol_update = p.failover_replication(self.ctxt, group,
[vol], 'default')
updates = {'status': 'available'}
self.assertEqual(({'replication_status': 'available'},
[{'volume_id': vol['id'],
'updates': updates}]), (group_update, vol_update))
def test_failover_resource_no_mirror(self):
driver = mock.MagicMock()
driver.VERSION = "VERSION"
p = self.proxy(
self.default_storage_info,
mock.MagicMock(),
test_mock.cinder.exception,
driver)
recovery_mgr = mock.MagicMock()
recovery_mgr.is_mirror_active = mock.MagicMock()
recovery_mgr.is_mirror_active.return_value = False
group = self._create_test_group('WTF')
ret = xiv_replication.Replication(p)._failover_resource(
group, recovery_mgr, mock.MagicMock, 'cg', True)
msg = ("%(rep_type)s %(res)s: no active mirroring and can not "
"failback" % {'rep_type': 'cg',
'res': group['name']})
self.assertEqual((False, msg), ret)
def test_failover_resource_mirror(self):
driver = mock.MagicMock()
driver.VERSION = "VERSION"
p = self.proxy(
self.default_storage_info,
mock.MagicMock(),
test_mock.cinder.exception,
driver)
recovery_mgr = mock.MagicMock()
recovery_mgr.is_mirror_active = mock.MagicMock()
recovery_mgr.is_mirror_active.return_value = True
group = self._create_test_group('WTF')
ret = xiv_replication.Replication(p)._failover_resource(
group, recovery_mgr, mock.MagicMock, 'cg', True)
self.assertEqual((True, None), ret)
def test_failover_resource_change_role(self):
driver = mock.MagicMock()
driver.VERSION = "VERSION"
p = self.proxy(
self.default_storage_info,
mock.MagicMock(),
test_mock.cinder.exception,
driver)
recovery_mgr = mock.MagicMock()
recovery_mgr.is_mirror_active = mock.MagicMock()
recovery_mgr.is_mirror_active.return_value = True
recovery_mgr.switch_roles.side_effect = (
errors.XCLIError(''))
failover_rep_mgr = mock.MagicMock()
failover_rep_mgr.change_role = mock.MagicMock()
group = self._create_test_group('WTF')
xiv_replication.Replication(p)._failover_resource(
group, recovery_mgr, failover_rep_mgr, 'cg', True)
failover_rep_mgr.change_role.assert_called_once_with(
resource_id=group['name'],
new_role='Slave')
@mock.patch("cinder.volume.utils.is_group_a_cg_snapshot_type", @mock.patch("cinder.volume.utils.is_group_a_cg_snapshot_type",
mock.MagicMock(return_value=True)) mock.MagicMock(return_value=True))
def test_create_volume_with_consistency_group(self): def test_create_volume_with_consistency_group(self):
@ -376,8 +551,8 @@ class XIVProxyTest(test.TestCase):
vol_type = testutils.create_volume_type(self.ctxt, name='WTF') vol_type = testutils.create_volume_type(self.ctxt, name='WTF')
volume = testutils.create_volume( volume = testutils.create_volume(
self.ctxt, size=16, volume_type_id=vol_type.id) self.ctxt, size=16, volume_type_id=vol_type.id)
grp = testutils.create_group(self.ctxt, name='bla', group_type_id='1',
volume_type_ids=[vol_type.id]) grp = self._create_test_group('WTF')
volume.group = grp volume.group = grp
p.create_volume(volume) p.create_volume(volume)
@ -390,7 +565,7 @@ class XIVProxyTest(test.TestCase):
cg='cg') cg='cg')
@mock.patch("cinder.volume.drivers.ibm.ibm_storage." @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._replication_create", "xiv_replication.VolumeReplication.create_replication",
mock.MagicMock()) mock.MagicMock())
@mock.patch("cinder.volume.drivers.ibm.ibm_storage." @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_qos_specs", "xiv_proxy.XIVProxy._get_qos_specs",
@ -417,7 +592,7 @@ class XIVProxyTest(test.TestCase):
p.create_volume(volume) p.create_volume(volume)
@mock.patch("cinder.volume.drivers.ibm.ibm_storage." @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._replication_create", "xiv_replication.VolumeReplication.create_replication",
mock.MagicMock()) mock.MagicMock())
@mock.patch("cinder.volume.drivers.ibm.ibm_storage." @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_qos_specs", "xiv_proxy.XIVProxy._get_qos_specs",
@ -446,10 +621,6 @@ class XIVProxyTest(test.TestCase):
ex = getattr(p, "_get_exception")() ex = getattr(p, "_get_exception")()
self.assertRaises(ex, p.create_volume, volume) self.assertRaises(ex, p.create_volume, volume)
@mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_targets",
mock.MagicMock(
return_value={'tgt1': 'info1', 'tgt2': 'info2'}))
@mock.patch("cinder.volume.drivers.ibm.ibm_storage." @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_qos_specs", "xiv_proxy.XIVProxy._get_qos_specs",
mock.MagicMock(return_value=None)) mock.MagicMock(return_value=None))
@ -495,7 +666,7 @@ class XIVProxyTest(test.TestCase):
p.ibm_storage_cli.cmd.vol_delete.assert_called_once_with(vol='WTF32') p.ibm_storage_cli.cmd.vol_delete.assert_called_once_with(vol='WTF32')
@mock.patch("cinder.volume.drivers.ibm.ibm_storage." @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._replication_delete", "xiv_replication.VolumeReplication.delete_replication",
mock.MagicMock()) mock.MagicMock())
@mock.patch("cinder.volume.drivers.ibm.ibm_storage." @mock.patch("cinder.volume.drivers.ibm.ibm_storage."
"xiv_proxy.XIVProxy._get_extra_specs", "xiv_proxy.XIVProxy._get_extra_specs",
@ -1406,7 +1577,9 @@ class XIVProxyTest(test.TestCase):
ex = getattr(p, "_get_exception")() ex = getattr(p, "_get_exception")()
self.assertRaises(ex, p.create_group, {}, self._create_test_group()) self.assertRaises(ex, p.create_group, {}, self._create_test_group())
def test_create_consistencygroup_with_replication(self): @mock.patch("cinder.volume.drivers.ibm.ibm_storage.xiv_proxy."
"client.XCLIClient")
def test_create_consistencygroup_with_replication(self, mock_xcli):
"""test create_consistenygroup when replication is set""" """test create_consistenygroup when replication is set"""
p = self.proxy( p = self.proxy(
@ -1426,8 +1599,8 @@ class XIVProxyTest(test.TestCase):
group_obj.volume_types = objects.VolumeTypeList(context=self.ctxt, group_obj.volume_types = objects.VolumeTypeList(context=self.ctxt,
objects=[vol_type]) objects=[vol_type])
ex = getattr(p, "_get_exception")() model_update = p.create_group({}, group_obj)
self.assertRaises(ex, p.create_group, {}, group_obj) self.assertEqual('available', model_update['status'])
def test_create_consistencygroup_from_src_cgsnapshot(self): def test_create_consistencygroup_from_src_cgsnapshot(self):
"""test a successful cg create from cgsnapshot""" """test a successful cg create from cgsnapshot"""
@ -2113,7 +2286,7 @@ class XIVProxyTest(test.TestCase):
test_mock.cinder.exception, test_mock.cinder.exception,
driver) driver)
p._replication_create = test_mock.MagicMock(return_value=None) xiv_replication.VolumeReplication = mock.MagicMock()
grp = testutils.create_group(self.ctxt, name='bla', group_type_id='1') grp = testutils.create_group(self.ctxt, name='bla', group_type_id='1')
volume = testutils.create_volume(self.ctxt, display_name='bla') volume = testutils.create_volume(self.ctxt, display_name='bla')
volume.group = grp volume.group = grp

View File

@ -365,14 +365,6 @@ class IBMStorageProxy(object):
except Exception: except Exception:
return None return None
def _get_targets(self):
return self.targets
def _is_replication_supported(self):
if self.targets:
return True
return False
@_trace_time @_trace_time
def _read_replication_devices(self): def _read_replication_devices(self):
"""Read replication devices from configuration """Read replication devices from configuration

View File

@ -27,8 +27,7 @@ if pyxcli:
from pyxcli import client from pyxcli import client
from pyxcli import errors from pyxcli import errors
from pyxcli.events import events from pyxcli.events import events
from pyxcli.mirroring import errors as m_errors from pyxcli.mirroring import mirrored_entities
from pyxcli.mirroring import volume_recovery_manager
from pyxcli import transports from pyxcli import transports
from cinder import context from cinder import context
@ -40,6 +39,8 @@ from cinder.volume.drivers.ibm.ibm_storage import certificate
from cinder.volume.drivers.ibm.ibm_storage import cryptish from cinder.volume.drivers.ibm.ibm_storage import cryptish
from cinder.volume.drivers.ibm.ibm_storage import proxy from cinder.volume.drivers.ibm.ibm_storage import proxy
from cinder.volume.drivers.ibm.ibm_storage import strings from cinder.volume.drivers.ibm.ibm_storage import strings
from cinder.volume.drivers.ibm.ibm_storage import xiv_replication as repl
from cinder.volume import group_types
from cinder.volume import qos_specs from cinder.volume import qos_specs
from cinder.volume import utils from cinder.volume import utils
from cinder.volume import volume_types from cinder.volume import volume_types
@ -95,38 +96,19 @@ MANAGE_VOLUME_BASE_ERROR = _("Unable to manage the volume '%(volume)s': "
"%(error)s.") "%(error)s.")
class Rate(object):
def __init__(self, rpo, schedule):
self.rpo = rpo
self.schedule = schedule
self.schedule_name = self._schedule_name_from_schedule(self.schedule)
def _schedule_name_from_schedule(self, schedule):
if schedule == '00:00:20':
return 'min_interval'
return ("cinder_%(sched)s" %
{'sched': schedule.replace(':', '_')})
class XIVProxy(proxy.IBMStorageProxy): class XIVProxy(proxy.IBMStorageProxy):
"""Proxy between the Cinder Volume and Spectrum Accelerate Storage. """Proxy between the Cinder Volume and Spectrum Accelerate Storage.
Supports IBM XIV, Spectrum Accelerate, A9000, A9000R Supports IBM XIV, Spectrum Accelerate, A9000, A9000R
Version: 2.1.0 Version: 2.1.0
Required pyxcli version: 1.1.2 Required pyxcli version: 1.1.4
2.0 - First open source driver version 2.0 - First open source driver version
2.1.0 - Support Consistency groups through Generic volume groups 2.1.0 - Support Consistency groups through Generic volume groups
- Support XIV/A9000 Volume independent QoS - Support XIV/A9000 Volume independent QoS
- Support groups replication
""" """
async_rates = (
Rate(rpo=120, schedule='00:01:00'),
Rate(rpo=300, schedule='00:02:00'),
Rate(rpo=600, schedule='00:05:00'),
Rate(rpo=1200, schedule='00:10:00'),
)
def __init__(self, storage_info, logger, exception, def __init__(self, storage_info, logger, exception,
driver=None, active_backend_id=None): driver=None, active_backend_id=None):
@ -192,13 +174,6 @@ class XIVProxy(proxy.IBMStorageProxy):
LOG.info("Connection to the IBM storage " LOG.info("Connection to the IBM storage "
"system established successfully.") "system established successfully.")
def _get_schedule_from_rpo(self, rpo):
return [rate for rate in self.async_rates
if rate.rpo == rpo][0].schedule_name
def _get_supported_rpo(self):
return [rate.rpo for rate in self.async_rates]
@proxy._trace_time @proxy._trace_time
def _update_active_schedule_objects(self): def _update_active_schedule_objects(self):
"""Set schedule objects on active backend. """Set schedule objects on active backend.
@ -207,7 +182,7 @@ class XIVProxy(proxy.IBMStorageProxy):
min_interval. min_interval.
""" """
schedules = self._call_xiv_xcli("schedule_list").as_dict('name') schedules = self._call_xiv_xcli("schedule_list").as_dict('name')
for rate in self.async_rates: for rate in repl.Replication.async_rates:
if rate.schedule == '00:00:20': if rate.schedule == '00:00:20':
continue continue
name = rate.schedule_name name = rate.schedule_name
@ -245,7 +220,7 @@ class XIVProxy(proxy.IBMStorageProxy):
min_interval. min_interval.
""" """
schedules = self._call_remote_xiv_xcli("schedule_list").as_dict('name') schedules = self._call_remote_xiv_xcli("schedule_list").as_dict('name')
for rate in self.async_rates: for rate in repl.Replication.async_rates:
if rate.schedule == '00:00:20': if rate.schedule == '00:00:20':
continue continue
name = rate.schedule_name name = rate.schedule_name
@ -438,30 +413,12 @@ class XIVProxy(proxy.IBMStorageProxy):
return self._get_qos_specs(type_id) return self._get_qos_specs(type_id)
def _get_replication_info(self, specs): def _get_replication_info(self, specs):
info = {'enabled': False, 'mode': None, 'rpo': 0}
if specs: info, msg = repl.Replication.extract_replication_info_from_specs(specs)
LOG.debug('_get_replication_info: specs %(specs)s', if not info:
{'specs': specs}) LOG.error(msg)
info['enabled'] = ( raise self._get_exception()(message=msg)
specs.get('replication_enabled', '').upper() in
(u'TRUE', strings.METADATA_IS_TRUE))
replication_type = specs.get('replication_type', SYNC).lower()
if replication_type in (u'sync', u'<is> sync'):
info['mode'] = SYNC
elif replication_type in (u'async', u'<is> async'):
info['mode'] = ASYNC
else:
msg = (_("Unsupported replication mode %(mode)s")
% {'mode': replication_type})
LOG.error(msg)
raise self._get_exception()(message=msg)
info['rpo'] = int(specs.get('rpo', u'<is> 0')[5:])
if info['rpo'] and info['rpo'] not in self._get_supported_rpo():
msg = (_("Unsupported replication RPO %(rpo)s")
% {'rpo': info['rpo']})
LOG.error(msg)
raise self._get_exception()(message=msg)
LOG.debug('_get_replication_info: info %(info)s', {'info': info})
return info return info
@proxy._trace_time @proxy._trace_time
@ -491,26 +448,74 @@ class XIVProxy(proxy.IBMStorageProxy):
@proxy._trace_time @proxy._trace_time
def create_volume(self, volume): def create_volume(self, volume):
"""Creates a volume.""" """Creates a volume."""
# read replication information # read replication information
specs = self._get_extra_specs(volume.get('volume_type_id', None)) specs = self._get_extra_specs(volume.get('volume_type_id', None))
replication_info = self._get_replication_info(specs) replication_info = self._get_replication_info(specs)
if volume.group and replication_info['enabled']:
# An unsupported illegal configuration
msg = _("Unable to create volume: "
"Replication of consistency group is not supported")
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
self._create_volume(volume) self._create_volume(volume)
return self.handle_created_vol_properties(replication_info, return self.handle_created_vol_properties(replication_info,
volume) volume)
def handle_created_vol_properties(self, replication_info, volume): def handle_created_vol_properties(self, replication_info, volume):
volume_update = {} volume_update = {}
LOG.debug('checking replication_info %(rep)s',
{'rep': replication_info})
volume_update['replication_status'] = 'disabled'
cg = volume.group and utils.is_group_a_cg_snapshot_type(volume.group) cg = volume.group and utils.is_group_a_cg_snapshot_type(volume.group)
if replication_info['enabled']:
try:
repl.VolumeReplication(self).create_replication(
volume.name, replication_info)
except Exception as e:
details = self._get_code_and_status_or_message(e)
msg = ('Failed create_replication for '
'volume %(vol)s: %(err)s',
{'vol': volume['name'], 'err': details})
LOG.error(msg)
if cg:
cg_name = self._cg_name_from_volume(volume)
self._silent_delete_volume_from_cg(volume, cg_name)
self._silent_delete_volume(volume=volume)
raise
volume_update['replication_status'] = 'enabled'
if cg: if cg:
if volume.group.is_replicated:
# for replicated Consistency Group:
# The Volume must be mirrored, and its mirroring settings must
# be identical to those of the Consistency Group:
# mirroring type (e.g., synchronous),
# mirroring status, mirroring target(backend)
group_specs = group_types.get_group_type_specs(
volume.group.group_type_id)
group_rep_info = self._get_replication_info(group_specs)
msg = None
if volume_update['replication_status'] != 'enabled':
msg = ('Cannot add non-replicated volume into'
' replicated group')
elif replication_info['mode'] != group_rep_info['mode']:
msg = ('Volume replication type and Group replication type'
' should be the same')
elif volume.host != volume.group.host:
msg = 'Cannot add volume to Group on different host'
else:
group_name = self._cg_name_from_group(volume.group)
me = mirrored_entities.MirroredEntities(
self.ibm_storage_cli)
me_objs = me.get_mirror_resources_by_name_map()
vol_sync_state = me_objs['volumes'][volume.name].sync_state
cg_sync_state = me_objs['cgs'][group_name].sync_state
if (vol_sync_state != 'Synchronized' or
cg_sync_state != 'Synchronized'):
msg = ('Cannot add volume to Group. Both volume and '
'group should have sync_state = Synchronized')
if msg:
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(
data=msg)
try: try:
cg_name = self._cg_name_from_volume(volume) cg_name = self._cg_name_from_volume(volume)
self._call_xiv_xcli( self._call_xiv_xcli(
@ -543,35 +548,234 @@ class XIVProxy(proxy.IBMStorageProxy):
raise self.meta['exception'].VolumeBackendAPIException( raise self.meta['exception'].VolumeBackendAPIException(
data=msg) data=msg)
LOG.debug('checking replication_info %(rep)s',
{'rep': replication_info})
volume_update['replication_status'] = 'disabled'
if replication_info['enabled']:
try:
self._replication_create(volume, replication_info)
except Exception as e:
details = self._get_code_and_status_or_message(e)
msg = ('Failed _replication_create for '
'volume %(vol)s: %(err)s',
{'vol': volume['name'], 'err': details})
LOG.error(msg)
if cg:
cg_name = self._cg_name_from_volume(volume)
self._silent_delete_volume_from_cg(volume, cg_name)
self._silent_delete_volume(volume=volume)
raise
volume_update['replication_status'] = 'enabled'
return volume_update return volume_update
def get_group_specs_by_group_resource(self, context, group):
group_type = group.get('group_type_id', None)
if group_type is None:
msg = ('No group specs inside group type.')
return None, msg
group_specs = group_types.get_group_type_specs(group_type)
keyword = 'consistent_group_replication_enabled'
if not group_specs.get(keyword) == '<is> True':
msg = ('No cg replication field in group specs.')
return None, msg
return group_specs, ''
@proxy._trace_time
def enable_replication(self, context, group, volumes):
"""Enable cg replication"""
# fetch replication info
group_specs = group_types.get_group_type_specs(group.group_type_id)
if not group_specs:
msg = 'No group specs inside group type'
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
# Add this field to adjust it to generic replication (for volumes)
replication_info = self._get_replication_info(group_specs)
if utils.is_group_a_cg_snapshot_type(group):
# take every vol out of cg - we can't mirror the cg otherwise.
if volumes:
self._update_consistencygroup(context, group,
remove_volumes=volumes)
for volume in volumes:
repl.VolumeReplication(self).create_replication(
volume.name, replication_info)
# mirror entire group
group_name = self._cg_name_from_group(group)
self._create_consistencygroup_on_remote(context, group_name)
repl.GroupReplication(self).create_replication(group_name,
replication_info)
updated_volumes = []
if volumes:
# add volumes back to cg
self._update_consistencygroup(context, group,
add_volumes=volumes)
for volume in volumes:
updated_volumes.append(
{'id': volume['id'],
'replication_status':
fields.ReplicationStatus.ENABLED})
return ({'replication_status': fields.ReplicationStatus.ENABLED},
updated_volumes)
else:
# For generic groups we replicate all the volumes
updated_volumes = []
for volume in volumes:
repl.VolumeReplication(self).create_replication(
volume.name, replication_info)
# update status
for volume in volumes:
updated_volumes.append(
{'id': volume['id'],
'replication_status': fields.ReplicationStatus.ENABLED})
return ({'replication_status': fields.ReplicationStatus.ENABLED},
updated_volumes)
@proxy._trace_time
def disable_replication(self, context, group, volumes):
"""disables CG replication"""
group_specs = group_types.get_group_type_specs(group.group_type_id)
if not group_specs:
msg = 'No group specs inside group type'
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
replication_info = self._get_replication_info(group_specs)
updated_volumes = []
if utils.is_group_a_cg_snapshot_type(group):
# one call deletes replication for cgs and volumes together.
repl.GroupReplication(self).delete_replication(group,
replication_info)
for volume in volumes:
# xiv locks volumes after deletion of replication.
# we need to unlock it for further use.
try:
self.ibm_storage_cli.cmd.vol_unlock(vol=volume.name)
except errors.XCLIError as e:
details = self._get_code_and_status_or_message(e)
msg = ('Failed to unlock volumes %(details)s' %
{'details': details})
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(
data=msg)
updated_volumes.append(
{'id': volume.id,
'replication_status': fields.ReplicationStatus.DISABLED})
else:
# For generic groups we replicate all the volumes
updated_volumes = []
for volume in volumes:
repl.VolumeReplication(self).delete_replication(
volume.name, replication_info)
# update status
for volume in volumes:
updated_volumes.append(
{'id': volume['id'],
'replication_status': fields.ReplicationStatus.DISABLED})
return ({'replication_status': fields.ReplicationStatus.DISABLED},
updated_volumes)
def get_secondary_backend_id(self, secondary_backend_id):
if secondary_backend_id is None:
secondary_backend_id = self._get_target()
if secondary_backend_id is None:
msg = _("No targets defined. Can't perform failover.")
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(
data=msg)
return secondary_backend_id
def check_for_splitbrain(self, volumes, pool_master, pool_slave):
if volumes:
# check for split brain situations
# check for files that are available on both volumes
# and are not in an active mirroring relation
split_brain = self._potential_split_brain(
self.ibm_storage_cli,
self.ibm_storage_remote_cli,
volumes, pool_master,
pool_slave)
if split_brain:
# if such a situation exists stop and raise an exception!
msg = (_("A potential split brain condition has been found "
"with the following volumes: \n'%(volumes)s.'") %
{'volumes': split_brain})
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(
data=msg)
def failover_replication(self, context, group, volumes,
secondary_backend_id):
"""Failover a cg with all it's volumes.
if secondery_id is default, cg needs to be failed back.
"""
volumes_updated = []
goal_status = ''
pool_master = None
group_updated = {'replication_status': group.replication_status}
LOG.info("failover_replication: of cg %(cg)s "
"from %(active)s to %(id)s",
{'cg': group.get('name'),
'active': self.active_backend_id,
'id': secondary_backend_id})
if secondary_backend_id == strings.PRIMARY_BACKEND_ID:
# default as active backend id
if self._using_default_backend():
LOG.info("CG has been failed back. "
"No need to fail back again.")
return group_updated, volumes_updated
# get the master pool, not using default id.
pool_master = self._get_target_params(
self.active_backend_id)['san_clustername']
pool_slave = self.storage_info[storage.FLAG_KEYS['storage_pool']]
goal_status = 'available'
else:
if self._using_default_backend():
LOG.info("cg already failed over.")
return group_updated, volumes_updated
# using same api as Cheesecake, we need
# replciation_device entry. so we use get_targets.
secondary_backend_id = self.get_secondary_backend_id(
secondary_backend_id)
pool_master = self.storage_info[storage.FLAG_KEYS['storage_pool']]
pool_slave = self._get_target_params(
secondary_backend_id)['san_clustername']
goal_status = fields.ReplicationStatus.FAILED_OVER
# we should have secondary_backend_id by here.
self.ibm_storage_remote_cli = self._init_xcli(secondary_backend_id)
# check for split brain in mirrored volumes
self.check_for_splitbrain(volumes, pool_master, pool_slave)
group_specs, msg = self.get_group_specs_by_group_resource(context,
group)
if group_specs is None:
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
failback = (secondary_backend_id == strings.PRIMARY_BACKEND_ID)
result, details = repl.GroupReplication.failover(group, failback)
if result:
status = goal_status
group_updated['replication_status'] = status
else:
status = 'error'
updates = {'status': status}
if status == 'error':
group_updated['replication_extended_status'] = details
# if replication on cg was successful, then all of the volumes
# have been successfully replicated as well.
for volume in volumes:
volumes_updated.append({
'volume_id': volume.id,
'updates': updates
})
# replace between active and secondary xcli
self._replace_xcli_to_remote_xcli()
return group_updated, volumes_updated
def _replace_xcli_to_remote_xcli(self):
temp_ibm_storage_cli = self.ibm_storage_cli
self.ibm_storage_cli = self.ibm_storage_remote_cli
self.ibm_storage_remote_cli = temp_ibm_storage_cli
def _get_replication_target_params(self): def _get_replication_target_params(self):
LOG.debug('_get_replication_target_params.') LOG.debug('_get_replication_target_params.')
targets = self._get_targets() if not self.targets:
if not targets:
msg = _("No targets available for replication") msg = _("No targets available for replication")
LOG.error(msg) LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg) raise self.meta['exception'].VolumeBackendAPIException(data=msg)
no_of_targets = len(targets) no_of_targets = len(self.targets)
if no_of_targets > 1: if no_of_targets > 1:
msg = _("Too many targets configured. Only one is supported") msg = _("Too many targets configured. Only one is supported")
LOG.error(msg) LOG.error(msg)
@ -591,89 +795,6 @@ class XIVProxy(proxy.IBMStorageProxy):
raise self.meta['exception'].VolumeBackendAPIException(data=msg) raise self.meta['exception'].VolumeBackendAPIException(data=msg)
return target, params return target, params
def _replication_create(self, volume, replication_info):
LOG.debug('_replication_create replication_info %(rep)s',
{'rep': replication_info})
target, params = self._get_replication_target_params()
LOG.info('Target %(target)s: %(params)s',
{'target': target, 'params': six.text_type(params)})
try:
pool = params['san_clustername']
except Exception:
msg = (_("Missing pool information for target '%(target)s'"),
{'target': target})
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
volume_replication_mgr = volume_recovery_manager.VolumeRecoveryManager(
False, self.ibm_storage_cli)
self._replication_create_mirror(volume, replication_info,
target, pool, volume_replication_mgr)
def _replication_create_mirror(self, volume, replication_info,
target, pool, volume_replication_mgr):
LOG.debug('_replication_create_mirror')
schedule = None
if replication_info['rpo']:
schedule = self._get_schedule_from_rpo(replication_info['rpo'])
if schedule:
LOG.debug('schedule %(sched)s: for rpo %(rpo)s',
{'sched': schedule, 'rpo': replication_info['rpo']})
else:
LOG.error('Failed to find schedule for rpo %(rpo)s',
{'rpo': replication_info['rpo']})
# will fail in the next step
try:
volume_replication_mgr.create_mirror(
resource_name=volume['name'],
target_name=target,
mirror_type=replication_info['mode'],
slave_resource_name=volume['name'],
create_slave='yes',
remote_pool=pool,
rpo=replication_info['rpo'],
schedule=schedule,
activate_mirror='yes')
# TBD - what exceptions will we get here?
except Exception as e:
details = self._get_code_and_status_or_message(e)
msg = (_("Failed replication for %(vol)s: '%(details)s'"),
{'vol': volume['name'], 'details': details})
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
def _replication_delete(self, volume, replication_info):
LOG.debug('_replication_delete replication_info %(rep)s',
{'rep': replication_info})
targets = self._get_targets()
if not targets:
LOG.debug('No targets defined for replication')
volume_replication_mgr = volume_recovery_manager.VolumeRecoveryManager(
False, self.ibm_storage_cli)
try:
volume_replication_mgr.deactivate_mirror(
resource_id=volume['name'])
except Exception as e:
details = self._get_code_and_status_or_message(e)
msg = (_("Failed ending replication for %(vol)s: '%(details)s'"),
{'vol': volume['name'], 'details': details})
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
try:
volume_replication_mgr.delete_mirror(
resource_id=volume['name'])
except Exception as e:
details = self._get_code_and_status_or_message(e)
msg = (_("Failed deleting replica for %(vol)s: '%(details)s'"),
{'vol': volume['name'], 'details': details})
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(data=msg)
def _delete_volume(self, vol_name): def _delete_volume(self, vol_name):
"""Deletes a volume on the Storage.""" """Deletes a volume on the Storage."""
LOG.debug("_delete_volume: %(volume)s", LOG.debug("_delete_volume: %(volume)s",
@ -727,7 +848,8 @@ class XIVProxy(proxy.IBMStorageProxy):
replication_info = self._get_replication_info(specs) replication_info = self._get_replication_info(specs)
if replication_info['enabled']: if replication_info['enabled']:
try: try:
self._replication_delete(volume, replication_info) repl.VolumeReplication(self).delete_replication(
volume.name, replication_info)
except Exception as e: except Exception as e:
error = self._get_code_and_status_or_message(e) error = self._get_code_and_status_or_message(e)
LOG.error(DELETE_VOLUME_BASE_ERROR, LOG.error(DELETE_VOLUME_BASE_ERROR,
@ -1221,70 +1343,6 @@ class XIVProxy(proxy.IBMStorageProxy):
potential_split_brain.append(name) potential_split_brain.append(name)
return potential_split_brain return potential_split_brain
def _failover_vol(self, volume, volume_replication_mgr,
failover_volume_replication_mgr,
replication_info, failback):
"""Failover a single volume.
Attempts to failover a single volume
Sequence:
1. attempt to switch roles from master
2. attempt to change role to master on slave
returns (success, failure_reason)
"""
LOG.debug("_failover_vol %(vol)s", {'vol': volume['name']})
# check if mirror is defined and active
try:
LOG.debug('Check if mirroring is active on %(vol)s.',
{'vol': volume['name']})
active = volume_replication_mgr.is_mirror_active(
resource_id=volume['name'])
except Exception:
active = False
state = 'active' if active else 'inactive'
LOG.debug('Mirroring is %(state)s', {'state': state})
# In case of failback, mirroring must be active
# In case of failover we attempt to move in any condition
if failback and not active:
msg = ("Volume %(vol)s: no active mirroring and can not "
"failback",
{'vol': volume['name']})
LOG.error(msg)
return False, msg
try:
volume_replication_mgr.switch_roles(resource_id=volume['name'])
return True, None
except Exception as e:
# failed attempt to switch_roles from the master
details = self._get_code_and_status_or_message(e)
LOG.warning('Failed to perform switch_roles on'
' %(vol)s: %(err)s. '
'Continue to change_role',
{'vol': volume['name'], 'err': details})
try:
# this is the ugly stage we come to brute force
LOG.warning('Attempt to change_role to master')
failover_volume_replication_mgr.failover_by_id(
resource_id=volume['name'])
return True, None
except m_errors.NoMirrorDefinedError as e:
details = self._get_code_and_status_or_message(e)
msg = ("Volume %(vol)s no replication defined: %(err)s" %
{'vol': volume['name'], 'err': details})
LOG.error(msg)
return False, msg
except Exception as e:
details = self._get_code_and_status_or_message(e)
msg = ('Volume %(vol)s change_role failed: %(err)s' %
{'vol': volume['name'], 'err': details})
LOG.error(msg)
return False, msg
@proxy._trace_time @proxy._trace_time
def failover_host(self, context, volumes, secondary_id, groups=None): def failover_host(self, context, volumes, secondary_id, groups=None):
"""Failover a full backend. """Failover a full backend.
@ -1316,14 +1374,7 @@ class XIVProxy(proxy.IBMStorageProxy):
LOG.info("Already failed over. No need to failover again.") LOG.info("Already failed over. No need to failover again.")
return self.active_backend_id, volume_update_list, [] return self.active_backend_id, volume_update_list, []
# case: need to select a target # case: need to select a target
if secondary_id is None: secondary_id = self.get_secondary_backend_id(secondary_id)
secondary_id = self._get_target()
# still no targets..
if secondary_id is None:
msg = _("No targets defined. Can't perform failover")
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(
data=msg)
pool_master = self.storage_info[storage.FLAG_KEYS['storage_pool']] pool_master = self.storage_info[storage.FLAG_KEYS['storage_pool']]
try: try:
pool_slave = self._get_target_params( pool_slave = self._get_target_params(
@ -1340,45 +1391,21 @@ class XIVProxy(proxy.IBMStorageProxy):
# calling _init_xcli with secondary_id # calling _init_xcli with secondary_id
self.ibm_storage_remote_cli = self._init_xcli(secondary_id) self.ibm_storage_remote_cli = self._init_xcli(secondary_id)
# Create volume manager for both master and remote
volume_replication_mgr = volume_recovery_manager.VolumeRecoveryManager(
False, self.ibm_storage_cli)
failover_volume_replication_mgr = (
volume_recovery_manager.VolumeRecoveryManager(
True, self.ibm_storage_remote_cli))
# get replication_info for all volumes at once # get replication_info for all volumes at once
if len(volumes): if len(volumes):
# check for split brain situations # check for split brain situations
# check for files that are available on both volumes # check for files that are available on both volumes
# and are not in an active mirroring relation # and are not in an active mirroring relation
split_brain = self._potential_split_brain( self.check_for_splitbrain(volumes, pool_master, pool_slave)
self.ibm_storage_cli,
self.ibm_storage_remote_cli,
volumes, pool_master,
pool_slave)
if len(split_brain):
# if such a situation exists stop and raise an exception!
msg = (_("A potential split brain condition has been found "
"with the following volumes: \n'%(volumes)s.'"),
{'volumes': split_brain})
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(
data=msg)
specs = self._get_extra_specs(
volumes[0].get('volume_type_id', None))
replication_info = self._get_replication_info(specs)
# loop over volumes and attempt failover # loop over volumes and attempt failover
for volume in volumes: for volume in volumes:
LOG.debug("Attempting to failover '%(vol)s'", LOG.debug("Attempting to failover '%(vol)s'",
{'vol': volume['name']}) {'vol': volume['name']})
result, details = self._failover_vol(
volume, result, details = repl.VolumeReplication(self).failover(
volume_replication_mgr, volume, failback=(secondary_id == strings.PRIMARY_BACKEND_ID))
failover_volume_replication_mgr,
replication_info,
failback=(secondary_id == strings.PRIMARY_BACKEND_ID))
if result: if result:
status = goal_status status = goal_status
else: else:
@ -1393,9 +1420,7 @@ class XIVProxy(proxy.IBMStorageProxy):
}) })
# set active xcli to secondary xcli # set active xcli to secondary xcli
temp_ibm_storage_cli = self.ibm_storage_cli self._replace_xcli_to_remote_xcli()
self.ibm_storage_cli = self.ibm_storage_remote_cli
self.ibm_storage_remote_cli = temp_ibm_storage_cli
# set active backend id to secondary id # set active backend id to secondary id
self.active_backend_id = secondary_id self.active_backend_id = secondary_id
@ -1500,6 +1525,8 @@ class XIVProxy(proxy.IBMStorageProxy):
self.meta['stat']["driver_version"] = self.full_version self.meta['stat']["driver_version"] = self.full_version
self.meta['stat']["storage_protocol"] = connection_type self.meta['stat']["storage_protocol"] = connection_type
self.meta['stat']['multiattach'] = False self.meta['stat']['multiattach'] = False
self.meta['stat']['group_replication_enabled'] = True
self.meta['stat']['consistent_group_replication_enabled'] = True
self.meta['stat']['QoS_support'] = ( self.meta['stat']['QoS_support'] = (
self._check_storage_version_for_qos_support()) self._check_storage_version_for_qos_support())
@ -1538,15 +1565,14 @@ class XIVProxy(proxy.IBMStorageProxy):
self.meta['stat']['thin_provision'] = ('True' if soft_size > hard_size self.meta['stat']['thin_provision'] = ('True' if soft_size > hard_size
else 'False') else 'False')
targets = self._get_targets() if self.targets:
if targets:
self.meta['stat']['replication_enabled'] = True self.meta['stat']['replication_enabled'] = True
# TBD - replication_type should be according to type
self.meta['stat']['replication_type'] = [SYNC, ASYNC] self.meta['stat']['replication_type'] = [SYNC, ASYNC]
self.meta['stat']['rpo'] = self._get_supported_rpo() self.meta['stat']['rpo'] = repl.Replication.get_supported_rpo()
self.meta['stat']['replication_count'] = len(targets) self.meta['stat']['replication_count'] = len(self.targets)
self.meta['stat']['replication_targets'] = [target for target in self.meta['stat']['replication_targets'] = [target for target in
six.iterkeys(targets)] six.iterkeys(
self.targets)]
self.meta['stat']['timestamp'] = datetime.datetime.utcnow() self.meta['stat']['timestamp'] = datetime.datetime.utcnow()
@ -1679,18 +1705,6 @@ class XIVProxy(proxy.IBMStorageProxy):
def create_group(self, context, group): def create_group(self, context, group):
"""Creates a group.""" """Creates a group."""
for volume_type in group.volume_types:
replication_info = self._get_replication_info(
volume_type.extra_specs)
if replication_info.get('enabled'):
# An unsupported illegal configuration
msg = _("Unable to create group: create group with "
"replication volume type is not supported")
LOG.error(msg)
raise self.meta['exception'].VolumeBackendAPIException(
data=msg)
if utils.is_group_a_cg_snapshot_type(group): if utils.is_group_a_cg_snapshot_type(group):
cgname = self._cg_name_from_group(group) cgname = self._cg_name_from_group(group)
return self._create_consistencygroup(context, cgname) return self._create_consistencygroup(context, cgname)
@ -1726,6 +1740,35 @@ class XIVProxy(proxy.IBMStorageProxy):
model_update = {'status': fields.GroupStatus.AVAILABLE} model_update = {'status': fields.GroupStatus.AVAILABLE}
return model_update return model_update
def _create_consistencygroup_on_remote(self, context, cgname):
"""Creates a consistency group on secondary machine.
Return group available even if it already exists (for replication)
"""
LOG.info("Creating consistency group %(name)s on secondary.",
{'name': cgname})
# call remote XCLI
try:
self._call_remote_xiv_xcli(
"cg_create", cg=cgname,
pool=self.storage_info[
storage.FLAG_KEYS['storage_pool']]).as_list
except errors.CgNameExistsError:
model_update = {'status': fields.GroupStatus.AVAILABLE}
except errors.CgLimitReachedError:
error = _("Maximum number of consistency groups reached")
LOG.error(error)
raise self._get_exception()(error)
except errors.XCLIError as e:
error = (_("Fatal error in cg_create on remote: %(details)s") %
{'details': self._get_code_and_status_or_message(e)})
LOG.error(error)
raise self._get_exception()(error)
model_update = {'status': fields.GroupStatus.AVAILABLE}
return model_update
def _silent_cleanup_consistencygroup_from_src(self, context, group, def _silent_cleanup_consistencygroup_from_src(self, context, group,
volumes, cgname): volumes, cgname):
"""Silent cleanup of volumes from CG. """Silent cleanup of volumes from CG.

View File

@ -0,0 +1,342 @@
# Copyright (c) 2017 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import six
from oslo_log import log as logging
from oslo_utils import importutils
pyxcli = importutils.try_import("pyxcli")
if pyxcli:
from pyxcli import errors
from pyxcli.mirroring import cg_recovery_manager
from pyxcli.mirroring import errors as m_errors
from pyxcli.mirroring import volume_recovery_manager
from cinder.i18n import _
from cinder.volume.drivers.ibm.ibm_storage import strings
SYNC = 'sync'
ASYNC = 'async'
LOG = logging.getLogger(__name__)
class Rate(object):
def __init__(self, rpo, schedule):
self.rpo = rpo
self.schedule = schedule
self.schedule_name = self._schedule_name_from_schedule(self.schedule)
def _schedule_name_from_schedule(self, schedule):
if schedule == '00:00:20':
return 'min_interval'
return ("cinder_%(sched)s" %
{'sched': schedule.replace(':', '_')})
class Replication(object):
async_rates = (
Rate(rpo=120, schedule='00:01:00'),
Rate(rpo=300, schedule='00:02:00'),
Rate(rpo=600, schedule='00:05:00'),
Rate(rpo=1200, schedule='00:10:00'),
)
def __init__(self, proxy):
self.proxy = proxy
@staticmethod
def get_schedule_from_rpo(rpo):
schedule = [rate for rate in Replication.async_rates
if rate.rpo == rpo][0].schedule_name
if schedule:
LOG.debug('schedule %(sched)s: for rpo %(rpo)s',
{'sched': schedule, 'rpo': rpo})
else:
LOG.error('Failed to find schedule for rpo %(rpo)s',
{'rpo': rpo})
return schedule
@staticmethod
def get_supported_rpo():
return [rate.rpo for rate in Replication.async_rates]
def get_recovery_mgr(self):
# Recovery manager is set in derived classes
raise NotImplementedError
def get_remote_recovery_mgr(self):
# Recovery manager is set in derived classes
raise NotImplementedError
def replication_create_mirror(self, resource, replication_info,
target, pool):
raise NotImplementedError
@staticmethod
def extract_replication_info_from_specs(specs):
info = {'enabled': False, 'mode': None, 'rpo': 0}
msg = ""
if specs:
LOG.debug('extract_replication_info_from_specs: specs %(specs)s',
{'specs': specs})
info['enabled'] = (
specs.get('replication_enabled', '').upper() in
(u'TRUE', strings.METADATA_IS_TRUE) or
specs.get('group_replication_enabled', '').upper() in
(u'TRUE', strings.METADATA_IS_TRUE))
replication_type = specs.get('replication_type', SYNC).lower()
if replication_type in (u'sync', u'<is> sync'):
info['mode'] = SYNC
elif replication_type in (u'async', u'<is> async'):
info['mode'] = ASYNC
else:
msg = (_("Unsupported replication mode %(mode)s")
% {'mode': replication_type})
return None, msg
info['rpo'] = int(specs.get('rpo', u'<is> 0')[5:])
supported_rpos = Replication.get_supported_rpo()
if info['rpo'] and info['rpo'] not in supported_rpos:
msg = (_("Unsupported replication RPO %(rpo)s"),
{'rpo': info['rpo']})
return None, msg
LOG.debug('extract_replication_info_from_specs: info %(info)s',
{'info': info})
return info, msg
def failover(self, resource, failback):
raise NotImplementedError
def create_replication(self, resource_name, replication_info):
LOG.debug('Replication::create_replication replication_info %(rep)s',
{'rep': replication_info})
target, params = self.proxy._get_replication_target_params()
LOG.info('Target %(target)s: %(params)s',
{'target': target, 'params': six.text_type(params)})
try:
pool = params['san_clustername']
except Exception:
msg = (_("Missing pool information for target '%(target)s'") %
{'target': target})
LOG.error(msg)
raise self.proxy.meta['exception'].VolumeBackendAPIException(
data=msg)
self.replication_create_mirror(resource_name, replication_info,
target, pool)
def delete_replication(self, resource_name, replication_info):
LOG.debug('Replication::delete_replication replication_info %(rep)s',
{'rep': replication_info})
recovery_mgr = self.get_recovery_mgr()
try:
recovery_mgr.deactivate_mirror(resource_id=resource_name)
except Exception as e:
details = self.proxy._get_code_and_status_or_message(e)
msg = (_("Failed ending replication for %(resource)s: "
"'%(details)s'") % {'resource': resource_name,
'details': details})
LOG.error(msg)
raise self.proxy.meta['exception'].VolumeBackendAPIException(
data=msg)
try:
recovery_mgr.delete_mirror(resource_id=resource_name)
except Exception as e:
details = self.proxy._get_code_and_status_or_message(e)
msg = (_("Failed deleting replica for %(resource)s: "
"'%(details)s'") % {'resource': resource_name,
'details': details})
LOG.error(msg)
raise self.proxy.meta['exception'].VolumeBackendAPIException(
data=msg)
def _failover_resource(self, resource, recovery_mgr, failover_rep_mgr,
rep_type, failback):
# check if mirror is defined and active
LOG.debug('Check if mirroring is active on %(res)s',
{'res': resource['name']})
try:
active = recovery_mgr.is_mirror_active(
resource_id=resource['name'])
except Exception:
active = False
state = 'active' if active else 'inactive'
LOG.debug('Mirroring is %(state)s', {'state': state})
# In case of failback, mirroring must be active
# In case of failover we attempt to move in any condition
if failback and not active:
msg = ("%(rep_type)s %(res)s: no active mirroring and can not "
"failback" % {'rep_type': rep_type,
'res': resource['name']})
LOG.error(msg)
return False, msg
try:
recovery_mgr.switch_roles(resource_id=resource['name'])
return True, None
except Exception as e:
# failed attempt to switch_roles from the master
details = self.proxy._get_code_and_status_or_message(e)
LOG.warning('Failed to perform switch_roles on'
' %(res)s: %(err)s. '
'Continue to change_role',
{'res': resource['name'], 'err': details})
try:
# this is the ugly stage we come to brute force
if failback:
role = 'Slave'
else:
role = 'Master'
LOG.warning('Attempt to change_role to %(role)s', {'role': role})
failover_rep_mgr.change_role(resource_id=resource['name'],
new_role=role)
return True, None
except m_errors.NoMirrorDefinedError as e:
details = self.proxy._get_code_and_status_or_message(e)
msg = ("%(rep_type)s %(res)s no replication defined: %(err)s" %
{'rep_type': rep_type, 'res': resource['name'],
'err': details})
LOG.error(msg)
return False, msg
except Exception as e:
details = self.proxy._get_code_and_status_or_message(e)
msg = ('%(rep_type)s %(res)s change_role failed: %(err)s' %
{'rep_type': rep_type, 'res': resource['name'],
'err': details})
LOG.error(msg)
return False, msg
class VolumeReplication(Replication):
def __init__(self, proxy):
super(VolumeReplication, self).__init__(proxy)
def get_recovery_mgr(self):
return volume_recovery_manager.VolumeRecoveryManager(
False, self.proxy.ibm_storage_cli)
def get_remote_recovery_mgr(self):
return volume_recovery_manager.VolumeRecoveryManager(
True, self.proxy.ibm_storage_remote_cli)
def replication_create_mirror(self, resource_name, replication_info,
target, pool):
LOG.debug('VolumeReplication::replication_create_mirror')
schedule = None
if replication_info['rpo']:
schedule = Replication.get_schedule_from_rpo(
replication_info['rpo'])
try:
recovery_mgr = self.get_recovery_mgr()
recovery_mgr.create_mirror(
resource_name=resource_name,
target_name=target,
mirror_type=replication_info['mode'],
slave_resource_name=resource_name,
create_slave='yes',
remote_pool=pool,
rpo=replication_info['rpo'],
schedule=schedule,
activate_mirror='yes')
except errors.VolumeMasterError:
LOG.debug('Volume %(vol)s has been already mirrored',
{'vol': resource_name})
except Exception as e:
details = self.proxy._get_code_and_status_or_message(e)
msg = (_("Failed replication for %(resource)s: '%(details)s'") %
{'resource': resource_name, 'details': details})
LOG.error(msg)
raise self.proxy.meta['exception'].VolumeBackendAPIException(
data=msg)
def failover(self, resource, failback):
"""Failover a single volume.
Attempts to failover a single volume
Sequence:
1. attempt to switch roles from master
2. attempt to change role to master on secondary
returns (success, failure_reason)
"""
LOG.debug("VolumeReplication::failover %(vol)s",
{'vol': resource['name']})
recovery_mgr = self.get_recovery_mgr()
remote_recovery_mgr = self.get_remote_recovery_mgr()
return self._failover_resource(resource, recovery_mgr,
remote_recovery_mgr, 'vol', failback)
class GroupReplication(Replication):
def __init__(self, proxy):
super(GroupReplication, self).__init__(proxy)
def get_recovery_mgr(self):
return cg_recovery_manager.CGRecoveryManager(
False, self.proxy.ibm_storage_cli)
def get_remote_recovery_mgr(self):
return volume_recovery_manager.CGRecoveryManager(
True, self.proxy.ibm_storage_remote_cli)
def replication_create_mirror(self, resource_name, replication_info,
target, pool):
LOG.debug('GroupReplication::replication_create_mirror')
schedule = None
if replication_info['rpo']:
schedule = Replication.get_schedule_from_rpo(
replication_info['rpo'])
try:
recovery_mgr = self.get_recovery_mgr()
recovery_mgr.create_mirror(
resource_name=resource_name,
target_name=target,
mirror_type=replication_info['mode'],
slave_resource_name=resource_name,
rpo=replication_info['rpo'],
schedule=schedule,
activate_mirror='yes')
except Exception as e:
details = self.proxy._get_code_and_status_or_message(e)
msg = (_("Failed replication for %(resource)s: '%(details)s'"),
{'resource': resource_name, 'details': details})
LOG.error(msg)
raise self.proxy.meta['exception'].VolumeBackendAPIException(
data=msg)
def failover(self, resource, failback):
LOG.debug("GroupReplication::failover %(cg)s",
{'cg': resource['name']})
recovery_mgr = self.get_recovery_mgr()
remote_recovery_mgr = self.get_remote_recovery_mgr()
return self._failover_resource(resource, recovery_mgr,
remote_recovery_mgr, 'cg', failback)

View File

@ -0,0 +1,4 @@
---
features:
- |
Add consistency group replication support in XIV\A9000 Cinder driver.