Dell PowerScale: Http auth issue, SSL verification error and etc.

Included below changes:
1. Implemented session based PowerScale API auth and enabled CSRF token.
2. Enabled configurable SSL cert verification.
3. Fixed SSL verification error for `delete_nfs_share` and `lookup_smb_share`.
4. Fixed mounted NFS share is inaccessible.
6. Fixed return type error for `create_share_from_snapshot`.
7. Fixed `Delete a share` did not remove the quota and the directory.
8. Improved logging and error handling.
9. Moved RESTAPI calls to modify NFS/SMB share access into class IsilonApi.

Closes-Bug: #2100829
Change-Id: I797df695943fbd2c3971cb8147a5992a2af67519
Signed-off-by: Yian Zong <yian.zong@dell.com>
This commit is contained in:
Yian Zong
2025-03-03 13:22:53 +00:00
parent 1d159679f2
commit f3581daa31
7 changed files with 1046 additions and 1120 deletions

View File

@@ -128,7 +128,7 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| EMC Unity | NFS (N) | NFS (Q) | CIFS (N) | \- | \- | NFS (N) | NFS (Q) | CIFS (N) | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| EMC Isilon | NFS,CIFS (K) | \- | CIFS (M) | \- | \- | NFS (M) | \- | CIFS (M) | \- | \- |
| EMC Isilon | NFS,CIFS (K) | NFS (F) | CIFS (M) | \- | \- | NFS (M) | \- | CIFS (M) | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| Dell EMC PowerStore | NFS (B) | \- | CIFS (B) | \- | \- | NFS (B) | \- | CIFS (B) | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+

View File

