Implement unit test for HPE Lefthand driver

1. Added 4 unit tests for migrate_volume() to cover following cases:
   * when volume is on different backend
   * when volume is on different management group
   * volume is exported
   * input volume does not exists in management group
2. Added a unit test for create_volume_from_snapshot() to cover:
   * when replication enabled and snapshot is scheduled
   * when replication enable and getSnapshotByName fails with HTTPNotFound
3. Added unit test for create_cloned_volume() to cover:
   * when replication enabled and volume is created on replica node
   * when replication enabled and volume creation fails with HTTPServerError

Change-Id: Iffa4188513082d298d36cd8d298ed9047ca6646a
This commit is contained in:
Jay Mehta 2016-05-20 19:19:52 -07:00
parent 9f9b3c9be7
commit 45caa0f44a

View File

@ -776,6 +776,75 @@ class TestHPELeftHandISCSIDriver(HPELeftHandBaseDriver, test.TestCase):
# validate call chain
mock_client.assert_has_calls(expected)
@mock.patch.object(volume_types, 'get_volume_type')
def test_create_volume_from_snapshot_with_replication(
self,
mock_get_volume_type):
conf = self.default_mock_conf()
conf.replication_device = self.repl_targets_unmgd
mock_client = self.setup_driver(config=conf)
mock_client.getSnapshotByName.return_value = {'id': self.snapshot_id}
mock_client.cloneSnapshot.return_value = {
'iscsiIqn': self.connector['initiator']}
mock_client.doesRemoteSnapshotScheduleExist.return_value = True
mock_get_volume_type.return_value = {
'name': 'replicated',
'extra_specs': {
'replication_enabled': '<is> True'}}
volume = self.volume.copy()
volume['volume_type_id'] = self.volume_type_id
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
# execute create_volume_from_snapshot for volume replication
# enabled, with snapshot scheduled
model_update = self.driver.create_volume_from_snapshot(
volume, self.snapshot)
expected_iqn = 'iqn.1993-08.org.debian:01:222 0'
expected_location = "10.0.1.6:3260,1 %s" % expected_iqn
self.assertEqual(expected_location,
model_update['provider_location'])
self.assertEqual('enabled', model_update['replication_status'])
# expected calls
expected = self.driver_startup_call_stack + [
mock.call.getSnapshotByName('fakeshapshot'),
mock.call.cloneSnapshot(self.volume['name'], self.snapshot_id),
mock.call.doesRemoteSnapshotScheduleExist(
'fakevolume_SCHED_Pri'),
mock.call.startRemoteSnapshotSchedule('fakevolume_SCHED_Pri'),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_create_volume_from_snapshot_exception(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
mock_client.getSnapshotByName.side_effect = (
hpeexceptions.HTTPNotFound())
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
# testing when getSnapshotByName returns an exception
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.create_volume_from_snapshot,
self.volume, self.snapshot)
# expected calls
expected = self.driver_startup_call_stack + [
mock.call.getSnapshotByName(self.snapshot_name),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_create_cloned_volume(self):
# setup drive with default configuration
@ -808,6 +877,89 @@ class TestHPELeftHandISCSIDriver(HPELeftHandBaseDriver, test.TestCase):
# validate call chain
mock_client.assert_has_calls(expected)
@mock.patch.object(volume_types, 'get_volume_type')
def test_create_cloned_volume_with_replication(self, mock_get_volume_type):
conf = self.default_mock_conf()
conf.replication_device = self.repl_targets_unmgd
mock_client = self.setup_driver(config=conf)
mock_replicated_client = self.setup_driver(config=conf)
mock_client.getVolumeByName.return_value = {'id': self.volume_id}
mock_client.cloneVolume.return_value = {
'iscsiIqn': self.connector['initiator']}
mock_client.doesRemoteSnapshotScheduleExist.return_value = False
mock_get_volume_type.return_value = {
'name': 'replicated',
'extra_specs': {
'replication_enabled': '<is> True'}}
cloned_volume = self.cloned_volume.copy()
cloned_volume['volume_type_id'] = self.volume_type_id
with mock.patch.object(
hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
with mock.patch.object(
hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_replication_client') as mock_replication_client:
mock_do_setup.return_value = mock_client
mock_replication_client.return_value = mock_replicated_client
# execute create_cloned_volume
model_update = self.driver.create_cloned_volume(
cloned_volume, self.volume)
self.assertEqual('enabled',
model_update['replication_status'])
expected = self.driver_startup_call_stack + [
mock.call.getVolumeByName('fakevolume'),
mock.call.cloneVolume('clone_volume', 1),
mock.call.doesRemoteSnapshotScheduleExist(
'clone_volume_SCHED_Pri'),
mock.call.createRemoteSnapshotSchedule(
'clone_volume', 'clone_volume_SCHED', 1800,
'1970-01-01T00:00:00Z', 5, 'CloudCluster1', 5,
'clone_volume', '1.1.1.1', 'foo1', 'bar2'),
mock.call.logout()]
mock_client.assert_has_calls(expected)
expected_calls_replica_client = [
mock.call.createVolume('clone_volume', 1,
cloned_volume['size'] * units.Gi,
None),
mock.call.makeVolumeRemote('clone_volume',
'clone_volume_SS'),
mock.call.getIPFromCluster('CloudCluster1')]
mock_replicated_client.assert_has_calls(
expected_calls_replica_client)
def test_create_cloned_volume_exception(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
mock_client.cloneVolume.side_effect = (
hpeexceptions.HTTPServerError())
mock_client.getVolumeByName.return_value = {'id': self.volume_id}
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.create_cloned_volume,
self.cloned_volume, self.volume)
expected = self.driver_startup_call_stack + [
mock.call.getVolumeByName('fakevolume'),
mock.call.cloneVolume('clone_volume', 1),
mock.call.logout()]
# validate call chain
mock_client.assert_has_calls(expected)
def test_create_cloned_volume_extend(self):
# setup drive with default configuration
@ -1231,6 +1383,209 @@ class TestHPELeftHandISCSIDriver(HPELeftHandBaseDriver, test.TestCase):
len(expected),
len(mock_client.method_calls))
def test_migrate_volume_error_diff_backends(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
fake_driver_loc = 'FakeDriver %(cluster)s %(vip)s'
location = (fake_driver_loc % {'cluster': 'New_CloudCluster',
'vip': '10.10.10.111'})
host = {'host': self.serverName,
'capabilities': {'location_info': location}}
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
(migrated, update) = self.driver.migrate_volume(None,
self.volume,
host)
self.assertFalse(migrated)
expected = self.driver_startup_call_stack + [
mock.call.getClusterByName('New_CloudCluster'),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_migrate_volume_error_diff_management_group(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
location = (self.driver.DRIVER_LOCATION % {'cluster': 'FakeCluster',
'vip': '10.10.10.112'})
host = {'host': self.serverName,
'capabilities': {'location_info': location}}
mock_client.getClusterByName.return_value = {
'id': self.cluster_id,
'virtualIPAddresses': [{'ipV4Address': '10.0.1.6',
'ipV4NetMask': '255.255.240.0'}]
}
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
(migrated, update) = self.driver.migrate_volume(None,
self.volume,
host)
self.assertFalse(migrated)
expected = self.driver_startup_call_stack + [
mock.call.getClusterByName('FakeCluster'),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_migrate_volume_exception_diff_management_group(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
location = (self.driver.DRIVER_LOCATION % {
'cluster': 'New_CloudCluster',
'vip': '10.10.10.111'})
host = {
'host': self.serverName,
'capabilities': {'location_info': location}}
mock_client.getClusterByName.side_effect = [{
'id': self.cluster_id,
'virtualIPAddresses': [{
'ipV4Address': '10.0.1.6',
'ipV4NetMask': '255.255.240.0'}]},
hpeexceptions.HTTPNotFound]
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
(migrated, update) = self.driver.migrate_volume(None,
self.volume,
host)
self.assertFalse(migrated)
expected = self.driver_startup_call_stack + [
mock.call.getClusterByName('New_CloudCluster'),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_migrate_volume_error_exported_volume(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
mock_client.getClusterByName.return_value = {
"virtualIPAddresses": [{
"ipV4Address": "10.10.10.111",
"ipV4NetMask": "255.255.240.0"}],
"id": self.cluster_id
}
mock_client.getVolumeByName.return_value = {
'id': self.volume_id,
'iscsiSessions': [{'server': {'uri': self.server_uri}}]
}
location = (self.driver.DRIVER_LOCATION % {
'cluster': 'New_CloudCluster',
'vip': '10.10.10.111'})
host = {
'host': self.serverName,
'capabilities': {'location_info': location}}
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
(migrated, update) = self.driver.migrate_volume(None,
self.volume,
host)
self.assertFalse(migrated)
expected = self.driver_startup_call_stack + [
mock.call.getClusterByName('New_CloudCluster'),
mock.call.logout()] + self.driver_startup_call_stack + [
mock.call.getVolumeByName(self.volume['name']),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_migrate_volume_error_volume_not_exist(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
mock_client.getClusterByName.return_value = {
"virtualIPAddresses": [{
"ipV4Address": "10.10.10.111",
"ipV4NetMask": "255.255.240.0"}],
"id": self.cluster_id}
mock_client.getVolumeByName.return_value = {
'id': self.volume_id,
'iscsiSessions': None}
mock_client.getVolume.return_value = {'snapshots': {
'resource': 'snapfoo'}}
location = (self.driver.DRIVER_LOCATION % {
'cluster': 'New_CloudCluster',
'vip': '10.10.10.111'})
host = {
'host': self.serverName,
'capabilities': {'location_info': location}}
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
(migrated, update) = self.driver.migrate_volume(
None,
self.volume,
host)
self.assertFalse(migrated)
expected = self.driver_startup_call_stack + [
mock.call.getClusterByName('New_CloudCluster'),
mock.call.logout()] + self.driver_startup_call_stack + [
mock.call.getVolumeByName(self.volume['name']),
mock.call.getVolume(
self.volume['id'],
'fields=snapshots,snapshots[resource[members[name]]]'),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_migrate_volume_exception(self):
# setup driver with default configuration
# and return the mock HTTP LeftHand client
mock_client = self.setup_driver()
mock_client.getClusterByName.return_value = {
"virtualIPAddresses": [{
"ipV4Address": "10.10.10.111",
"ipV4NetMask": "255.255.240.0"}],
"id": self.cluster_id}
mock_client.getVolumeByName.return_value = {
'id': self.volume_id,
'iscsiSessions': None}
mock_client.getVolume.side_effect = hpeexceptions.HTTPServerError()
location = (self.driver.DRIVER_LOCATION % {
'cluster': 'New_CloudCluster',
'vip': '10.10.10.111'})
host = {
'host': self.serverName,
'capabilities': {'location_info': location}}
with mock.patch.object(hpe_lefthand_iscsi.HPELeftHandISCSIDriver,
'_create_client') as mock_do_setup:
mock_do_setup.return_value = mock_client
# Testing for any other HTTPServerError
(migrated, update) = self.driver.migrate_volume(
None,
self.volume,
host)
self.assertFalse(migrated)
expected = self.driver_startup_call_stack + [
mock.call.getClusterByName('New_CloudCluster'),
mock.call.logout()] + self.driver_startup_call_stack + [
mock.call.getVolumeByName(self.volume['name']),
mock.call.getVolume(
self.volume['id'],
'fields=snapshots,snapshots[resource[members[name]]]'),
mock.call.logout()]
mock_client.assert_has_calls(expected)
def test_update_migrated_volume(self):
mock_client = self.setup_driver()
volume_id = 'fake_vol_id'