Merge "Manage/unmanage for CephFS drivers"

This commit is contained in:
Zuul 2024-08-30 06:46:37 +00:00 committed by Gerrit Code Review
commit a2a6f278f3
4 changed files with 691 additions and 125 deletions

View File

@ -16,6 +16,7 @@
import ipaddress
import json
import math
import re
import socket
import sys
@ -144,7 +145,7 @@ cephfs_opts = [
"ensure all of the shares it has created during "
"startup. Ensuring would re-export shares and this "
"action isn't always required, unless something has "
"been administratively modified on CephFS.")
"been administratively modified on CephFS."),
]
cephfsnfs_opts = [
@ -241,6 +242,7 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
self._ceph_mon_version = None
self.configuration.append_config_values(cephfs_opts)
self.configuration.append_config_values(cephfsnfs_opts)
self.private_storage = kwargs.get('private_storage')
try:
int(self.configuration.cephfs_volume_mode, 8)
@ -333,17 +335,36 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
"""
return gigs * units.Gi
def _get_export_locations(self, share):
def _get_subvolume_name(self, share_id):
try:
subvolume_name = self.private_storage.get(
share_id, "subvolume_name")
except Exception:
return share_id
# Subvolume name could be None, so in case it is, return share_id
return subvolume_name or share_id
def _get_subvolume_snapshot_name(self, snapshot_id):
try:
subvolume_snapshot_name = self.private_storage.get(
snapshot_id, "subvolume_snapshot_name"
)
except Exception:
return snapshot_id
return subvolume_snapshot_name or snapshot_id
def _get_export_locations(self, share, subvolume_name=None):
"""Get the export location for a share.
:param share: a manila share.
:return: the export location for a share.
"""
subvolume_name = subvolume_name or share["id"]
# get path of FS subvolume/share
argdict = {
"vol_name": self.volname,
"sub_name": share["id"]
"sub_name": subvolume_name
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
@ -490,14 +511,174 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
return self._get_export_locations(share)
def _need_to_cancel_clone(self, share):
def _get_subvolume_size_in_gb(self, subvolume_size):
"""Returns the size of the subvolume in GB."""
# There is a chance that we would end up with 2.5gb for example, so
# we round it up
return int(math.ceil(int(subvolume_size) / units.Gi))
def manage_existing(self, share, driver_options):
# bring FS subvolume/share under manila management
LOG.debug("[%(be)s]: manage_existing: id=%(id)s.",
{"be": self.backend_name, "id": share['id']})
# Subvolume name must be provided.
subvolume_name = share['export_locations'][0]['path']
if not subvolume_name:
raise exception.ShareBackendException(
"The subvolume name must be provided as a 'export_path' while "
"managing shares.")
argdict = {
"vol_name": self.volname,
"sub_name": subvolume_name,
}
subvolume_info = {}
# Try to get the subvolume info in the ceph backend
try:
subvolume_info = rados_command(
self.rados_client, "fs subvolume info", argdict, json_obj=True)
except exception.ShareBackendException as e:
# Couldn't find a subvolume with the name provided.
if 'does not exist' in str(e).lower():
msg = ("Subvolume %(subvol)s cannot be found on the "
"backend." % {'subvol': subvolume_name})
raise exception.ShareBackendException(msg=msg)
# Check if share mode matches
if subvolume_info.get('mode') != self._cephfs_volume_mode:
LOG.info("Subvolume %(subvol)s mode is different from what is "
"configured in Manila.")
subvolume_size = subvolume_info.get('bytes_quota')
# We need to resize infinite subvolumes, as Manila doesn't support it
if isinstance(subvolume_size, str) and subvolume_size == "infinite":
try:
# Default resize gb must be configured
new_size = driver_options.get('size')
if not new_size or new_size <= 0:
msg = ("subvolume %s has infinite size and a valid "
"integer value was not added to the driver_options "
"arg. Please provide a 'size' in the driver "
"options and try again." % subvolume_name)
raise exception.ShareBackendException(msg=msg)
# Attempt resizing the subvolume
self._resize_share(share, new_size, no_shrink=True)
subvolume_size = new_size
except exception.ShareShrinkingPossibleDataLoss:
msg = ("Could not resize the subvolume using the provided "
"size, as data could be lost. Please update it and "
"try again.")
LOG.exception(msg)
raise
except exception.ShareBackendException:
raise
else:
if int(subvolume_size) % units.Gi == 0:
# subvolume_size is an integer GB, no need to resize subvolume
subvolume_size = self._get_subvolume_size_in_gb(subvolume_size)
else:
# subvolume size is not an integer GB. need to resize subvolume
new_size_gb = self._get_subvolume_size_in_gb(subvolume_size)
LOG.info(
"Subvolume %(subvol)s is being resized to %(new_size)s "
"GB.", {
'subvol': subvolume_name,
'new_size': new_size_gb
}
)
self._resize_share(share, new_size_gb, no_shrink=True)
subvolume_size = new_size_gb
share_metadata = {"subvolume_name": subvolume_name}
self.private_storage.update(share['id'], share_metadata)
export_locations = self._get_export_locations(
share, subvolume_name=subvolume_name
)
managed_share = {
"size": subvolume_size,
"export_locations": export_locations
}
return managed_share
def manage_existing_snapshot(self, snapshot, driver_options):
# bring FS subvolume/share under manila management
LOG.debug("[%(be)s]: manage_existing_snapshot: id=%(id)s.",
{"be": self.backend_name, "id": snapshot['id']})
# Subvolume name must be provided.
sub_snapshot_name = snapshot.get('provider_location', None)
if not sub_snapshot_name:
raise exception.ShareBackendException(
"The subvolume snapshot name must be provided as the "
"'provider_location' while managing snapshots.")
sub_name = self._get_subvolume_name(snapshot['share_instance_id'])
argdict = {
"vol_name": self.volname,
"sub_name": sub_name,
}
# Try to get the subvolume info in the ceph backend, this is useful for
# us to get the size for the snapshot.
try:
rados_command(
self.rados_client, "fs subvolume info", argdict, json_obj=True)
except exception.ShareBackendException as e:
# Couldn't find a subvolume with the name provided.
if 'does not exist' in str(e).lower():
msg = ("Subvolume %(subvol)s cannot be found on the "
"backend." % {'subvol': sub_name})
raise exception.ShareBackendException(msg=msg)
sub_snap_info_argdict = {
"vol_name": self.volname,
"sub_name": sub_name,
"snap_name": sub_snapshot_name
}
# Shares/subvolumes already managed by manila will never have
# infinite as their bytes_quota, so no need for extra precaution.
try:
managed_subvolume_snapshot = rados_command(
self.rados_client, "fs subvolume snapshot info",
sub_snap_info_argdict, json_obj=True
)
except exception.ShareBackendException as e:
# Couldn't find a subvolume snapshot with the name provided.
if 'does not exist' in str(e).lower():
msg = ("Subvolume snapshot %(snap)s cannot be found on the "
"backend." % {'snap': sub_snapshot_name})
raise exception.ShareBackendException(msg=msg)
snapshot_metadata = {"subvolume_snapshot_name": sub_snapshot_name}
self.private_storage.update(
snapshot['snapshot_id'], snapshot_metadata
)
# NOTE(carloss): fs subvolume snapshot info command does not return
# the snapshot size, so we reuse the share size until this is not
# available for us.
managed_snapshot = {'provider_location': sub_snapshot_name}
if managed_subvolume_snapshot.get('bytes_quota') is not None:
managed_snapshot['size'] = self._get_subvolume_size_in_gb(
managed_subvolume_snapshot['bytes_quota'])
return managed_snapshot
def _need_to_cancel_clone(self, share, clone_name):
# Is there an ongoing clone operation that needs to be canceled
# so we can delete the share?
need_to_cancel_clone = False
argdict = {
"vol_name": self.volname,
"clone_name": share["id"],
"clone_name": clone_name,
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
@ -522,11 +703,12 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
{"be": self.backend_name, "id": share['id'],
"gr": share['share_group_id']})
if self._need_to_cancel_clone(share):
clone_name = self._get_subvolume_name(share['id'])
if self._need_to_cancel_clone(share, clone_name):
try:
argdict = {
"vol_name": self.volname,
"clone_name": share["id"],
"clone_name": clone_name,
"force": True,
}
if share['share_group_id'] is not None:
@ -539,7 +721,7 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"sub_name": self._get_subvolume_name(share["id"]),
# We want to clean up the share even if the subvolume is
# not in a good state.
"force": True,
@ -551,9 +733,10 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
sub_name = self._get_subvolume_name(share['id'])
return self.protocol_helper.update_access(
context, share, access_rules, add_rules, delete_rules,
share_server=share_server)
share_server=share_server, sub_name=sub_name)
def get_backend_info(self, context):
return self.protocol_helper.get_backend_info(context)
@ -586,40 +769,18 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
}
return share_updates
def extend_share(self, share, new_size, share_server=None):
# resize FS subvolume/share
LOG.debug("[%(be)s]: extend_share: share=%(id)s, size=%(sz)s.",
{"be": self.backend_name, "id": share['id'],
"sz": new_size})
def _resize_share(self, share, new_size, no_shrink=False):
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"sub_name": self._get_subvolume_name(share["id"]),
"new_size": self._to_bytes(new_size),
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
LOG.debug("extend_share {id} {size}",
{"id": share['id'], "size": new_size})
rados_command(self.rados_client, "fs subvolume resize", argdict)
def shrink_share(self, share, new_size, share_server=None):
# resize FS subvolume/share
LOG.debug("[%(be)s]: shrink_share: share=%(id)s, size=%(sz)s.",
{"be": self.backend_name, "id": share['id'],
"sz": new_size})
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"new_size": self._to_bytes(new_size),
"no_shrink": True,
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
if no_shrink:
argdict.update({"no_shrink": True})
try:
rados_command(self.rados_client, "fs subvolume resize", argdict)
except exception.ShareBackendException as e:
@ -628,6 +789,22 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
share_id=share['id'])
raise
def extend_share(self, share, new_size, share_server=None):
# resize FS subvolume/share
LOG.debug("[%(be)s]: extend_share: share=%(id)s, size=%(sz)s.",
{"be": self.backend_name, "id": share['id'],
"sz": new_size})
self._resize_share(share, new_size)
def shrink_share(self, share, new_size, share_server=None):
# resize FS subvolume/share
LOG.debug("[%(be)s]: shrink_share: share=%(id)s, size=%(sz)s.",
{"be": self.backend_name, "id": share['id'],
"sz": new_size})
self._resize_share(share, new_size, no_shrink=True)
def create_snapshot(self, context, snapshot, share_server=None):
# create a FS snapshot
LOG.debug("[%(be)s]: create_snapshot: original share=%(id)s, "
@ -637,24 +814,29 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
argdict = {
"vol_name": self.volname,
"sub_name": snapshot["share_id"],
"sub_name": self._get_subvolume_name(snapshot["share_id"]),
"snap_name": snapshot["snapshot_id"],
}
rados_command(
self.rados_client, "fs subvolume snapshot create", argdict)
return {"provider_location": snapshot["snapshot_id"]}
def delete_snapshot(self, context, snapshot, share_server=None):
# delete a FS snapshot
LOG.debug("[%(be)s]: delete_snapshot: snapshot=%(id)s.",
{"be": self.backend_name, "id": snapshot['id']})
snapshot_name = self._get_subvolume_snapshot_name(
snapshot['snapshot_id']
)
# FIXME(vkmc) remove this in CC (next tick) release.
legacy_snap_name = "_".join([snapshot["snapshot_id"], snapshot["id"]])
argdict_legacy = {
"vol_name": self.volname,
"sub_name": snapshot["share_id"],
"sub_name": self._get_subvolume_name(snapshot["share_id"]),
"snap_name": legacy_snap_name,
"force": True,
}
@ -665,7 +847,7 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
# in case it's a snapshot with new naming, retry remove with new name
argdict = argdict_legacy.copy()
argdict.update({"snap_name": snapshot["snapshot_id"]})
argdict.update({"snap_name": snapshot_name})
rados_command(self.rados_client, "fs subvolume snapshot rm", argdict)
def create_share_group(self, context, sg_dict, share_server=None):
@ -729,9 +911,10 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
def _get_clone_status(self, share):
"""Check the status of a newly cloned share."""
clone_name = self._get_subvolume_name(share["id"])
argdict = {
"vol_name": self.volname,
"clone_name": share["id"]
"clone_name": clone_name
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
@ -787,9 +970,10 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
argdict = {
"vol_name": self.volname,
"sub_name": parent_share["id"],
"snap_name": snapshot["snapshot_id"],
"target_sub_name": share["id"]
"sub_name": self._get_subvolume_name(parent_share["id"]),
"snap_name": self._get_subvolume_snapshot_name(
snapshot["snapshot_id"]),
"target_sub_name": self._get_subvolume_name(share["id"])
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
@ -878,7 +1062,8 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
def get_optional_share_creation_data(self, share, share_server=None):
return {"metadata": {"__mount_options": f"fs={self.volname}"}}
def _allow_access(self, context, share, access, share_server=None):
def _allow_access(self, context, share, access, share_server=None,
sub_name=None):
if access['access_type'] != CEPHX_ACCESS_TYPE:
raise exception.InvalidShareAccessType(type=access['access_type'])
@ -897,7 +1082,7 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"sub_name": sub_name,
"auth_id": ceph_auth_id,
"tenant_id": share["project_id"],
}
@ -925,7 +1110,8 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
return auth_result
def _deny_access(self, context, share, access, share_server=None):
def _deny_access(self, context, share, access, share_server=None,
sub_name=None):
if access['access_type'] != CEPHX_ACCESS_TYPE:
LOG.warning("Invalid access type '%(type)s', "
"ignoring in deny.",
@ -934,7 +1120,7 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"sub_name": sub_name,
"auth_id": access['access_to']
}
if share["share_group_id"] is not None:
@ -953,12 +1139,12 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
rados_command(self.rados_client, "fs subvolume evict", argdict)
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
delete_rules, share_server=None, sub_name=None):
access_updates = {}
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"sub_name": sub_name,
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
@ -995,7 +1181,9 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
# backend are in sync.
for rule in add_rules:
try:
access_key = self._allow_access(context, share, rule)
access_key = self._allow_access(
context, share, rule, sub_name=sub_name
)
except (exception.InvalidShareAccessLevel,
exception.InvalidShareAccessType):
self.message_api.create(
@ -1033,7 +1221,7 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
})
for rule in delete_rules:
self._deny_access(context, share, rule)
self._deny_access(context, share, rule, sub_name=sub_name)
return access_updates
@ -1077,11 +1265,11 @@ class NFSProtocolHelperMixin():
def get_optional_share_creation_data(self, share, share_server=None):
return {}
def _get_export_path(self, share):
def _get_export_path(self, share, sub_name=None):
"""Callback to provide export path."""
argdict = {
"vol_name": self.volname,
"sub_name": share["id"]
"sub_name": sub_name or share["id"]
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
@ -1091,9 +1279,9 @@ class NFSProtocolHelperMixin():
return path
def _get_export_pseudo_path(self, share):
def _get_export_pseudo_path(self, share, sub_name=None):
"""Callback to provide pseudo path."""
return self._get_export_path(share)
return self._get_export_path(share, sub_name=sub_name)
def get_configured_ip_versions(self):
if not self.configured_ip_versions:
@ -1171,13 +1359,13 @@ class NFSProtocolHelper(NFSProtocolHelperMixin, ganesha.GaneshaNASHelper2):
ganesha_utils.patch(dconf, self._load_conf_dir(conf_dir))
return dconf
def _fsal_hook(self, base, share, access):
def _fsal_hook(self, base, share, access, sub_name=None):
"""Callback to create FSAL subblock."""
ceph_auth_id = ''.join(['ganesha-', share['id']])
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"sub_name": sub_name,
"auth_id": ceph_auth_id,
"access_level": "rw",
"tenant_id": share["project_id"],
@ -1199,13 +1387,13 @@ class NFSProtocolHelper(NFSProtocolHelperMixin, ganesha.GaneshaNASHelper2):
'Filesystem': self.volname
}
def _cleanup_fsal_hook(self, base, share, access):
def _cleanup_fsal_hook(self, base, share, access, sub_name=None):
"""Callback for FSAL specific cleanup after removing an export."""
ceph_auth_id = ''.join(['ganesha-', share['id']])
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"sub_name": sub_name,
"auth_id": ceph_auth_id,
}
if share["share_group_id"] is not None:
@ -1329,12 +1517,12 @@ class NFSClusterProtocolHelper(NFSProtocolHelperMixin, ganesha.NASHelperBase):
"""Returns an error if prerequisites aren't met."""
return
def _allow_access(self, share, access):
def _allow_access(self, share, access, sub_name=None):
"""Allow access to the share."""
export = {
"path": self._get_export_path(share),
"path": self._get_export_path(share, sub_name=sub_name),
"cluster_id": self.nfs_clusterid,
"pseudo": self._get_export_pseudo_path(share),
"pseudo": self._get_export_pseudo_path(share, sub_name=sub_name),
"squash": "none",
"security_label": True,
"protocols": [4],
@ -1354,18 +1542,19 @@ class NFSClusterProtocolHelper(NFSProtocolHelperMixin, ganesha.NASHelperBase):
rados_command(self.rados_client,
"nfs export apply", argdict, inbuf=inbuf)
def _deny_access(self, share):
def _deny_access(self, share, sub_name=None):
"""Deny access to the share."""
argdict = {
"cluster_id": self.nfs_clusterid,
"pseudo_path": self._get_export_pseudo_path(share)
"pseudo_path": self._get_export_pseudo_path(
share, sub_name=sub_name)
}
rados_command(self.rados_client, "nfs export rm", argdict)
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
delete_rules, share_server=None, sub_name=None):
"""Update access rules of share.
Creates an export per share. Modifies access rules of shares by
@ -1407,10 +1596,10 @@ class NFSClusterProtocolHelper(NFSProtocolHelperMixin, ganesha.NASHelperBase):
})
if clients: # empty list if no rules passed validation
self._allow_access(share, clients)
self._allow_access(share, clients, sub_name=sub_name)
else:
# no clients have access to the share. remove export
self._deny_access(share)
self._deny_access(share, sub_name=sub_name)
return rule_state_map

View File

@ -50,7 +50,7 @@ class NASHelperBase(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
delete_rules, share_server=None, sub_name=None):
"""Update access rules of share."""
def get_backend_info(self, context):
@ -122,15 +122,15 @@ class GaneshaNASHelper(NASHelperBase):
return self._load_conf_dir(ganesha_utils.path_from(__file__, "conf"))
def _fsal_hook(self, base_path, share, access):
def _fsal_hook(self, base_path, share, access, sub_name=None):
"""Subclass this to create FSAL block."""
return {}
def _cleanup_fsal_hook(self, base_path, share, access):
def _cleanup_fsal_hook(self, base_path, share, access, sub_name=None):
"""Callback for FSAL specific cleanup after removing an export."""
pass
def _allow_access(self, base_path, share, access):
def _allow_access(self, base_path, share, access, sub_name=None):
"""Allow access to the share."""
ganesha_utils.validate_access_rule(
self.supported_access_types, self.supported_access_levels,
@ -151,7 +151,8 @@ class GaneshaNASHelper(NASHelperBase):
'CLIENT': {
'Clients': access['access_to']
},
'FSAL': self._fsal_hook(base_path, share, access)
'FSAL': self._fsal_hook(
base_path, share, access, sub_name=sub_name)
}
})
self.ganesha.add_export(export_name, cf)
@ -161,7 +162,7 @@ class GaneshaNASHelper(NASHelperBase):
self.ganesha.remove_export("%s--%s" % (share['name'], access['id']))
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
delete_rules, share_server=None, sub_name=None):
"""Update access rules of share."""
rule_state_map = {}
if not (add_rules or delete_rules):
@ -223,16 +224,16 @@ class GaneshaNASHelper2(GaneshaNASHelper):
else:
self.export_template = self._default_config_hook()
def _get_export_path(self, share):
def _get_export_path(self, share, sub_name=None):
"""Subclass this to return export path."""
raise NotImplementedError()
def _get_export_pseudo_path(self, share):
def _get_export_pseudo_path(self, share, sub_name=None):
"""Subclass this to return export pseudo path."""
raise NotImplementedError()
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
delete_rules, share_server=None, sub_name=None):
"""Update access rules of share.
Creates an export per share. Modifies access rules of shares by
@ -243,6 +244,7 @@ class GaneshaNASHelper2(GaneshaNASHelper):
existing_access_rules = []
rule_state_map = {}
# TODO(carloss): check if share['name'] can cause us troubles
if self.ganesha.check_export_exists(share['name']):
confdict = self.ganesha._read_export(share['name'])
existing_access_rules = confdict["EXPORT"]["CLIENT"]
@ -301,16 +303,20 @@ class GaneshaNASHelper2(GaneshaNASHelper):
ganesha_utils.patch(confdict, self.export_template, {
'EXPORT': {
'Export_Id': self.ganesha.get_export_id(),
'Path': self._get_export_path(share),
'Pseudo': self._get_export_pseudo_path(share),
'Path': self._get_export_path(
share, sub_name=sub_name),
'Pseudo': self._get_export_pseudo_path(
share, sub_name=sub_name),
'Tag': share['name'],
'CLIENT': clients,
'FSAL': self._fsal_hook(None, share, None)
'FSAL': self._fsal_hook(
None, share, None, sub_name=sub_name
)
}
})
self.ganesha.add_export(share['name'], confdict)
else:
# No clients have access to the share. Remove export.
self.ganesha.remove_export(share['name'])
self._cleanup_fsal_hook(None, share, None)
self._cleanup_fsal_hook(None, share, None, sub_name=sub_name)
return rule_state_map

View File

@ -12,8 +12,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import math
from unittest import mock
import ddt
@ -92,10 +92,14 @@ class CephFSDriverTestCase(test.TestCase):
self.mock_object(driver, 'NFSClusterProtocolHelper')
driver.ceph_default_target = ('mon-mgr', )
self.fake_private_storage = mock.Mock()
self.mock_object(self.fake_private_storage, 'get',
mock.Mock(return_value=None))
self._driver = (
driver.CephFSDriver(execute=self._execute,
configuration=self.fake_conf))
configuration=self.fake_conf,
private_storage=self.fake_private_storage))
self._driver.protocol_helper = mock.Mock()
type(self._driver).volname = mock.PropertyMock(return_value='cephfs')
@ -138,6 +142,36 @@ class CephFSDriverTestCase(test.TestCase):
self.assertEqual(DEFAULT_VOLUME_MODE, self._driver._cephfs_volume_mode)
def test__get_sub_name(self):
sub_name = self._driver._get_subvolume_name(self._share["id"])
self.assertEqual(sub_name, self._share["id"])
def test__get_sub_name_has_other_name(self):
expected_sub_name = 'user_specified_subvolume_name'
self.mock_object(
self._driver.private_storage, 'get',
mock.Mock(return_value=expected_sub_name)
)
sub_name = self._driver._get_subvolume_name(self._share["id"])
self.assertEqual(expected_sub_name, sub_name)
def test__get_sub_snapshot_name(self):
sub_name = self._driver._get_subvolume_snapshot_name(
self._snapshot["id"]
)
self.assertEqual(sub_name, self._snapshot["id"])
def test__get_sub_snapshot_name_has_other_name(self):
expected_sub_snap_name = 'user_specified_subvolume_snapshot_name'
self.mock_object(
self._driver.private_storage, 'get',
mock.Mock(return_value=expected_sub_snap_name)
)
sub_name = self._driver._get_subvolume_snapshot_name(
self._snapshot["id"]
)
self.assertEqual(expected_sub_snap_name, sub_name)
@ddt.data(
('{"version": "ceph version 16.2.4"}', 'pacific'),
('{"version": "ceph version 15.1.2"}', 'octopus'),
@ -216,6 +250,302 @@ class CephFSDriverTestCase(test.TestCase):
self._context,
share)
def _setup_manage_subvolume_test(self):
fake_els = [
{'path': 'fake/path'}
]
share_with_el = fake_share.fake_share(export_locations=fake_els)
expected_subvolume_info_argdict = {
"vol_name": self._driver.volname,
"sub_name": fake_els[0]["path"],
}
subvolume_info_mock_result = {
'atime': '2024-07-23 16:50:03',
'bytes_pcent': '0.00',
'bytes_quota': 2147483648,
'bytes_used': 0,
'created_at': '2024-07-23 16:50:03',
'ctime': '2024-07-23 17:24:49',
'data_pool': 'cephfs.cephfs.data',
'features': ['snapshot-clone', 'snapshot-autoprotect'],
'gid': 0,
'mode': 755,
'mon_addrs': ['10.0.0.1:6342'],
'mtime': '2024-07-23 16:50:03',
'path': '/volumes/_nogroup/subbvol/475a-4972-9f6b-fe025a8d383f',
'pool_namespace': 'fsvolumes_cephfs',
'state': 'complete',
'type': 'subvolume',
'uid': 0
}
return (
share_with_el, expected_subvolume_info_argdict,
subvolume_info_mock_result
)
def test_manage_existing_no_subvolume_name(self):
self.assertRaises(
exception.ShareBackendException,
self._driver.manage_existing,
{
'id': 'fake_project_uuid_1',
'export_locations': [{'path': None}]
},
{}
)
def test_manage_existing_subvolume_not_found(self):
driver.rados_command.side_effect = exception.ShareBackendException(
msg="does not exist"
)
fake_els = [
{'path': 'fake/path'}
]
share_with_el = fake_share.fake_share(export_locations=fake_els)
expected_info_argdict = {
"vol_name": self._driver.volname,
"sub_name": fake_els[0]["path"],
}
self.assertRaises(
exception.ShareBackendException,
self._driver.manage_existing,
share_with_el,
{}
)
driver.rados_command.assert_called_once_with(
self._driver.rados_client, "fs subvolume info",
expected_info_argdict,
json_obj=True
)
def test_manage_existing_subvolume_infinite_no_provided_size(self):
share_with_el, expected_info_argdict, subvolume_info = (
self._setup_manage_subvolume_test()
)
subvolume_info['bytes_quota'] = "infinite"
driver.rados_command.return_value = subvolume_info
self.assertRaises(
exception.ShareBackendException,
self._driver.manage_existing,
share_with_el,
{}
)
driver.rados_command.assert_called_once_with(
self._driver.rados_client, "fs subvolume info",
expected_info_argdict,
json_obj=True
)
@ddt.data(
exception.ShareShrinkingPossibleDataLoss,
exception.ShareBackendException
)
def test_manage_existing_subvolume_infinite_size(self, expected_exception):
share_with_el, expected_info_argdict, subvolume_info = (
self._setup_manage_subvolume_test()
)
subvolume_info['bytes_quota'] = "infinite"
driver.rados_command.return_value = subvolume_info
new_size = 1
mock_resize = self.mock_object(
self._driver, '_resize_share',
mock.Mock(side_effect=expected_exception('fake'))
)
self.assertRaises(
expected_exception,
self._driver.manage_existing,
share_with_el,
{'size': new_size}
)
driver.rados_command.assert_called_once_with(
self._driver.rados_client, "fs subvolume info",
expected_info_argdict,
json_obj=True
)
mock_resize.assert_called_once_with(
share_with_el, new_size, no_shrink=True
)
@ddt.data(True, False)
def test_manage_existing(self, current_size_is_smaller):
share_with_el, expected_info_argdict, subvolume_info = (
self._setup_manage_subvolume_test()
)
if current_size_is_smaller:
# set this to half gb, to ensure it will turn into 1gb
subvolume_info['bytes_quota'] = 536870912
subvolume_name = share_with_el["export_locations"][0]["path"]
expected_share_metadata = {"subvolume_name": subvolume_name}
expected_share_updates = {
"size": int(
math.ceil(int(subvolume_info['bytes_quota']) / units.Gi)),
"export_locations": subvolume_name
}
driver.rados_command.return_value = subvolume_info
self.mock_object(
self._driver, '_get_export_locations',
mock.Mock(return_value=subvolume_name))
mock_resize_share = self.mock_object(self._driver, '_resize_share')
share_updates = self._driver.manage_existing(share_with_el, {})
self.assertEqual(expected_share_updates, share_updates)
driver.rados_command.assert_called_once_with(
self._driver.rados_client, "fs subvolume info",
expected_info_argdict,
json_obj=True
)
self._driver.private_storage.update.assert_called_once_with(
share_with_el['id'], expected_share_metadata
)
self._driver._get_export_locations.assert_called_once_with(
share_with_el, subvolume_name=subvolume_name
)
if current_size_is_smaller:
mock_resize_share.assert_called_once_with(
share_with_el, 1, no_shrink=True
)
else:
mock_resize_share.assert_not_called()
def test_manage_existing_snapshot_no_snapshot_name(self):
self.assertRaises(
exception.ShareBackendException,
self._driver.manage_existing_snapshot,
{
'id': 'fake_project_uuid_1',
'provider_location': None,
},
{}
)
def test_manage_existing_snapshot_subvolume_not_found(self):
driver.rados_command.side_effect = exception.ShareBackendException(
msg="does not exist"
)
snapshot_instance = {
'id': 'fake_project_uuid_1',
'provider_location': 'fake/provider/location',
'share_instance_id': 'fake_share_instance_id'
}
expected_info_argdict = {
"vol_name": self._driver.volname,
"sub_name": snapshot_instance["share_instance_id"]
}
self.assertRaises(
exception.ShareBackendException,
self._driver.manage_existing_snapshot,
snapshot_instance,
{}
)
driver.rados_command.assert_called_once_with(
self._driver.rados_client, "fs subvolume info",
expected_info_argdict,
json_obj=True
)
def test_manage_existing_snapshot_snapshot_not_found(self):
_, expected_info_argdict, subvolume_info = (
self._setup_manage_subvolume_test()
)
expected_snapshot_name = 'fake/provider/location'
snapshot_instance = {
'id': 'fake_project_uuid_1',
'provider_location': expected_snapshot_name,
'share_instance_id': 'fake_share_instance_id'
}
expected_info_argdict = {
"vol_name": self._driver.volname,
"sub_name": snapshot_instance["share_instance_id"]
}
expected_snap_info_argdict = {
"vol_name": self._driver.volname,
"sub_name": snapshot_instance["share_instance_id"],
"snap_name": expected_snapshot_name
}
driver.rados_command.side_effect = [
subvolume_info,
exception.ShareBackendException(msg="does not exist")
]
self.assertRaises(
exception.ShareBackendException,
self._driver.manage_existing_snapshot,
snapshot_instance,
{}
)
driver.rados_command.assert_has_calls([
mock.call(
self._driver.rados_client, "fs subvolume info",
expected_info_argdict, json_obj=True
),
mock.call(
self._driver.rados_client, "fs subvolume snapshot info",
expected_snap_info_argdict,
json_obj=True
)
])
def test_manage_existing_snapshot(self):
_, expected_info_argdict, subvolume_info = (
self._setup_manage_subvolume_test()
)
expected_snapshot_name = 'fake_snapshot_name'
snapshot_instance = {
'id': 'fake_project_uuid_1',
'provider_location': expected_snapshot_name,
'share_instance_id': 'fake_share_instance_id',
'snapshot_id': 'fake_snapshot_id'
}
expected_info_argdict = {
"vol_name": self._driver.volname,
"sub_name": snapshot_instance["share_instance_id"]
}
expected_snap_info_argdict = {
"vol_name": self._driver.volname,
"sub_name": snapshot_instance["share_instance_id"],
"snap_name": expected_snapshot_name
}
driver.rados_command.side_effect = [
subvolume_info,
{'name': expected_snapshot_name}
]
expected_result = {
'provider_location': expected_snapshot_name
}
result = self._driver.manage_existing_snapshot(
snapshot_instance,
{}
)
self.assertEqual(expected_result, result)
driver.rados_command.assert_has_calls([
mock.call(
self._driver.rados_client, "fs subvolume info",
expected_info_argdict, json_obj=True
),
mock.call(
self._driver.rados_client, "fs subvolume snapshot info",
expected_snap_info_argdict, json_obj=True
)
])
self.fake_private_storage.update.assert_called_once_with(
snapshot_instance['snapshot_id'],
{"subvolume_snapshot_name": expected_snapshot_name}
)
def test_update_access(self):
alice = {
'id': 'instance_mapping_id1',
@ -233,7 +563,7 @@ class CephFSDriverTestCase(test.TestCase):
self._driver.protocol_helper.update_access.assert_called_once_with(
self._context, self._share, access_rules, add_rules, delete_rules,
share_server=None)
share_server=None, sub_name=self._share['id'])
def test_ensure_shares(self):
self._driver.protocol_helper.reapply_rules_while_ensuring_shares = True
@ -418,6 +748,11 @@ class CephFSDriverTestCase(test.TestCase):
snapshot_remove_dict_2.update(
{"snap_name": self._snapshot["snapshot_id"]})
self.mock_object(
self._driver,
'_get_subvolume_snapshot_name',
mock.Mock(return_value=self._snapshot["snapshot_id"]))
self._driver.delete_snapshot(self._context,
self._snapshot,
None)
@ -714,7 +1049,7 @@ class NativeProtocolHelperTestCase(test.TestCase):
driver.rados_command.return_value = 'native-zorilla'
auth_key = self._native_protocol_helper._allow_access(
self._context, self._share, rule)
self._context, self._share, rule, sub_name=self._share['id'])
self.assertEqual("native-zorilla", auth_key)
@ -723,22 +1058,32 @@ class NativeProtocolHelperTestCase(test.TestCase):
access_allow_prefix, access_allow_dict)
def test_allow_access_wrong_type(self):
self.assertRaises(exception.InvalidShareAccessType,
self._native_protocol_helper._allow_access,
self._context, self._share, {
'access_level': constants.ACCESS_LEVEL_RW,
'access_type': 'RHUBARB',
'access_to': 'alice'
})
self.assertRaises(
exception.InvalidShareAccessType,
self._native_protocol_helper._allow_access,
self._context,
self._share,
{
'access_level': constants.ACCESS_LEVEL_RW,
'access_type': 'RHUBARB',
'access_to': 'alice'
},
self._share['id']
)
def test_allow_access_same_cephx_id_as_manila_service(self):
self.assertRaises(exception.InvalidShareAccess,
self._native_protocol_helper._allow_access,
self._context, self._share, {
'access_level': constants.ACCESS_LEVEL_RW,
'access_type': 'cephx',
'access_to': 'manila',
})
self.assertRaises(
exception.InvalidShareAccess,
self._native_protocol_helper._allow_access,
self._context,
self._share,
{
'access_level': constants.ACCESS_LEVEL_RW,
'access_type': 'cephx',
'access_to': 'manila',
},
self._share['id']
)
def test_allow_access_to_preexisting_ceph_user(self):
msg = ("auth ID: admin exists and not created by "
@ -752,7 +1097,9 @@ class NativeProtocolHelperTestCase(test.TestCase):
'access_level': constants.ACCESS_LEVEL_RW,
'access_type': 'cephx',
'access_to': 'admin'
})
},
self._share['id']
)
def test_deny_access(self):
access_deny_prefix = "fs subvolume deauthorize"
@ -767,11 +1114,16 @@ class NativeProtocolHelperTestCase(test.TestCase):
evict_dict = access_deny_dict
self._native_protocol_helper._deny_access(self._context, self._share, {
'access_level': 'rw',
'access_type': 'cephx',
'access_to': 'alice'
})
self._native_protocol_helper._deny_access(
self._context,
self._share,
{
'access_level': 'rw',
'access_type': 'cephx',
'access_to': 'alice'
},
sub_name=self._share['id']
)
driver.rados_command.assert_has_calls([
mock.call(self._native_protocol_helper.rados_client,
@ -802,11 +1154,16 @@ class NativeProtocolHelperTestCase(test.TestCase):
"auth_id": "alice",
}
self._native_protocol_helper._deny_access(self._context, self._share, {
'access_level': 'rw',
'access_type': 'cephx',
'access_to': 'alice'
})
self._native_protocol_helper._deny_access(
self._context,
self._share,
{
'access_level': 'rw',
'access_type': 'cephx',
'access_to': 'alice'
},
sub_name=self._share['id']
)
driver.rados_command.assert_called_once_with(
self._native_protocol_helper.rados_client,
@ -868,7 +1225,9 @@ class NativeProtocolHelperTestCase(test.TestCase):
self._share,
access_rules=[alice, manila, admin, dabo],
add_rules=[alice, manila, admin, dabo],
delete_rules=[bob])
delete_rules=[bob],
sub_name=self._share['id']
)
expected_access_updates = {
'accessid1': {'access_key': 'abc123'},
@ -878,11 +1237,14 @@ class NativeProtocolHelperTestCase(test.TestCase):
}
self.assertEqual(expected_access_updates, access_updates)
self._native_protocol_helper._allow_access.assert_has_calls(
[mock.call(self._context, self._share, alice),
mock.call(self._context, self._share, manila),
mock.call(self._context, self._share, admin)])
[mock.call(self._context, self._share, alice,
sub_name=self._share['id']),
mock.call(self._context, self._share, manila,
sub_name=self._share['id']),
mock.call(self._context, self._share, admin,
sub_name=self._share['id'])])
self._native_protocol_helper._deny_access.assert_called_once_with(
self._context, self._share, bob)
self._context, self._share, bob, sub_name=self._share['id'])
self.assertEqual(
3, self._native_protocol_helper.message_api.create.call_count)
@ -936,7 +1298,7 @@ class NativeProtocolHelperTestCase(test.TestCase):
access_updates = self._native_protocol_helper.update_access(
self._context, self._share, access_rules=[alice], add_rules=[],
delete_rules=[])
delete_rules=[], sub_name=self._share['id'])
self.assertEqual(
{'accessid1': {'access_key': 'abc123'}}, access_updates)
@ -1237,7 +1599,9 @@ class NFSProtocolHelperTestCase(test.TestCase):
driver.rados_command.return_value = 'ganesha-zorilla'
ret = self._nfs_helper._fsal_hook(None, self._share, None)
ret = self._nfs_helper._fsal_hook(
None, self._share, None, self._share['id']
)
driver.rados_command.assert_called_once_with(
self._nfs_helper.rados_client,
@ -1254,7 +1618,9 @@ class NFSProtocolHelperTestCase(test.TestCase):
"auth_id": "ganesha-fakeid",
}
ret = self._nfs_helper._cleanup_fsal_hook(None, self._share, None)
ret = self._nfs_helper._cleanup_fsal_hook(
None, self._share, None, self._share['id']
)
driver.rados_command.assert_called_once_with(
self._nfs_helper.rados_client,
@ -1384,7 +1750,9 @@ class NFSClusterProtocolHelperTestCase(test.TestCase):
inbuf = json.dumps(export).encode('utf-8')
self._nfscluster_protocol_helper._allow_access(self._share, clients)
self._nfscluster_protocol_helper._allow_access(
self._share, clients, sub_name=self._share['id']
)
driver.rados_command.assert_called_once_with(
self._rados_client,
@ -1400,7 +1768,9 @@ class NFSClusterProtocolHelperTestCase(test.TestCase):
"pseudo_path": "ganesha:/foo/bar"
}
self._nfscluster_protocol_helper._deny_access(self._share)
self._nfscluster_protocol_helper._deny_access(
self._share, self._share['id']
)
driver.rados_command.assert_called_once_with(
self._rados_client,

View File

@ -244,11 +244,11 @@ class GaneshaNASHelperTestCase(test.TestCase):
self.access)
self._helper.ganesha.get_export_id.assert_called_once_with()
self._helper._fsal_hook.assert_called_once_with(
fake_basepath, self.share, self.access)
fake_basepath, self.share, self.access, sub_name=None)
mock_ganesha_utils_patch.assert_called_once_with(
{}, self._helper.export_template, fake_output_template)
self._helper._fsal_hook.assert_called_once_with(
fake_basepath, self.share, self.access)
fake_basepath, self.share, self.access, sub_name=None)
self._helper.ganesha.add_export.assert_called_once_with(
fake_export_name, fake_output_template)
self.assertIsNone(ret)
@ -488,11 +488,12 @@ class GaneshaNASHelper2TestCase(test.TestCase):
mock_gh.check_export_exists.assert_called_once_with('fakename')
mock_gh.get_export_id.assert_called_once_with()
self._helper._get_export_path.assert_called_once_with(self.share)
self._helper._get_export_path.assert_called_once_with(
self.share, sub_name=None)
(self._helper._get_export_pseudo_path.assert_called_once_with(
self.share))
self.share, sub_name=None))
self._helper._fsal_hook.assert_called_once_with(
None, self.share, None)
None, self.share, None, sub_name=None)
mock_gh.add_export.assert_called_once_with(
'fakename', result_confdict)
self.assertFalse(mock_gh.update_export.called)
@ -546,7 +547,7 @@ class GaneshaNASHelper2TestCase(test.TestCase):
mock_gh.check_export_exists.assert_called_once_with('fakename')
mock_gh.remove_export.assert_called_once_with('fakename')
self._helper._cleanup_fsal_hook.assert_called_once_with(
None, self.share, None)
None, self.share, None, sub_name=None)
self.assertFalse(mock_gh.add_export.called)
self.assertFalse(mock_gh.update_export.called)