@@ -18,7 +18,7 @@ Platform API (PAPI) and the RESTful Access to Namespace API (RAN).
Requirements
~~~~~~~~~~~~
- Isilon cluster running OneFS 7.2 or higher
- PowerScale cluster running OneFS 9.10 or higher
Supported shared filesystems and operations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@@ -17,10 +17,10 @@
Isilon specific NAS backend plugin.
"""
import os
from oslo_config import cfg
from oslo_log import log
from oslo_utils import units
from requests.exceptions import HTTPError
from manila.common import constants as const
from manila import exception
@@ -28,33 +28,54 @@ from manila.i18n import _
from manila.share.drivers.dell_emc.plugins import base
from manila.share.drivers.dell_emc.plugins.isilon import isilon_api
"""Version history:
0.1.0 - Initial version
1.0.0 - Fix Http auth issue, SSL verification error and etc
"""
VERSION = "1.0.0"
CONF = cfg.CONF
VERSION = "0.1.0"
LOG = log.getLogger(__name__)
POWERSCALE_OPTS = [
cfg.StrOpt('powerscale_dir_permission',
default='0777',
help='Predefined ACL value or POSIX mode '
'for PowerScale directories.')
]
class IsilonStorageConnection(base.StorageConnection):
"""Implements Isilon specific functionality for EMC Manila driver."""
def __init__(self, *args, **kwargs):
super(IsilonStorageConnection, self).__init__(*args, **kwargs)
LOG.debug('Setting up attributes for Manila '
'Dell PowerScale Driver.')
if 'configuration' in kwargs:
kwargs['configuration'].append_config_values(POWERSCALE_OPTS)
self._server = None
self._port = None
self._username = None
self._password = None
self._server_url = None
self._connect_resp = None
self._root_dir = None
self._verify_ssl_cert = None
self._ssl_cert_path = None
self._containers = {}
self._shares = {}
self._snapshots = {}
self._isilon_api = None
self._isilon_api_class = isilon_api.IsilonApi
self.driver_handles_share_servers = False
self.dhss_mandatory_security_service_association = {}
self.ipv6_implemented = True
# props for share status update
self.reserved_percentage = None
self.reserved_snapshot_percentage = None
self.reserved_share_extend_percentage = None
self.max_over_subscription_ratio = None
def _get_container_path(self, share):
"""Return path to a container."""
@@ -62,6 +83,7 @@ class IsilonStorageConnection(base.StorageConnection):
def create_share(self, context, share, share_server):
"""Is called to create share."""
LOG.debug(f'Creating {share["share_proto"]} share.')
if share['share_proto'] == 'NFS':
location = self._create_nfs_share(share)
elif share['share_proto'] == 'CIFS':
@@ -82,7 +104,7 @@ class IsilonStorageConnection(base.StorageConnection):
def create_share_from_snapshot(self, context, share, snapshot,
share_server):
"""Creates a share from the snapshot."""
LOG.debug(f'Creating {share["share_proto"]} share from snapshot.')
# Create share at new location
location = self.create_share(context, share, share_server)
@@ -94,9 +116,11 @@ class IsilonStorageConnection(base.StorageConnection):
def _create_nfs_share(self, share):
"""Is called to create nfs share."""
LOG.debug(f'Creating NFS share {share["name"]}.')
# Create directory
container_path = self._get_container_path(share)
self._isilon_api.create_directory(container_path)
self._create_directory(container_path)
# Create nfs share
share_created = self._isilon_api.create_nfs_export(container_path)
if not share_created:
message = (
@@ -104,26 +128,53 @@ class IsilonStorageConnection(base.StorageConnection):
{'share': share['name']})
LOG.error(message)
raise exception.ShareBackendException(msg=message)
location = '{0}:{1}'.format(self._server, container_path)
location = self._get_location(self._format_nfs_path(container_path))
return location
def _create_cifs_share(self, share):
"""Is called to create cifs share."""
# Create the directory
LOG.debug(f'Creating CIFS share {share["name"]}.')
# Create directory
container_path = self._get_container_path(share)
self._isilon_api.create_directory(container_path)
self._isilon_api.create_smb_share(share['name'], container_path)
share_path = '\\\\{0}\\{1}'.format(self._server, share['name'])
return share_path
self._create_directory(container_path)
# Create smb share
share_created = self._isilon_api.create_smb_share(
share['name'], container_path)
if not share_created:
message = (
_('The requested CIFS share "%(share)s" was not created.') %
{'share': share['name']})
LOG.error(message)
raise exception.ShareBackendException(msg=message)
location = self._get_location(self._format_smb_path(share['name']))
return location
def _create_directory(self, path, recursive=False):
"""Is called to create a directory."""
dir_created = self._isilon_api.create_directory(path, recursive)
if not dir_created:
message = (
_('Failed to create directory "%(dir)s".') %
{'dir': path})
LOG.error(message)
raise exception.ShareBackendException(msg=message)
def create_snapshot(self, context, snapshot, share_server):
"""Is called to create snapshot."""
LOG.debug(f'Creating snapshot {snapshot["name"]}.')
snapshot_path = os.path.join(self._root_dir, snapshot['share_name'])
self._isilon_api.create_snapshot(snapshot['name'], snapshot_path)
snap_created = self._isilon_api.create_snapshot(
snapshot['name'], snapshot_path)
if not snap_created:
message = (
_('Failed to create snapshot "%(snap)s".') %
{'snap': snapshot['name']})
LOG.error(message)
raise exception.ShareBackendException(msg=message)
def delete_share(self, context, share, share_server):
"""Is called to remove share."""
LOG.debug(f'Deleting {share["share_proto"]} share.')
if share['share_proto'] == 'NFS':
self._delete_nfs_share(share)
elif share['share_proto'] == 'CIFS':
@@ -131,13 +182,48 @@ class IsilonStorageConnection(base.StorageConnection):
else:
message = (_('Unsupported share type: %(type)s.') %
{'type': share['share_proto']})
LOG.error(message)
raise exception.InvalidShare(reason=message)
LOG.warning(message)
return
dir_path = self._get_container_path(share)
# remove quota
self._delete_quota(dir_path)
# remove directory
self._delete_directory(dir_path)
def _delete_quota(self, path):
"""Is called to remove quota."""
quota = self._isilon_api.quota_get(path, 'directory')
if quota:
LOG.debug(f'Removing quota {quota["id"]}')
deleted = self._isilon_api.delete_quota(quota['id'])
if not deleted:
message = (
_('Failed to delete quota "%(quota_id)s" for '
'directory "%(dir)s".') %
{'quota_id': quota['id'], 'dir': path})
LOG.error(message)
else:
LOG.warning(f'Quota not found for {path}')
def _delete_directory(self, path):
"""Is called to remove directory."""
path_exist = self._isilon_api.is_path_existent(path)
if path_exist:
LOG.debug(f'Removing directory {path}')
deleted = self._isilon_api.delete_path(path, recursive=True)
if not deleted:
message = (
_('Failed to delete directory "%(dir)s".') %
{'dir': path})
LOG.error(message)
else:
LOG.warning(f'Directory not found for {path}')
def _delete_nfs_share(self, share):
"""Is called to remove nfs share."""
share_id = self._isilon_api.lookup_nfs_export(
self._root_dir + '/' + share['name'])
self._get_container_path(share))
if share_id is None:
lw = ('Attempted to delete NFS Share "%s", but the share does '
@@ -167,223 +253,89 @@ class IsilonStorageConnection(base.StorageConnection):
def delete_snapshot(self, context, snapshot, share_server):
"""Is called to remove snapshot."""
self._isilon_api.delete_snapshot(snapshot['name'])
LOG.debug(f'Deleting snapshot {snapshot["name"]}')
deleted = self._isilon_api.delete_snapshot(snapshot['name'])
if not deleted:
message = (
_('Failed to delete snapshot "%(snap)s".') %
{'snap': snapshot['name']})
LOG.error(message)
raise exception.ShareBackendException(msg=message)
def ensure_share(self, context, share, share_server):
"""Invoked to ensure that share is exported."""
def extend_share(self, share, new_size, share_server=None):
"""Extends a share."""
LOG.debug('Extending share %(name)s to %(size)sG.', {
'name': share['name'], 'size': new_size
})
new_quota_size = new_size * units.Gi
self._isilon_api.quota_set(
self._get_container_path(share), 'directory', new_quota_size)
def allow_access(self, context, share, access, share_server):
"""Allow access to the share."""
if share['share_proto'] == 'NFS':
self._nfs_allow_access(share, access)
elif share['share_proto'] == 'CIFS':
self._cifs_allow_access(share, access)
else:
message = _(
'Unsupported share protocol: %s. Only "NFS" and '
'"CIFS" are currently supported share protocols.') % share[
'share_proto']
LOG.error(message)
raise exception.InvalidShare(reason=message)
def _nfs_allow_access(self, share, access):
"""Allow access to nfs share."""
access_type = access['access_type']
if access_type != 'ip':
message = _('Only "ip" access type allowed for the NFS '
'protocol.')
LOG.error(message)
raise exception.InvalidShareAccess(reason=message)
export_path = self._get_container_path(share)
access_ip = access['access_to']
access_level = access['access_level']
share_id = self._isilon_api.lookup_nfs_export(export_path)
share_access_group = 'clients'
if access_level == const.ACCESS_LEVEL_RO:
share_access_group = 'read_only_clients'
# Get current allowed clients
export = self._get_existing_nfs_export(share_id)
current_clients = export[share_access_group]
# Format of ips could be '10.0.0.2', or '10.0.0.2, 10.0.0.0/24'
ips = list()
ips.append(access_ip)
ips.extend(current_clients)
export_params = {share_access_group: ips}
url = '{0}/platform/1/protocols/nfs/exports/{1}'.format(
self._server_url, share_id)
resp = self._isilon_api.request('PUT', url, data=export_params)
resp.raise_for_status()
def _cifs_allow_access(self, share, access):
access_type = access['access_type']
access_to = access['access_to']
access_level = access['access_level']
if access_type == 'ip':
access_ip = access['access_to']
self._cifs_allow_access_ip(access_ip, share, access_level)
elif access_type == 'user':
self._cifs_allow_access_user(access_to, share, access_level)
else:
message = _('Only "ip" and "user" access types allowed for '
'CIFS protocol.')
LOG.error(message)
raise exception.InvalidShareAccess(reason=message)
def _cifs_allow_access_ip(self, ip, share, access_level):
if access_level == const.ACCESS_LEVEL_RO:
message = _('Only RW Access allowed for CIFS Protocol when using '
'the "ip" access type.')
LOG.error(message)
raise exception.InvalidShareAccess(reason=message)
allowed_ip = 'allow:' + ip
smb_share = self._isilon_api.lookup_smb_share(share['name'])
host_acl = smb_share['host_acl']
if allowed_ip not in host_acl:
host_acl.append(allowed_ip)
data = {'host_acl': host_acl}
url = ('{0}/platform/1/protocols/smb/shares/{1}'
.format(self._server_url, share['name']))
r = self._isilon_api.request('PUT', url, data=data)
r.raise_for_status()
def _cifs_allow_access_user(self, user, share, access_level):
if access_level == const.ACCESS_LEVEL_RW:
smb_permission = isilon_api.SmbPermission.rw
elif access_level == const.ACCESS_LEVEL_RO:
smb_permission = isilon_api.SmbPermission.ro
else:
message = _('Only "RW" and "RO" access levels are supported.')
LOG.error(message)
raise exception.InvalidShareAccess(reason=message)
self._isilon_api.smb_permissions_add(share['name'], user,
smb_permission)
raise NotImplementedError()
def deny_access(self, context, share, access, share_server):
"""Deny access to the share."""
if share['share_proto'] == 'NFS':
self._nfs_deny_access(share, access)
elif share['share_proto'] == 'CIFS':
self._cifs_deny_access(share, access)
def _nfs_deny_access(self, share, access):
"""Deny access to nfs share."""
if access['access_type'] != 'ip':
return
denied_ip = access['access_to']
access_level = access['access_level']
share_access_group = 'clients'
if access_level == const.ACCESS_LEVEL_RO:
share_access_group = 'read_only_clients'
# Get list of currently allowed client ips
export_id = self._isilon_api.lookup_nfs_export(
self._get_container_path(share))
if export_id is None:
message = _('Share %s should have been created, but was not '
'found.') % share['name']
LOG.error(message)
raise exception.ShareBackendException(msg=message)
export = self._get_existing_nfs_export(export_id)
try:
clients = export[share_access_group]
except KeyError:
message = (_('Export %(export_name)s should have contained the '
'JSON key %(json_key)s, but this key was not found.')
% {'export_name': share['name'],
'json_key': share_access_group})
LOG.error(message)
raise exception.ShareBackendException(msg=message)
allowed_ips = set(clients)
if allowed_ips.__contains__(denied_ip):
allowed_ips.remove(denied_ip)
data = {share_access_group: list(allowed_ips)}
url = ('{0}/platform/1/protocols/nfs/exports/{1}'
.format(self._server_url, str(export_id)))
r = self._isilon_api.request('PUT', url, data=data)
r.raise_for_status()
def _get_existing_nfs_export(self, export_id):
export = self._isilon_api.get_nfs_export(export_id)
if export is None:
message = _('NFS share with export id %d should have been '
'created, but was not found.') % export_id
LOG.error(message)
raise exception.ShareBackendException(msg=message)
return export
def _cifs_deny_access(self, share, access):
access_type = access['access_type']
if access_type == 'ip':
self._cifs_deny_access_ip(access['access_to'], share)
elif access_type == 'user':
self._cifs_deny_access_user(share, access)
else:
message = _('Access type for CIFS deny access request was '
'"%(access_type)s". Only "user" and "ip" access types '
'are supported for CIFS protocol access.') % {
'access_type': access_type}
LOG.warning(message)
def _cifs_deny_access_ip(self, denied_ip, share):
"""Deny access to cifs share."""
share_json = self._isilon_api.lookup_smb_share(share['name'])
host_acl_list = share_json['host_acl']
allow_ip = 'allow:' + denied_ip
if allow_ip in host_acl_list:
host_acl_list.remove(allow_ip)
share_params = {"host_acl": host_acl_list}
url = ('{0}/platform/1/protocols/smb/shares/{1}'
.format(self._server_url, share['name']))
resp = self._isilon_api.request('PUT', url, data=share_params)
resp.raise_for_status()
def _cifs_deny_access_user(self, share, access):
self._isilon_api.smb_permissions_remove(share['name'], access[
'access_to'])
raise NotImplementedError()
def check_for_setup_error(self):
"""Check for setup error."""
def connect(self, emc_share_driver, context):
"""Connect to an Isilon cluster."""
self._server = emc_share_driver.configuration.safe_get(
"emc_nas_server")
self._port = (
int(emc_share_driver.configuration.safe_get("emc_nas_server_port"))
)
self._server_url = ('https://' + self._server + ':' +
str(self._port))
self._username = emc_share_driver.configuration.safe_get(
"emc_nas_login")
self._password = emc_share_driver.configuration.safe_get(
"emc_nas_password")
self._root_dir = emc_share_driver.configuration.safe_get(
"emc_nas_root_dir")
# TODO(Shaun Edwards): make verify ssl a config variable?
self._verify_ssl_cert = False
self._isilon_api = self._isilon_api_class(self._server_url, auth=(
self._username, self._password),
verify_ssl_cert=self._verify_ssl_cert)
LOG.debug('Reading configuration parameters for Manila'
' Dell PowerScale Driver.')
config = emc_share_driver.configuration
self._server = config.safe_get("emc_nas_server")
self._port = config.safe_get("emc_nas_server_port")
self._username = config.safe_get("emc_nas_login")
self._password = config.safe_get("emc_nas_password")
self._root_dir = config.safe_get("emc_nas_root_dir")
# validate IP, username and password
if not all([self._server,
self._username,
self._password]):
message = _("REST server IP, username and password"
" must be specified.")
raise exception.BadConfigurationException(reason=message)
self._server_url = f'https://{self._server}:{self._port}'
self._verify_ssl_cert = config.safe_get("emc_ssl_cert_verify")
if self._verify_ssl_cert:
self._ssl_cert_path = config.safe_get("emc_ssl_cert_path")
self._dir_permission = config.safe_get("powerscale_dir_permission")
self._isilon_api = isilon_api.IsilonApi(
self._server_url, self._username, self._password,
self._verify_ssl_cert, self._ssl_cert_path,
self._dir_permission)
if not self._isilon_api.is_path_existent(self._root_dir):
self._isilon_api.create_directory(self._root_dir, recursive=True)
self._create_directory(self._root_dir, recursive=True)
# configuration for share status update
self.reserved_percentage = config.safe_get(
'reserved_share_percentage')
if self.reserved_percentage is None:
self.reserved_percentage = 0
self.reserved_snapshot_percentage = config.safe_get(
'reserved_share_from_snapshot_percentage')
if self.reserved_snapshot_percentage is None:
self.reserved_snapshot_percentage = self.reserved_percentage
self.reserved_share_extend_percentage = config.safe_get(
'reserved_share_extend_percentage')
if self.reserved_share_extend_percentage is None:
self.reserved_share_extend_percentage = self.reserved_percentage
self.max_over_subscription_ratio = config.safe_get(
'max_over_subscription_ratio')
def update_share_stats(self, stats_dict):
"""TODO."""
@@ -407,6 +359,7 @@ class IsilonStorageConnection(base.StorageConnection):
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
"""Update share access."""
LOG.debug(f'Updaing access for share {share["name"]}.')
if share['share_proto'] == 'NFS':
state_map = self._update_access_nfs(share, access_rules)
if share['share_proto'] == 'CIFS':
@@ -433,17 +386,14 @@ class IsilonStorageConnection(base.StorageConnection):
self._get_container_path(share))
if export_id is None:
# share does not exist on backend (set all rules to error state)
message = _('Failed to update access for NFS share %s: '
'share not found.') % share['name']
LOG.error(message)
return rule_state_map
data = {
'clients': list(nfs_rw_ips),
'read_only_clients': list(nfs_ro_ips)
}
url = ('{0}/platform/1/protocols/nfs/exports/{1}'
.format(self._server_url, str(export_id)))
r = self._isilon_api.request('PUT', url, data=data)
try:
r.raise_for_status()
except HTTPError:
r = self._isilon_api.modify_nfs_export_access(
export_id, ro_ips=list(nfs_ro_ips), rw_ips=list(nfs_rw_ips))
if not r:
return rule_state_map
# if we finish the bulk rule update with no error set rules to active
@@ -452,65 +402,94 @@ class IsilonStorageConnection(base.StorageConnection):
return rule_state_map
def _update_access_cifs(self, share, access_rules):
"""Clear access on a CIFS share."""
cifs_ip_set = set()
users = set()
"""Update access on a CIFS share."""
rule_state_map = {}
ip_access_rules = []
user_access_rules = []
for rule in access_rules:
if rule['access_type'] == 'ip':
cifs_ip_set.add('allow:' + rule['access_to'])
ip_access_rules.append(rule)
elif rule['access_type'] == 'user':
users.add(rule['access_to'])
user_access_rules.append(rule)
else:
message = (_("Access type %(type)s is not supported for CIFS."
) % {'type': rule['access_type']})
LOG.error(message)
rule_state_map.update({rule['access_id']: {'state': 'error'}})
smb_share = self._isilon_api.lookup_smb_share(share['name'])
ips = self._get_cifs_ip_list(ip_access_rules, rule_state_map)
user_permissions = self._get_cifs_user_permissions(
user_access_rules, rule_state_map)
backend_smb_user_permissions = smb_share['permissions']
perms_to_remove = []
for perm in backend_smb_user_permissions:
if perm['trustee']['name'] not in users:
perms_to_remove.append(perm)
for perm in perms_to_remove:
backend_smb_user_permissions.remove(perm)
share_updated = self._isilon_api.modify_smb_share_access(
share['name'],
host_acl=ips,
permissions=user_permissions)
data = {
'host_acl': list(cifs_ip_set),
'permissions': backend_smb_user_permissions,
}
url = ('{0}/platform/1/protocols/smb/shares/{1}'
.format(self._server_url, share['name']))
r = self._isilon_api.request('PUT', url, data=data)
try:
r.raise_for_status()
except HTTPError:
# clear access rules failed so set all access rules to error state
rule_state_map = {}
if not share_updated:
message = (
_('Failed to update access rules for CIFS share "%(share)s".'
) % {'share': share['name']})
LOG.error(message)
for rule in access_rules:
rule_state_map[rule['access_id']] = {
'state': 'error'
}
return rule_state_map
# add access rules that don't exist on backend
rule_state_map = {}
for rule in access_rules:
rule_state_map[rule['access_id']] = {
'state': 'error'
}
try:
if rule['access_type'] == 'ip':
self._cifs_allow_access_ip(rule['access_to'], share,
rule['access_level'])
rule_state_map[rule['access_id']]['state'] = 'active'
elif rule['access_type'] == 'user':
backend_users = set()
for perm in backend_smb_user_permissions:
backend_users.add(perm['trustee']['name'])
if rule['access_to'] not in backend_users:
self._cifs_allow_access_user(
rule['access_to'], share, rule['access_level'])
rule_state_map[rule['access_id']]['state'] = 'active'
else:
continue
except exception.ManilaException:
pass
return rule_state_map
def _get_cifs_ip_list(self, access_rules, rule_state_map):
"""Get CIFS ip list."""
cifs_ips = []
for rule in access_rules:
if rule['access_level'] != const.ACCESS_LEVEL_RW:
message = ('Only RW access level is supported '
'for CIFS IP access.')
LOG.error(message)
rule_state_map.update({rule['access_id']: {'state': 'error'}})
continue
cifs_ips.append('allow:' + rule['access_to'])
rule_state_map.update({rule['access_id']: {'state': 'active'}})
return cifs_ips
def _get_cifs_user_permissions(self, access_rules, rule_state_map):
"""Get CIFS user permissions."""
cifs_user_permissions = []
for rule in access_rules:
if rule['access_level'] == const.ACCESS_LEVEL_RW:
smb_permission = isilon_api.SmbPermission.rw
elif rule['access_level'] == const.ACCESS_LEVEL_RO:
smb_permission = isilon_api.SmbPermission.ro
else:
message = ('Only RW and RO access levels are supported '
'for CIFS user access.')
LOG.error(message)
rule_state_map.update({rule['access_id']: {'state': 'error'}})
continue
user_sid = self._isilon_api.get_user_sid(rule['access_to'])
if user_sid:
cifs_user_permissions.append({
'permission': smb_permission.value,
'permission_type': 'allow',
'trustee': user_sid
})
rule_state_map.update({rule['access_id']: {'state': 'active'}})
else:
message = _('Failed to get user sid by %(user)s.' %
{'user': rule['access_to']})
LOG.error(message)
rule_state_map.update({rule['access_id']: {'state': 'error'}})
return cifs_user_permissions
def _format_smb_path(self, share_name):
return '\\\\{0}\\{1}'.format(self._server, share_name)
def _format_nfs_path(self, container_path):
return '{0}:{1}'.format(self._server, container_path)
def _get_location(self, path):
export_locations = [{'path': path,
'is_admin_only': False,
'metadata': {"preferred": True}}]
return export_locations

View File

@@ -13,7 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from enum import Enum
import enum
import functools
from oslo_log import log
from oslo_serialization import jsonutils
import requests
@@ -21,22 +24,72 @@ from manila import exception
from manila.i18n import _
LOG = log.getLogger(__name__)
class IsilonApi(object):
def __init__(self, api_url, auth, verify_ssl_cert=True):
def __init__(self, api_url, username, password,
verify_ssl_cert=False,
ssl_cert_path=None,
dir_permission=None):
self.host_url = api_url
self.session = requests.session()
self.session.auth = auth
self.username = username
self.password = password
self.verify_ssl_cert = verify_ssl_cert
self.certificate_path = ssl_cert_path
self.dir_permission = dir_permission
# Create session
self.session_token = None
self.csrf_token = None
LOG.debug("Login to PowerScale OneFS during initialization.")
login = self.create_session(username, password)
if not login:
message = _("Failed to login to PowerScale OneFS.")
raise exception.BadConfigurationException(reason=message)
@property
def _verify_cert(self):
verify_cert = self.verify_ssl_cert
if self.verify_ssl_cert and self.certificate_path:
verify_cert = self.certificate_path
return verify_cert
def create_session(self, username, password):
"""Create a session. Update session token and csrf token."""
headers = {"Content-type": "application/json"}
url = self.host_url + '/session/1/session'
data = {
"username": username,
"password": password,
"services": ["platform", "namespace"]
}
r = self.session.request(
'POST', url, headers=headers, data=jsonutils.dumps(data),
verify=self._verify_cert)
if r.status_code == requests.codes.created:
self.session_token = r.cookies['isisessid']
self.csrf_token = r.cookies['isicsrf']
return True
message = (_('Failed to create session. '
'Status_code="%(code)s", body="%(body)s".') %
{'code': r.status_code, 'body': r.text})
LOG.error(message)
return False
def create_directory(self, container_path, recursive=False):
"""Create a directory."""
headers = {"x-isi-ifs-target-type": "container"}
if self.dir_permission:
headers.update({"x-isi-ifs-access-control": self.dir_permission})
url = (self.host_url + "/namespace" + container_path + '?recursive='
+ str(recursive))
r = self.request('PUT', url,
headers=headers)
r = self.send_put_request(url, headers=headers)
return r.status_code == 200
def clone_snapshot(self, snapshot_name, fq_target_dir):
@@ -75,18 +128,18 @@ class IsilonApi(object):
snapshot_suffix = '&snapshot=' + snapshot_name
url = (self.host_url + '/namespace' + fq_dest_path + '?clone=true' +
snapshot_suffix)
self.request('PUT', url, headers=headers)
self.send_put_request(url, headers=headers)
def get_directory_listing(self, fq_dir_path):
url = self.host_url + '/namespace' + fq_dir_path + '?detail=default'
r = self.request('GET', url)
r = self.send_get_request(url)
r.raise_for_status()
return r.json()
def is_path_existent(self, resource_path):
url = self.host_url + '/namespace' + resource_path
r = self.request('HEAD', url)
r = self.send_head_request(url)
if r.status_code == 200:
return True
elif r.status_code == 404:
@@ -95,9 +148,9 @@ class IsilonApi(object):
r.raise_for_status()
def get_snapshot(self, snapshot_name):
r = self.request('GET',
self.host_url + '/platform/1/snapshot/snapshots/' +
snapshot_name)
r = self.send_get_request(
self.host_url + '/platform/1/snapshot/snapshots/' +
snapshot_name)
snapshot_json = r.json()
if r.status_code == 200:
return snapshot_json['snapshots'][0]
@@ -107,36 +160,33 @@ class IsilonApi(object):
r.raise_for_status()
def get_snapshots(self):
r = self.request('GET',
self.host_url + '/platform/1/snapshot/snapshots')
r = self.send_get_request(
self.host_url + '/platform/1/snapshot/snapshots')
if r.status_code == 200:
return r.json()
else:
r.raise_for_status()
def lookup_nfs_export(self, share_path):
response = self.session.get(
self.host_url + '/platform/1/protocols/nfs/exports',
verify=self.verify_ssl_cert)
nfs_exports_json = response.json()
for export in nfs_exports_json['exports']:
for path in export['paths']:
if path == share_path:
return export['id']
'''Retrieve NFS export by directory path.'''
r = self.send_get_request(
self.host_url + '/platform/12/protocols/nfs/exports',
params={'path': share_path})
if r.status_code == 200 and r.json()['total'] > 0:
return r.json()['exports'][0]['id']
return None
def get_nfs_export(self, export_id):
response = self.request('GET',
self.host_url +
'/platform/1/protocols/nfs/exports/' +
str(export_id))
response = self.send_get_request(
self.host_url + '/platform/1/protocols/nfs/exports/' +
str(export_id))
if response.status_code == 200:
return response.json()['exports'][0]
else:
return None
def lookup_smb_share(self, share_name):
response = self.session.get(
response = self.send_get_request(
self.host_url + '/platform/1/protocols/smb/shares/' + share_name)
if response.status_code == 200:
return response.json()['shares'][0]
@@ -152,9 +202,31 @@ class IsilonApi(object):
data = {'paths': [export_path]}
url = self.host_url + '/platform/1/protocols/nfs/exports'
response = self.request('POST', url, data=data)
response = self.send_post_request(url, data=data)
return response.status_code == 201
def modify_nfs_export_access(self, share_id, ro_ips=None, rw_ips=None):
"""Modify access on an existing NFS export.
:param share_id: the ID of the NFS export
:param ro_ips: a list of IP addresses that should have read-only
access
:param rw_ips: a list of IP addresses that should have read-write
access
:return: a boolean indicating whether the modification was successful
"""
export_params = {}
if ro_ips is not None:
export_params['read_only_clients'] = ro_ips
if rw_ips is not None:
export_params['clients'] = rw_ips
url = '{0}/platform/1/protocols/nfs/exports/{1}'.format(
self.host_url, share_id)
resp = self.send_put_request(url, data=export_params)
return resp.status_code == 204
def create_smb_share(self, share_name, share_path):
"""Creates an SMB/CIFS share.
@@ -168,45 +240,42 @@ class IsilonApi(object):
data['name'] = share_name
data['path'] = share_path
url = self.host_url + '/platform/1/protocols/smb/shares'
response = self.request('POST', url, data=data)
response = self.send_post_request(url, data=data)
return response.status_code == 201
def create_snapshot(self, snapshot_name, snapshot_path):
"""Creates a snapshot."""
data = {'name': snapshot_name, 'path': snapshot_path}
r = self.request('POST',
self.host_url + '/platform/1/snapshot/snapshots',
data=data)
if r.status_code == 201:
return True
else:
r.raise_for_status()
r = self.send_post_request(
self.host_url + '/platform/1/snapshot/snapshots',
data=data)
return r.status_code == 201
def delete(self, fq_resource_path, recursive=False):
def delete_path(self, fq_resource_path, recursive=False):
"""Deletes a file or folder."""
r = self.request('DELETE',
self.host_url + '/namespace' + fq_resource_path +
'?recursive=' + str(recursive))
r.raise_for_status()
r = self.send_delete_request(
self.host_url + '/namespace' + fq_resource_path +
'?recursive=' + str(recursive))
return r.status_code == 204
def delete_nfs_share(self, share_number):
response = self.session.delete(
response = self.send_delete_request(
self.host_url + '/platform/1/protocols/nfs/exports' + '/' +
str(share_number))
return response.status_code == 204
def delete_smb_share(self, share_name):
url = self.host_url + '/platform/1/protocols/smb/shares/' + share_name
response = self.request('DELETE', url)
response = self.send_delete_request(url)
return response.status_code == 204
def delete_snapshot(self, snapshot_name):
response = self.request(
'DELETE', '{0}/platform/1/snapshot/snapshots/{1}'
response = self.send_delete_request(
'{0}/platform/1/snapshot/snapshots/{1}'
.format(self.host_url, snapshot_name))
response.raise_for_status()
return response.status_code == 204
def quota_create(self, path, quota_type, size):
thresholds = {'hard': size}
@@ -218,14 +287,13 @@ class IsilonApi(object):
'enforced': True,
'thresholds': thresholds,
}
response = self.request(
'POST', '{0}/platform/1/quota/quotas'.format(self.host_url),
response = self.send_post_request(
'{0}/platform/1/quota/quotas'.format(self.host_url),
data=data)
response.raise_for_status()
def quota_get(self, path, quota_type):
response = self.request(
'GET',
response = self.send_get_request(
'{0}/platform/1/quota/quotas?path={1}'.format(self.host_url, path),
)
if response.status_code == 404:
@@ -247,8 +315,7 @@ class IsilonApi(object):
def quota_modify_size(self, quota_id, new_size):
data = {'thresholds': {'hard': new_size}}
response = self.request(
'PUT',
response = self.send_put_request(
'{0}/platform/1/quota/quotas/{1}'.format(self.host_url, quota_id),
data=data
)
@@ -264,73 +331,105 @@ class IsilonApi(object):
quota_id = quota_json['id']
self.quota_modify_size(quota_id, size)
def smb_permissions_add(self, share_name, user, smb_permission):
smb_share = self.lookup_smb_share(share_name)
permissions = smb_share['permissions']
def delete_quota(self, quota_id):
response = self.send_delete_request(
'{0}/platform/1/quota/quotas/{1}'.format(self.host_url, quota_id))
return response.status_code == 204
# lookup given user string
def modify_smb_share_access(self, share_name,
host_acl=None, permissions=None):
"""Modifies SMB share access
:param share_name: the name of the SMB share
:param host_acl: host access control list
:param permissions: SMB permissions
:return: "True" if access updated successfully; otherwise "False"
"""
data = {}
if host_acl is not None:
data['host_acl'] = host_acl
if permissions is not None:
data['permissions'] = permissions
url = ('{0}/platform/1/protocols/smb/shares/{1}'
.format(self.host_url, share_name))
r = self.send_put_request(url, data=data)
return r.status_code == 204
def get_user_sid(self, user):
user_json = self.auth_lookup_user(user)
auth_mappings = user_json['mapping']
if len(auth_mappings) > 1:
message = (_('More than one mapping found for user "%(user)s".')
% {'user': user})
raise exception.ShareBackendException(msg=message)
user_sid = auth_mappings[0]['user']['sid']
new_permission = {
'permission': smb_permission.value,
'permission_type': 'allow',
'trustee': user_sid
}
url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.host_url, share_name)
new_permissions = list(permissions)
new_permissions.append(new_permission)
data = {'permissions': new_permissions}
r = self.request('PUT', url, data=data)
r.raise_for_status()
def smb_permissions_remove(self, share_name, user):
smb_share = self.lookup_smb_share(share_name)
permissions = smb_share['permissions']
# find the perm to remove
perm_to_remove = None
for perm in list(permissions):
if perm['trustee']['name'] == user:
perm_to_remove = perm
if perm_to_remove is not None:
permissions.remove(perm)
else:
message = _('Attempting to remove permission for user "%(user)s", '
'but this user was not found in the share\'s '
'(%(share)s) permissions list.') % {'user': user,
'share': smb_share}
raise exception.ShareBackendException(msg=message)
self.request('PUT', '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.host_url, share_name), data={'permissions': permissions})
if user_json:
auth_mappings = user_json['mapping']
if len(auth_mappings) > 1:
message = (_('More than one mapping found for user "%(user)s".'
) % {'user': user})
LOG.error(message)
return None
user_sid = auth_mappings[0]['user']['sid']
return user_sid
def auth_lookup_user(self, user_string):
url = '{0}/platform/1/auth/mapping/users/lookup'.format(self.host_url)
r = self.request('GET', url, params={"user": user_string})
if r.status_code == 404:
raise exception.ShareBackendException(msg='user not found')
elif r.status_code != 200:
r.raise_for_status()
return r.json()
r = self.send_get_request(url, params={"user": user_string})
if r.status_code == 200:
return r.json()
LOG.error(f'Failed to lookup user {user_string}.')
def request(self, method, url, headers=None, data=None, params=None):
if data is not None:
data = jsonutils.dumps(data)
r = self.session.request(method, url, headers=headers, data=data,
verify=self.verify_ssl_cert, params=params)
cookies = {'isisessid': self.session_token}
csrf_headers = {'X-CSRF-Token': self.csrf_token,
'referer': self.host_url}
if headers:
headers.update(csrf_headers)
else:
headers = csrf_headers
self._log_request(method, url, data, params)
r = self.session.request(
method, url, cookies=cookies, headers=headers, data=data,
verify=self._verify_cert, params=params)
self._log_response(r)
# Unauthorized, login again
if r.status_code == 401:
login = self.create_session(self.username, self.password)
# Resend the request once login is successful
if login:
self._log_request(method, url, data, params)
r = self.session.request(
method, url, cookies=cookies, headers=headers, data=data,
verify=self._verify_cert, params=params)
self._log_response(r)
return r
def _log_request(self, method, url, data=None, params=None):
req_dict = {}
if data:
req_dict['data'] = data
if params:
req_dict['params'] = params
if req_dict:
LOG.debug(f'Request: {method} {url} {req_dict}')
else:
LOG.debug(f'Request: {method} {url}')
class SmbPermission(Enum):
def _log_response(self, r):
try:
body = r.json()
except requests.exceptions.JSONDecodeError:
body = r.text
LOG.debug(f'Response: status_code={r.status_code} body={body}')
send_get_request = functools.partialmethod(request, "GET")
send_post_request = functools.partialmethod(request, "POST")
send_put_request = functools.partialmethod(request, "PUT")
send_delete_request = functools.partialmethod(request, "DELETE")
send_head_request = functools.partialmethod(request, "HEAD")
class SmbPermission(enum.Enum):
full = 'full'
rw = 'change'
ro = 'read'

View File

@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from oslo_serialization import jsonutils as json
import requests
@@ -26,15 +28,78 @@ from manila import test
@ddt.ddt
class IsilonApiTest(test.TestCase):
def setUp(self):
@mock.patch('manila.share.drivers.dell_emc.plugins.isilon.'
'isilon_api.IsilonApi.create_session')
def setUp(self, mockup_create_session):
super(IsilonApiTest, self).setUp()
mockup_create_session.return_value = True
self._mock_url = 'https://localhost:8080'
_mock_auth = ('admin', 'admin')
self.username = 'admin'
self.password = 'pwd'
self.dir_permission = '0777'
self.isilon_api = isilon_api.IsilonApi(
self._mock_url, _mock_auth
self._mock_url, self.username, self.password,
dir_permission=self.dir_permission
)
@mock.patch('manila.share.drivers.dell_emc.plugins.isilon.'
'isilon_api.IsilonApi.create_session')
def test__init__login_failure(self, mockup_create_session):
mockup_create_session.return_value = False
self.assertRaises(
exception.BadConfigurationException,
self.isilon_api.__init__,
self._mock_url,
self.username,
self.password,
False,
None,
self.dir_permission
)
def test__verify_cert(self):
verify_cert = self.isilon_api.verify_ssl_cert
certificate_path = self.isilon_api.certificate_path
self.isilon_api.verify_ssl_cert = True
self.isilon_api.certificate_path = "fake_certificate_path"
self.assertEqual(self.isilon_api._verify_cert,
self.isilon_api.certificate_path)
self.isilon_api.verify_ssl_cert = verify_cert
self.isilon_api.certificate_path = certificate_path
@mock.patch('requests.Session.request')
def test_create_session_success(self, mock_request):
mock_response = mock.Mock()
mock_response.status_code = 201
mock_response.cookies = {'isisessid': 'test_session_token',
'isicsrf': 'test_csrf_token'}
mock_request.return_value = mock_response
result = self.isilon_api.create_session(self.username, self.password)
mock_request.assert_called_once_with(
'POST', self._mock_url + '/session/1/session',
headers={"Content-type": "application/json"},
data=json.dumps({"username": self.username,
"password": self.password,
"services": ["platform", "namespace"]}),
verify=False
)
self.assertTrue(result)
self.assertEqual(self.isilon_api.session_token, 'test_session_token')
self.assertEqual(self.isilon_api.csrf_token, 'test_csrf_token')
@mock.patch('requests.Session.request')
def test_create_session_failure(self, mock_request):
mock_response = mock.Mock()
mock_response.status_code = 401
mock_response.json.return_value = {
'message': 'Username or password is incorrect.'}
mock_request.return_value = mock_response
result = self.isilon_api.create_session(self.username, self.password)
self.assertFalse(result)
self.assertIsNone(self.isilon_api.session_token)
self.assertIsNone(self.isilon_api.csrf_token)
@ddt.data(False, True)
def test_create_directory(self, is_recursive):
with requests_mock.Mocker() as m:
@@ -50,6 +115,22 @@ class IsilonApiTest(test.TestCase):
request = m.request_history[0]
self._verify_dir_creation_request(request, path, is_recursive)
def test_create_directory_no_permission(self):
with requests_mock.Mocker() as m:
path = '/ifs/test'
self.isilon_api.dir_permission = None
self.assertEqual(0, len(m.request_history))
self._add_create_directory_response(m, path, True)
r = self.isilon_api.create_directory(path,
recursive=True)
self.isilon_api.dir_permission = '0777'
self.assertTrue(r)
self.assertEqual(1, len(m.request_history))
request = m.request_history[0]
self.assertNotIn("x-isi-ifs-access-control", request.headers)
@requests_mock.mock()
def test_clone_snapshot(self, m):
snapshot_name = 'snapshot01'
@@ -255,16 +336,19 @@ class IsilonApiTest(test.TestCase):
@ddt.data(
('/ifs/home/admin',
'{"exports": [{"id": 42, "paths": ["/ifs/home/admin"]}]}', 42),
'{"exports": [{"id": 42, "paths": ["/ifs/home/admin"]}], "total": 1}',
42),
('/ifs/home/test',
'{"exports": [{"id": 42, "paths": ["/ifs/home/admin"]}]}', None)
'{"exports": [], "total": 0}', None)
)
def test_lookup_nfs_export(self, data):
share_path, response_json, expected_return = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
m.get('{0}/platform/1/protocols/nfs/exports'
.format(self._mock_url), json=json.loads(response_json))
m.get('{0}/platform/12/protocols/nfs/exports?path={1}'
.format(self._mock_url,
share_path.replace('/', '%2F')),
json=json.loads(response_json))
r = self.isilon_api.lookup_nfs_export(share_path)
@@ -395,31 +479,31 @@ class IsilonApiTest(test.TestCase):
m.post(self._mock_url + '/platform/1/snapshot/snapshots',
status_code=404)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.create_snapshot,
snapshot_name, snapshot_path)
self.assertEqual(1, len(m.request_history))
self.assertEqual(
self.isilon_api.create_snapshot(snapshot_name, snapshot_path),
False
)
@ddt.data(True, False)
def test_delete(self, is_recursive_delete):
def test_delete_path(self, is_recursive_delete):
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
fq_path = '/ifs/home/admin/test'
m.delete(self._mock_url + '/namespace' + fq_path + '?recursive='
+ str(is_recursive_delete), status_code=204)
self.isilon_api.delete(fq_path, recursive=is_recursive_delete)
self.isilon_api.delete_path(fq_path, recursive=is_recursive_delete)
self.assertEqual(1, len(m.request_history))
@requests_mock.mock()
def test_delete_error_case(self, m):
def test_delete_path_error_case(self, m):
fq_path = '/ifs/home/admin/test'
m.delete(self._mock_url + '/namespace' + fq_path + '?recursive=False',
status_code=403)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.delete, fq_path, recursive=False)
self.assertEqual(self.isilon_api.delete_path(fq_path, recursive=False),
False)
@ddt.data((204, True), (404, False))
def test_delete_nfs_share(self, data):
@@ -467,8 +551,8 @@ class IsilonApiTest(test.TestCase):
m.delete(self._mock_url + '/platform/1/snapshot/snapshots/my_snapshot',
status_code=403)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.delete_snapshot, "my_snapshot")
self.assertEqual(
self.isilon_api.delete_snapshot("my_snapshot"), False)
@requests_mock.mock()
def test_quota_create(self, m):
@@ -613,177 +697,34 @@ class IsilonApiTest(test.TestCase):
)
self.assertEqual(400, e.response.status_code)
@ddt.data(
('foouser', isilon_api.SmbPermission.rw),
('testuser', isilon_api.SmbPermission.ro),
)
def test_smb_permission_add(self, data):
user, smb_permission = data
share_name = 'testshare'
with requests_mock.mock() as m:
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
share_data = {
'shares': [
{'permissions': []}
]
def test_get_user_sid_success(self):
sid = {"id": "SID:S-1-22-1-0",
"name": "foo",
"type": "user"}
self.isilon_api.auth_lookup_user = mock.MagicMock(
return_value={
"mapping": [{"user": {"sid": sid}}]
}
m.get(papi_share_url, status_code=200, json=share_data)
)
expected_sid = self.isilon_api.get_user_sid('foo')
self.assertEqual(expected_sid, sid)
auth_url = ('{0}/platform/1/auth/mapping/users/lookup?user={1}'
''.format(self._mock_url, user))
example_sid = 'SID:S-1-5-21'
sid_json = {
'id': example_sid,
'name': user,
'type': 'user'
def test_get_user_sid_wrong_mappings(self):
self.isilon_api.auth_lookup_user = mock.MagicMock(
return_value={
"mapping": [{"user": {"sid": 'fake_sid1'}},
{"user": {"sid": 'fake_sid2'}}]
}
auth_json = {'mapping': [
{'user': {'sid': sid_json}}
]}
m.get(auth_url, status_code=200, json=auth_json)
m.put(papi_share_url)
)
expected_sid = self.isilon_api.get_user_sid('foo')
self.assertIsNone(expected_sid)
self.isilon_api.smb_permissions_add(share_name, user,
smb_permission)
perms_put_request = m.request_history[2]
expected_perm_request_json = {
'permissions': [
{'permission': smb_permission.value,
'permission_type': 'allow',
'trustee': sid_json
}
]
}
self.assertEqual(expected_perm_request_json,
json.loads(perms_put_request.body))
@requests_mock.mock()
def test_smb_permission_add_with_multiple_users_found(self, m):
user = 'foouser'
smb_permission = isilon_api.SmbPermission.rw
share_name = 'testshare'
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
share_data = {
'shares': [
{'permissions': []}
]
}
m.get(papi_share_url, status_code=200, json=share_data)
auth_url = ('{0}/platform/1/auth/mapping/users/lookup?user={1}'
''.format(self._mock_url, user))
example_sid = 'SID:S-1-5-21'
sid_json = {
'id': example_sid,
'name': user,
'type': 'user'
}
auth_json = {'mapping': [
{'user': {'sid': sid_json}},
{'user': {'sid': sid_json}},
]}
m.get(auth_url, status_code=200, json=auth_json)
m.put(papi_share_url)
self.assertRaises(exception.ShareBackendException,
self.isilon_api.smb_permissions_add,
share_name, user, smb_permission)
@requests_mock.mock()
def test_smb_permission_remove(self, m):
share_name = 'testshare'
user = 'testuser'
share_data = {
'permissions': [{
'permission': 'change',
'permission_type': 'allow',
'trustee': {
'id': 'SID:S-1-5-21',
'name': user,
'type': 'user',
}
}]
}
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
num_existing_perms = len(self.isilon_api.lookup_smb_share(share_name))
self.assertEqual(1, num_existing_perms)
m.put(papi_share_url)
self.isilon_api.smb_permissions_remove(share_name, user)
smb_put_request = m.request_history[2]
expected_body = {'permissions': []}
expected_body = json.dumps(expected_body)
self.assertEqual(expected_body, smb_put_request.body)
@requests_mock.mock()
def test_smb_permission_remove_with_multiple_existing_perms(self, m):
share_name = 'testshare'
user = 'testuser'
foouser_perms = {
'permission': 'change',
'permission_type': 'allow',
'trustee': {
'id': 'SID:S-1-5-21',
'name': 'foouser',
'type': 'user',
}
}
user_perms = {
'permission': 'change',
'permission_type': 'allow',
'trustee': {
'id': 'SID:S-1-5-22',
'name': user,
'type': 'user',
}
}
share_data = {
'permissions': [
foouser_perms,
user_perms,
]
}
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
num_existing_perms = len(self.isilon_api.lookup_smb_share(
share_name)['permissions'])
self.assertEqual(2, num_existing_perms)
m.put(papi_share_url)
self.isilon_api.smb_permissions_remove(share_name, user)
smb_put_request = m.request_history[2]
expected_body = {'permissions': [foouser_perms]}
expected_body = json.dumps(expected_body)
self.assertEqual(json.loads(expected_body),
json.loads(smb_put_request.body))
@requests_mock.mock()
def test_smb_permission_remove_with_empty_perms_list(self, m):
share_name = 'testshare'
user = 'testuser'
share_data = {'permissions': []}
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
m.put(papi_share_url)
self.assertRaises(exception.ShareBackendException,
self.isilon_api.smb_permissions_remove,
share_name, user)
def test_get_user_sid_user_not_found(self):
self.isilon_api.auth_lookup_user = mock.MagicMock(
return_value=None
)
expected_sid = self.isilon_api.get_user_sid('foo')
self.assertIsNone(expected_sid)
@requests_mock.mock()
def test_auth_lookup_user(self, m):
@@ -812,8 +753,7 @@ class IsilonApiTest(test.TestCase):
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
self._mock_url, user)
m.get(auth_url, status_code=404)
self.assertRaises(exception.ShareBackendException,
self.isilon_api.auth_lookup_user, user)
self.assertIsNone(self.isilon_api.auth_lookup_user(user))
@requests_mock.mock()
def test_auth_lookup_user_with_backend_error(self, m):
@@ -821,8 +761,7 @@ class IsilonApiTest(test.TestCase):
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
self._mock_url, user)
m.get(auth_url, status_code=400)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.auth_lookup_user, user)
self.assertIsNone(self.isilon_api.auth_lookup_user(user))
def _add_create_directory_response(self, m, path, is_recursive):
url = '{0}/namespace{1}?recursive={2}'.format(
@@ -854,6 +793,9 @@ class IsilonApiTest(test.TestCase):
self.assertIn("x-isi-ifs-target-type", request.headers)
self.assertEqual("container",
request.headers['x-isi-ifs-target-type'])
self.assertIn("x-isi-ifs-access-control", request.headers)
self.assertEqual(self.dir_permission,
request.headers['x-isi-ifs-access-control'])
def _verify_clone_file_from_snapshot(
self, request, fq_file_path, fq_dest_path, snapshot_name):
@@ -865,3 +807,120 @@ class IsilonApiTest(test.TestCase):
self.assertIn("x-isi-ifs-copy-source", request.headers)
self.assertEqual('/namespace' + fq_file_path,
request.headers['x-isi-ifs-copy-source'])
def test_modify_nfs_export_access_success(self):
self.isilon_api.send_put_request = mock.MagicMock()
share_id = '123'
ro_ips = ['10.0.0.1', '10.0.0.2']
rw_ips = ['10.0.0.3', '10.0.0.4']
self.isilon_api.modify_nfs_export_access(share_id, ro_ips, rw_ips)
expected_url = '{0}/platform/1/protocols/nfs/exports/{1}'.format(
self.isilon_api.host_url, share_id)
expected_data = {'read_only_clients': ro_ips, 'clients': rw_ips}
self.isilon_api.send_put_request.assert_called_once_with(
expected_url, data=expected_data)
def test_modify_nfs_export_access_no_ro_ips(self):
self.isilon_api.send_put_request = mock.MagicMock()
share_id = '123'
rw_ips = ['10.0.0.3', '10.0.0.4']
self.isilon_api.modify_nfs_export_access(share_id, None, rw_ips)
expected_url = '{0}/platform/1/protocols/nfs/exports/{1}'.format(
self.isilon_api.host_url, share_id)
expected_data = {'clients': rw_ips}
self.isilon_api.send_put_request.assert_called_once_with(
expected_url, data=expected_data)
def test_modify_nfs_export_access_no_rw_ips(self):
self.isilon_api.send_put_request = mock.MagicMock()
share_id = '123'
ro_ips = ['10.0.0.1', '10.0.0.2']
self.isilon_api.modify_nfs_export_access(share_id, ro_ips, None)
expected_url = '{0}/platform/1/protocols/nfs/exports/{1}'.format(
self.isilon_api.host_url, share_id)
expected_data = {'read_only_clients': ro_ips}
self.isilon_api.send_put_request.assert_called_once_with(
expected_url, data=expected_data)
@mock.patch('requests.Session.request')
def test_request_with_401_response(self, mock_request):
"""Test sending a request with a 401 Unauthorized response."""
mock_request.return_value.status_code = 401
self.isilon_api.create_session = mock.MagicMock(return_value=True)
self.isilon_api.request('GET', 'http://example.com/api/data')
self.assertEqual(mock_request.call_count, 2)
def test_delete_quota_sends_delete_request(self):
self.isilon_api.send_delete_request = mock.MagicMock()
quota_id = '123'
self.isilon_api.delete_quota(quota_id)
self.isilon_api.send_delete_request.assert_called_once_with(
'{0}/platform/1/quota/quotas/{1}'.format(
self.isilon_api.host_url, quota_id)
)
def test_delete_quota_raises_exception_on_error(self):
quota_id = '123'
self.isilon_api.send_delete_request = mock.MagicMock(
side_effect=requests.exceptions.HTTPError)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.delete_quota,
quota_id)
def test_modify_smb_share_access_with_host_acl_and_smb_permission(self):
self.isilon_api.send_put_request = mock.MagicMock()
share_name = 'my_share'
host_acl = 'host1,host2'
smb_permission = 'read'
self.isilon_api.modify_smb_share_access(
share_name, host_acl, smb_permission)
expected_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.isilon_api.host_url, share_name)
expected_data = {'host_acl': host_acl, 'permissions': smb_permission}
self.isilon_api.send_put_request.assert_called_with(
expected_url, data=expected_data)
def test_modify_smb_share_access_with_host_acl_only(self):
self.isilon_api.send_put_request = mock.MagicMock()
share_name = 'my_share'
host_acl = 'host1,host2'
self.isilon_api.modify_smb_share_access(share_name, host_acl)
expected_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.isilon_api.host_url, share_name)
expected_data = {'host_acl': host_acl}
self.isilon_api.send_put_request.assert_called_with(
expected_url, data=expected_data)
def test_modify_smb_share_access_with_smb_permission_only(self):
self.isilon_api.send_put_request = mock.MagicMock()
share_name = 'my_share'
smb_permission = 'read'
self.isilon_api.modify_smb_share_access(
share_name, permissions=smb_permission)
expected_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.isilon_api.host_url, share_name)
expected_data = {'permissions': smb_permission}
self.isilon_api.send_put_request.assert_called_with(
expected_url, data=expected_data)
def test_modify_smb_share_access_with_no_arguments(self):
self.isilon_api.send_put_request = mock.MagicMock()
share_name = 'my_share'
self.isilon_api.modify_smb_share_access(share_name)
expected_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.isilon_api.host_url, share_name)
expected_data = {}
self.isilon_api.send_put_request.assert_called_with(
expected_url, data=expected_data)
def test_modify_smb_share_access_with_http_error(self):
self.isilon_api.send_put_request = mock.MagicMock(
side_effect=requests.exceptions.HTTPError
)
share_name = 'my_share'
host_acl = 'host1,host2'
smb_permission = 'read'
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.modify_smb_share_access,
share_name, host_acl, smb_permission)

View File

@@ -0,0 +1,13 @@
---
fixes:
- |
Dell PowerScale Driver `Bug #2100829
<https://bugs.launchpad.net/manila/+bug/2100829>`_:
* Fixed SSL verification error for `Delete NFS share`, `Delete CIFS Share`
and `Allow CIFS share access`.
* Fixed mounted NFS share is inaccessible.
* Fixed return type error for `Create Share from snapshot`.
* Fixed `Delete a share` did not remove the quota and the directory.
* Implemented session based PowerScale API auth and enabled CSRF token.
* Enabled configurable SSL cert verification.