Merge "EMC Isilon Driver Support For CIFS Read-Only Share"
This commit is contained in:
commit
4b8c45fa3a
@ -82,7 +82,7 @@ Mapping of share drivers and share access rules support
|
||||
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+
|
||||
| EMC VNX | NFS (J) | CIFS (J) | \- | NFS (L) | CIFS (L) | \- |
|
||||
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+
|
||||
| EMC Isilon | NFS,CIFS (K) | \- | \- | NFS (M) | \- | \- |
|
||||
| EMC Isilon | NFS,CIFS (K) | CIFS (M) | \- | NFS (M) | CIFS (M) | \- |
|
||||
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+
|
||||
| Red Hat GlusterFS | NFS (J) | \- | \- | \- | \- | \- |
|
||||
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+
|
||||
|
@ -17,7 +17,6 @@
|
||||
Isilon specific NAS backend plugin.
|
||||
"""
|
||||
import os
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import units
|
||||
@ -181,21 +180,10 @@ class IsilonStorageConnection(base.StorageConnection):
|
||||
def allow_access(self, context, share, access, share_server):
|
||||
"""Allow access to the share."""
|
||||
|
||||
# TODO(sedwards): Look into supporting ro/rw access to shares
|
||||
if access['access_type'] != 'ip':
|
||||
message = _('Only ip access type allowed.')
|
||||
LOG.error(message)
|
||||
raise exception.ShareBackendException(msg=message)
|
||||
|
||||
access_ip = access['access_to']
|
||||
|
||||
if share['share_proto'] == 'NFS':
|
||||
export_path = self._get_container_path(share)
|
||||
self._nfs_allow_access(
|
||||
access_ip, export_path, access['access_level'])
|
||||
self._nfs_allow_access(share, access)
|
||||
elif share['share_proto'] == 'CIFS':
|
||||
self._cifs_allow_access(
|
||||
access_ip, share, access['access_level'])
|
||||
self._cifs_allow_access(share, access)
|
||||
else:
|
||||
message = _(
|
||||
'Unsupported share protocol: %s. Only "NFS" and '
|
||||
@ -204,9 +192,18 @@ class IsilonStorageConnection(base.StorageConnection):
|
||||
LOG.error(message)
|
||||
raise exception.InvalidShare(reason=message)
|
||||
|
||||
def _nfs_allow_access(self, access_ip, export_path, access_level):
|
||||
def _nfs_allow_access(self, share, access):
|
||||
"""Allow access to nfs share."""
|
||||
access_type = access['access_type']
|
||||
if access_type != 'ip':
|
||||
message = _('Only "ip" access type allowed for the NFS'
|
||||
'protocol.')
|
||||
LOG.error(message)
|
||||
raise exception.InvalidShareAccess(reason=message)
|
||||
|
||||
export_path = self._get_container_path(share)
|
||||
access_ip = access['access_to']
|
||||
access_level = access['access_level']
|
||||
share_id = self._isilon_api.lookup_nfs_export(export_path)
|
||||
|
||||
share_access_group = 'clients'
|
||||
@ -227,9 +224,22 @@ class IsilonStorageConnection(base.StorageConnection):
|
||||
resp = self._isilon_api.request('PUT', url, data=export_params)
|
||||
resp.raise_for_status()
|
||||
|
||||
def _cifs_allow_access(self, ip, share, access_level):
|
||||
"""Allow access to cifs share."""
|
||||
def _cifs_allow_access(self, share, access):
|
||||
access_type = access['access_type']
|
||||
access_to = access['access_to']
|
||||
access_level = access['access_level']
|
||||
if access_type == 'ip':
|
||||
access_ip = access['access_to']
|
||||
self._cifs_allow_access_ip(access_ip, share, access_level)
|
||||
elif access_type == 'user':
|
||||
self._cifs_allow_access_user(access_to, share, access_level)
|
||||
else:
|
||||
message = _('Only "ip" and "user" access types allowed for '
|
||||
'CIFS protocol.')
|
||||
LOG.error(message)
|
||||
raise exception.InvalidShareAccess(reason=message)
|
||||
|
||||
def _cifs_allow_access_ip(self, ip, share, access_level):
|
||||
if access_level == const.ACCESS_LEVEL_RO:
|
||||
message = _('Only RW Access allowed for CIFS Protocol when using '
|
||||
'the "ip" access type.')
|
||||
@ -247,22 +257,34 @@ class IsilonStorageConnection(base.StorageConnection):
|
||||
r = self._isilon_api.request('PUT', url, data=data)
|
||||
r.raise_for_status()
|
||||
|
||||
def _cifs_allow_access_user(self, user, share, access_level):
|
||||
if access_level == const.ACCESS_LEVEL_RW:
|
||||
smb_permission = isilon_api.SmbPermission.rw
|
||||
elif access_level == const.ACCESS_LEVEL_RO:
|
||||
smb_permission = isilon_api.SmbPermission.ro
|
||||
else:
|
||||
message = _('Only "RW" and "RO" access levels are supported.')
|
||||
LOG.error(message)
|
||||
raise exception.InvalidShareAccess(reason=message)
|
||||
|
||||
self._isilon_api.smb_permissions_add(share['name'], user,
|
||||
smb_permission)
|
||||
|
||||
def deny_access(self, context, share, access, share_server):
|
||||
"""Deny access to the share."""
|
||||
|
||||
if share['share_proto'] == 'NFS':
|
||||
self._nfs_deny_access(share, access)
|
||||
elif share['share_proto'] == 'CIFS':
|
||||
self._cifs_deny_access(share, access)
|
||||
|
||||
def _nfs_deny_access(self, share, access):
|
||||
"""Deny access to nfs share."""
|
||||
if access['access_type'] != 'ip':
|
||||
return
|
||||
|
||||
denied_ip = access['access_to']
|
||||
access_level = access['access_level']
|
||||
if share['share_proto'] == 'NFS':
|
||||
self._nfs_deny_access(denied_ip, share, access_level)
|
||||
elif share['share_proto'] == 'CIFS':
|
||||
self._cifs_deny_access(denied_ip, share)
|
||||
|
||||
def _nfs_deny_access(self, denied_ip, share, access_level):
|
||||
"""Deny access to nfs share."""
|
||||
|
||||
share_access_group = 'clients'
|
||||
if access_level == const.ACCESS_LEVEL_RO:
|
||||
share_access_group = 'read_only_clients'
|
||||
@ -305,7 +327,20 @@ class IsilonStorageConnection(base.StorageConnection):
|
||||
|
||||
return export
|
||||
|
||||
def _cifs_deny_access(self, denied_ip, share):
|
||||
def _cifs_deny_access(self, share, access):
|
||||
access_type = access['access_type']
|
||||
if access_type == 'ip':
|
||||
self._cifs_deny_access_ip(access['access_to'], share)
|
||||
elif access_type == 'user':
|
||||
self._cifs_deny_access_user(share, access)
|
||||
else:
|
||||
message = _('Access type for CIFS deny access request was '
|
||||
'"%(access_type)s". Only "user" and "ip" access types '
|
||||
'are supported for CIFS protocol access.') % {
|
||||
'access_type': access_type}
|
||||
LOG.warning(message)
|
||||
|
||||
def _cifs_deny_access_ip(self, denied_ip, share):
|
||||
"""Deny access to cifs share."""
|
||||
|
||||
share_json = self._isilon_api.lookup_smb_share(share['name'])
|
||||
@ -319,6 +354,10 @@ class IsilonStorageConnection(base.StorageConnection):
|
||||
resp = self._isilon_api.request('PUT', url, data=share_params)
|
||||
resp.raise_for_status()
|
||||
|
||||
def _cifs_deny_access_user(self, share, access):
|
||||
self._isilon_api.smb_permissions_remove(share['name'], access[
|
||||
'access_to'])
|
||||
|
||||
def check_for_setup_error(self):
|
||||
"""Check for setup error."""
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from enum import Enum
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
import six
|
||||
@ -164,7 +165,7 @@ class IsilonApi(object):
|
||||
otherwise
|
||||
"""
|
||||
|
||||
data = {}
|
||||
data = {'permissions': []}
|
||||
data['name'] = share_name
|
||||
data['path'] = share_path
|
||||
url = self.host_url + '/platform/1/protocols/smb/shares'
|
||||
@ -264,9 +265,73 @@ class IsilonApi(object):
|
||||
quota_id = quota_json['id']
|
||||
self.quota_modify_size(quota_id, size)
|
||||
|
||||
def smb_permissions_add(self, share_name, user, smb_permission):
|
||||
smb_share = self.lookup_smb_share(share_name)
|
||||
permissions = smb_share['permissions']
|
||||
|
||||
# lookup given user string
|
||||
user_json = self.auth_lookup_user(user)
|
||||
auth_mappings = user_json['mapping']
|
||||
if len(auth_mappings) > 1:
|
||||
message = (_('More than one mapping found for user "%(user)s".')
|
||||
% {'user': user})
|
||||
raise exception.ShareBackendException(msg=message)
|
||||
user_sid = auth_mappings[0]['user']['sid']
|
||||
new_permission = {
|
||||
'permission': smb_permission.value,
|
||||
'permission_type': 'allow',
|
||||
'trustee': user_sid
|
||||
}
|
||||
|
||||
url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
|
||||
self.host_url, share_name)
|
||||
new_permissions = list(permissions)
|
||||
new_permissions.append(new_permission)
|
||||
data = {'permissions': new_permissions}
|
||||
|
||||
r = self.request('PUT', url, data=data)
|
||||
r.raise_for_status()
|
||||
|
||||
def smb_permissions_remove(self, share_name, user):
|
||||
smb_share = self.lookup_smb_share(share_name)
|
||||
permissions = smb_share['permissions']
|
||||
|
||||
# find the perm to remove
|
||||
perm_to_remove = None
|
||||
for perm in list(permissions):
|
||||
if perm['trustee']['name'] == user:
|
||||
perm_to_remove = perm
|
||||
|
||||
if perm_to_remove is not None:
|
||||
permissions.remove(perm)
|
||||
else:
|
||||
message = _('Attempting to remove permission for user "%(user)s", '
|
||||
'but this user was not found in the share\'s '
|
||||
'(%(share)s) permissions list.') % {'user': user,
|
||||
'share': smb_share}
|
||||
raise exception.ShareBackendException(msg=message)
|
||||
|
||||
self.request('PUT', '{0}/platform/1/protocols/smb/shares/{1}'.format(
|
||||
self.host_url, share_name), data={'permissions': permissions})
|
||||
|
||||
def auth_lookup_user(self, user_string):
|
||||
url = '{0}/platform/1/auth/mapping/users/lookup'.format(self.host_url)
|
||||
r = self.request('GET', url, params={"user": user_string})
|
||||
if r.status_code == 404:
|
||||
raise exception.ShareBackendException(msg='user not found')
|
||||
elif r.status_code != 200:
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def request(self, method, url, headers=None, data=None, params=None):
|
||||
if data is not None:
|
||||
data = jsonutils.dumps(data)
|
||||
r = self.session.request(method, url, headers=headers, data=data,
|
||||
verify=self.verify_ssl_cert, params=params)
|
||||
return r
|
||||
|
||||
|
||||
class SmbPermission(Enum):
|
||||
full = 'full'
|
||||
rw = 'change'
|
||||
ro = 'read'
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ddt
|
||||
import mock
|
||||
from oslo_log import log
|
||||
from oslo_utils import units
|
||||
@ -21,11 +22,13 @@ import six
|
||||
from manila.common import constants as const
|
||||
from manila import exception
|
||||
from manila.share.drivers.emc.plugins.isilon import isilon
|
||||
from manila.share.drivers.emc.plugins.isilon.isilon_api import SmbPermission
|
||||
from manila import test
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class IsilonTest(test.TestCase):
|
||||
"""Integration test for the Isilon Manila driver."""
|
||||
|
||||
@ -227,7 +230,7 @@ class IsilonTest(test.TestCase):
|
||||
self._mock_isilon_api.request.assert_called_once_with(
|
||||
'PUT', expected_url, data=expected_data)
|
||||
|
||||
def test_deny_access_invalid_access_type(self):
|
||||
def test_deny_access_nfs_invalid_access_type(self):
|
||||
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
|
||||
access = {'access_type': 'foo_access_type', 'access_to': '10.0.0.1'}
|
||||
|
||||
@ -235,6 +238,14 @@ class IsilonTest(test.TestCase):
|
||||
self.storage_connection.deny_access(
|
||||
self.mock_context, share, access, None)
|
||||
|
||||
def test_deny_access_cifs_invalid_access_type(self):
|
||||
share = {'name': self.SHARE_NAME, 'share_proto': 'CIFS'}
|
||||
access = {'access_type': 'foo_access_type', 'access_to': '10.0.0.1'}
|
||||
|
||||
# This operation should return silently
|
||||
self.storage_connection.deny_access(self.mock_context, share, access,
|
||||
None)
|
||||
|
||||
def test_deny_access_invalid_share_protocol(self):
|
||||
share = {'name': self.SHARE_NAME, 'share_proto': 'FOO'}
|
||||
access = {'access_type': 'ip', 'access_to': '10.0.0.1',
|
||||
@ -268,6 +279,18 @@ class IsilonTest(test.TestCase):
|
||||
self.storage_connection.deny_access,
|
||||
self.mock_context, share, access, None)
|
||||
|
||||
def test_deny_access_nfs_share_does_not_contain_required_key(self):
|
||||
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
|
||||
access = {
|
||||
'access_type': 'ip',
|
||||
'access_to': '10.0.0.1',
|
||||
'access_level': const.ACCESS_LEVEL_RW,
|
||||
}
|
||||
self._mock_isilon_api.get_nfs_export.return_value = {}
|
||||
self.assertRaises(exception.ShareBackendException,
|
||||
self.storage_connection.deny_access,
|
||||
self.mock_context, share, access, None)
|
||||
|
||||
def test_allow_access_multiple_ip_nfs(self):
|
||||
"""Verifies adding an IP to a whitelist with pre-existing ips.
|
||||
|
||||
@ -374,6 +397,63 @@ class IsilonTest(test.TestCase):
|
||||
self._mock_isilon_api.request.assert_called_once_with(
|
||||
'PUT', expected_url, data=expected_data)
|
||||
|
||||
@ddt.data(
|
||||
('foo', const.ACCESS_LEVEL_RW, SmbPermission.rw),
|
||||
('testuser', const.ACCESS_LEVEL_RO, SmbPermission.ro),
|
||||
)
|
||||
def test_allow_access_with_cifs_user(self, data):
|
||||
# setup
|
||||
share_name = self.SHARE_NAME
|
||||
user, access_level, expected_smb_perm = data
|
||||
share = {'name': share_name, 'share_proto': 'CIFS'}
|
||||
access = {'access_type': 'user',
|
||||
'access_to': user,
|
||||
'access_level': access_level}
|
||||
|
||||
self.storage_connection.allow_access(self.mock_context, share,
|
||||
access, None)
|
||||
|
||||
self._mock_isilon_api.smb_permissions_add.assert_called_once_with(
|
||||
share_name, user, expected_smb_perm)
|
||||
|
||||
def test_allow_access_with_cifs_user_invalid_access_level(self):
|
||||
share = {'name': self.SHARE_NAME, 'share_proto': 'CIFS'}
|
||||
access = {
|
||||
'access_type': 'user',
|
||||
'access_to': 'foo',
|
||||
'access_level': 'everything',
|
||||
}
|
||||
|
||||
self.assertRaises(exception.InvalidShareAccess,
|
||||
self.storage_connection.allow_access,
|
||||
self.mock_context, share, access, None)
|
||||
|
||||
def test_allow_access_with_cifs_invalid_access_type(self):
|
||||
share_name = self.SHARE_NAME
|
||||
share = {'name': share_name, 'share_proto': 'CIFS'}
|
||||
access = {'access_type': 'fooaccesstype',
|
||||
'access_to': 'testuser',
|
||||
'access_level': const.ACCESS_LEVEL_RW}
|
||||
|
||||
self.assertRaises(exception.InvalidShareAccess,
|
||||
self.storage_connection.allow_access,
|
||||
self.mock_context, share, access, None)
|
||||
|
||||
def test_deny_access_with_cifs_user(self):
|
||||
share_name = self.SHARE_NAME
|
||||
user_to_remove = 'testuser'
|
||||
share = {'name': share_name, 'share_proto': 'CIFS'}
|
||||
access = {'access_type': 'user',
|
||||
'access_to': user_to_remove,
|
||||
'access_level': const.ACCESS_LEVEL_RW}
|
||||
self.assertFalse(self._mock_isilon_api.smb_permissions_remove.called)
|
||||
|
||||
self.storage_connection.deny_access(self.mock_context, share, access,
|
||||
None)
|
||||
|
||||
self._mock_isilon_api.smb_permissions_remove.assert_called_with(
|
||||
share_name, user_to_remove)
|
||||
|
||||
def test_allow_access_invalid_access_type(self):
|
||||
# setup
|
||||
share_name = self.SHARE_NAME
|
||||
@ -383,7 +463,7 @@ class IsilonTest(test.TestCase):
|
||||
|
||||
# verify method under test throws the expected exception
|
||||
self.assertRaises(
|
||||
exception.ShareBackendException,
|
||||
exception.InvalidShareAccess,
|
||||
self.storage_connection.allow_access,
|
||||
self.mock_context, share, access, None)
|
||||
|
||||
|
@ -19,6 +19,7 @@ import requests
|
||||
import requests_mock
|
||||
import six
|
||||
|
||||
from manila import exception
|
||||
from manila.share.drivers.emc.plugins.isilon import isilon_api
|
||||
from manila import test
|
||||
|
||||
@ -360,10 +361,11 @@ class IsilonApiTest(test.TestCase):
|
||||
|
||||
self.assertEqual(expected_return_value, r)
|
||||
self.assertEqual(1, len(m.request_history))
|
||||
expected_request_data = json.loads(
|
||||
'{{"name": "{0}", "path": "{1}"}}'.format(
|
||||
share_name, share_path)
|
||||
)
|
||||
expected_request_data = {
|
||||
'name': share_name,
|
||||
'path': share_path,
|
||||
'permissions': []
|
||||
}
|
||||
self.assertEqual(expected_request_data,
|
||||
json.loads(m.request_history[0].body))
|
||||
|
||||
@ -612,6 +614,217 @@ class IsilonApiTest(test.TestCase):
|
||||
)
|
||||
self.assertEqual(400, e.response.status_code)
|
||||
|
||||
@ddt.data(
|
||||
('foouser', isilon_api.SmbPermission.rw),
|
||||
('testuser', isilon_api.SmbPermission.ro),
|
||||
)
|
||||
def test_smb_permission_add(self, data):
|
||||
user, smb_permission = data
|
||||
share_name = 'testshare'
|
||||
|
||||
with requests_mock.mock() as m:
|
||||
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
|
||||
self._mock_url, share_name)
|
||||
share_data = {
|
||||
'shares': [
|
||||
{'permissions': []}
|
||||
]
|
||||
}
|
||||
m.get(papi_share_url, status_code=200, json=share_data)
|
||||
|
||||
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}' \
|
||||
''.format(self._mock_url, user)
|
||||
example_sid = 'SID:S-1-5-21'
|
||||
sid_json = {
|
||||
'id': example_sid,
|
||||
'name': user,
|
||||
'type': 'user'
|
||||
}
|
||||
auth_json = {'mapping': [
|
||||
{'user': {'sid': sid_json}}
|
||||
]}
|
||||
m.get(auth_url, status_code=200, json=auth_json)
|
||||
m.put(papi_share_url)
|
||||
|
||||
self.isilon_api.smb_permissions_add(share_name, user,
|
||||
smb_permission)
|
||||
|
||||
perms_put_request = m.request_history[2]
|
||||
expected_perm_request_json = {
|
||||
'permissions': [
|
||||
{'permission': smb_permission.value,
|
||||
'permission_type': 'allow',
|
||||
'trustee': sid_json
|
||||
}
|
||||
]
|
||||
}
|
||||
self.assertEqual(expected_perm_request_json,
|
||||
json.loads(perms_put_request.body))
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_smb_permission_add_with_multiple_users_found(self, m):
|
||||
user = 'foouser'
|
||||
smb_permission = isilon_api.SmbPermission.rw
|
||||
share_name = 'testshare'
|
||||
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
|
||||
self._mock_url, share_name)
|
||||
share_data = {
|
||||
'shares': [
|
||||
{'permissions': []}
|
||||
]
|
||||
}
|
||||
m.get(papi_share_url, status_code=200, json=share_data)
|
||||
|
||||
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}' \
|
||||
''.format(self._mock_url, user)
|
||||
example_sid = 'SID:S-1-5-21'
|
||||
sid_json = {
|
||||
'id': example_sid,
|
||||
'name': user,
|
||||
'type': 'user'
|
||||
}
|
||||
auth_json = {'mapping': [
|
||||
{'user': {'sid': sid_json}},
|
||||
{'user': {'sid': sid_json}},
|
||||
]}
|
||||
m.get(auth_url, status_code=200, json=auth_json)
|
||||
m.put(papi_share_url)
|
||||
|
||||
self.assertRaises(exception.ShareBackendException,
|
||||
self.isilon_api.smb_permissions_add,
|
||||
share_name, user, smb_permission)
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_smb_permission_remove(self, m):
|
||||
|
||||
share_name = 'testshare'
|
||||
user = 'testuser'
|
||||
|
||||
share_data = {
|
||||
'permissions': [{
|
||||
'permission': 'change',
|
||||
'permission_type': 'allow',
|
||||
'trustee': {
|
||||
'id': 'SID:S-1-5-21',
|
||||
'name': user,
|
||||
'type': 'user',
|
||||
}
|
||||
}]
|
||||
}
|
||||
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
|
||||
self._mock_url, share_name)
|
||||
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
|
||||
num_existing_perms = len(self.isilon_api.lookup_smb_share(share_name))
|
||||
self.assertEqual(1, num_existing_perms)
|
||||
|
||||
m.put(papi_share_url)
|
||||
self.isilon_api.smb_permissions_remove(share_name, user)
|
||||
|
||||
smb_put_request = m.request_history[2]
|
||||
expected_body = {'permissions': []}
|
||||
expected_body = json.dumps(expected_body)
|
||||
self.assertEqual(expected_body, smb_put_request.body)
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_smb_permission_remove_with_multiple_existing_perms(self, m):
|
||||
|
||||
share_name = 'testshare'
|
||||
user = 'testuser'
|
||||
|
||||
foouser_perms = {
|
||||
'permission': 'change',
|
||||
'permission_type': 'allow',
|
||||
'trustee': {
|
||||
'id': 'SID:S-1-5-21',
|
||||
'name': 'foouser',
|
||||
'type': 'user',
|
||||
}
|
||||
}
|
||||
user_perms = {
|
||||
'permission': 'change',
|
||||
'permission_type': 'allow',
|
||||
'trustee': {
|
||||
'id': 'SID:S-1-5-22',
|
||||
'name': user,
|
||||
'type': 'user',
|
||||
}
|
||||
}
|
||||
share_data = {
|
||||
'permissions': [
|
||||
foouser_perms,
|
||||
user_perms,
|
||||
]
|
||||
}
|
||||
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
|
||||
self._mock_url, share_name)
|
||||
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
|
||||
num_existing_perms = len(self.isilon_api.lookup_smb_share(
|
||||
share_name)['permissions'])
|
||||
self.assertEqual(2, num_existing_perms)
|
||||
m.put(papi_share_url)
|
||||
|
||||
self.isilon_api.smb_permissions_remove(share_name, user)
|
||||
|
||||
smb_put_request = m.request_history[2]
|
||||
expected_body = {'permissions': [foouser_perms]}
|
||||
expected_body = json.dumps(expected_body)
|
||||
self.assertEqual(json.loads(expected_body),
|
||||
json.loads(smb_put_request.body))
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_smb_permission_remove_with_empty_perms_list(self, m):
|
||||
share_name = 'testshare'
|
||||
user = 'testuser'
|
||||
|
||||
share_data = {'permissions': []}
|
||||
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
|
||||
self._mock_url, share_name)
|
||||
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
|
||||
m.put(papi_share_url)
|
||||
|
||||
self.assertRaises(exception.ShareBackendException,
|
||||
self.isilon_api.smb_permissions_remove,
|
||||
share_name, user)
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_auth_lookup_user(self, m):
|
||||
user = 'foo'
|
||||
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
|
||||
self._mock_url, user)
|
||||
example_sid = 'SID:S-1-5-21'
|
||||
sid_json = {
|
||||
'id': example_sid,
|
||||
'name': user,
|
||||
'type': 'user'
|
||||
}
|
||||
auth_json = {
|
||||
'mapping': [
|
||||
{'user': {'sid': sid_json}}
|
||||
]
|
||||
}
|
||||
m.get(auth_url, status_code=200, json=auth_json)
|
||||
|
||||
returned_auth_json = self.isilon_api.auth_lookup_user(user)
|
||||
self.assertEqual(auth_json, returned_auth_json)
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_auth_lookup_user_with_nonexistent_user(self, m):
|
||||
user = 'nonexistent'
|
||||
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
|
||||
self._mock_url, user)
|
||||
m.get(auth_url, status_code=404)
|
||||
self.assertRaises(exception.ShareBackendException,
|
||||
self.isilon_api.auth_lookup_user, user)
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_auth_lookup_user_with_backend_error(self, m):
|
||||
user = 'foo'
|
||||
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
|
||||
self._mock_url, user)
|
||||
m.get(auth_url, status_code=400)
|
||||
self.assertRaises(requests.exceptions.HTTPError,
|
||||
self.isilon_api.auth_lookup_user, user)
|
||||
|
||||
def _add_create_directory_response(self, m, path, is_recursive):
|
||||
url = '{0}/namespace{1}?recursive={2}'.format(
|
||||
self._mock_url, path, six.text_type(is_recursive))
|
||||
|
Loading…
x
Reference in New Issue
Block a user