Merge "Fix dos-style endlines"

This commit is contained in:
Jenkins 2016-12-08 19:40:21 +00:00 committed by Gerrit Code Review
commit 2b1a27da11
9 changed files with 2982 additions and 2982 deletions

View File

@ -1,26 +1,26 @@
# Copyright (c) 2015 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, MetaData, String, Table
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
backups = Table('backups', meta, autoload=True)
restore_volume_id = Column('restore_volume_id', String(length=36))
backups.create_column(restore_volume_id)
# Copyright (c) 2015 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, MetaData, String, Table
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
backups = Table('backups', meta, autoload=True)
restore_volume_id = Column('restore_volume_id', String(length=36))
backups.create_column(restore_volume_id)

View File

@ -1,211 +1,211 @@
# Copyright (c) 2013 - 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from cinder import context
from cinder.tests.unit.consistencygroup import fake_consistencygroup
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
from cinder.tests.unit.volume.drivers.emc.scaleio import mocks
class TestConsistencyGroups(scaleio.TestScaleIODriver):
"""Test cases for ``ScaleIODriver consistency groups support``"""
def setUp(self):
"""Setup a test case environment.
Creates a fake volume object and sets up the required API responses.
"""
super(TestConsistencyGroups, self).setUp()
self.ctx = context.RequestContext('fake', 'fake', auth_token=True)
self.consistency_group = (
fake_consistencygroup.fake_consistencyobject_obj(
self.ctx, **{'id': fake.CONSISTENCY_GROUP_ID}))
fake_volume1 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME_ID, 'provider_id': fake.PROVIDER_ID})
fake_volume2 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME2_ID, 'provider_id': fake.PROVIDER2_ID})
fake_volume3 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME3_ID, 'provider_id': fake.PROVIDER3_ID})
fake_volume4 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME4_ID, 'provider_id': fake.PROVIDER4_ID})
self.volumes = [fake_volume1, fake_volume2]
self.volumes2 = [fake_volume3, fake_volume4]
fake_snapshot1 = fake_snapshot.fake_snapshot_obj(
self.ctx,
**{'id': fake.SNAPSHOT_ID, 'volume_id': fake.VOLUME_ID,
'volume': fake_volume1})
fake_snapshot2 = fake_snapshot.fake_snapshot_obj(
self.ctx,
**{'id': fake.SNAPSHOT2_ID, 'volume_id': fake.VOLUME2_ID, 'volume':
fake_volume2})
self.snapshots = [fake_snapshot1, fake_snapshot2]
self.snapshot_reply = json.dumps({
'volumeIdList': ['sid1', 'sid2'],
'snapshotGroupId': 'sgid1'})
self.HTTPS_MOCK_RESPONSES = {
self.RESPONSE_MODE.Valid: {
'instances/Volume::{}/action/removeVolume'.format(
fake_volume1['provider_id']
): fake_volume1['provider_id'],
'instances/Volume::{}/action/removeVolume'.format(
fake_volume2['provider_id']
): fake_volume2['provider_id'],
'instances/Volume::{}/action/removeMappedSdc'.format(
fake_volume1['provider_id']
): fake_volume1['provider_id'],
'instances/Volume::{}/action/removeMappedSdc'.format(
fake_volume2['provider_id']
): fake_volume2['provider_id'],
'instances/System/action/snapshotVolumes':
self.snapshot_reply,
},
self.RESPONSE_MODE.BadStatus: {
'instances/Volume::{}/action/removeVolume'.format(
fake_volume1['provider_id']
): mocks.MockHTTPSResponse(
{
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401
),
'instances/Volume::{}/action/removeVolume'.format(
fake_volume2['provider_id']
): mocks.MockHTTPSResponse(
{
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401
),
'instances/System/action/snapshotVolumes':
self.BAD_STATUS_RESPONSE
},
}
def _fake_cgsnapshot(self):
cgsnap = {'id': 'cgsid', 'name': 'testsnap',
'consistencygroup_id': fake.CONSISTENCY_GROUP_ID,
'status': 'available'}
return cgsnap
def test_create_consistencygroup(self):
result = self.driver.create_consistencygroup(self.ctx,
self.consistency_group)
self.assertEqual('available', result['status'])
def test_delete_consistencygroup_valid(self):
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
self.driver.configuration.set_override(
'sio_unmap_volume_before_deletion',
override=True)
result_model_update, result_volumes_update = (
self.driver.delete_consistencygroup(self.ctx,
self.consistency_group,
self.volumes))
self.assertTrue(all(volume['status'] == 'deleted' for volume in
result_volumes_update))
self.assertEqual('deleted', result_model_update['status'])
def test_delete_consistency_group_fail(self):
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
result_model_update, result_volumes_update = (
self.driver.delete_consistencygroup(self.ctx,
self.consistency_group,
self.volumes))
self.assertTrue(any(volume['status'] == 'error_deleting' for volume in
result_volumes_update))
self.assertIn(result_model_update['status'],
['error_deleting', 'error'])
def test_create_consistencygroup_from_cg(self):
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_volumes_model_update = (
self.driver.create_consistencygroup_from_src(
self.ctx, self.consistency_group, self.volumes2,
source_cg=self.consistency_group, source_vols=self.volumes))
self.assertEqual('available', result_model_update['status'])
get_pid = lambda snapshot: snapshot['provider_id']
volume_provider_list = list(map(get_pid, result_volumes_model_update))
self.assertListEqual(volume_provider_list, ['sid1', 'sid2'])
def test_create_consistencygroup_from_cgs(self):
self.snapshots[0]['provider_id'] = fake.PROVIDER_ID
self.snapshots[1]['provider_id'] = fake.PROVIDER2_ID
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_volumes_model_update = (
self.driver.create_consistencygroup_from_src(
self.ctx, self.consistency_group, self.volumes2,
cgsnapshot=self._fake_cgsnapshot(),
snapshots=self.snapshots))
self.assertEqual('available', result_model_update['status'])
get_pid = lambda snapshot: snapshot['provider_id']
volume_provider_list = list(map(get_pid, result_volumes_model_update))
self.assertListEqual(['sid1', 'sid2'], volume_provider_list)
@mock.patch('cinder.objects.snapshot')
@mock.patch('cinder.objects.snapshot')
def test_create_cgsnapshots(self, snapshot1, snapshot2):
type(snapshot1).volume = mock.PropertyMock(
return_value=self.volumes[0])
type(snapshot2).volume = mock.PropertyMock(
return_value=self.volumes[1])
snapshots = [snapshot1, snapshot2]
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_snapshot_model_update = (
self.driver.create_cgsnapshot(
self.ctx,
self._fake_cgsnapshot(),
snapshots
))
self.assertEqual('available', result_model_update['status'])
self.assertTrue(all(snapshot['status'] == 'available' for snapshot in
result_snapshot_model_update))
get_pid = lambda snapshot: snapshot['provider_id']
snapshot_provider_list = list(map(get_pid,
result_snapshot_model_update))
self.assertListEqual(['sid1', 'sid2'], snapshot_provider_list)
@mock.patch('cinder.objects.snapshot')
@mock.patch('cinder.objects.snapshot')
def test_delete_cgsnapshots(self, snapshot1, snapshot2):
type(snapshot1).volume = mock.PropertyMock(
return_value=self.volumes[0])
type(snapshot2).volume = mock.PropertyMock(
return_value=self.volumes[1])
type(snapshot1).provider_id = mock.PropertyMock(
return_value=fake.PROVIDER_ID)
type(snapshot2).provider_id = mock.PropertyMock(
return_value=fake.PROVIDER2_ID)
snapshots = [snapshot1, snapshot2]
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_snapshot_model_update = (
self.driver.delete_cgsnapshot(
self.ctx,
self._fake_cgsnapshot(),
snapshots
))
self.assertEqual('deleted', result_model_update['status'])
self.assertTrue(all(snapshot['status'] == 'deleted' for snapshot in
result_snapshot_model_update))
# Copyright (c) 2013 - 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from cinder import context
from cinder.tests.unit.consistencygroup import fake_consistencygroup
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
from cinder.tests.unit.volume.drivers.emc.scaleio import mocks
class TestConsistencyGroups(scaleio.TestScaleIODriver):
"""Test cases for ``ScaleIODriver consistency groups support``"""
def setUp(self):
"""Setup a test case environment.
Creates a fake volume object and sets up the required API responses.
"""
super(TestConsistencyGroups, self).setUp()
self.ctx = context.RequestContext('fake', 'fake', auth_token=True)
self.consistency_group = (
fake_consistencygroup.fake_consistencyobject_obj(
self.ctx, **{'id': fake.CONSISTENCY_GROUP_ID}))
fake_volume1 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME_ID, 'provider_id': fake.PROVIDER_ID})
fake_volume2 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME2_ID, 'provider_id': fake.PROVIDER2_ID})
fake_volume3 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME3_ID, 'provider_id': fake.PROVIDER3_ID})
fake_volume4 = fake_volume.fake_volume_obj(
self.ctx,
**{'id': fake.VOLUME4_ID, 'provider_id': fake.PROVIDER4_ID})
self.volumes = [fake_volume1, fake_volume2]
self.volumes2 = [fake_volume3, fake_volume4]
fake_snapshot1 = fake_snapshot.fake_snapshot_obj(
self.ctx,
**{'id': fake.SNAPSHOT_ID, 'volume_id': fake.VOLUME_ID,
'volume': fake_volume1})
fake_snapshot2 = fake_snapshot.fake_snapshot_obj(
self.ctx,
**{'id': fake.SNAPSHOT2_ID, 'volume_id': fake.VOLUME2_ID, 'volume':
fake_volume2})
self.snapshots = [fake_snapshot1, fake_snapshot2]
self.snapshot_reply = json.dumps({
'volumeIdList': ['sid1', 'sid2'],
'snapshotGroupId': 'sgid1'})
self.HTTPS_MOCK_RESPONSES = {
self.RESPONSE_MODE.Valid: {
'instances/Volume::{}/action/removeVolume'.format(
fake_volume1['provider_id']
): fake_volume1['provider_id'],
'instances/Volume::{}/action/removeVolume'.format(
fake_volume2['provider_id']
): fake_volume2['provider_id'],
'instances/Volume::{}/action/removeMappedSdc'.format(
fake_volume1['provider_id']
): fake_volume1['provider_id'],
'instances/Volume::{}/action/removeMappedSdc'.format(
fake_volume2['provider_id']
): fake_volume2['provider_id'],
'instances/System/action/snapshotVolumes':
self.snapshot_reply,
},
self.RESPONSE_MODE.BadStatus: {
'instances/Volume::{}/action/removeVolume'.format(
fake_volume1['provider_id']
): mocks.MockHTTPSResponse(
{
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401
),
'instances/Volume::{}/action/removeVolume'.format(
fake_volume2['provider_id']
): mocks.MockHTTPSResponse(
{
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401
),
'instances/System/action/snapshotVolumes':
self.BAD_STATUS_RESPONSE
},
}
def _fake_cgsnapshot(self):
cgsnap = {'id': 'cgsid', 'name': 'testsnap',
'consistencygroup_id': fake.CONSISTENCY_GROUP_ID,
'status': 'available'}
return cgsnap
def test_create_consistencygroup(self):
result = self.driver.create_consistencygroup(self.ctx,
self.consistency_group)
self.assertEqual('available', result['status'])
def test_delete_consistencygroup_valid(self):
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
self.driver.configuration.set_override(
'sio_unmap_volume_before_deletion',
override=True)
result_model_update, result_volumes_update = (
self.driver.delete_consistencygroup(self.ctx,
self.consistency_group,
self.volumes))
self.assertTrue(all(volume['status'] == 'deleted' for volume in
result_volumes_update))
self.assertEqual('deleted', result_model_update['status'])
def test_delete_consistency_group_fail(self):
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
result_model_update, result_volumes_update = (
self.driver.delete_consistencygroup(self.ctx,
self.consistency_group,
self.volumes))
self.assertTrue(any(volume['status'] == 'error_deleting' for volume in
result_volumes_update))
self.assertIn(result_model_update['status'],
['error_deleting', 'error'])
def test_create_consistencygroup_from_cg(self):
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_volumes_model_update = (
self.driver.create_consistencygroup_from_src(
self.ctx, self.consistency_group, self.volumes2,
source_cg=self.consistency_group, source_vols=self.volumes))
self.assertEqual('available', result_model_update['status'])
get_pid = lambda snapshot: snapshot['provider_id']
volume_provider_list = list(map(get_pid, result_volumes_model_update))
self.assertListEqual(volume_provider_list, ['sid1', 'sid2'])
def test_create_consistencygroup_from_cgs(self):
self.snapshots[0]['provider_id'] = fake.PROVIDER_ID
self.snapshots[1]['provider_id'] = fake.PROVIDER2_ID
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_volumes_model_update = (
self.driver.create_consistencygroup_from_src(
self.ctx, self.consistency_group, self.volumes2,
cgsnapshot=self._fake_cgsnapshot(),
snapshots=self.snapshots))
self.assertEqual('available', result_model_update['status'])
get_pid = lambda snapshot: snapshot['provider_id']
volume_provider_list = list(map(get_pid, result_volumes_model_update))
self.assertListEqual(['sid1', 'sid2'], volume_provider_list)
@mock.patch('cinder.objects.snapshot')
@mock.patch('cinder.objects.snapshot')
def test_create_cgsnapshots(self, snapshot1, snapshot2):
type(snapshot1).volume = mock.PropertyMock(
return_value=self.volumes[0])
type(snapshot2).volume = mock.PropertyMock(
return_value=self.volumes[1])
snapshots = [snapshot1, snapshot2]
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_snapshot_model_update = (
self.driver.create_cgsnapshot(
self.ctx,
self._fake_cgsnapshot(),
snapshots
))
self.assertEqual('available', result_model_update['status'])
self.assertTrue(all(snapshot['status'] == 'available' for snapshot in
result_snapshot_model_update))
get_pid = lambda snapshot: snapshot['provider_id']
snapshot_provider_list = list(map(get_pid,
result_snapshot_model_update))
self.assertListEqual(['sid1', 'sid2'], snapshot_provider_list)
@mock.patch('cinder.objects.snapshot')
@mock.patch('cinder.objects.snapshot')
def test_delete_cgsnapshots(self, snapshot1, snapshot2):
type(snapshot1).volume = mock.PropertyMock(
return_value=self.volumes[0])
type(snapshot2).volume = mock.PropertyMock(
return_value=self.volumes[1])
type(snapshot1).provider_id = mock.PropertyMock(
return_value=fake.PROVIDER_ID)
type(snapshot2).provider_id = mock.PropertyMock(
return_value=fake.PROVIDER2_ID)
snapshots = [snapshot1, snapshot2]
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result_model_update, result_snapshot_model_update = (
self.driver.delete_cgsnapshot(
self.ctx,
self._fake_cgsnapshot(),
snapshots
))
self.assertEqual('deleted', result_model_update['status'])
self.assertTrue(all(snapshot['status'] == 'deleted' for snapshot in
result_snapshot_model_update))

View File

@ -1,116 +1,116 @@
# Copyright (c) 2015 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinder import context
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
class TestInitializeConnection(scaleio.TestScaleIODriver):
def setUp(self):
"""Setup a test case environment."""
super(TestInitializeConnection, self).setUp()
self.connector = {}
self.ctx = (
context.RequestContext('fake', 'fake', True, auth_token=True))
self.volume = fake_volume.fake_volume_obj(
self.ctx, **{'provider_id': fake.PROVIDER_ID})
def test_only_qos(self):
qos = {'maxIOPS': 1000, 'maxBWS': 2048}
extraspecs = {}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(1000, int(connection_properties['iopsLimit']))
self.assertEqual(2048, int(connection_properties['bandwidthLimit']))
def test_no_qos(self):
qos = {}
extraspecs = {}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertIsNone(connection_properties['iopsLimit'])
self.assertIsNone(connection_properties['bandwidthLimit'])
def test_only_extraspecs(self):
qos = {}
extraspecs = {'sio:iops_limit': 2000, 'sio:bandwidth_limit': 4096}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(2000, int(connection_properties['iopsLimit']))
self.assertEqual(4096, int(connection_properties['bandwidthLimit']))
def test_qos_and_extraspecs(self):
qos = {'maxIOPS': 1000, 'maxBWS': 3072}
extraspecs = {'sio:iops_limit': 2000, 'sio:bandwidth_limit': 4000}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(1000, int(connection_properties['iopsLimit']))
self.assertEqual(3072, int(connection_properties['bandwidthLimit']))
def test_qos_scaling_and_max(self):
qos = {'maxIOPS': 100, 'maxBWS': 2048, 'maxIOPSperGB': 10,
'maxBWSperGB': 128}
extraspecs = {}
self.volume.size = 8
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(80, int(connection_properties['iopsLimit']))
self.assertEqual(1024, int(connection_properties['bandwidthLimit']))
self.volume.size = 24
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(100, int(connection_properties['iopsLimit']))
self.assertEqual(2048, int(connection_properties['bandwidthLimit']))
def test_qos_scaling_no_max(self):
qos = {'maxIOPSperGB': 10, 'maxBWSperGB': 128}
extraspecs = {}
self.volume.size = 8
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(80, int(connection_properties['iopsLimit']))
self.assertEqual(1024, int(connection_properties['bandwidthLimit']))
def test_qos_round_up(self):
qos = {'maxBWS': 2000, 'maxBWSperGB': 100}
extraspecs = {}
self.volume.size = 8
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(1024, int(connection_properties['bandwidthLimit']))
self.volume.size = 24
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(2048, int(connection_properties['bandwidthLimit']))
def test_vol_id(self):
extraspecs = qos = {}
connection_properties = (
self._initialize_connection(extraspecs, qos)['data'])
self.assertEqual(fake.PROVIDER_ID,
connection_properties['scaleIO_volume_id'])
def _initialize_connection(self, qos, extraspecs):
self.driver._get_volumetype_qos = mock.MagicMock()
self.driver._get_volumetype_qos.return_value = qos
self.driver._get_volumetype_extraspecs = mock.MagicMock()
self.driver._get_volumetype_extraspecs.return_value = extraspecs
return self.driver.initialize_connection(self.volume, self.connector)
# Copyright (c) 2015 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinder import context
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
class TestInitializeConnection(scaleio.TestScaleIODriver):
def setUp(self):
"""Setup a test case environment."""
super(TestInitializeConnection, self).setUp()
self.connector = {}
self.ctx = (
context.RequestContext('fake', 'fake', True, auth_token=True))
self.volume = fake_volume.fake_volume_obj(
self.ctx, **{'provider_id': fake.PROVIDER_ID})
def test_only_qos(self):
qos = {'maxIOPS': 1000, 'maxBWS': 2048}
extraspecs = {}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(1000, int(connection_properties['iopsLimit']))
self.assertEqual(2048, int(connection_properties['bandwidthLimit']))
def test_no_qos(self):
qos = {}
extraspecs = {}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertIsNone(connection_properties['iopsLimit'])
self.assertIsNone(connection_properties['bandwidthLimit'])
def test_only_extraspecs(self):
qos = {}
extraspecs = {'sio:iops_limit': 2000, 'sio:bandwidth_limit': 4096}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(2000, int(connection_properties['iopsLimit']))
self.assertEqual(4096, int(connection_properties['bandwidthLimit']))
def test_qos_and_extraspecs(self):
qos = {'maxIOPS': 1000, 'maxBWS': 3072}
extraspecs = {'sio:iops_limit': 2000, 'sio:bandwidth_limit': 4000}
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(1000, int(connection_properties['iopsLimit']))
self.assertEqual(3072, int(connection_properties['bandwidthLimit']))
def test_qos_scaling_and_max(self):
qos = {'maxIOPS': 100, 'maxBWS': 2048, 'maxIOPSperGB': 10,
'maxBWSperGB': 128}
extraspecs = {}
self.volume.size = 8
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(80, int(connection_properties['iopsLimit']))
self.assertEqual(1024, int(connection_properties['bandwidthLimit']))
self.volume.size = 24
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(100, int(connection_properties['iopsLimit']))
self.assertEqual(2048, int(connection_properties['bandwidthLimit']))
def test_qos_scaling_no_max(self):
qos = {'maxIOPSperGB': 10, 'maxBWSperGB': 128}
extraspecs = {}
self.volume.size = 8
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(80, int(connection_properties['iopsLimit']))
self.assertEqual(1024, int(connection_properties['bandwidthLimit']))
def test_qos_round_up(self):
qos = {'maxBWS': 2000, 'maxBWSperGB': 100}
extraspecs = {}
self.volume.size = 8
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(1024, int(connection_properties['bandwidthLimit']))
self.volume.size = 24
connection_properties = (
self._initialize_connection(qos, extraspecs)['data'])
self.assertEqual(2048, int(connection_properties['bandwidthLimit']))
def test_vol_id(self):
extraspecs = qos = {}
connection_properties = (
self._initialize_connection(extraspecs, qos)['data'])
self.assertEqual(fake.PROVIDER_ID,
connection_properties['scaleIO_volume_id'])
def _initialize_connection(self, qos, extraspecs):
self.driver._get_volumetype_qos = mock.MagicMock()
self.driver._get_volumetype_qos.return_value = qos
self.driver._get_volumetype_extraspecs = mock.MagicMock()
self.driver._get_volumetype_extraspecs.return_value = extraspecs
return self.driver.initialize_connection(self.volume, self.connector)

View File

@ -1,127 +1,127 @@
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder import context
from cinder import exception
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
from cinder.tests.unit.volume.drivers.emc.scaleio import mocks
from cinder.volume import volume_types
from mock import patch
from six.moves import urllib
class TestManageExisting(scaleio.TestScaleIODriver):
"""Test cases for ``ScaleIODriver.manage_existing()``"""
def setUp(self):
"""Setup a test case environment.
Creates a fake volume object and sets up the required API responses.
"""
super(TestManageExisting, self).setUp()
ctx = context.RequestContext('fake', 'fake', auth_token=True)
self.volume = fake_volume.fake_volume_obj(
ctx, **{'provider_id': fake.PROVIDER_ID})
self.volume_attached = fake_volume.fake_volume_obj(
ctx, **{'provider_id': fake.PROVIDER2_ID})
self.volume_no_provider_id = fake_volume.fake_volume_obj(ctx)
self.volume_name_2x_enc = urllib.parse.quote(
urllib.parse.quote(self.driver._id_to_base64(self.volume.id))
)
self.HTTPS_MOCK_RESPONSES = {
self.RESPONSE_MODE.Valid: {
'instances/Volume::' + self.volume['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER_ID,
'sizeInKb': 8000000,
'mappedSdcInfo': None
}, 200)
},
self.RESPONSE_MODE.BadStatus: {
'instances/Volume::' + self.volume['provider_id']:
mocks.MockHTTPSResponse({
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401),
'instances/Volume::' + self.volume_attached['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER2_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': 'Mapped'
}, 200)
}
}
def test_no_source_id(self):
existing_ref = {'source-name': 'scaleioVolName'}
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing, self.volume,
existing_ref)
def test_no_type_id(self):
self.volume['volume_type_id'] = None
existing_ref = {'source-id': fake.PROVIDER_ID}
self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
self.driver.manage_existing, self.volume,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_volume_not_found(self, _mock_volume_type):
self.volume['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing, self.volume,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_volume_attached(self, _mock_volume_type):
self.volume_attached['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing, self.volume_attached,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_get_size_calc(self, _mock_volume_type):
self.volume['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER_ID}
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result = self.driver.manage_existing_get_size(self.volume,
existing_ref)
self.assertEqual(8, result)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_existing_valid(self, _mock_volume_type):
self.volume['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER_ID}
result = self.driver.manage_existing(self.volume, existing_ref)
self.assertEqual(fake.PROVIDER_ID, result['provider_id'])
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder import context
from cinder import exception
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
from cinder.tests.unit.volume.drivers.emc.scaleio import mocks
from cinder.volume import volume_types
from mock import patch
from six.moves import urllib
class TestManageExisting(scaleio.TestScaleIODriver):
"""Test cases for ``ScaleIODriver.manage_existing()``"""
def setUp(self):
"""Setup a test case environment.
Creates a fake volume object and sets up the required API responses.
"""
super(TestManageExisting, self).setUp()
ctx = context.RequestContext('fake', 'fake', auth_token=True)
self.volume = fake_volume.fake_volume_obj(
ctx, **{'provider_id': fake.PROVIDER_ID})
self.volume_attached = fake_volume.fake_volume_obj(
ctx, **{'provider_id': fake.PROVIDER2_ID})
self.volume_no_provider_id = fake_volume.fake_volume_obj(ctx)
self.volume_name_2x_enc = urllib.parse.quote(
urllib.parse.quote(self.driver._id_to_base64(self.volume.id))
)
self.HTTPS_MOCK_RESPONSES = {
self.RESPONSE_MODE.Valid: {
'instances/Volume::' + self.volume['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER_ID,
'sizeInKb': 8000000,
'mappedSdcInfo': None
}, 200)
},
self.RESPONSE_MODE.BadStatus: {
'instances/Volume::' + self.volume['provider_id']:
mocks.MockHTTPSResponse({
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401),
'instances/Volume::' + self.volume_attached['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER2_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': 'Mapped'
}, 200)
}
}
def test_no_source_id(self):
existing_ref = {'source-name': 'scaleioVolName'}
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing, self.volume,
existing_ref)
def test_no_type_id(self):
self.volume['volume_type_id'] = None
existing_ref = {'source-id': fake.PROVIDER_ID}
self.assertRaises(exception.ManageExistingVolumeTypeMismatch,
self.driver.manage_existing, self.volume,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_volume_not_found(self, _mock_volume_type):
self.volume['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing, self.volume,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_volume_attached(self, _mock_volume_type):
self.volume_attached['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing, self.volume_attached,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_get_size_calc(self, _mock_volume_type):
self.volume['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER_ID}
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result = self.driver.manage_existing_get_size(self.volume,
existing_ref)
self.assertEqual(8, result)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_existing_valid(self, _mock_volume_type):
self.volume['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER_ID}
result = self.driver.manage_existing(self.volume, existing_ref)
self.assertEqual(fake.PROVIDER_ID, result['provider_id'])

View File

@ -1,154 +1,154 @@
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
from cinder import context
from cinder import exception
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
from cinder.tests.unit.volume.drivers.emc.scaleio import mocks
from cinder.volume import volume_types
class TestManageExistingSnapshot(scaleio.TestScaleIODriver):
"""Test cases for ``ScaleIODriver.manage_existing_snapshot()``"""
def setUp(self):
"""Setup a test case environment.
Creates a fake volume object and sets up the required API responses.
"""
super(TestManageExistingSnapshot, self).setUp()
ctx = context.RequestContext('fake', 'fake', auth_token=True)
self.volume = fake_volume.fake_volume_obj(
ctx, **{'provider_id': fake.PROVIDER_ID})
self.snapshot = fake_snapshot.fake_snapshot_obj(
ctx, **{'provider_id': fake.PROVIDER2_ID})
self.snapshot2 = fake_snapshot.fake_snapshot_obj(
ctx, **{'provider_id': fake.PROVIDER3_ID})
self.snapshot.volume = self.snapshot2.volume = self.volume
self.snapshot['volume_type_id'] = fake.VOLUME_TYPE_ID
self.snapshot2['volume_type_id'] = fake.VOLUME_TYPE_ID
self.snapshot_attached = fake_snapshot.fake_snapshot_obj(
ctx, **{'provider_id': fake.PROVIDER3_ID})
self.HTTPS_MOCK_RESPONSES = {
self.RESPONSE_MODE.Valid: {
'instances/Volume::' + self.volume['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': None,
'ancestorVolumeId': None
}, 200),
'instances/Volume::' + self.snapshot['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER2_ID,
'sizeInKb': 8000000,
'mappedSdcInfo': None,
'ancestorVolumeId': fake.PROVIDER_ID
}, 200),
'instances/Volume::' + self.snapshot2['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER3_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': None,
'ancestorVolumeId': fake.PROVIDER2_ID
}, 200)
},
self.RESPONSE_MODE.BadStatus: {
'instances/Volume::' + self.snapshot['provider_id']:
mocks.MockHTTPSResponse({
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401),
'instances/Volume::' + self.snapshot2['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER3_ID,
'sizeInKb': 8388608,
'ancestorVolumeId': fake.PROVIDER2_ID
}, 200),
'instances/Volume::' + self.snapshot_attached['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER3_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': 'Mapped',
'ancestorVolumeId': fake.PROVIDER_ID
}, 200)
}
}
def test_no_source_id(self):
existing_ref = {'source-name': 'scaleioSnapName'}
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot, self.snapshot,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_snapshot_not_found(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot, self.snapshot,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_snapshot_attached(self, _mock_volume_type):
self.snapshot_attached['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot,
self.snapshot_attached, existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_different_ancestor(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER3_ID}
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot,
self.snapshot2, existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_snapshot_get_size_calc(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result = self.driver.manage_existing_snapshot_get_size(
self.snapshot, existing_ref)
self.assertEqual(8, result)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_existing_snapshot_valid(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER2_ID}
result = self.driver.manage_existing_snapshot(
self.snapshot, existing_ref)
self.assertEqual(fake.PROVIDER2_ID, result['provider_id'])
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
from cinder import context
from cinder import exception
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.drivers.emc import scaleio
from cinder.tests.unit.volume.drivers.emc.scaleio import mocks
from cinder.volume import volume_types
class TestManageExistingSnapshot(scaleio.TestScaleIODriver):
"""Test cases for ``ScaleIODriver.manage_existing_snapshot()``"""
def setUp(self):
"""Setup a test case environment.
Creates a fake volume object and sets up the required API responses.
"""
super(TestManageExistingSnapshot, self).setUp()
ctx = context.RequestContext('fake', 'fake', auth_token=True)
self.volume = fake_volume.fake_volume_obj(
ctx, **{'provider_id': fake.PROVIDER_ID})
self.snapshot = fake_snapshot.fake_snapshot_obj(
ctx, **{'provider_id': fake.PROVIDER2_ID})
self.snapshot2 = fake_snapshot.fake_snapshot_obj(
ctx, **{'provider_id': fake.PROVIDER3_ID})
self.snapshot.volume = self.snapshot2.volume = self.volume
self.snapshot['volume_type_id'] = fake.VOLUME_TYPE_ID
self.snapshot2['volume_type_id'] = fake.VOLUME_TYPE_ID
self.snapshot_attached = fake_snapshot.fake_snapshot_obj(
ctx, **{'provider_id': fake.PROVIDER3_ID})
self.HTTPS_MOCK_RESPONSES = {
self.RESPONSE_MODE.Valid: {
'instances/Volume::' + self.volume['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': None,
'ancestorVolumeId': None
}, 200),
'instances/Volume::' + self.snapshot['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER2_ID,
'sizeInKb': 8000000,
'mappedSdcInfo': None,
'ancestorVolumeId': fake.PROVIDER_ID
}, 200),
'instances/Volume::' + self.snapshot2['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER3_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': None,
'ancestorVolumeId': fake.PROVIDER2_ID
}, 200)
},
self.RESPONSE_MODE.BadStatus: {
'instances/Volume::' + self.snapshot['provider_id']:
mocks.MockHTTPSResponse({
'errorCode': 401,
'message': 'BadStatus Volume Test',
}, 401),
'instances/Volume::' + self.snapshot2['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER3_ID,
'sizeInKb': 8388608,
'ancestorVolumeId': fake.PROVIDER2_ID
}, 200),
'instances/Volume::' + self.snapshot_attached['provider_id']:
mocks.MockHTTPSResponse({
'id': fake.PROVIDER3_ID,
'sizeInKb': 8388608,
'mappedSdcInfo': 'Mapped',
'ancestorVolumeId': fake.PROVIDER_ID
}, 200)
}
}
def test_no_source_id(self):
existing_ref = {'source-name': 'scaleioSnapName'}
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot, self.snapshot,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_snapshot_not_found(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot, self.snapshot,
existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_snapshot_attached(self, _mock_volume_type):
self.snapshot_attached['volume_type_id'] = fake.VOLUME_TYPE_ID
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.BadStatus)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot,
self.snapshot_attached, existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_different_ancestor(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER3_ID}
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_snapshot,
self.snapshot2, existing_ref)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_snapshot_get_size_calc(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER2_ID}
self.set_https_response_mode(self.RESPONSE_MODE.Valid)
result = self.driver.manage_existing_snapshot_get_size(
self.snapshot, existing_ref)
self.assertEqual(8, result)
@patch.object(
volume_types,
'get_volume_type',
return_value={'extra_specs': {'volume_backend_name': 'ScaleIO'}})
def test_manage_existing_snapshot_valid(self, _mock_volume_type):
existing_ref = {'source-id': fake.PROVIDER2_ID}
result = self.driver.manage_existing_snapshot(
self.snapshot, existing_ref)
self.assertEqual(fake.PROVIDER2_ID, result['provider_id'])

File diff suppressed because it is too large Load Diff

View File

@ -1,447 +1,447 @@
# Copyright (c) 2013 - 2016 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for Huawei FusionStorage drivers.
"""
import mock
from cinder import test
from cinder import utils
from cinder.volume.drivers.fusionstorage import fspythonapi
class FSPythonApiTestCase(test.TestCase):
def setUp(self):
super(FSPythonApiTestCase, self).setUp()
self.api = fspythonapi.FSPythonApi()
@mock.patch.object(fspythonapi.FSPythonApi, 'get_ip_port')
@mock.patch.object(fspythonapi.FSPythonApi, 'get_manage_ip')
@mock.patch.object(utils, 'execute')
def test_start_execute_cmd(self, mock_execute,
mock_get_manage_ip, mock_get_ip_port):
result1 = ['result=0\ndesc=success\n', '']
result2 = ['result=50150007\ndesc=volume does not exist\n', '']
result3 = ['result=50150008\ndesc=volume is being deleted\n', '']
result4 = ['result=50\ndesc=exception\n', '']
cmd = 'abcdef'
mock_get_ip_port.return_value = ['127.0.0.1', '128.0.0.1']
mock_get_manage_ip.return_value = '127.0.0.1'
mock_execute.return_value = result1
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=0', retval)
mock_execute.return_value = result2
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=0', retval)
mock_execute.return_value = result3
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=0', retval)
mock_execute.return_value = result4
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=50', retval)
mock_execute.return_value = result1
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual(['result=0', 'desc=success', ''], retval)
mock_execute.return_value = result2
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual('result=0', retval)
mock_execute.return_value = result3
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual('result=0', retval)
mock_execute.return_value = result4
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual(['result=50', 'desc=exception', ''], retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_volume(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_volume('volume_name', 'pool_id-123', 1024, 0)
self.assertEqual(0, retval)
retval = self.api.create_volume('volume_name', 'pool_id-123', 1024, 0)
self.assertEqual('50150007\n', retval)
retval = self.api.create_volume('volume_name', 'pool_id-123', 1024, 0)
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_extend_volume(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.extend_volume('volume_name', 1024)
self.assertEqual(0, retval)
retval = self.api.extend_volume('volume_name', 1024)
self.assertEqual('50150007\n', retval)
retval = self.api.extend_volume('volume_name', 1024)
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_volume_from_snap(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_volume_from_snap('volume_name', 1024,
'snap_name')
self.assertEqual(0, retval)
retval = self.api.create_volume_from_snap('volume_name', 1024,
'snap_name')
self.assertEqual('50150007\n', retval)
retval = self.api.create_volume_from_snap('volume_name', 1024,
'snap_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_fullvol_from_snap(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_fullvol_from_snap('volume_name', 'snap_name')
self.assertEqual(0, retval)
retval = self.api.create_fullvol_from_snap('volume_name', 'snap_name')
self.assertEqual('50150007\n', retval)
retval = self.api.create_fullvol_from_snap('volume_name', 'snap_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'create_snapshot')
@mock.patch.object(fspythonapi.FSPythonApi, 'create_volume')
@mock.patch.object(fspythonapi.FSPythonApi, 'delete_snapshot')
@mock.patch.object(fspythonapi.FSPythonApi, 'delete_volume')
@mock.patch.object(fspythonapi.FSPythonApi, 'create_fullvol_from_snap')
def test_create_volume_from_volume(self, mock_create_fullvol,
mock_delete_volume, mock_delete_snap,
mock_create_volume, mock_create_snap):
mock_create_snap.return_value = 0
mock_create_volume.return_value = 0
mock_create_fullvol.return_value = 0
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(0, retval)
mock_create_snap.return_value = 1
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
mock_create_snap.return_value = 0
mock_create_volume.return_value = 1
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
mock_create_volume.return_value = 0
self.api.create_fullvol_from_snap.return_value = 1
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'create_snapshot')
@mock.patch.object(fspythonapi.FSPythonApi, 'create_volume_from_snap')
def test_create_clone_volume_from_volume(self, mock_volume, mock_snap):
mock_snap.side_effect = [0, 1]
mock_volume.side_effect = [0, 1]
retval = self.api.create_clone_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(0, retval)
retval = self.api.create_clone_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
def test_volume_info_analyze_success(self):
vol_info = ('vol_name=vol1,father_name=vol1_father,'
'status=available,vol_size=1024,real_size=1024,'
'pool_id=pool1,create_time=01/01/2015')
vol_info_res = {'result': 0, 'vol_name': 'vol1',
'father_name': 'vol1_father',
'status': 'available', 'vol_size': '1024',
'real_size': '1024', 'pool_id': 'pool1',
'create_time': '01/01/2015'}
retval = self.api.volume_info_analyze(vol_info)
self.assertEqual(vol_info_res, retval)
def test_volume_info_analyze_fail(self):
vol_info = ''
vol_info_res = {'result': 1, 'vol_name': '', 'father_name': '',
'status': '', 'vol_size': '', 'real_size': '',
'pool_id': '', 'create_time': ''}
retval = self.api.volume_info_analyze(vol_info)
self.assertEqual(vol_info_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
@mock.patch.object(fspythonapi.FSPythonApi, 'volume_info_analyze')
@mock.patch.object(fspythonapi.FSPythonApi, 'delete_snapshot')
def test_query_volume(self, mock_delete, mock_analyze, mock_execute):
exec_result = ['result=0\n',
'vol_name=vol1,father_name=vol1_father,status=0,' +
'vol_size=1024,real_size=1024,pool_id=pool1,' +
'create_time=01/01/2015']
query_result = {'result': 0, 'vol_name': 'vol1',
'father_name': 'vol1_father', 'status': '0',
'vol_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'create_time': '01/01/2015'}
mock_delete.return_value = 0
mock_execute.return_value = exec_result
mock_analyze.return_value = query_result
retval = self.api.query_volume('vol1')
self.assertEqual(query_result, retval)
exec_result = ['result=0\n',
'vol_name=vol1,father_name=vol1_father,status=1,' +
'vol_size=1024,real_size=1024,pool_id=pool1,' +
'create_time=01/01/2015']
query_result = {'result': 0, 'vol_name': 'vol1',
'father_name': 'vol1_father', 'status': '1',
'vol_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'create_time': '01/01/2015'}
mock_delete.return_value = 0
mock_execute.return_value = exec_result
mock_analyze.return_value = query_result
retval = self.api.query_volume('vol1')
self.assertEqual(query_result, retval)
vol_info_failure = 'result=32500000\n'
failure_res = {'result': 1, 'vol_name': '', 'father_name': '',
'status': '', 'vol_size': '', 'real_size': '',
'pool_id': '', 'create_time': ''}
mock_execute.return_value = vol_info_failure
retval = self.api.query_volume('vol1')
self.assertEqual(failure_res, retval)
vol_info_failure = None
failure_res = {'result': 1, 'vol_name': '', 'father_name': '',
'status': '', 'vol_size': '', 'real_size': '',
'pool_id': '', 'create_time': ''}
mock_execute.return_value = vol_info_failure
retval = self.api.query_volume('vol1')
self.assertEqual(failure_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_delete_volume(self, mock_execute):
mock_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.delete_volume('volume_name')
self.assertEqual(0, retval)
retval = self.api.delete_volume('volume_name')
self.assertEqual('50150007\n', retval)
retval = self.api.delete_volume('volume_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_snapshot(self, mock_execute):
mock_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_snapshot('snap_name', 'vol_name', 0)
self.assertEqual(0, retval)
retval = self.api.create_snapshot('snap_name', 'vol_name', 0)
self.assertEqual('50150007\n', retval)
retval = self.api.create_snapshot('snap_name', 'vol_name', 0)
self.assertEqual(1, retval)
def test_snap_info_analyze_success(self):
snap_info = ('snap_name=snap1,father_name=snap1_father,status=0,'
'snap_size=1024,real_size=1024,pool_id=pool1,'
'delete_priority=1,create_time=01/01/2015')
snap_info_res = {'result': 0, 'snap_name': 'snap1',
'father_name': 'snap1_father', 'status': '0',
'snap_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'delete_priority': '1',
'create_time': '01/01/2015'}
retval = self.api.snap_info_analyze(snap_info)
self.assertEqual(snap_info_res, retval)
def test_snap_info_analyze_fail(self):
snap_info = ''
snap_info_res = {'result': 1, 'snap_name': '', 'father_name': '',
'status': '', 'snap_size': '', 'real_size': '',
'pool_id': '', 'delete_priority': '',
'create_time': ''}
retval = self.api.snap_info_analyze(snap_info)
self.assertEqual(snap_info_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_snap(self, mock_execute):
exec_result = ['result=0\n',
'snap_name=snap1,father_name=snap1_father,status=0,' +
'snap_size=1024,real_size=1024,pool_id=pool1,' +
'delete_priority=1,create_time=01/01/2015']
query_result = {'result': 0, 'snap_name': 'snap1',
'father_name': 'snap1_father', 'status': '0',
'snap_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'delete_priority': '1',
'create_time': '01/01/2015'}
mock_execute.return_value = exec_result
retval = self.api.query_snap('snap1')
self.assertEqual(query_result, retval)
exec_result = ['result=50150007\n']
qurey_result = {'result': '50150007\n', 'snap_name': '',
'father_name': '', 'status': '', 'snap_size': '',
'real_size': '', 'pool_id': '',
'delete_priority': '', 'create_time': ''}
mock_execute.return_value = exec_result
retval = self.api.query_snap('snap1')
self.assertEqual(qurey_result, retval)
exec_result = ''
query_result = {'result': 1, 'snap_name': '', 'father_name': '',
'status': '', 'snap_size': '', 'real_size': '',
'pool_id': '', 'delete_priority': '',
'create_time': ''}
mock_execute.return_value = exec_result
retval = self.api.query_snap('snap1')
self.assertEqual(query_result, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_delete_snapshot(self, mock_execute):
mock_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.delete_snapshot('snap_name')
self.assertEqual(0, retval)
retval = self.api.delete_snapshot('snap_name')
self.assertEqual('50150007\n', retval)
retval = self.api.delete_snapshot('snap_name')
self.assertEqual(1, retval)
def test_pool_info_analyze(self):
pool_info = 'pool_id=pool100,total_capacity=1024,' + \
'used_capacity=500,alloc_capacity=500'
analyze_res = {'result': 0, 'pool_id': 'pool100',
'total_capacity': '1024', 'used_capacity': '500',
'alloc_capacity': '500'}
retval = self.api.pool_info_analyze(pool_info)
self.assertEqual(analyze_res, retval)
pool_info = ''
analyze_res = {'result': 1, 'pool_id': '', 'total_capacity': '',
'used_capacity': '', 'alloc_capacity': ''}
retval = self.api.pool_info_analyze(pool_info)
self.assertEqual(analyze_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_pool_info(self, mock_execute):
exec_result = ['result=0\n',
'pool_id=0,total_capacity=1024,' +
'used_capacity=500,alloc_capacity=500\n']
query_result = {'result': 0, 'pool_id': '0',
'total_capacity': '1024', 'used_capacity': '500',
'alloc_capacity': '500'}
mock_execute.return_value = exec_result
retval = self.api.query_pool_info('0')
self.assertEqual(query_result, retval)
exec_result = ['result=51050008\n']
query_result = {'result': '51050008\n', 'pool_id': '',
'total_capacity': '', 'used_capacity': '',
'alloc_capacity': ''}
mock_execute.return_value = exec_result
retval = self.api.query_pool_info('0')
self.assertEqual(query_result, retval)
exec_result = ''
query_result = {'result': 1, 'pool_id': '', 'total_capacity': '',
'used_capacity': '', 'alloc_capacity': ''}
mock_execute.return_value = exec_result
retval = self.api.query_pool_info('0')
self.assertEqual(query_result, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_pool_type(self, mock_execute):
exec_result = ['result=0\n',
'pool_id=0,total_capacity=1024,' +
'used_capacity=500,alloc_capacity=500\n']
query_result = (0, [{'result': 0,
'pool_id': '0', 'total_capacity': '1024',
'used_capacity': '500', 'alloc_capacity': '500'}])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
exec_result = ['result=0\n',
'pool_id=0,total_capacity=1024,' +
'used_capacity=500,alloc_capacity=500\n',
'pool_id=1,total_capacity=2048,' +
'used_capacity=500,alloc_capacity=500\n']
query_result = (0, [{'result': 0, 'pool_id': '0',
'total_capacity': '1024', 'used_capacity': '500',
'alloc_capacity': '500'},
{'result': 0, 'pool_id': '1',
'total_capacity': '2048', 'used_capacity': '500',
'alloc_capacity': '500'}])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
exec_result = ['result=51010015\n']
query_result = (51010015, [])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
exec_result = ''
query_result = (0, [])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_dsware_version(self, mock_execute):
mock_execute.side_effect = ['result=0\n', 'result=50500001\n',
'result=50150007\n', None]
retval = self.api.query_dsware_version()
self.assertEqual(0, retval)
retval = self.api.query_dsware_version()
self.assertEqual(1, retval)
retval = self.api.query_dsware_version()
self.assertEqual('50150007\n', retval)
retval = self.api.query_dsware_version()
self.assertEqual(2, retval)
# Copyright (c) 2013 - 2016 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for Huawei FusionStorage drivers.
"""
import mock
from cinder import test
from cinder import utils
from cinder.volume.drivers.fusionstorage import fspythonapi
class FSPythonApiTestCase(test.TestCase):
def setUp(self):
super(FSPythonApiTestCase, self).setUp()
self.api = fspythonapi.FSPythonApi()
@mock.patch.object(fspythonapi.FSPythonApi, 'get_ip_port')
@mock.patch.object(fspythonapi.FSPythonApi, 'get_manage_ip')
@mock.patch.object(utils, 'execute')
def test_start_execute_cmd(self, mock_execute,
mock_get_manage_ip, mock_get_ip_port):
result1 = ['result=0\ndesc=success\n', '']
result2 = ['result=50150007\ndesc=volume does not exist\n', '']
result3 = ['result=50150008\ndesc=volume is being deleted\n', '']
result4 = ['result=50\ndesc=exception\n', '']
cmd = 'abcdef'
mock_get_ip_port.return_value = ['127.0.0.1', '128.0.0.1']
mock_get_manage_ip.return_value = '127.0.0.1'
mock_execute.return_value = result1
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=0', retval)
mock_execute.return_value = result2
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=0', retval)
mock_execute.return_value = result3
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=0', retval)
mock_execute.return_value = result4
retval = self.api.start_execute_cmd(cmd, 0)
self.assertEqual('result=50', retval)
mock_execute.return_value = result1
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual(['result=0', 'desc=success', ''], retval)
mock_execute.return_value = result2
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual('result=0', retval)
mock_execute.return_value = result3
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual('result=0', retval)
mock_execute.return_value = result4
retval = self.api.start_execute_cmd(cmd, 1)
self.assertEqual(['result=50', 'desc=exception', ''], retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_volume(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_volume('volume_name', 'pool_id-123', 1024, 0)
self.assertEqual(0, retval)
retval = self.api.create_volume('volume_name', 'pool_id-123', 1024, 0)
self.assertEqual('50150007\n', retval)
retval = self.api.create_volume('volume_name', 'pool_id-123', 1024, 0)
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_extend_volume(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.extend_volume('volume_name', 1024)
self.assertEqual(0, retval)
retval = self.api.extend_volume('volume_name', 1024)
self.assertEqual('50150007\n', retval)
retval = self.api.extend_volume('volume_name', 1024)
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_volume_from_snap(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_volume_from_snap('volume_name', 1024,
'snap_name')
self.assertEqual(0, retval)
retval = self.api.create_volume_from_snap('volume_name', 1024,
'snap_name')
self.assertEqual('50150007\n', retval)
retval = self.api.create_volume_from_snap('volume_name', 1024,
'snap_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_fullvol_from_snap(self, mock_start_execute):
mock_start_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_fullvol_from_snap('volume_name', 'snap_name')
self.assertEqual(0, retval)
retval = self.api.create_fullvol_from_snap('volume_name', 'snap_name')
self.assertEqual('50150007\n', retval)
retval = self.api.create_fullvol_from_snap('volume_name', 'snap_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'create_snapshot')
@mock.patch.object(fspythonapi.FSPythonApi, 'create_volume')
@mock.patch.object(fspythonapi.FSPythonApi, 'delete_snapshot')
@mock.patch.object(fspythonapi.FSPythonApi, 'delete_volume')
@mock.patch.object(fspythonapi.FSPythonApi, 'create_fullvol_from_snap')
def test_create_volume_from_volume(self, mock_create_fullvol,
mock_delete_volume, mock_delete_snap,
mock_create_volume, mock_create_snap):
mock_create_snap.return_value = 0
mock_create_volume.return_value = 0
mock_create_fullvol.return_value = 0
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(0, retval)
mock_create_snap.return_value = 1
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
mock_create_snap.return_value = 0
mock_create_volume.return_value = 1
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
mock_create_volume.return_value = 0
self.api.create_fullvol_from_snap.return_value = 1
retval = self.api.create_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'create_snapshot')
@mock.patch.object(fspythonapi.FSPythonApi, 'create_volume_from_snap')
def test_create_clone_volume_from_volume(self, mock_volume, mock_snap):
mock_snap.side_effect = [0, 1]
mock_volume.side_effect = [0, 1]
retval = self.api.create_clone_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(0, retval)
retval = self.api.create_clone_volume_from_volume('vol_name', 1024,
'src_vol_name')
self.assertEqual(1, retval)
def test_volume_info_analyze_success(self):
vol_info = ('vol_name=vol1,father_name=vol1_father,'
'status=available,vol_size=1024,real_size=1024,'
'pool_id=pool1,create_time=01/01/2015')
vol_info_res = {'result': 0, 'vol_name': 'vol1',
'father_name': 'vol1_father',
'status': 'available', 'vol_size': '1024',
'real_size': '1024', 'pool_id': 'pool1',
'create_time': '01/01/2015'}
retval = self.api.volume_info_analyze(vol_info)
self.assertEqual(vol_info_res, retval)
def test_volume_info_analyze_fail(self):
vol_info = ''
vol_info_res = {'result': 1, 'vol_name': '', 'father_name': '',
'status': '', 'vol_size': '', 'real_size': '',
'pool_id': '', 'create_time': ''}
retval = self.api.volume_info_analyze(vol_info)
self.assertEqual(vol_info_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
@mock.patch.object(fspythonapi.FSPythonApi, 'volume_info_analyze')
@mock.patch.object(fspythonapi.FSPythonApi, 'delete_snapshot')
def test_query_volume(self, mock_delete, mock_analyze, mock_execute):
exec_result = ['result=0\n',
'vol_name=vol1,father_name=vol1_father,status=0,' +
'vol_size=1024,real_size=1024,pool_id=pool1,' +
'create_time=01/01/2015']
query_result = {'result': 0, 'vol_name': 'vol1',
'father_name': 'vol1_father', 'status': '0',
'vol_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'create_time': '01/01/2015'}
mock_delete.return_value = 0
mock_execute.return_value = exec_result
mock_analyze.return_value = query_result
retval = self.api.query_volume('vol1')
self.assertEqual(query_result, retval)
exec_result = ['result=0\n',
'vol_name=vol1,father_name=vol1_father,status=1,' +
'vol_size=1024,real_size=1024,pool_id=pool1,' +
'create_time=01/01/2015']
query_result = {'result': 0, 'vol_name': 'vol1',
'father_name': 'vol1_father', 'status': '1',
'vol_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'create_time': '01/01/2015'}
mock_delete.return_value = 0
mock_execute.return_value = exec_result
mock_analyze.return_value = query_result
retval = self.api.query_volume('vol1')
self.assertEqual(query_result, retval)
vol_info_failure = 'result=32500000\n'
failure_res = {'result': 1, 'vol_name': '', 'father_name': '',
'status': '', 'vol_size': '', 'real_size': '',
'pool_id': '', 'create_time': ''}
mock_execute.return_value = vol_info_failure
retval = self.api.query_volume('vol1')
self.assertEqual(failure_res, retval)
vol_info_failure = None
failure_res = {'result': 1, 'vol_name': '', 'father_name': '',
'status': '', 'vol_size': '', 'real_size': '',
'pool_id': '', 'create_time': ''}
mock_execute.return_value = vol_info_failure
retval = self.api.query_volume('vol1')
self.assertEqual(failure_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_delete_volume(self, mock_execute):
mock_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.delete_volume('volume_name')
self.assertEqual(0, retval)
retval = self.api.delete_volume('volume_name')
self.assertEqual('50150007\n', retval)
retval = self.api.delete_volume('volume_name')
self.assertEqual(1, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_create_snapshot(self, mock_execute):
mock_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.create_snapshot('snap_name', 'vol_name', 0)
self.assertEqual(0, retval)
retval = self.api.create_snapshot('snap_name', 'vol_name', 0)
self.assertEqual('50150007\n', retval)
retval = self.api.create_snapshot('snap_name', 'vol_name', 0)
self.assertEqual(1, retval)
def test_snap_info_analyze_success(self):
snap_info = ('snap_name=snap1,father_name=snap1_father,status=0,'
'snap_size=1024,real_size=1024,pool_id=pool1,'
'delete_priority=1,create_time=01/01/2015')
snap_info_res = {'result': 0, 'snap_name': 'snap1',
'father_name': 'snap1_father', 'status': '0',
'snap_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'delete_priority': '1',
'create_time': '01/01/2015'}
retval = self.api.snap_info_analyze(snap_info)
self.assertEqual(snap_info_res, retval)
def test_snap_info_analyze_fail(self):
snap_info = ''
snap_info_res = {'result': 1, 'snap_name': '', 'father_name': '',
'status': '', 'snap_size': '', 'real_size': '',
'pool_id': '', 'delete_priority': '',
'create_time': ''}
retval = self.api.snap_info_analyze(snap_info)
self.assertEqual(snap_info_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_snap(self, mock_execute):
exec_result = ['result=0\n',
'snap_name=snap1,father_name=snap1_father,status=0,' +
'snap_size=1024,real_size=1024,pool_id=pool1,' +
'delete_priority=1,create_time=01/01/2015']
query_result = {'result': 0, 'snap_name': 'snap1',
'father_name': 'snap1_father', 'status': '0',
'snap_size': '1024', 'real_size': '1024',
'pool_id': 'pool1', 'delete_priority': '1',
'create_time': '01/01/2015'}
mock_execute.return_value = exec_result
retval = self.api.query_snap('snap1')
self.assertEqual(query_result, retval)
exec_result = ['result=50150007\n']
qurey_result = {'result': '50150007\n', 'snap_name': '',
'father_name': '', 'status': '', 'snap_size': '',
'real_size': '', 'pool_id': '',
'delete_priority': '', 'create_time': ''}
mock_execute.return_value = exec_result
retval = self.api.query_snap('snap1')
self.assertEqual(qurey_result, retval)
exec_result = ''
query_result = {'result': 1, 'snap_name': '', 'father_name': '',
'status': '', 'snap_size': '', 'real_size': '',
'pool_id': '', 'delete_priority': '',
'create_time': ''}
mock_execute.return_value = exec_result
retval = self.api.query_snap('snap1')
self.assertEqual(query_result, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_delete_snapshot(self, mock_execute):
mock_execute.side_effect = ['result=0\n',
'result=50150007\n', None]
retval = self.api.delete_snapshot('snap_name')
self.assertEqual(0, retval)
retval = self.api.delete_snapshot('snap_name')
self.assertEqual('50150007\n', retval)
retval = self.api.delete_snapshot('snap_name')
self.assertEqual(1, retval)
def test_pool_info_analyze(self):
pool_info = 'pool_id=pool100,total_capacity=1024,' + \
'used_capacity=500,alloc_capacity=500'
analyze_res = {'result': 0, 'pool_id': 'pool100',
'total_capacity': '1024', 'used_capacity': '500',
'alloc_capacity': '500'}
retval = self.api.pool_info_analyze(pool_info)
self.assertEqual(analyze_res, retval)
pool_info = ''
analyze_res = {'result': 1, 'pool_id': '', 'total_capacity': '',
'used_capacity': '', 'alloc_capacity': ''}
retval = self.api.pool_info_analyze(pool_info)
self.assertEqual(analyze_res, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_pool_info(self, mock_execute):
exec_result = ['result=0\n',
'pool_id=0,total_capacity=1024,' +
'used_capacity=500,alloc_capacity=500\n']
query_result = {'result': 0, 'pool_id': '0',
'total_capacity': '1024', 'used_capacity': '500',
'alloc_capacity': '500'}
mock_execute.return_value = exec_result
retval = self.api.query_pool_info('0')
self.assertEqual(query_result, retval)
exec_result = ['result=51050008\n']
query_result = {'result': '51050008\n', 'pool_id': '',
'total_capacity': '', 'used_capacity': '',
'alloc_capacity': ''}
mock_execute.return_value = exec_result
retval = self.api.query_pool_info('0')
self.assertEqual(query_result, retval)
exec_result = ''
query_result = {'result': 1, 'pool_id': '', 'total_capacity': '',
'used_capacity': '', 'alloc_capacity': ''}
mock_execute.return_value = exec_result
retval = self.api.query_pool_info('0')
self.assertEqual(query_result, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_pool_type(self, mock_execute):
exec_result = ['result=0\n',
'pool_id=0,total_capacity=1024,' +
'used_capacity=500,alloc_capacity=500\n']
query_result = (0, [{'result': 0,
'pool_id': '0', 'total_capacity': '1024',
'used_capacity': '500', 'alloc_capacity': '500'}])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
exec_result = ['result=0\n',
'pool_id=0,total_capacity=1024,' +
'used_capacity=500,alloc_capacity=500\n',
'pool_id=1,total_capacity=2048,' +
'used_capacity=500,alloc_capacity=500\n']
query_result = (0, [{'result': 0, 'pool_id': '0',
'total_capacity': '1024', 'used_capacity': '500',
'alloc_capacity': '500'},
{'result': 0, 'pool_id': '1',
'total_capacity': '2048', 'used_capacity': '500',
'alloc_capacity': '500'}])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
exec_result = ['result=51010015\n']
query_result = (51010015, [])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
exec_result = ''
query_result = (0, [])
mock_execute.return_value = exec_result
retval = self.api.query_pool_type('sata2copy')
self.assertEqual(query_result, retval)
@mock.patch.object(fspythonapi.FSPythonApi, 'start_execute_cmd')
def test_query_dsware_version(self, mock_execute):
mock_execute.side_effect = ['result=0\n', 'result=50500001\n',
'result=50150007\n', None]
retval = self.api.query_dsware_version()
self.assertEqual(0, retval)
retval = self.api.query_dsware_version()
self.assertEqual(1, retval)
retval = self.api.query_dsware_version()
self.assertEqual('50150007\n', retval)
retval = self.api.query_dsware_version()
self.assertEqual(2, retval)

File diff suppressed because it is too large Load Diff