diff --git a/manila/share/drivers/cephfs/driver.py b/manila/share/drivers/cephfs/driver.py index 83356d6763..38e7c9f938 100644 --- a/manila/share/drivers/cephfs/driver.py +++ b/manila/share/drivers/cephfs/driver.py @@ -15,14 +15,15 @@ import ipaddress +import json import socket import sys from oslo_config import cfg from oslo_config import types from oslo_log import log +from oslo_utils import importutils from oslo_utils import units -import six from manila.common import constants from manila import exception @@ -33,14 +34,30 @@ from manila.share import driver from manila.share.drivers import ganesha from manila.share.drivers.ganesha import utils as ganesha_utils from manila.share.drivers import helpers as driver_helpers -from manila.share import share_types -try: - import ceph_volume_client - ceph_module_found = True -except ImportError: - ceph_volume_client = None - ceph_module_found = False +rados = None +json_command = None + + +def setup_rados(): + global rados + if not rados: + try: + rados = importutils.import_module('rados') + except ImportError: + raise exception.ShareBackendException( + _("rados python module is not installed")) + + +def setup_json_command(): + global json_command + if not json_command: + try: + json_command = importutils.import_class( + 'ceph_argparse.json_command') + except ImportError: + raise exception.ShareBackendException( + _("ceph_argparse python module is not installed")) CEPHX_ACCESS_TYPE = "cephx" @@ -50,6 +67,7 @@ CEPH_DEFAULT_AUTH_ID = "admin" DEFAULT_VOLUME_MODE = '755' +RADOS_TIMEOUT = 10 LOG = log.getLogger(__name__) @@ -66,6 +84,10 @@ cephfs_opts = [ help="The name of the ceph auth identity to use." ), cfg.StrOpt('cephfs_volume_path_prefix', + deprecated_for_removal=True, + deprecated_since='Wallaby', + deprecated_reason='This option is not used starting with ' + 'the Nautilus release of Ceph.', default="/volumes", help="The prefix of the cephfs volume path." ), @@ -111,6 +133,9 @@ cephfs_opts = [ help="The read/write/execute permissions mode for CephFS " "volumes, snapshots, and snapshot groups expressed in " "Octal as with linux 'chmod' or 'umask' commands."), + cfg.StrOpt('cephfs_filesystem_name', + help="The name of the filesystem to use, if there are " + "multiple filesystems in the cluster."), ] @@ -118,10 +143,60 @@ CONF = cfg.CONF CONF.register_opts(cephfs_opts) -def cephfs_share_path(share): - """Get VolumePath from Share.""" - return ceph_volume_client.VolumePath( - share['share_group_id'], share['id']) +class RadosError(Exception): + """Something went wrong talking to Ceph with librados""" + + pass + + +def rados_command(rados_client, prefix=None, args=None, json_obj=False): + """Safer wrapper for ceph_argparse.json_command + + Raises error exception instead of relying on caller to check return + codes. + + Error exception can result from: + * Timeout + * Actual legitimate errors + * Malformed JSON output + + return: If json_obj is True, return the decoded JSON object from ceph, + or None if empty string returned. + If json is False, return a decoded string (the data returned by + ceph command) + """ + if args is None: + args = {} + + argdict = args.copy() + argdict['format'] = 'json' + + LOG.debug("Invoking ceph_argparse.json_command - rados_client=%(cl)s, " + "prefix='%(pf)s', argdict=%(ad)s, timeout=%(to)s.", + {"cl": rados_client, "pf": prefix, "ad": argdict, + "to": RADOS_TIMEOUT}) + + try: + ret, outbuf, outs = json_command(rados_client, + prefix=prefix, + argdict=argdict, + timeout=RADOS_TIMEOUT) + if ret != 0: + raise rados.Error(outs, ret) + if not json_obj: + result = outbuf.decode().strip() + else: + if outbuf: + result = json.loads(outbuf.decode().strip()) + else: + result = None + except Exception as e: + msg = _("json_command failed - prefix=%(pfx)s, argdict=%(ad)s - " + "exception message: %(ex)s." % + {"pfx": prefix, "ad": argdict, "ex": e}) + raise exception.ShareBackendException(msg) + + return result class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, @@ -133,18 +208,22 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, self.backend_name = self.configuration.safe_get( 'share_backend_name') or 'CephFS' - self._volume_client = None + setup_rados() + setup_json_command() + self._rados_client = None + # name of the filesystem/volume used by the driver + self._volname = None self.configuration.append_config_values(cephfs_opts) try: - self._cephfs_volume_mode = int( - self.configuration.cephfs_volume_mode, 8) + int(self.configuration.cephfs_volume_mode, 8) except ValueError: msg = _("Invalid CephFS volume mode %s") raise exception.BadConfigurationException( msg % self.configuration.cephfs_volume_mode) + self._cephfs_volume_mode = self.configuration.cephfs_volume_mode self.ipv6_implemented = True def do_setup(self, context): @@ -158,7 +237,8 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, self.protocol_helper = protocol_helper_class( self._execute, self.configuration, - ceph_vol_client=self.volume_client) + rados_client=self.rados_client, + volname=self.volname) self.protocol_helper.init_helper() @@ -167,7 +247,7 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, self.protocol_helper.check_for_setup_error() def _update_share_stats(self): - stats = self.volume_client.rados.get_cluster_stats() + stats = self.rados_client.get_cluster_stats() total_capacity_gb = round(stats['kb'] / units.Mi, 2) free_capacity_gb = round(stats['kb_avail'] / units.Mi, 2) @@ -210,41 +290,58 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, return gigs * units.Gi @property - def volume_client(self): - if self._volume_client: - return self._volume_client - - if not ceph_module_found: - raise exception.ManilaException( - _("Ceph client libraries not found.") - ) + def rados_client(self): + if self._rados_client: + return self._rados_client conf_path = self.configuration.safe_get('cephfs_conf_path') cluster_name = self.configuration.safe_get('cephfs_cluster_name') auth_id = self.configuration.safe_get('cephfs_auth_id') - volume_prefix = self.configuration.safe_get( - 'cephfs_volume_path_prefix') - self._volume_client = ceph_volume_client.CephFSVolumeClient( - auth_id, conf_path, cluster_name, volume_prefix=volume_prefix) - LOG.info("[%(be)s}] Ceph client found, connecting...", + self._rados_client = rados.Rados( + name="client.{0}".format(auth_id), + clustername=cluster_name, + conffile=conf_path, + conf={} + ) + + LOG.info("[%(be)s] Ceph client found, connecting...", {"be": self.backend_name}) - if auth_id != CEPH_DEFAULT_AUTH_ID: - # Evict any other manila sessions. Only do this if we're - # using a client ID that isn't the default admin ID, to avoid - # rudely disrupting anyone else. - premount_evict = auth_id - else: - premount_evict = None try: - self._volume_client.connect(premount_evict=premount_evict) + if self._rados_client.state != "connected": + self._rados_client.connect() except Exception: - self._volume_client = None - raise + self._rados_client = None + raise exception.ShareBackendException( + "[%(be)s] Ceph client failed to connect.", + {"be": self.backend_name}) else: LOG.info("[%(be)s] Ceph client connection complete.", {"be": self.backend_name}) - return self._volume_client + return self._rados_client + + @property + def volname(self): + # Name of the CephFS volume/filesystem where the driver creates + # manila entities such as shares, sharegroups, snapshots, etc. + if self._volname: + return self._volname + + self._volname = self.configuration.safe_get('cephfs_filesystem_name') + if not self._volname: + out = rados_command( + self.rados_client, "fs volume ls", json_obj=True) + if len(out) == 1: + self._volname = out[0]['name'] + else: + if len(out) > 1: + msg = _("Specify Ceph filesystem name using " + "'cephfs_filesystem_name' driver option.") + else: + msg = _("No Ceph filesystem found.") + raise exception.ShareBackendException(msg=msg) + + return self._volname def create_share(self, context, share, share_server=None): """Create a CephFS volume. @@ -260,34 +357,55 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, if (requested_proto != supported_proto): msg = _("Share protocol %s is not supported.") % requested_proto raise exception.ShareBackendException(msg=msg) - - # `share` is a Share - msg = _("create_share {be} name={id} size={size}" - " share_group_id={group}") - LOG.debug(msg.format( - be=self.backend_name, id=share['id'], size=share['size'], - group=share['share_group_id'])) - - extra_specs = share_types.get_extra_specs_from_share(share) - data_isolated = extra_specs.get("cephfs:data_isolated", False) - size = self._to_bytes(share['size']) - # Create the CephFS volume - cephfs_volume = self.volume_client.create_volume( - cephfs_share_path(share), size=size, data_isolated=data_isolated, - mode=self._cephfs_volume_mode) + LOG.debug("[%(be)s]: create_share: id=%(id)s, size=%(sz)s, " + "group=%(gr)s.", + {"be": self.backend_name, "id": share['id'], + "sz": share['size'], "gr": share['share_group_id']}) - return self.protocol_helper.get_export_locations(share, cephfs_volume) + # create FS subvolume/share + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "size": size, + "namespace_isolated": True, + "mode": self._cephfs_volume_mode, + } + + if share['share_group_id'] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + rados_command(self.rados_client, "fs subvolume create", argdict) + + # get path of FS subvolume/share + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + } + if share['share_group_id'] is not None: + argdict.update({"group_name": share["share_group_id"]}) + subvolume_path = rados_command( + self.rados_client, "fs subvolume getpath", argdict) + + return self.protocol_helper.get_export_locations(share, subvolume_path) def delete_share(self, context, share, share_server=None): - extra_specs = share_types.get_extra_specs_from_share(share) - data_isolated = extra_specs.get("cephfs:data_isolated", False) + # remove FS subvolume/share - self.volume_client.delete_volume(cephfs_share_path(share), - data_isolated=data_isolated) - self.volume_client.purge_volume(cephfs_share_path(share), - data_isolated=data_isolated) + LOG.debug("[%(be)s]: delete_share: id=%(id)s, group=%(gr)s.", + {"be": self.backend_name, "id": share['id'], + "gr": share['share_group_id']}) + + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "force": True, + } + if share['share_group_id'] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + rados_command(self.rados_client, "fs subvolume rm", argdict) def update_access(self, context, share, access_rules, add_rules, delete_rules, share_server=None): @@ -300,65 +418,151 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, return self.create_share(context, share, share_server) def extend_share(self, share, new_size, share_server=None): + # resize FS subvolume/share + LOG.debug("[%(be)s]: extend_share: share=%(id)s, size=%(sz)s.", + {"be": self.backend_name, "id": share['id'], + "sz": new_size}) + + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "new_size": self._to_bytes(new_size), + } + + if share['share_group_id'] is not None: + argdict.update({"group_name": share["share_group_id"]}) + LOG.debug("extend_share {id} {size}".format( id=share['id'], size=new_size)) - self.volume_client.set_max_bytes(cephfs_share_path(share), - self._to_bytes(new_size)) + + rados_command(self.rados_client, "fs subvolume resize", argdict) def shrink_share(self, share, new_size, share_server=None): - LOG.debug("shrink_share {id} {size}".format( - id=share['id'], size=new_size)) - new_bytes = self._to_bytes(new_size) - used = self.volume_client.get_used_bytes(cephfs_share_path(share)) - if used > new_bytes: - # While in fact we can "shrink" our volumes to less than their - # used bytes (it's just a quota), raise error anyway to avoid - # confusing API consumers that might depend on typical shrink - # behaviour. - raise exception.ShareShrinkingPossibleDataLoss( - share_id=share['id']) + # resize FS subvolume/share + LOG.debug("[%(be)s]: shrink_share: share=%(id)s, size=%(sz)s.", + {"be": self.backend_name, "id": share['id'], + "sz": new_size}) - self.volume_client.set_max_bytes(cephfs_share_path(share), new_bytes) + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "new_size": self._to_bytes(new_size), + "no_shrink": True, + } + + if share["share_group_id"] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + try: + rados_command(self.rados_client, "fs subvolume resize", argdict) + except exception.ShareBackendException as e: + if 'would be lesser than' in str(e).lower(): + raise exception.ShareShrinkingPossibleDataLoss( + share_id=share['id']) + raise def create_snapshot(self, context, snapshot, share_server=None): - self.volume_client.create_snapshot_volume( - cephfs_share_path(snapshot['share']), - '_'.join([snapshot['snapshot_id'], snapshot['id']]), - mode=self._cephfs_volume_mode) + # create a FS snapshot + LOG.debug("[%(be)s]: create_snapshot: original share=%(id)s, " + "snapshot=%(sn)s.", + {"be": self.backend_name, "id": snapshot['share_id'], + "sn": snapshot['id']}) + + argdict = { + "vol_name": self.volname, + "sub_name": snapshot["share_id"], + "snap_name": "_".join([snapshot["snapshot_id"], snapshot["id"]]), + } + + rados_command( + self.rados_client, "fs subvolume snapshot create", argdict) def delete_snapshot(self, context, snapshot, share_server=None): - self.volume_client.destroy_snapshot_volume( - cephfs_share_path(snapshot['share']), - '_'.join([snapshot['snapshot_id'], snapshot['id']])) + # delete a FS snapshot + LOG.debug("[%(be)s]: delete_snapshot: snapshot=%(id)s.", + {"be": self.backend_name, "id": snapshot['id']}) + argdict = { + "vol_name": self.volname, + "sub_name": snapshot["share_id"], + "snap_name": '_'.join([snapshot['snapshot_id'], snapshot['id']]), + "force": True, + } + + rados_command(self.rados_client, "fs subvolume snapshot rm", argdict) def create_share_group(self, context, sg_dict, share_server=None): - self.volume_client.create_group(sg_dict['id'], - mode=self._cephfs_volume_mode) + # delete a FS group + LOG.debug("[%(be)s]: create_share_group: share_group=%(id)s.", + {"be": self.backend_name, "id": sg_dict['id']}) + + argdict = { + "vol_name": self.volname, + "group_name": sg_dict['id'], + "mode": self._cephfs_volume_mode, + } + + rados_command(self.rados_client, "fs subvolumegroup create", argdict) def delete_share_group(self, context, sg_dict, share_server=None): - self.volume_client.destroy_group(sg_dict['id']) + # create a FS group + LOG.debug("[%(be)s]: delete_share_group: share_group=%(id)s.", + {"be": self.backend_name, "id": sg_dict['id']}) + + argdict = { + "vol_name": self.volname, + "group_name": sg_dict['id'], + "force": True, + } + + rados_command(self.rados_client, "fs subvolumegroup rm", argdict) def delete_share_group_snapshot(self, context, snap_dict, share_server=None): - self.volume_client.destroy_snapshot_group( - snap_dict['share_group_id'], - snap_dict['id']) + # delete a FS group snapshot + LOG.debug("[%(be)s]: delete_share_group_snapshot: " + "share_group=%(sg_id)s, snapshot=%(sn)s.", + {"be": self.backend_name, "sg_id": snap_dict['id'], + "sn": snap_dict["share_group_id"]}) + + argdict = { + "vol_name": self.volname, + "group_name": snap_dict["share_group_id"], + "snap_name": snap_dict["id"], + "force": True, + } + + rados_command( + self.rados_client, "fs subvolumegroup snapshot rm", argdict) return None, [] def create_share_group_snapshot(self, context, snap_dict, share_server=None): - self.volume_client.create_snapshot_group( - snap_dict['share_group_id'], - snap_dict['id'], - mode=self._cephfs_volume_mode) + # create a FS group snapshot + LOG.debug("[%(be)s]: create_share_group_snapshot: share_group=%(id)s, " + "snapshot=%(sn)s.", + {"be": self.backend_name, "id": snap_dict['share_group_id'], + "sn": snap_dict["id"]}) + + argdict = { + "vol_name": self.volname, + "group_name": snap_dict["share_group_id"], + "snap_name": snap_dict["id"] + } + + rados_command( + self.rados_client, "fs subvolumegroup snapshot create", argdict) return None, [] def __del__(self): - if self._volume_client: - self._volume_client.disconnect() - self._volume_client = None + if self._rados_client: + LOG.info("[%(be)s] Ceph client disconnecting...", + {"be": self.backend_name}) + self._rados_client.shutdown() + self._rados_client = None + LOG.info("[%(be)s] Ceph client disconnected", + {"be": self.backend_name}) def get_configured_ip_versions(self): return self.protocol_helper.get_configured_ip_versions() @@ -372,7 +576,8 @@ class NativeProtocolHelper(ganesha.NASHelperBase): constants.ACCESS_LEVEL_RO) def __init__(self, execute, config, **kwargs): - self.volume_client = kwargs.pop('ceph_vol_client') + self.rados_client = kwargs.pop('rados_client') + self.volname = kwargs.pop('volname') self.message_api = message_api.API() super(NativeProtocolHelper, self).__init__(execute, config, **kwargs) @@ -384,13 +589,22 @@ class NativeProtocolHelper(ganesha.NASHelperBase): """Returns an error if prerequisites aren't met.""" return - def get_export_locations(self, share, cephfs_volume): + def get_mon_addrs(self): + result = [] + mon_map = rados_command(self.rados_client, "mon dump", json_obj=True) + for mon in mon_map['mons']: + ip_port = mon['addr'].split("/")[0] + result.append(ip_port) + + return result + + def get_export_locations(self, share, subvolume_path): # To mount this you need to know the mon IPs and the path to the volume - mon_addrs = self.volume_client.get_mon_addrs() + mon_addrs = self.get_mon_addrs() export_location = "{addrs}:{path}".format( addrs=",".join(mon_addrs), - path=cephfs_volume['mount_path']) + path=subvolume_path) LOG.info("Calculated export location for share %(id)s: %(loc)s", {"id": share['id'], "loc": export_location}) @@ -418,31 +632,36 @@ class NativeProtocolHelper(ganesha.NASHelperBase): ceph_auth_id) raise exception.InvalidShareAccess(reason=error_message) - if not getattr(self.volume_client, 'version', None): - if access['access_level'] == constants.ACCESS_LEVEL_RO: - LOG.error("Need python-cephfs package version 10.2.3 or " - "greater to enable read-only access.") - raise exception.InvalidShareAccessLevel( - level=constants.ACCESS_LEVEL_RO) + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "auth_id": ceph_auth_id, + "tenant_id": share["project_id"], + } - auth_result = self.volume_client.authorize( - cephfs_share_path(share), ceph_auth_id) + if share["share_group_id"] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + readonly = access['access_level'] == constants.ACCESS_LEVEL_RO + + if readonly: + argdict.update({"access_level": "r"}) else: - readonly = access['access_level'] == constants.ACCESS_LEVEL_RO - try: - auth_result = self.volume_client.authorize( - cephfs_share_path(share), ceph_auth_id, readonly=readonly, - tenant_id=share['project_id']) - except Exception as e: - if 'not allowed' in str(e).lower(): - msg = ("Access to client %(client)s is not allowed. " - "Reason: %(reason)s") - msg_payload = {'client': ceph_auth_id, 'reason': e} - raise exception.InvalidShareAccess( - reason=msg % msg_payload) - raise + argdict.update({"access_level": "rw"}) - return auth_result['auth_key'] + try: + auth_result = rados_command( + self.rados_client, "fs subvolume authorize", argdict) + except exception.ShareBackendException as e: + if 'not allowed' in str(e).lower(): + msg = ("Access to client %(client)s is not allowed. " + "Reason: %(reason)s") + msg_payload = {'client': ceph_auth_id, 'reason': e} + raise exception.InvalidShareAccess( + reason=msg % msg_payload) + raise + + return auth_result def _deny_access(self, context, share, access, share_server=None): if access['access_type'] != CEPHX_ACCESS_TYPE: @@ -451,35 +670,50 @@ class NativeProtocolHelper(ganesha.NASHelperBase): {"type": access['access_type']}) return - self.volume_client.deauthorize(cephfs_share_path(share), - access['access_to']) - self.volume_client.evict( - access['access_to'], - volume_path=cephfs_share_path(share)) + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "auth_id": access['access_to'] + } + + if share["share_group_id"] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + rados_command(self.rados_client, "fs subvolume deauthorize", argdict) + rados_command(self.rados_client, "fs subvolume evict", argdict) def update_access(self, context, share, access_rules, add_rules, delete_rules, share_server=None): access_updates = {} + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + } + + if share["share_group_id"] is not None: + argdict.update({"group_name": share["share_group_id"]}) + if not (add_rules or delete_rules): # recovery/maintenance mode add_rules = access_rules existing_auths = None - # The unversioned volume client cannot fetch from the Ceph backend, - # the list of auth IDs that have share access. - if getattr(self.volume_client, 'version', None): - existing_auths = self.volume_client.get_authorized_ids( - cephfs_share_path(share)) + existing_auths = rados_command( + self.rados_client, "fs subvolume authorized_list", + argdict, json_obj=True) if existing_auths: - existing_auth_ids = set( - [auth[0] for auth in existing_auths]) + existing_auth_ids = set() + for rule in range(len(existing_auths)): + for cephx_id in existing_auths[rule]: + existing_auth_ids.add(cephx_id) want_auth_ids = set( [rule['access_to'] for rule in add_rules]) delete_auth_ids = existing_auth_ids.difference( want_auth_ids) - for delete_auth_id in delete_auth_ids: + delete_auth_ids_list = delete_auth_ids + for delete_auth_id in delete_auth_ids_list: delete_rules.append( { 'access_to': delete_auth_id, @@ -562,8 +796,10 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2): super(NFSProtocolHelper, self).__init__(execute, config_object, **kwargs) - if not hasattr(self, 'ceph_vol_client'): - self.ceph_vol_client = kwargs.pop('ceph_vol_client') + if not hasattr(self, 'rados_client'): + self.rados_client = kwargs.pop('rados_client') + if not hasattr(self, 'volname'): + self.volname = kwargs.pop('volname') self.export_ips = config_object.cephfs_ganesha_export_ips if not self.export_ips: self.export_ips = [self.ganesha_host] @@ -582,12 +818,12 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2): "hostname.") % export_ip) raise exception.InvalidParameterValue(err=msg) - def get_export_locations(self, share, cephfs_volume): + def get_export_locations(self, share, subvolume_path): export_locations = [] for export_ip in self.export_ips: export_path = "{server_address}:{mount_path}".format( server_address=driver_helpers.escaped_address(export_ip), - mount_path=cephfs_volume['mount_path']) + mount_path=subvolume_path) LOG.info("Calculated export path for share %(id)s: %(epath)s", {"id": share['id'], "epath": export_path}) @@ -609,9 +845,21 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2): def _fsal_hook(self, base, share, access): """Callback to create FSAL subblock.""" ceph_auth_id = ''.join(['ganesha-', share['id']]) - auth_result = self.ceph_vol_client.authorize( - cephfs_share_path(share), ceph_auth_id, readonly=False, - tenant_id=share['project_id']) + + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "auth_id": ceph_auth_id, + "access_level": "rw", + "tenant_id": share["project_id"], + } + + if share["share_group_id"] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + auth_result = rados_command( + self.rados_client, "fs subvolume authorize", argdict) + # Restrict Ganesha server's access to only the CephFS subtree or path, # corresponding to the manila share, that is to be exported by making # Ganesha use Ceph auth IDs with path restricted capabilities to @@ -619,31 +867,49 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2): return { 'Name': 'Ceph', 'User_Id': ceph_auth_id, - 'Secret_Access_Key': auth_result['auth_key'] + 'Secret_Access_Key': auth_result } def _cleanup_fsal_hook(self, base, share, access): """Callback for FSAL specific cleanup after removing an export.""" ceph_auth_id = ''.join(['ganesha-', share['id']]) - self.ceph_vol_client.deauthorize(cephfs_share_path(share), - ceph_auth_id) + + argdict = { + "vol_name": self.volname, + "sub_name": share["id"], + "auth_id": ceph_auth_id, + } + + if share["share_group_id"] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + rados_command(self.rados_client, "fs subvolume deauthorize", argdict) def _get_export_path(self, share): """Callback to provide export path.""" - volume_path = cephfs_share_path(share) - return self.ceph_vol_client._get_path(volume_path) + argdict = { + "vol_name": self.volname, + "sub_name": share["id"] + } + + if share["share_group_id"] is not None: + argdict.update({"group_name": share["share_group_id"]}) + + path = rados_command( + self.rados_client, "fs subvolume getpath", argdict) + + return path def _get_export_pseudo_path(self, share): """Callback to provide pseudo path.""" - volume_path = cephfs_share_path(share) - return self.ceph_vol_client._get_path(volume_path) + return self._get_export_path(share) def get_configured_ip_versions(self): if not self.configured_ip_versions: try: for export_ip in self.export_ips: self.configured_ip_versions.add( - ipaddress.ip_address(six.text_type(export_ip)).version) + ipaddress.ip_address(str(export_ip)).version) except Exception: # export_ips contained a hostname, safest thing is to # claim support for IPv4 and IPv6 address families diff --git a/manila/share/drivers/ganesha/__init__.py b/manila/share/drivers/ganesha/__init__.py index 03b87ce438..98e19ecf17 100644 --- a/manila/share/drivers/ganesha/__init__.py +++ b/manila/share/drivers/ganesha/__init__.py @@ -20,7 +20,6 @@ import re from oslo_config import cfg from oslo_log import log -import six from manila.common import constants from manila import exception @@ -32,8 +31,7 @@ CONF = cfg.CONF LOG = log.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) -class NASHelperBase(object): +class NASHelperBase(object, metaclass=abc.ABCMeta): """Interface to work with share.""" # drivers that use a helper derived from this class @@ -184,7 +182,7 @@ class GaneshaNASHelper2(GaneshaNASHelper): def __init__(self, execute, config, tag='', **kwargs): super(GaneshaNASHelper2, self).__init__(execute, config, **kwargs) if self.configuration.ganesha_rados_store_enable: - self.ceph_vol_client = kwargs.pop('ceph_vol_client') + self.rados_client = kwargs.pop('rados_client') def init_helper(self): """Initializes protocol-specific NAS drivers.""" @@ -206,8 +204,7 @@ class GaneshaNASHelper2(GaneshaNASHelper): self.configuration.ganesha_rados_export_index) kwargs['ganesha_rados_export_counter'] = ( self.configuration.ganesha_rados_export_counter) - kwargs['ceph_vol_client'] = ( - self.ceph_vol_client) + kwargs['rados_client'] = self.rados_client else: kwargs['ganesha_db_path'] = self.configuration.ganesha_db_path self.ganesha = ganesha_manager.GaneshaManager( diff --git a/manila/share/drivers/ganesha/manager.py b/manila/share/drivers/ganesha/manager.py index bfaf055613..c42ecfadd0 100644 --- a/manila/share/drivers/ganesha/manager.py +++ b/manila/share/drivers/ganesha/manager.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import io import os import pipes import re @@ -21,7 +22,6 @@ import sys from oslo_log import log from oslo_serialization import jsonutils from oslo_utils import importutils -import six from manila import exception from manila.i18n import _ @@ -36,7 +36,7 @@ def _conf2json(conf): """Convert Ganesha config to JSON.""" # tokenize config string - token_list = [six.StringIO()] + token_list = [io.StringIO()] state = { 'in_quote': False, 'in_comment': False, @@ -49,7 +49,7 @@ def _conf2json(conf): if not state['escape']: if char == '"': state['in_quote'] = False - cbk.append(lambda: token_list.append(six.StringIO())) + cbk.append(lambda: token_list.append(io.StringIO())) elif char == '\\': cbk.append(lambda: state.update({'escape': True})) else: @@ -60,7 +60,7 @@ def _conf2json(conf): state['in_comment'] = False else: if char == '"': - token_list.append(six.StringIO()) + token_list.append(io.StringIO()) state['in_quote'] = True state['escape'] = False if not state['in_comment']: @@ -200,7 +200,7 @@ def parseconf(conf): def mkconf(confdict): """Create Ganesha config string from confdict.""" - s = six.StringIO() + s = io.StringIO() _dump_to_conf(confdict, s) return s.getvalue() @@ -255,13 +255,12 @@ class GaneshaManager(object): kwargs['ganesha_rados_export_counter']) self.ganesha_rados_export_index = ( kwargs['ganesha_rados_export_index']) - self.ceph_vol_client = ( - kwargs['ceph_vol_client']) + self.rados_client = kwargs['rados_client'] try: self._get_rados_object(self.ganesha_rados_export_counter) except rados.ObjectNotFound: self._put_rados_object(self.ganesha_rados_export_counter, - six.text_type(1000)) + str(1000)) else: self.ganesha_db_path = kwargs['ganesha_db_path'] self.execute('mkdir', '-p', os.path.dirname(self.ganesha_db_path)) @@ -385,7 +384,7 @@ class GaneshaManager(object): for k, v in ganesha_utils.walk(confdict): # values in the export block template that need to be # filled in by Manila are pre-fixed by '@' - if isinstance(v, six.string_types) and v[0] == '@': + if isinstance(v, str) and v[0] == '@': msg = _("Incomplete export block: value %(val)s of attribute " "%(key)s is a stub.") % {'key': k, 'val': v} raise exception.InvalidParameterValue(err=msg) @@ -524,22 +523,78 @@ class GaneshaManager(object): self._rm_export_file(name) self._mkindex() - def _get_rados_object(self, obj_name): - """Get data stored in Ceph RADOS object as a text string.""" - return self.ceph_vol_client.get_object( - self.ganesha_rados_store_pool_name, obj_name).decode('utf-8') + def _get_rados_object(self, object_name): + """Synchronously read data from Ceph RADOS object as a text string. - def _put_rados_object(self, obj_name, data): - """Put data as a byte string in a Ceph RADOS object.""" - return self.ceph_vol_client.put_object( - self.ganesha_rados_store_pool_name, - obj_name, - data.encode('utf-8')) + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :returns: tuple of object data and version + """ - def _delete_rados_object(self, obj_name): - return self.ceph_vol_client.delete_object( - self.ganesha_rados_store_pool_name, - obj_name) + pool_name = self.ganesha_rados_store_pool_name + + ioctx = self.rados_client.open_ioctx(pool_name) + + osd_max_write_size = self.rados_client.conf_get('osd_max_write_size') + max_size = int(osd_max_write_size) * 1024 * 1024 + try: + bytes_read = ioctx.read(object_name, max_size) + if ((len(bytes_read) == max_size) and + (ioctx.read(object_name, 1, offset=max_size))): + LOG.warning("Size of object {0} exceeds '{1}' bytes " + "read".format(object_name, max_size)) + finally: + ioctx.close() + + bytes_read_decoded = bytes_read.decode('utf-8') + + return bytes_read_decoded + + def _put_rados_object(self, object_name, data): + """Synchronously write data as a byte string in a Ceph RADOS object. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :param data: data to write + :type data: bytes + """ + + pool_name = self.ganesha_rados_store_pool_name + encoded_data = data.encode('utf-8') + + ioctx = self.rados_client.open_ioctx(pool_name) + + max_size = int( + self.rados_client.conf_get('osd_max_write_size')) * 1024 * 1024 + if len(encoded_data) > max_size: + msg = ("Data to be written to object '{0}' exceeds " + "{1} bytes".format(object_name, max_size)) + LOG.error(msg) + raise exception.ShareBackendException(msg) + + try: + with rados.WriteOpCtx() as wop: + wop.write_full(encoded_data) + ioctx.operate_write_op(wop, object_name) + except rados.OSError as e: + LOG.error(e) + raise e + finally: + ioctx.close() + + def _delete_rados_object(self, object_name): + pool_name = self.ganesha_rados_store_pool_name + ioctx = self.rados_client.open_ioctx(pool_name) + try: + ioctx.remove_object(object_name) + except rados.ObjectNotFound: + LOG.warning("Object '{0}' was already removed".format(object_name)) + finally: + ioctx.close() def get_export_id(self, bump=True): """Get a new export id.""" diff --git a/manila/tests/share/drivers/cephfs/test_driver.py b/manila/tests/share/drivers/cephfs/test_driver.py index 52f15f939e..fe220b7afe 100644 --- a/manila/tests/share/drivers/cephfs/test_driver.py +++ b/manila/tests/share/drivers/cephfs/test_driver.py @@ -28,59 +28,39 @@ from manila import test from manila.tests import fake_share -DEFAULT_VOLUME_MODE = 0o755 -ALT_VOLUME_MODE_CFG = '775' -ALT_VOLUME_MODE = 0o775 +DEFAULT_VOLUME_MODE = '755' +ALT_VOLUME_MODE = '644' -class MockVolumeClientModule(object): - """Mocked up version of ceph's VolumeClient interface.""" - - class VolumePath(object): - """Copy of VolumePath from CephFSVolumeClient.""" - - def __init__(self, group_id, volume_id): - self.group_id = group_id - self.volume_id = volume_id - - def __eq__(self, other): - return (self.group_id == other.group_id - and self.volume_id == other.volume_id) - - def __str__(self): - return "{0}/{1}".format(self.group_id, self.volume_id) - - class CephFSVolumeClient(mock.Mock): - mock_used_bytes = 0 - version = 1 +class MockRadosModule(object): + """Mocked up version of the rados module.""" + class Rados(mock.Mock): def __init__(self, *args, **kwargs): mock.Mock.__init__(self, spec=[ - "connect", "disconnect", - "create_snapshot_volume", "destroy_snapshot_volume", - "create_group", "destroy_group", - "delete_volume", "purge_volume", - "deauthorize", "evict", "set_max_bytes", - "destroy_snapshot_group", "create_snapshot_group", - "get_authorized_ids" + "connect", "shutdown", "state" ]) - self.create_volume = mock.Mock(return_value={ - "mount_path": "/foo/bar" - }) - self._get_path = mock.Mock(return_value='/foo/bar') self.get_mon_addrs = mock.Mock(return_value=["1.2.3.4", "5.6.7.8"]) - self.get_authorized_ids = mock.Mock( - return_value=[('eve', 'rw')]) - self.authorize = mock.Mock(return_value={"auth_key": "abc123"}) - self.get_used_bytes = mock.Mock(return_value=self.mock_used_bytes) - self.rados = mock.Mock() - self.rados.get_cluster_stats = mock.Mock(return_value={ + self.get_cluster_stats = mock.Mock(return_value={ "kb": 172953600, "kb_avail": 157123584, "kb_used": 15830016, "num_objects": 26, }) + class Error(mock.Mock): + pass + + +class MockCephArgparseModule(object): + """Mocked up version of the ceph_argparse module.""" + + class json_command(mock.Mock): + def __init__(self, *args, **kwargs): + mock.Mock.__init__(self, spec=[ + "connect", "shutdown", "state" + ]) + @ddt.ddt class CephFSDriverTestCase(test.TestCase): @@ -98,14 +78,14 @@ class CephFSDriverTestCase(test.TestCase): self.fake_conf = configuration.Configuration(None) self._context = context.get_admin_context() self._share = fake_share.fake_share(share_proto='CEPHFS') + self._snapshot = fake_share.fake_snapshot_instance() self.fake_conf.set_default('driver_handles_share_servers', False) self.fake_conf.set_default('cephfs_auth_id', 'manila') - self.mock_object(driver, "ceph_volume_client", - MockVolumeClientModule) - self.mock_object(driver, "ceph_module_found", True) - self.mock_object(driver, "cephfs_share_path") + self.mock_object(driver, "rados_command") + self.mock_object(driver, "rados", MockRadosModule) + self.mock_object(driver, "json_command", MockCephArgparseModule) self.mock_object(driver, 'NativeProtocolHelper') self.mock_object(driver, 'NFSProtocolHelper') @@ -114,6 +94,8 @@ class CephFSDriverTestCase(test.TestCase): configuration=self.fake_conf)) self._driver.protocol_helper = mock.Mock() + type(self._driver).volname = mock.PropertyMock(return_value='cephfs') + self.mock_object(share_types, 'get_share_type_extra_specs', mock.Mock(return_value={})) @@ -127,11 +109,13 @@ class CephFSDriverTestCase(test.TestCase): if protocol_helper == 'cephfs': driver.NativeProtocolHelper.assert_called_once_with( self._execute, self._driver.configuration, - ceph_vol_client=self._driver._volume_client) + rados_client=self._driver._rados_client, + volname=self._driver.volname) else: driver.NFSProtocolHelper.assert_called_once_with( self._execute, self._driver.configuration, - ceph_vol_client=self._driver._volume_client) + rados_client=self._driver._rados_client, + volname=self._driver.volname) self._driver.protocol_helper.init_helper.assert_called_once_with() @@ -148,16 +132,33 @@ class CephFSDriverTestCase(test.TestCase): assert_called_once_with()) def test_create_share(self): - cephfs_volume = {"mount_path": "/foo/bar"} + create_share_prefix = "fs subvolume create" + get_path_prefix = "fs subvolume getpath" + + create_share_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + "size": self._share["size"] * units.Gi, + "namespace_isolated": True, + "mode": DEFAULT_VOLUME_MODE, + } + + get_path_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + } self._driver.create_share(self._context, self._share) - self._driver._volume_client.create_volume.assert_called_once_with( - driver.cephfs_share_path(self._share), - size=self._share['size'] * units.Gi, - data_isolated=False, mode=DEFAULT_VOLUME_MODE) - (self._driver.protocol_helper.get_export_locations. - assert_called_once_with(self._share, cephfs_volume)) + driver.rados_command.assert_has_calls([ + mock.call(self._driver.rados_client, + create_share_prefix, + create_share_dict), + mock.call(self._driver.rados_client, + get_path_prefix, + get_path_dict)]) + + self.assertEqual(2, driver.rados_command.call_count) def test_create_share_error(self): share = fake_share.fake_share(share_proto='NFS') @@ -187,171 +188,229 @@ class CephFSDriverTestCase(test.TestCase): share_server=None) def test_ensure_share(self): + create_share_prefix = "fs subvolume create" + get_path_prefix = "fs subvolume getpath" + + create_share_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + "size": self._share["size"] * units.Gi, + "namespace_isolated": True, + "mode": DEFAULT_VOLUME_MODE, + } + + get_path_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + } + self._driver.ensure_share(self._context, self._share) - self._driver._volume_client.create_volume.assert_called_once_with( - driver.cephfs_share_path(self._share), - size=self._share['size'] * units.Gi, - data_isolated=False, - mode=DEFAULT_VOLUME_MODE) + driver.rados_command.assert_has_calls([ + mock.call(self._driver.rados_client, + create_share_prefix, + create_share_dict), + mock.call(self._driver.rados_client, + get_path_prefix, + get_path_dict)]) - def test_create_data_isolated(self): - self.mock_object(share_types, 'get_share_type_extra_specs', - mock.Mock(return_value={"cephfs:data_isolated": True}) - ) - - self._driver.create_share(self._context, self._share) - - self._driver._volume_client.create_volume.assert_called_once_with( - driver.cephfs_share_path(self._share), - size=self._share['size'] * units.Gi, - data_isolated=True, - mode=DEFAULT_VOLUME_MODE) + self.assertEqual(2, driver.rados_command.call_count) def test_delete_share(self): - self._driver.delete_share(self._context, self._share) + delete_share_prefix = "fs subvolume rm" - self._driver._volume_client.delete_volume.assert_called_once_with( - driver.cephfs_share_path(self._share), - data_isolated=False) - self._driver._volume_client.purge_volume.assert_called_once_with( - driver.cephfs_share_path(self._share), - data_isolated=False) - - def test_delete_data_isolated(self): - self.mock_object(share_types, 'get_share_type_extra_specs', - mock.Mock(return_value={"cephfs:data_isolated": True}) - ) + delete_share_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + "force": True, + } self._driver.delete_share(self._context, self._share) - self._driver._volume_client.delete_volume.assert_called_once_with( - driver.cephfs_share_path(self._share), - data_isolated=True) - self._driver._volume_client.purge_volume.assert_called_once_with( - driver.cephfs_share_path(self._share), - data_isolated=True) + driver.rados_command.assert_called_once_with( + self._driver.rados_client, delete_share_prefix, delete_share_dict) def test_extend_share(self): + extend_share_prefix = "fs subvolume resize" + new_size_gb = self._share['size'] * 2 new_size = new_size_gb * units.Gi + extend_share_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + "new_size": new_size, + } + self._driver.extend_share(self._share, new_size_gb, None) - self._driver._volume_client.set_max_bytes.assert_called_once_with( - driver.cephfs_share_path(self._share), - new_size) + driver.rados_command.assert_called_once_with( + self._driver.rados_client, extend_share_prefix, extend_share_dict) def test_shrink_share(self): + shrink_share_prefix = "fs subvolume resize" + new_size_gb = self._share['size'] * 0.5 new_size = new_size_gb * units.Gi + shrink_share_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + "new_size": new_size, + "no_shrink": True, + } + self._driver.shrink_share(self._share, new_size_gb, None) - self._driver._volume_client.get_used_bytes.assert_called_once_with( - driver.cephfs_share_path(self._share)) - self._driver._volume_client.set_max_bytes.assert_called_once_with( - driver.cephfs_share_path(self._share), - new_size) + driver.rados_command.assert_called_once_with( + self._driver.rados_client, shrink_share_prefix, shrink_share_dict) def test_shrink_share_full(self): """That shrink fails when share is too full.""" + shrink_share_prefix = "fs subvolume resize" + new_size_gb = self._share['size'] * 0.5 + new_size = new_size_gb * units.Gi + + msg = ("Can't resize the subvolume. " + "The new size '{0}' would be lesser " + "than the current used size '{1}'".format( + new_size, self._share['size'])) + driver.rados_command.side_effect = exception.ShareBackendException(msg) + + shrink_share_dict = { + "vol_name": self._driver.volname, + "sub_name": self._share["id"], + "new_size": new_size, + "no_shrink": True, + } # Pretend to be full up - vc = MockVolumeClientModule.CephFSVolumeClient - vc.mock_used_bytes = (units.Gi * self._share['size']) - self.assertRaises(exception.ShareShrinkingPossibleDataLoss, self._driver.shrink_share, self._share, new_size_gb, None) - self._driver._volume_client.set_max_bytes.assert_not_called() + + driver.rados_command.assert_called_once_with( + self._driver.rados_client, shrink_share_prefix, shrink_share_dict) def test_create_snapshot(self): - self._driver.create_snapshot(self._context, - { - "id": "instance1", - "share": self._share, - "snapshot_id": "snappy1" - }, - None) + snapshot_create_prefix = "fs subvolume snapshot create" - (self._driver._volume_client.create_snapshot_volume - .assert_called_once_with( - driver.cephfs_share_path(self._share), - "snappy1_instance1", - mode=DEFAULT_VOLUME_MODE)) + snapshot_create_dict = { + "vol_name": self._driver.volname, + "sub_name": self._snapshot["share_id"], + "snap_name": "_".join([ + self._snapshot["snapshot_id"], self._snapshot["id"]]), + } + + self._driver.create_snapshot(self._context, self._snapshot, None) + + driver.rados_command.assert_called_once_with( + self._driver.rados_client, + snapshot_create_prefix, snapshot_create_dict) def test_delete_snapshot(self): + snapshot_remove_prefix = "fs subvolume snapshot rm" + + snapshot_remove_dict = { + "vol_name": self._driver.volname, + "sub_name": self._snapshot["share_id"], + "snap_name": "_".join([ + self._snapshot["snapshot_id"], self._snapshot["id"]]), + "force": True, + } + self._driver.delete_snapshot(self._context, - { - "id": "instance1", - "share": self._share, - "snapshot_id": "snappy1" - }, + self._snapshot, None) - (self._driver._volume_client.destroy_snapshot_volume - .assert_called_once_with( - driver.cephfs_share_path(self._share), - "snappy1_instance1")) + driver.rados_command.assert_called_once_with( + self._driver.rados_client, + snapshot_remove_prefix, snapshot_remove_dict) def test_create_share_group(self): + group_create_prefix = "fs subvolumegroup create" + + group_create_dict = { + "vol_name": self._driver.volname, + "group_name": "grp1", + "mode": DEFAULT_VOLUME_MODE, + } + self._driver.create_share_group(self._context, {"id": "grp1"}, None) - self._driver._volume_client.create_group.assert_called_once_with( - "grp1", mode=DEFAULT_VOLUME_MODE) + driver.rados_command.assert_called_once_with( + self._driver.rados_client, + group_create_prefix, group_create_dict) def test_delete_share_group(self): + group_delete_prefix = "fs subvolumegroup rm" + + group_delete_dict = { + "vol_name": self._driver.volname, + "group_name": "grp1", + "force": True, + } + self._driver.delete_share_group(self._context, {"id": "grp1"}, None) - self._driver._volume_client.destroy_group.assert_called_once_with( - "grp1") + driver.rados_command.assert_called_once_with( + self._driver.rados_client, + group_delete_prefix, group_delete_dict) + + def test_create_share_group_snapshot(self): + group_snapshot_create_prefix = "fs subvolumegroup snapshot create" + + group_snapshot_create_dict = { + "vol_name": self._driver.volname, + "group_name": "sgid", + "snap_name": "snapid", + } - def test_create_share_snapshot(self): self._driver.create_share_group_snapshot(self._context, { 'share_group_id': 'sgid', 'id': 'snapid', }) - (self._driver._volume_client.create_snapshot_group. - assert_called_once_with("sgid", "snapid", mode=DEFAULT_VOLUME_MODE)) + driver.rados_command.assert_called_once_with( + self._driver.rados_client, + group_snapshot_create_prefix, group_snapshot_create_dict) def test_delete_share_group_snapshot(self): + group_snapshot_delete_prefix = "fs subvolumegroup snapshot rm" + + group_snapshot_delete_dict = { + "vol_name": self._driver.volname, + "group_name": "sgid", + "snap_name": "snapid", + "force": True, + } + self._driver.delete_share_group_snapshot(self._context, { 'share_group_id': 'sgid', 'id': 'snapid', + "force": True, }) - (self._driver._volume_client.destroy_snapshot_group. - assert_called_once_with("sgid", "snapid")) + driver.rados_command.assert_called_once_with( + self._driver.rados_client, + group_snapshot_delete_prefix, group_snapshot_delete_dict) def test_delete_driver(self): # Create share to prompt volume_client construction self._driver.create_share(self._context, self._share) - vc = self._driver._volume_client + rc = self._driver._rados_client del self._driver - vc.disconnect.assert_called_once_with() + rc.shutdown.assert_called_once_with() def test_delete_driver_no_client(self): - self.assertIsNone(self._driver._volume_client) + self.assertIsNone(self._driver._rados_client) del self._driver - def test_connect_noevict(self): - # When acting as "admin", driver should skip evicting - self._driver.configuration.local_conf.set_override('cephfs_auth_id', - "admin") - - self._driver.create_share(self._context, - self._share) - - vc = self._driver._volume_client - vc.connect.assert_called_once_with(premount_evict=None) - def test_update_share_stats(self): self._driver.get_configured_ip_versions = mock.Mock(return_value=[4]) self._driver.configuration.local_conf.set_override( @@ -367,15 +426,6 @@ class CephFSDriverTestCase(test.TestCase): self.assertFalse(result['ipv6_support']) self.assertEqual("CEPHFS", result['storage_protocol']) - def test_module_missing(self): - driver.ceph_module_found = False - driver.ceph_volume_client = None - - self.assertRaises(exception.ManilaException, - self._driver.create_share, - self._context, - self._share) - @ddt.data('cephfs', 'nfs') def test_get_configured_ip_versions(self, protocol_helper): self._driver.configuration.cephfs_protocol_helper_type = ( @@ -398,14 +448,20 @@ class NativeProtocolHelperTestCase(test.TestCase): self.fake_conf.set_default('driver_handles_share_servers', False) - self.mock_object(driver, "cephfs_share_path") + self.mock_object(driver, "rados_command") self._native_protocol_helper = driver.NativeProtocolHelper( None, self.fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) + self._rados_client = self._native_protocol_helper.rados_client + + self._native_protocol_helper.get_mon_addrs = mock.Mock( + return_value=['1.2.3.4', '5.6.7.8']) + def test_check_for_setup_error(self): expected = None @@ -414,8 +470,7 @@ class NativeProtocolHelperTestCase(test.TestCase): self.assertEqual(expected, result) def test_get_export_locations(self): - vc = self._native_protocol_helper.volume_client - fake_cephfs_volume = {'mount_path': '/foo/bar'} + fake_cephfs_subvolume_path = '/foo/bar' expected_export_locations = { 'path': '1.2.3.4,5.6.7.8:/foo/bar', 'is_admin_only': False, @@ -423,58 +478,40 @@ class NativeProtocolHelperTestCase(test.TestCase): } export_locations = self._native_protocol_helper.get_export_locations( - self._share, fake_cephfs_volume) + self._share, fake_cephfs_subvolume_path) self.assertEqual(expected_export_locations, export_locations) - vc.get_mon_addrs.assert_called_once_with() + self._native_protocol_helper.get_mon_addrs.assert_called_once_with() + + @ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO) + def test_allow_access_rw_ro(self, mode): + access_allow_prefix = "fs subvolume authorize" + access_allow_mode = "r" if mode == "ro" else "rw" + + access_allow_dict = { + "vol_name": self._native_protocol_helper.volname, + "sub_name": self._share["id"], + "auth_id": "alice", + "tenant_id": self._share["project_id"], + "access_level": access_allow_mode, + } - @ddt.data(None, 1) - def test_allow_access_rw(self, volume_client_version): - vc = self._native_protocol_helper.volume_client rule = { - 'access_level': constants.ACCESS_LEVEL_RW, + 'access_level': mode, 'access_to': 'alice', 'access_type': 'cephx', } - vc.version = volume_client_version + + driver.rados_command.return_value = 'native-zorilla' auth_key = self._native_protocol_helper._allow_access( self._context, self._share, rule) - self.assertEqual("abc123", auth_key) + self.assertEqual("native-zorilla", auth_key) - if not volume_client_version: - vc.authorize.assert_called_once_with( - driver.cephfs_share_path(self._share), "alice") - else: - vc.authorize.assert_called_once_with( - driver.cephfs_share_path(self._share), "alice", - readonly=False, tenant_id=self._share['project_id']) - - @ddt.data(None, 1) - def test_allow_access_ro(self, volume_client_version): - vc = self._native_protocol_helper.volume_client - rule = { - 'access_level': constants.ACCESS_LEVEL_RO, - 'access_to': 'alice', - 'access_type': 'cephx', - } - vc.version = volume_client_version - - if not volume_client_version: - self.assertRaises(exception.InvalidShareAccessLevel, - self._native_protocol_helper._allow_access, - self._context, self._share, rule) - else: - auth_key = ( - self._native_protocol_helper._allow_access( - self._context, self._share, rule) - ) - - self.assertEqual("abc123", auth_key) - vc.authorize.assert_called_once_with( - driver.cephfs_share_path(self._share), "alice", readonly=True, - tenant_id=self._share['project_id']) + driver.rados_command.assert_called_once_with( + self._rados_client, + access_allow_prefix, access_allow_dict) def test_allow_access_wrong_type(self): self.assertRaises(exception.InvalidShareAccessType, @@ -495,12 +532,9 @@ class NativeProtocolHelperTestCase(test.TestCase): }) def test_allow_access_to_preexisting_ceph_user(self): - - vc = self._native_protocol_helper.volume_client msg = ("auth ID: admin exists and not created by " - "ceph_volume_client. Not allowed to modify") - self.mock_object(vc, 'authorize', - mock.Mock(side_effect=Exception(msg))) + "ceph manager plugin. Not allowed to modify") + driver.rados_command.side_effect = exception.ShareBackendException(msg) self.assertRaises(exception.InvalidShareAccess, self._native_protocol_helper._allow_access, @@ -512,17 +546,33 @@ class NativeProtocolHelperTestCase(test.TestCase): }) def test_deny_access(self): - vc = self._native_protocol_helper.volume_client + access_deny_prefix = "fs subvolume deauthorize" + + access_deny_dict = { + "vol_name": self._native_protocol_helper.volname, + "sub_name": self._share["id"], + "auth_id": "alice", + } + + evict_prefix = "fs subvolume evict" + + evict_dict = access_deny_dict + self._native_protocol_helper._deny_access(self._context, self._share, { 'access_level': 'rw', 'access_type': 'cephx', 'access_to': 'alice' }) - vc.deauthorize.assert_called_once_with( - driver.cephfs_share_path(self._share), "alice") - vc.evict.assert_called_once_with( - "alice", volume_path=driver.cephfs_share_path(self._share)) + driver.rados_command.assert_has_calls([ + mock.call(self._native_protocol_helper.rados_client, + access_deny_prefix, + access_deny_dict), + mock.call(self._native_protocol_helper.rados_client, + evict_prefix, + evict_dict)]) + + self.assertEqual(2, driver.rados_command.call_count) def test_update_access_add_rm(self): alice = { @@ -596,17 +646,53 @@ class NativeProtocolHelperTestCase(test.TestCase): self.assertEqual( 3, self._native_protocol_helper.message_api.create.call_count) - @ddt.data(None, 1) - def test_update_access_all(self, volume_client_version): - vc = self._native_protocol_helper.volume_client + def test_update_access_all(self): + get_authorized_ids_prefix = "fs subvolume authorized_list" + + get_authorized_ids_dict = { + "vol_name": self._native_protocol_helper.volname, + "sub_name": self._share["id"] + } + + access_allow_prefix = "fs subvolume authorize" + + access_allow_dict = { + "vol_name": self._native_protocol_helper.volname, + "sub_name": self._share["id"], + "auth_id": "alice", + "tenant_id": self._share["project_id"], + "access_level": "rw", + } + + access_deny_prefix = "fs subvolume deauthorize" + + access_deny_john_dict = { + "vol_name": self._native_protocol_helper.volname, + "sub_name": self._share["id"], + "auth_id": "john", + } + + access_deny_paul_dict = { + "vol_name": self._native_protocol_helper.volname, + "sub_name": self._share["id"], + "auth_id": "paul", + } + + evict_prefix = "fs subvolume evict" + alice = { 'id': 'instance_mapping_id1', 'access_id': 'accessid1', 'access_level': 'rw', 'access_type': 'cephx', - 'access_to': 'alice' + 'access_to': 'alice', } - vc.version = volume_client_version + + driver.rados_command.side_effect = [ + [{"john": "rw"}, {"paul": "r"}], + 'abc123', + mock.Mock(), mock.Mock(), + mock.Mock(), mock.Mock()] access_updates = self._native_protocol_helper.update_access( self._context, self._share, access_rules=[alice], add_rules=[], @@ -615,18 +701,28 @@ class NativeProtocolHelperTestCase(test.TestCase): self.assertEqual( {'accessid1': {'access_key': 'abc123'}}, access_updates) - if volume_client_version: - vc.get_authorized_ids.assert_called_once_with( - driver.cephfs_share_path(self._share)) - vc.authorize.assert_called_once_with( - driver.cephfs_share_path(self._share), "alice", readonly=False, - tenant_id=self._share['project_id']) - vc.deauthorize.assert_called_once_with( - driver.cephfs_share_path(self._share), "eve") - else: - self.assertFalse(vc.get_authorized_ids.called) - vc.authorize.assert_called_once_with( - driver.cephfs_share_path(self._share), "alice") + driver.rados_command.assert_has_calls([ + mock.call(self._native_protocol_helper.rados_client, + get_authorized_ids_prefix, + get_authorized_ids_dict, + json_obj=True), + mock.call(self._native_protocol_helper.rados_client, + access_allow_prefix, + access_allow_dict), + mock.call(self._native_protocol_helper.rados_client, + access_deny_prefix, + access_deny_john_dict), + mock.call(self._native_protocol_helper.rados_client, + evict_prefix, + access_deny_john_dict), + mock.call(self._native_protocol_helper.rados_client, + access_deny_prefix, + access_deny_paul_dict), + mock.call(self._native_protocol_helper.rados_client, + evict_prefix, + access_deny_paul_dict)], any_order=True) + + self.assertEqual(6, driver.rados_command.call_count) def test_get_configured_ip_versions(self): expected = [4] @@ -643,21 +739,22 @@ class NFSProtocolHelperTestCase(test.TestCase): super(NFSProtocolHelperTestCase, self).setUp() self._execute = mock.Mock() self._share = fake_share.fake_share(share_proto='NFS') - self._volume_client = MockVolumeClientModule.CephFSVolumeClient() + self._rados_client = MockRadosModule.Rados() + self._volname = "cephfs" self.fake_conf = configuration.Configuration(None) self.fake_conf.set_default('cephfs_ganesha_server_ip', 'fakeip') - self.mock_object(driver, "cephfs_share_path", - mock.Mock(return_value='fakevolumepath')) self.mock_object(driver.ganesha_utils, 'SSHExecutor') self.mock_object(driver.ganesha_utils, 'RootExecutor') self.mock_object(driver.socket, 'gethostname') + self.mock_object(driver, "rados_command") self._nfs_helper = driver.NFSProtocolHelper( self._execute, self.fake_conf, - ceph_vol_client=self._volume_client) + rados_client=self._rados_client, + volname=self._volname) @ddt.data( (['fakehost', 'some.host.name', 'some.host.name.', '1.1.1.0'], False), @@ -682,7 +779,8 @@ class NFSProtocolHelperTestCase(test.TestCase): helper = driver.NFSProtocolHelper( self._execute, fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) if raises: @@ -706,7 +804,8 @@ class NFSProtocolHelperTestCase(test.TestCase): driver.NFSProtocolHelper( self._execute, fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) if ganesha_server_is_remote: @@ -733,7 +832,8 @@ class NFSProtocolHelperTestCase(test.TestCase): driver.NFSProtocolHelper( self._execute, fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) driver.ganesha_utils.RootExecutor.assert_has_calls( @@ -746,18 +846,19 @@ class NFSProtocolHelperTestCase(test.TestCase): driver.LOG.info.assert_called_once() def test_get_export_locations_no_export_ips_configured(self): - cephfs_volume = {"mount_path": "/foo/bar"} + cephfs_subvolume_path = "/foo/bar" fake_conf = configuration.Configuration(None) fake_conf.set_default('cephfs_ganesha_server_ip', '1.2.3.4') helper = driver.NFSProtocolHelper( self._execute, fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) ret = helper.get_export_locations(self._share, - cephfs_volume) + cephfs_subvolume_path) self.assertEqual( [{ 'path': '1.2.3.4:/foo/bar', @@ -776,12 +877,13 @@ class NFSProtocolHelperTestCase(test.TestCase): helper = driver.NFSProtocolHelper( self._execute, fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) - cephfs_volume = {"mount_path": "/foo/bar"} + cephfs_subvolume_path = "/foo/bar" - ret = helper.get_export_locations(self._share, cephfs_volume) + ret = helper.get_export_locations(self._share, cephfs_subvolume_path) self.assertEqual( [ @@ -822,7 +924,8 @@ class NFSProtocolHelperTestCase(test.TestCase): helper = driver.NFSProtocolHelper( self._execute, fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) self.assertEqual(set(configured_ip_version), @@ -833,7 +936,8 @@ class NFSProtocolHelperTestCase(test.TestCase): helper = driver.NFSProtocolHelper( self._execute, fake_conf, - ceph_vol_client=MockVolumeClientModule.CephFSVolumeClient() + rados_client=MockRadosModule.Rados(), + volname="cephfs" ) ip_versions = ['foo', 'bar'] @@ -865,46 +969,83 @@ class NFSProtocolHelperTestCase(test.TestCase): self.assertEqual(fake_conf_dict, ret) def test_fsal_hook(self): - expected_ret = { - 'Name': 'Ceph', - 'User_Id': 'ganesha-fakeid', - 'Secret_Access_Key': 'fakekey' + access_allow_prefix = "fs subvolume authorize" + + access_allow_dict = { + "vol_name": self._nfs_helper.volname, + "sub_name": self._share["id"], + "auth_id": "ganesha-fakeid", + "tenant_id": self._share["project_id"], + "access_level": "rw", } - self.mock_object(self._volume_client, 'authorize', - mock.Mock(return_value={'auth_key': 'fakekey'})) + + expected_ret = { + "Name": "Ceph", + "User_Id": "ganesha-fakeid", + "Secret_Access_Key": "ganesha-zorilla" + } + + driver.rados_command.return_value = 'ganesha-zorilla' ret = self._nfs_helper._fsal_hook(None, self._share, None) - driver.cephfs_share_path.assert_called_once_with(self._share) - self._volume_client.authorize.assert_called_once_with( - 'fakevolumepath', 'ganesha-fakeid', readonly=False, - tenant_id='fake_project_uuid') + driver.rados_command.assert_called_once_with( + self._nfs_helper.rados_client, + access_allow_prefix, access_allow_dict) + self.assertEqual(expected_ret, ret) def test_cleanup_fsal_hook(self): - self.mock_object(self._volume_client, 'deauthorize') + access_deny_prefix = "fs subvolume deauthorize" + + access_deny_dict = { + "vol_name": self._nfs_helper.volname, + "sub_name": self._share["id"], + "auth_id": "ganesha-fakeid", + } ret = self._nfs_helper._cleanup_fsal_hook(None, self._share, None) - driver.cephfs_share_path.assert_called_once_with(self._share) - self._volume_client.deauthorize.assert_called_once_with( - 'fakevolumepath', 'ganesha-fakeid') + driver.rados_command.assert_called_once_with( + self._nfs_helper.rados_client, + access_deny_prefix, access_deny_dict) + self.assertIsNone(ret) def test_get_export_path(self): + get_path_prefix = "fs subvolume getpath" + + get_path_dict = { + "vol_name": self._nfs_helper.volname, + "sub_name": self._share["id"], + } + + driver.rados_command.return_value = '/foo/bar' + ret = self._nfs_helper._get_export_path(self._share) - driver.cephfs_share_path.assert_called_once_with(self._share) - self._volume_client._get_path.assert_called_once_with( - 'fakevolumepath') + driver.rados_command.assert_called_once_with( + self._nfs_helper.rados_client, + get_path_prefix, get_path_dict) + self.assertEqual('/foo/bar', ret) def test_get_export_pseudo_path(self): + get_path_prefix = "fs subvolume getpath" + + get_path_dict = { + "vol_name": self._nfs_helper.volname, + "sub_name": self._share["id"], + } + + driver.rados_command.return_value = '/foo/bar' + ret = self._nfs_helper._get_export_pseudo_path(self._share) - driver.cephfs_share_path.assert_called_once_with(self._share) - self._volume_client._get_path.assert_called_once_with( - 'fakevolumepath') + driver.rados_command.assert_called_once_with( + self._nfs_helper.rados_client, + get_path_prefix, get_path_dict) + self.assertEqual('/foo/bar', ret) @@ -916,25 +1057,28 @@ class CephFSDriverAltConfigTestCase(test.TestCase): super(CephFSDriverAltConfigTestCase, self).setUp() self._execute = mock.Mock() self.fake_conf = configuration.Configuration(None) + self._rados_client = MockRadosModule.Rados() self._context = context.get_admin_context() self._share = fake_share.fake_share(share_proto='CEPHFS') self.fake_conf.set_default('driver_handles_share_servers', False) self.fake_conf.set_default('cephfs_auth_id', 'manila') - self.mock_object(driver, "ceph_volume_client", - MockVolumeClientModule) - self.mock_object(driver, "ceph_module_found", True) - self.mock_object(driver, "cephfs_share_path") + self.mock_object(driver, "rados", MockRadosModule) + self.mock_object(driver, "json_command", + MockCephArgparseModule.json_command) + self.mock_object(driver, "rados_command") self.mock_object(driver, 'NativeProtocolHelper') self.mock_object(driver, 'NFSProtocolHelper') @ddt.data('cephfs', 'nfs') def test_do_setup_alt_volume_mode(self, protocol_helper): - - self.fake_conf.set_default('cephfs_volume_mode', ALT_VOLUME_MODE_CFG) + self.fake_conf.set_default('cephfs_volume_mode', ALT_VOLUME_MODE) self._driver = driver.CephFSDriver(execute=self._execute, - configuration=self.fake_conf) + configuration=self.fake_conf, + rados_client=self._rados_client) + + type(self._driver).volname = mock.PropertyMock(return_value='cephfs') self._driver.configuration.cephfs_protocol_helper_type = ( protocol_helper) @@ -944,11 +1088,13 @@ class CephFSDriverAltConfigTestCase(test.TestCase): if protocol_helper == 'cephfs': driver.NativeProtocolHelper.assert_called_once_with( self._execute, self._driver.configuration, - ceph_vol_client=self._driver._volume_client) + rados_client=self._driver.rados_client, + volname=self._driver.volname) else: driver.NFSProtocolHelper.assert_called_once_with( self._execute, self._driver.configuration, - ceph_vol_client=self._driver._volume_client) + rados_client=self._driver._rados_client, + volname=self._driver.volname) self._driver.protocol_helper.init_helper.assert_called_once_with() @@ -962,3 +1108,45 @@ class CephFSDriverAltConfigTestCase(test.TestCase): self.assertRaises(exception.BadConfigurationException, driver.CephFSDriver, execute=self._execute, configuration=self.fake_conf) + + +@ddt.ddt +class MiscTests(test.TestCase): + + @ddt.data({'import_exc': None}, + {'import_exc': ImportError}) + @ddt.unpack + def test_rados_module_missing(self, import_exc): + driver.rados = None + with mock.patch.object( + driver.importutils, + 'import_module', + side_effect=import_exc) as mock_import_module: + if import_exc: + self.assertRaises( + exception.ShareBackendException, driver.setup_rados) + else: + driver.setup_rados() + self.assertEqual(mock_import_module.return_value, + driver.rados) + + mock_import_module.assert_called_once_with('rados') + + @ddt.data({'import_exc': None}, + {'import_exc': ImportError}) + @ddt.unpack + def test_setup_json_class_missing(self, import_exc): + driver.json_command = None + with mock.patch.object( + driver.importutils, + 'import_class', + side_effect=import_exc) as mock_import_class: + if import_exc: + self.assertRaises( + exception.ShareBackendException, driver.setup_json_command) + else: + driver.setup_json_command() + self.assertEqual(mock_import_class.return_value, + driver.json_command) + mock_import_class.assert_called_once_with( + 'ceph_argparse.json_command') diff --git a/manila/tests/share/drivers/ganesha/test_manager.py b/manila/tests/share/drivers/ganesha/test_manager.py index 5e8c565e01..57753a0309 100644 --- a/manila/tests/share/drivers/ganesha/test_manager.py +++ b/manila/tests/share/drivers/ganesha/test_manager.py @@ -14,12 +14,12 @@ # under the License. import copy +import io import re from unittest import mock import ddt from oslo_serialization import jsonutils -import six from manila import exception from manila.share.drivers.ganesha import manager @@ -67,12 +67,26 @@ manager_fake_kwargs = { } -class MockRadosClientModule(object): - """Mocked up version of Ceph's RADOS client interface.""" +class MockRadosModule(object): + """Mocked up version of Ceph's RADOS module.""" class ObjectNotFound(Exception): pass + class OSError(Exception): + pass + + class WriteOpCtx(): + + def __enter__(self): + return self + + def __exit__(self, type, msg, traceback): + pass + + def write_full(self, bytes_to_write): + pass + @ddt.ddt class MiscTests(test.TestCase): @@ -167,7 +181,7 @@ class GaneshaConfigTests(test.TestCase): self.assertEqual(test_dict_unicode, ret) def test_dump_to_conf(self): - ganesha_cnf = six.StringIO() + ganesha_cnf = io.StringIO() manager._dump_to_conf(test_dict_str, ganesha_cnf) self.assertEqual(*self.conf_mangle(self.ref_ganesha_cnf, ganesha_cnf.getvalue())) @@ -200,12 +214,14 @@ class GaneshaManagerTestCase(test.TestCase): def setUp(self): super(GaneshaManagerTestCase, self).setUp() self._execute = mock.Mock(return_value=('', '')) + self._rados_client = mock.Mock() self._manager = self.instantiate_ganesha_manager( - self._execute, 'faketag', **manager_fake_kwargs) - self._ceph_vol_client = mock.Mock() + self._execute, 'faketag', + rados_client=self._rados_client, + **manager_fake_kwargs) self._setup_rados = mock.Mock() self._execute2 = mock.Mock(return_value=('', '')) - self.mock_object(manager, 'rados', MockRadosClientModule) + self.mock_object(manager, 'rados', MockRadosModule) self.mock_object(manager, 'setup_rados', self._setup_rados) fake_kwargs = copy.copy(manager_fake_kwargs) fake_kwargs.update( @@ -213,7 +229,7 @@ class GaneshaManagerTestCase(test.TestCase): ganesha_rados_store_pool_name='fakepool', ganesha_rados_export_counter='fakecounter', ganesha_rados_export_index='fakeindex', - ceph_vol_client=self._ceph_vol_client + rados_client=self._rados_client ) self._manager_with_rados_store = self.instantiate_ganesha_manager( self._execute2, 'faketag', **fake_kwargs) @@ -285,7 +301,7 @@ class GaneshaManagerTestCase(test.TestCase): ganesha_rados_store_pool_name='fakepool', ganesha_rados_export_counter='fakecounter', ganesha_rados_export_index='fakeindex', - ceph_vol_client=self._ceph_vol_client + rados_client=self._rados_client ) if counter_exists: self.mock_object( @@ -293,7 +309,7 @@ class GaneshaManagerTestCase(test.TestCase): else: self.mock_object( manager.GaneshaManager, '_get_rados_object', - mock.Mock(side_effect=MockRadosClientModule.ObjectNotFound)) + mock.Mock(side_effect=MockRadosModule.ObjectNotFound)) self.mock_object(manager.GaneshaManager, '_put_rados_object') test_mgr = manager.GaneshaManager( @@ -309,14 +325,14 @@ class GaneshaManagerTestCase(test.TestCase): self.assertEqual('fakepool', test_mgr.ganesha_rados_store_pool_name) self.assertEqual('fakecounter', test_mgr.ganesha_rados_export_counter) self.assertEqual('fakeindex', test_mgr.ganesha_rados_export_index) - self.assertEqual(self._ceph_vol_client, test_mgr.ceph_vol_client) + self.assertEqual(self._rados_client, test_mgr.rados_client) self._setup_rados.assert_called_with() test_mgr._get_rados_object.assert_called_once_with('fakecounter') if counter_exists: self.assertFalse(test_mgr._put_rados_object.called) else: test_mgr._put_rados_object.assert_called_once_with( - 'fakecounter', six.text_type(1000)) + 'fakecounter', str(1000)) def test_ganesha_export_dir(self): self.assertEqual( @@ -478,7 +494,7 @@ class GaneshaManagerTestCase(test.TestCase): else: self.mock_object( self._manager_with_rados_store, '_get_rados_object', - mock.Mock(side_effect=MockRadosClientModule.ObjectNotFound)) + mock.Mock(side_effect=MockRadosModule.ObjectNotFound)) ret = self._manager_with_rados_store._check_export_rados_object_exists( test_name) @@ -1021,36 +1037,58 @@ class GaneshaManagerTestCase(test.TestCase): self._manager._remove_rados_object_url_from_index.called) def test_get_rados_object(self): - fakebin = six.unichr(246).encode('utf-8') - self.mock_object(self._ceph_vol_client, 'get_object', - mock.Mock(return_value=fakebin)) + fakebin = chr(246).encode('utf-8') + + ioctx = mock.Mock() + ioctx.read.side_effect = [fakebin, fakebin] + + self._rados_client.open_ioctx = mock.Mock(return_value=ioctx) + self._rados_client.conf_get = mock.Mock(return_value=256) + + max_size = 256 * 1024 * 1024 ret = self._manager_with_rados_store._get_rados_object('fakeobj') - self._ceph_vol_client.get_object.assert_called_once_with( - 'fakepool', 'fakeobj') + self._rados_client.open_ioctx.assert_called_once_with('fakepool') + self._rados_client.conf_get.assert_called_once_with( + 'osd_max_write_size') + ioctx.read.assert_called_once_with('fakeobj', max_size) + ioctx.close.assert_called_once() + self.assertEqual(fakebin.decode('utf-8'), ret) def test_put_rados_object(self): - faketext = six.unichr(246) - self.mock_object(self._ceph_vol_client, 'put_object', - mock.Mock(return_value=None)) + faketext = chr(246) + + ioctx = mock.Mock() + manager.rados.WriteOpCtx.write_full = mock.Mock() + + self._rados_client.open_ioctx = mock.Mock(return_value=ioctx) + self._rados_client.conf_get = mock.Mock(return_value=256) ret = self._manager_with_rados_store._put_rados_object( 'fakeobj', faketext) - self._ceph_vol_client.put_object.assert_called_once_with( - 'fakepool', 'fakeobj', faketext.encode('utf-8')) + self._rados_client.open_ioctx.assert_called_once_with('fakepool') + self._rados_client.conf_get.assert_called_once_with( + 'osd_max_write_size') + manager.rados.WriteOpCtx.write_full.assert_called_once_with( + faketext.encode('utf-8')) + ioctx.operate_write_op.assert_called_once_with(mock.ANY, 'fakeobj') + self.assertIsNone(ret) def test_delete_rados_object(self): - self.mock_object(self._ceph_vol_client, 'delete_object', - mock.Mock(return_value=None)) + ioctx = mock.Mock() + + self._rados_client.open_ioctx = mock.Mock(return_value=ioctx) ret = self._manager_with_rados_store._delete_rados_object('fakeobj') - self._ceph_vol_client.delete_object.assert_called_once_with( - 'fakepool', 'fakeobj') + self._rados_client.open_ioctx.assert_called_once_with('fakepool') + ioctx.remove_object.assert_called_once_with('fakeobj') + ioctx.close.assert_called_once() + self.assertIsNone(ret) def test_get_export_id(self): diff --git a/manila/tests/share/drivers/test_ganesha.py b/manila/tests/share/drivers/test_ganesha.py index 39b49807c6..f3428a0337 100644 --- a/manila/tests/share/drivers/test_ganesha.py +++ b/manila/tests/share/drivers/test_ganesha.py @@ -351,12 +351,12 @@ class GaneshaNASHelper2TestCase(test.TestCase): self._context = context.get_admin_context() self._execute = mock.Mock(return_value=('', '')) - self.ceph_vol_client = mock.Mock() + self.rados_client = mock.Mock() self.fake_conf = config.Configuration(None) self.fake_conf_dir_path = '/fakedir0/exports.d' self._helper = ganesha.GaneshaNASHelper2( self._execute, self.fake_conf, tag='faketag', - ceph_vol_client=self.ceph_vol_client) + rados_client=self.rados_client) self._helper.ganesha = mock.Mock() self._helper.export_template = {} self.share = fake_share.fake_share() @@ -387,7 +387,7 @@ class GaneshaNASHelper2TestCase(test.TestCase): 'ganesha_rados_store_pool_name': 'ceph_pool', 'ganesha_rados_export_index': 'fake_index', 'ganesha_rados_export_counter': 'fake_counter', - 'ceph_vol_client': self.ceph_vol_client + 'rados_client': self.rados_client } else: kwargs = { @@ -431,7 +431,7 @@ class GaneshaNASHelper2TestCase(test.TestCase): ganesha_rados_store_pool_name='ceph_pool', ganesha_rados_export_index='fake_index', ganesha_rados_export_counter='fake_counter', - ceph_vol_client=self.ceph_vol_client) + rados_client=self.rados_client) self._helper._load_conf_dir.assert_called_once_with( '/fakedir2/faketempl.d', must_exist=False) self.assertEqual(mock_ganesha_manager, self._helper.ganesha) diff --git a/releasenotes/notes/bp-update-cephfs-drivers-9ac5165f31669030.yaml b/releasenotes/notes/bp-update-cephfs-drivers-9ac5165f31669030.yaml new file mode 100644 index 0000000000..177cb4c86c --- /dev/null +++ b/releasenotes/notes/bp-update-cephfs-drivers-9ac5165f31669030.yaml @@ -0,0 +1,36 @@ +--- +deprecations: + - | + As of the Wallaby release the CephFS driver no longer recognizes + the scoped extra-spec ``cephfs:data_isolated`` because it is no + longer supported by the Ceph community. This style of data isolation + required dedicating a Ceph pool for each share and scaled and performed + poorly. + - | + The ``ceph_volume_client`` is deprecated by the CephFS driver in favor of + a python rados client that connects to the Ceph manager daemon to interact + with the Ceph cluster. This new connection method will enable functionality + not available with older client, which has been deprecated by the Ceph + community and will be removed in the Quincy release. +upgrade: + - | + Manila's CephFS drivers now **require** the "python3-ceph-argparse" and + "python3-rados" packages. Do not upgrade without adding these packages + to the environment where the ``manila-share`` service runs since + without them the driver will refuse to start up. This breaking change + is necessary because the old ``ceph_volume_client`` has been deprecated + by the Ceph community. +features: + - | + The Ceph backend can now work with multiple filesystem clusters. + The filesystem to be used by manila can be specified by the + driver option 'cephfs_filesystem_name'. If this option is not specified, + the driver will assume that a single filesystem is present in the Ceph + cluster and will attempt to use it. + - | + Deletion of shares offerd by the CephFS driver (CephFS and NFS) is + now faster. Now the Ceph manager moves deleted share's content to a + trash folder and purges the contents asynchronously rather than + handling this as part of the synchronous delete operation. The purge + can take considerable time if a share contains a significant amount of data. +