Merge "Convert all eqlx tests from mox to mock"

This commit is contained in:
Jenkins 2015-04-01 06:28:48 +00:00 committed by Gerrit Code Review
commit e8652b1e92

View File

@ -17,7 +17,6 @@ import time
from eventlet import greenthread
import mock
import mox
from oslo_concurrency import processutils
from oslo_log import log as logging
import paramiko
@ -37,8 +36,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
def setUp(self):
super(DellEQLSanISCSIDriverTestCase, self).setUp()
self.configuration = mox.MockObject(conf.Configuration)
self.configuration.append_config_values(mox.IgnoreArg())
self.configuration = mock.Mock(conf.Configuration)
self.configuration.san_is_local = False
self.configuration.san_ip = "10.0.0.1"
self.configuration.san_login = "foo"
@ -59,6 +57,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
self.configuration.chap_username = 'admin'
self.configuration.chap_password = 'password'
self.cmd = 'this is dummy command'
self._context = context.get_admin_context()
self.driver = eqlx.DellEQLSanISCSIDriver(
configuration=self.configuration)
@ -75,6 +74,7 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
" 7dab76162"]
self.fake_iqn = 'iqn.2003-10.com.equallogic:group01:25366:fakev'
self.fake_iqn_return = ['iSCSI target name is %s.' % self.fake_iqn]
self.driver._group_ip = '10.0.1.6'
self.properties = {
'target_discoverd': True,
@ -93,245 +93,236 @@ class DellEQLSanISCSIDriverTestCase(test.TestCase):
return self.properties
def test_create_volume(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1}
self.driver._eql_execute('volume', 'create', volume['name'],
"%sG" % (volume['size']), 'pool',
self.configuration.eqlx_pool,
'thin-provision').\
AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
self.driver._eql_execute('volume', 'select', volume['name'],
'multihost-access', 'enable')
self.mox.ReplayAll()
model_update = self.driver.create_volume(volume)
self.assertEqual(model_update, self._model_update)
mock_attrs = {'args': ['volume', 'create', volume['name'],
"%sG" % (volume['size']), 'pool',
self.configuration.eqlx_pool,
'thin-provision']}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
mock_eql_execute.return_value = self.fake_iqn_return
model_update = self.driver.create_volume(volume)
self.assertEqual(self._model_update, model_update)
def test_delete_volume(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1}
self.driver._eql_execute('volume', 'select', volume['name'], 'show')
self.driver._eql_execute('volume', 'select', volume['name'], 'offline')
self.driver._eql_execute('volume', 'delete', volume['name'])
self.mox.ReplayAll()
self.driver.delete_volume(volume)
show_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
off_attrs = {'args': ['volume', 'select', volume['name'], 'offline']}
delete_attrs = {'args': ['volume', 'delete', volume['name']]}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**show_attrs)
mock_eql_execute.configure_mock(**off_attrs)
mock_eql_execute.configure_mock(**delete_attrs)
self.driver.delete_volume(volume)
def test_delete_absent_volume(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1, 'id': self.volid}
self.driver._eql_execute('volume', 'select', volume['name'], 'show').\
AndRaise(processutils.ProcessExecutionError(
stdout='% Error ..... does not exist.\n'))
self.mox.ReplayAll()
self.driver.delete_volume(volume)
mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
mock_eql_execute.side_effect = processutils.ProcessExecutionError(
stdout='% Error ..... does not exist.\n')
self.driver.delete_volume(volume)
def test_ensure_export(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1}
self.driver._eql_execute('volume', 'select', volume['name'], 'show')
self.mox.ReplayAll()
self.driver.ensure_export({}, volume)
mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
self.driver.ensure_export({}, volume)
def test_create_snapshot(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
snap_name = 'fake_snap_name'
self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
'snapshot', 'create-now').\
AndReturn(['Snapshot name is %s' % snap_name])
self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
'snapshot', 'rename', snap_name,
snapshot['name'])
self.mox.ReplayAll()
self.driver.create_snapshot(snapshot)
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.return_value = ['Snapshot name is %s' % snap_name]
self.driver.create_snapshot(snapshot)
def test_create_volume_from_snapshot(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
volume = {'name': self.volume_name}
self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
'snapshot', 'select', snapshot['name'],
'clone', volume['name']).\
AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
self.driver._eql_execute('volume', 'select', volume['name'],
'multihost-access', 'enable')
self.mox.ReplayAll()
model_update = self.driver.create_volume_from_snapshot(volume,
snapshot)
self.assertEqual(model_update, self._model_update)
mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'],
'snapshot', 'select', snapshot['name'],
'clone', volume['name']]}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
mock_eql_execute.return_value = self.fake_iqn_return
model_update = self.driver.create_volume_from_snapshot(volume,
snapshot)
self.assertEqual(self._model_update, model_update)
def test_create_cloned_volume(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
src_vref = {'name': 'fake_uuid'}
volume = {'name': self.volume_name}
self.driver._eql_execute('volume', 'select', src_vref['name'], 'clone',
volume['name']).\
AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
self.driver._eql_execute('volume', 'select', volume['name'],
'multihost-access', 'enable')
self.mox.ReplayAll()
model_update = self.driver.create_cloned_volume(volume, src_vref)
self.assertEqual(model_update, self._model_update)
mock_attrs = {'args': ['volume', 'select', volume['name'],
'multihost-access', 'enable']}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
mock_eql_execute.return_value = self.fake_iqn_return
model_update = self.driver.create_cloned_volume(volume, src_vref)
self.assertEqual(self._model_update, model_update)
def test_delete_snapshot(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
'snapshot', 'delete', snapshot['name'])
self.mox.ReplayAll()
self.driver.delete_snapshot(snapshot)
mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'],
'snapshot', 'delete', snapshot['name']]}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
self.driver.delete_snapshot(snapshot)
def test_extend_volume(self):
new_size = '200'
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 100}
self.driver._eql_execute('volume', 'select', volume['name'],
'size', "%sG" % new_size)
self.mox.ReplayAll()
self.driver.extend_volume(volume, new_size)
mock_attrs = {'args': ['volume', 'select', volume['name'],
'size', "%sG" % new_size]}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
self.driver.extend_volume(volume, new_size)
def test_initialize_connection(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name}
self.stubs.Set(self.driver, "_get_iscsi_properties",
self._fake_get_iscsi_properties)
self.driver._eql_execute('volume', 'select', volume['name'], 'access',
'create', 'initiator',
self.connector['initiator'],
'authmethod', 'chap',
'username',
self.configuration.chap_username)
self.mox.ReplayAll()
iscsi_properties = self.driver.initialize_connection(volume,
self.connector)
self.assertEqual(iscsi_properties['data'],
self._fake_get_iscsi_properties(volume))
mock_attrs = {'args': ['volume', 'select', volume['name'], 'access',
'create', 'initiator',
self.connector['initiator'],
'authmethod', 'chap',
'username',
self.configuration.chap_username]}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
with mock.patch.object(self.driver,
'_get_iscsi_properties') as mock_iscsi:
mock_eql_execute.configure_mock(**mock_attrs)
mock_iscsi.return_value = self.properties
iscsi_properties = self.driver.initialize_connection(
volume, self.connector)
self.assertEqual(self._fake_get_iscsi_properties(volume),
iscsi_properties['data'])
def test_terminate_connection(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
def my_side_effect(*args, **kwargs):
if args[4] == 'show':
return self.access_record_output
else:
return ''
volume = {'name': self.volume_name}
self.driver._eql_execute('volume', 'select', volume['name'], 'access',
'show').AndReturn(self.access_record_output)
self.driver._eql_execute('volume', 'select', volume['name'], 'access',
'delete', '1')
self.mox.ReplayAll()
self.driver.terminate_connection(volume, self.connector)
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.side_effect = my_side_effect
self.driver.terminate_connection(volume, self.connector)
def test_do_setup(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
fake_group_ip = '10.1.2.3'
for feature in ('confirmation', 'paging', 'events', 'formatoutput'):
self.driver._eql_execute('cli-settings', feature, 'off')
self.driver._eql_execute('grpparams', 'show').\
AndReturn(['Group-Ipaddress: %s' % fake_group_ip])
self.mox.ReplayAll()
self.driver.do_setup(self._context)
self.assertEqual(fake_group_ip, self.driver._group_ip)
def my_side_effect(*args, **kwargs):
if args[0] == 'grpparams':
return ['Group-Ipaddress: %s' % fake_group_ip]
else:
return ''
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.side_effect = my_side_effect
self.driver.do_setup(self._context)
self.assertEqual(self.driver._group_ip, fake_group_ip)
def test_update_volume_stats(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
self.driver._eql_execute('pool', 'select',
self.configuration.eqlx_pool, 'show').\
AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB'])
self.mox.ReplayAll()
self.driver._update_volume_stats()
self.assertEqual(self.driver._stats['total_capacity_gb'], 111.0)
self.assertEqual(self.driver._stats['free_capacity_gb'], 11.0)
mock_attrs = {'args': ['pool', 'select',
self.configuration.eqlx_pool, 'show']}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
mock_eql_execute.return_value = ['TotalCapacity: 111GB',
'FreeSpace: 11GB']
self.driver._update_volume_stats()
self.assertEqual(111.0, self.driver._stats['total_capacity_gb'])
self.assertEqual(11.0, self.driver._stats['free_capacity_gb'])
def test_get_volume_stats(self):
self.driver._eql_execute = self.mox.\
CreateMock(self.driver._eql_execute)
self.driver._eql_execute('pool', 'select',
self.configuration.eqlx_pool, 'show').\
AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB'])
self.mox.ReplayAll()
stats = self.driver.get_volume_stats(refresh=True)
self.assertEqual(stats['total_capacity_gb'], float('111.0'))
self.assertEqual(stats['free_capacity_gb'], float('11.0'))
self.assertEqual(stats['vendor_name'], 'Dell')
mock_attrs = {'args': ['pool', 'select',
self.configuration.eqlx_pool, 'show']}
with mock.patch.object(self.driver,
'_eql_execute') as mock_eql_execute:
mock_eql_execute.configure_mock(**mock_attrs)
mock_eql_execute.return_value = ['TotalCapacity: 111GB',
'FreeSpace: 11GB']
stats = self.driver.get_volume_stats(refresh=True)
self.assertEqual(float('111.0'), stats['total_capacity_gb'])
self.assertEqual(float('11.0'), stats['free_capacity_gb'])
self.assertEqual('Dell', stats['vendor_name'])
def test_get_space_in_gb(self):
self.assertEqual(self.driver._get_space_in_gb('123.0GB'), 123.0)
self.assertEqual(self.driver._get_space_in_gb('123.0TB'), 123.0 * 1024)
self.assertEqual(self.driver._get_space_in_gb('1024.0MB'), 1.0)
self.assertEqual(123.0, self.driver._get_space_in_gb('123.0GB'))
self.assertEqual(123.0 * 1024, self.driver._get_space_in_gb('123.0TB'))
self.assertEqual(1.0, self.driver._get_space_in_gb('1024.0MB'))
def test_get_output(self):
def _fake_recv(ignore_arg):
return '%s> ' % self.configuration.eqlx_group_name
chan = self.mox.CreateMock(paramiko.Channel)
self.stubs.Set(chan, "recv", _fake_recv)
self.assertEqual(self.driver._get_output(chan), [_fake_recv(None)])
chan = mock.Mock(paramiko.Channel)
mock_recv = self.mock_object(chan, 'recv')
mock_recv.return_value = '%s> ' % self.configuration.eqlx_group_name
self.assertEqual([_fake_recv(None)], self.driver._get_output(chan))
def test_get_prefixed_value(self):
lines = ['Line1 passed', 'Line1 failed']
prefix = ['Line1', 'Line2']
expected_output = [' passed', None]
self.assertEqual(self.driver._get_prefixed_value(lines, prefix[0]),
expected_output[0])
self.assertEqual(self.driver._get_prefixed_value(lines, prefix[1]),
expected_output[1])
self.assertEqual(expected_output[0],
self.driver._get_prefixed_value(lines, prefix[0]))
self.assertEqual(expected_output[1],
self.driver._get_prefixed_value(lines, prefix[1]))
def test_ssh_execute(self):
ssh = self.mox.CreateMock(paramiko.SSHClient)
chan = self.mox.CreateMock(paramiko.Channel)
transport = self.mox.CreateMock(paramiko.Transport)
self.mox.StubOutWithMock(self.driver, '_get_output')
self.mox.StubOutWithMock(chan, 'invoke_shell')
ssh = mock.Mock(paramiko.SSHClient)
chan = mock.Mock(paramiko.Channel)
transport = mock.Mock(paramiko.Transport)
mock_get_output = self.mock_object(self.driver, '_get_output')
self.mock_object(chan, 'invoke_shell')
expected_output = ['NoError: test run']
ssh.get_transport().AndReturn(transport)
transport.open_session().AndReturn(chan)
mock_get_output.return_value = expected_output
ssh.get_transport.return_value = transport
transport.open_session.return_value = chan
chan.invoke_shell()
self.driver._get_output(chan).AndReturn(expected_output)
cmd = 'this is dummy command'
chan.send('stty columns 255' + '\r')
self.driver._get_output(chan).AndReturn(expected_output)
chan.send(cmd + '\r')
self.driver._get_output(chan).AndReturn(expected_output)
chan.send(self.cmd + '\r')
chan.close()
self.mox.ReplayAll()
self.assertEqual(self.driver._ssh_execute(ssh, cmd), expected_output)
self.assertEqual(expected_output,
self.driver._ssh_execute(ssh, self.cmd))
def test_ssh_execute_error(self):
ssh = self.mox.CreateMock(paramiko.SSHClient)
chan = self.mox.CreateMock(paramiko.Channel)
transport = self.mox.CreateMock(paramiko.Transport)
self.mox.StubOutWithMock(self.driver, '_get_output')
self.mox.StubOutWithMock(ssh, 'get_transport')
self.mox.StubOutWithMock(chan, 'invoke_shell')
ssh = mock.Mock(paramiko.SSHClient)
chan = mock.Mock(paramiko.Channel)
transport = mock.Mock(paramiko.Transport)
mock_get_output = self.mock_object(self.driver, '_get_output')
self.mock_object(ssh, 'get_transport')
self.mock_object(chan, 'invoke_shell')
expected_output = ['Error: test run', '% Error']
ssh.get_transport().AndReturn(transport)
transport.open_session().AndReturn(chan)
mock_get_output.return_value = expected_output
ssh.get_transport().return_value = transport
transport.open_session.return_value = chan
chan.invoke_shell()
self.driver._get_output(chan).AndReturn(expected_output)
cmd = 'this is dummy command'
chan.send('stty columns 255' + '\r')
self.driver._get_output(chan).AndReturn(expected_output)
chan.send(cmd + '\r')
self.driver._get_output(chan).AndReturn(expected_output)
chan.send(self.cmd + '\r')
chan.close()
self.mox.ReplayAll()
self.assertRaises(processutils.ProcessExecutionError,
self.driver._ssh_execute, ssh, cmd)
self.driver._ssh_execute, ssh, self.cmd)
@mock.patch.object(greenthread, 'sleep')
def test_ensure_retries(self, _gt_sleep):
num_attempts = 3
self.driver.configuration.eqlx_cli_max_retries = num_attempts
self.mock_object(self.driver, '_ssh_execute',
mock.Mock(side_effect=exception.
VolumeBackendAPIException("some error")))