From 5c81264346141759d9c0d0ad529b4301dd130d3d Mon Sep 17 00:00:00 2001
From: Victoria Martinez de la Cruz <victoria@redhat.com>
Date: Thu, 11 Mar 2021 20:50:40 +0000
Subject: [PATCH] Add create share from snapshot in CephFS

Adds create share from snapshot functionality to
CephFS drivers.

Depends-On: https://review.opendev.org/c/openstack/manila-tempest-plugin/+/778188

Co-Authored-By: Victoria Martinez de la Cruz <victoria@redhat.com>
Co-Authored-By: Ramana Raja <rraja@redhat.com>
Co-Authored-By: Tom Barron <tpb@dyncloud.net>

DocImpact
Partially-Implements: blueprint create-share-from-snapshot-cephfs

Change-Id: I825ab15af934cb37dfda48ea26ec1af9de8dd293
---
 ...hare_back_ends_feature_support_mapping.rst |   2 +-
 manila/share/drivers/cephfs/driver.py         | 165 +++++++++++++++---
 .../tests/share/drivers/cephfs/test_driver.py | 105 ++++++++++-
 ...from-snapshot-cephfs-080bd6c2ece74c5b.yaml |  12 ++
 4 files changed, 256 insertions(+), 28 deletions(-)
 create mode 100644 releasenotes/notes/bp-create-share-from-snapshot-cephfs-080bd6c2ece74c5b.yaml

diff --git a/doc/source/admin/share_back_ends_feature_support_mapping.rst b/doc/source/admin/share_back_ends_feature_support_mapping.rst
index 579ea0b43c..b266cc7a42 100644
--- a/doc/source/admin/share_back_ends_feature_support_mapping.rst
+++ b/doc/source/admin/share_back_ends_feature_support_mapping.rst
@@ -83,7 +83,7 @@ Mapping of share drivers and share features support
 +----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+-----------------------------------+--------------------------+--------------------+--------------------+
 |             Oracle ZFSSA               |           K           |           N           |             M            |             M            |            K           |                 K                 |            \-            |          \-        |          \-        |
 +----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+-----------------------------------+--------------------------+--------------------+--------------------+
-|                 CephFS                 |           M           |          \-           |             M            |             M            |            M           |                \-                 |            \-            |          \-        |          \-        |
+|                 CephFS                 |           M           |          \-           |             M            |             M            |            M           |                 W                 |            \-            |          \-        |          \-        |
 +----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+-----------------------------------+--------------------------+--------------------+--------------------+
 |                 Tegile                 |           M           |          \-           |             M            |             M            |            M           |                 M                 |            \-            |          \-        |          \-        |
 +----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+-----------------------------------+--------------------------+--------------------+--------------------+
diff --git a/manila/share/drivers/cephfs/driver.py b/manila/share/drivers/cephfs/driver.py
index 38e7c9f938..dd430553ac 100644
--- a/manila/share/drivers/cephfs/driver.py
+++ b/manila/share/drivers/cephfs/driver.py
@@ -71,6 +71,14 @@ RADOS_TIMEOUT = 10
 
 LOG = log.getLogger(__name__)
 
+# Clone statuses
+CLONE_CREATING = 'creating'
+CLONE_FAILED = 'failed'
+CLONE_CANCELED = 'canceled'
+CLONE_PENDING = 'pending'
+CLONE_INPROGRESS = 'in-progress'
+CLONE_COMPLETE = 'complete'
+
 cephfs_opts = [
     cfg.StrOpt('cephfs_conf_path',
                default="",
@@ -91,14 +99,6 @@ cephfs_opts = [
                default="/volumes",
                help="The prefix of the cephfs volume path."
                ),
-    cfg.BoolOpt('cephfs_enable_snapshots',
-                deprecated_for_removal=True,
-                deprecated_since='Victoria',
-                deprecated_reason='CephFS snapshots are fully supported '
-                                  'since the Nautilus release of Ceph.',
-                default=True,
-                help="Whether to enable snapshots in this driver."
-                ),
     cfg.StrOpt('cephfs_protocol_helper_type',
                default="CEPHFS",
                choices=['CEPHFS', 'NFS'],
@@ -273,8 +273,8 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
             ],
             'total_capacity_gb': total_capacity_gb,
             'free_capacity_gb': free_capacity_gb,
-            'snapshot_support': self.configuration.safe_get(
-                'cephfs_enable_snapshots'),
+            'snapshot_support': True,
+            'create_share_from_snapshot_support': True,
         }
         super(    # pylint: disable=no-member
             CephFSDriver, self)._update_share_stats(data)
@@ -289,6 +289,26 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
         """
         return gigs * units.Gi
 
+    def _get_export_locations(self, share):
+        """Get the export location for a share.
+
+        :param share: a manila share.
+        :return: the export location for a share.
+        """
+
+        # get path of FS subvolume/share
+        argdict = {
+            "vol_name": self.volname,
+            "sub_name": share["id"]
+        }
+        if share['share_group_id'] is not None:
+            argdict.update({"group_name": share["share_group_id"]})
+
+        subvolume_path = rados_command(
+            self.rados_client, "fs subvolume getpath", argdict)
+
+        return self.protocol_helper.get_export_locations(share, subvolume_path)
+
     @property
     def rados_client(self):
         if self._rados_client:
@@ -372,34 +392,65 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
             "namespace_isolated": True,
             "mode": self._cephfs_volume_mode,
         }
-
         if share['share_group_id'] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
         rados_command(self.rados_client, "fs subvolume create", argdict)
 
-        # get path of FS subvolume/share
+        return self._get_export_locations(share)
+
+    def _need_to_cancel_clone(self, share):
+        # Is there an ongoing clone operation that needs to be canceled
+        # so we can delete the share?
+        need_to_cancel_clone = False
+
         argdict = {
             "vol_name": self.volname,
-            "sub_name": share["id"],
+            "clone_name": share["id"],
         }
         if share['share_group_id'] is not None:
             argdict.update({"group_name": share["share_group_id"]})
-        subvolume_path = rados_command(
-            self.rados_client, "fs subvolume getpath", argdict)
 
-        return self.protocol_helper.get_export_locations(share, subvolume_path)
+        try:
+            status = rados_command(
+                self.rados_client, "fs clone status", argdict)
+            if status in (CLONE_PENDING, CLONE_INPROGRESS):
+                need_to_cancel_clone = True
+        except exception.ShareBackendException as e:
+            # Trying to get clone status on a regular subvolume is expected
+            # to fail.
+            if 'not allowed on subvolume' not in str(e).lower():
+                raise exception.ShareBackendException(
+                    "Failed to remove share.")
+
+        return need_to_cancel_clone
 
     def delete_share(self, context, share, share_server=None):
         # remove FS subvolume/share
-
         LOG.debug("[%(be)s]: delete_share: id=%(id)s, group=%(gr)s.",
                   {"be": self.backend_name, "id": share['id'],
                    "gr": share['share_group_id']})
 
+        if self._need_to_cancel_clone(share):
+            try:
+                argdict = {
+                    "vol_name": self.volname,
+                    "clone_name": share["id"],
+                    "force": True,
+                }
+                if share['share_group_id'] is not None:
+                    argdict.update({"group_name": share["share_group_id"]})
+
+                rados_command(self.rados_client, "fs clone cancel", argdict)
+            except rados.Error:
+                raise exception.ShareBackendException(
+                    "Failed to cancel clone operation.")
+
         argdict = {
             "vol_name": self.volname,
             "sub_name": share["id"],
+            # We want to clean up the share even if the subvolume is
+            # not in a good state.
             "force": True,
         }
         if share['share_group_id'] is not None:
@@ -428,7 +479,6 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
             "sub_name": share["id"],
             "new_size": self._to_bytes(new_size),
         }
-
         if share['share_group_id'] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
@@ -449,7 +499,6 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
             "new_size": self._to_bytes(new_size),
             "no_shrink": True,
         }
-
         if share["share_group_id"] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
@@ -555,6 +604,78 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
 
         return None, []
 
+    def _get_clone_status(self, share):
+        """Check the status of a newly cloned share."""
+        argdict = {
+            "vol_name": self.volname,
+            "clone_name": share["id"]
+        }
+        if share['share_group_id'] is not None:
+            argdict.update({"group_name": share["share_group_id"]})
+
+        out = rados_command(self.rados_client,
+                            "fs clone status", argdict, True)
+        return out['status']['state']
+
+    def _update_create_from_snapshot_status(self, share):
+        updates = {
+            'status': constants.STATUS_ERROR,
+            'progress': None,
+            'export_locations': []
+        }
+        status = self._get_clone_status(share)
+        if status == CLONE_COMPLETE:
+            updates['status'] = constants.STATUS_AVAILABLE
+            updates['progress'] = '100%'
+            updates['export_locations'] = self._get_export_locations(share)
+        elif status in (CLONE_PENDING, CLONE_INPROGRESS):
+            updates['status'] = constants.STATUS_CREATING_FROM_SNAPSHOT
+        else:
+            # error if clone operation is not progressing or completed
+            raise exception.ShareBackendException(
+                "rados client clone of snapshot [%(sn)s}] to new "
+                "share [%(shr)s}] did not complete successfully." %
+                {"sn": share["snapshot_id"], "shr": share["id"]})
+        return updates
+
+    def get_share_status(self, share, share_server=None):
+        """Returns the current status for a share.
+
+        :param share: a manila share.
+        :param share_server: a manila share server (not currently supported).
+        :returns: manila share status.
+        """
+
+        if share['status'] != constants.STATUS_CREATING_FROM_SNAPSHOT:
+            LOG.warning("Caught an unexpected share status '%s' during share "
+                        "status update routine. Skipping.", share['status'])
+            return
+        return self._update_create_from_snapshot_status(share)
+
+    def create_share_from_snapshot(self, context, share, snapshot,
+                                   share_server=None, parent_share=None):
+        """Create a CephFS subvolume from a snapshot"""
+
+        LOG.debug("[%(be)s]: create_share_from_snapshot: id=%(id)s, "
+                  "snapshot=%(sn)s, size=%(sz)s, group=%(gr)s.",
+                  {"be": self.backend_name, "id": share['id'],
+                   "sn": snapshot['id'], "sz": share['size'],
+                   "gr": share['share_group_id']})
+
+        argdict = {
+            "vol_name": self.volname,
+            "sub_name": parent_share["id"],
+            "snap_name": '_'.join([snapshot["snapshot_id"], snapshot["id"]]),
+            "target_sub_name": share["id"]
+        }
+        if share['share_group_id'] is not None:
+            argdict.update({"group_name": share["share_group_id"]})
+
+        rados_command(
+            self.rados_client, "fs subvolume snapshot clone", argdict)
+
+        return self._update_create_from_snapshot_status(share)
+
     def __del__(self):
         if self._rados_client:
             LOG.info("[%(be)s] Ceph client disconnecting...",
@@ -638,7 +759,6 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
             "auth_id": ceph_auth_id,
             "tenant_id": share["project_id"],
         }
-
         if share["share_group_id"] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
@@ -675,7 +795,6 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
             "sub_name": share["id"],
             "auth_id": access['access_to']
         }
-
         if share["share_group_id"] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
@@ -690,7 +809,6 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
             "vol_name": self.volname,
             "sub_name": share["id"],
         }
-
         if share["share_group_id"] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
@@ -853,7 +971,6 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
             "access_level": "rw",
             "tenant_id": share["project_id"],
         }
-
         if share["share_group_id"] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
@@ -879,7 +996,6 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
             "sub_name": share["id"],
             "auth_id": ceph_auth_id,
         }
-
         if share["share_group_id"] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
@@ -891,7 +1007,6 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
             "vol_name": self.volname,
             "sub_name": share["id"]
         }
-
         if share["share_group_id"] is not None:
             argdict.update({"group_name": share["share_group_id"]})
 
diff --git a/manila/tests/share/drivers/cephfs/test_driver.py b/manila/tests/share/drivers/cephfs/test_driver.py
index fe220b7afe..2a08547229 100644
--- a/manila/tests/share/drivers/cephfs/test_driver.py
+++ b/manila/tests/share/drivers/cephfs/test_driver.py
@@ -218,6 +218,13 @@ class CephFSDriverTestCase(test.TestCase):
         self.assertEqual(2, driver.rados_command.call_count)
 
     def test_delete_share(self):
+        clone_status_prefix = "fs clone status"
+
+        clone_status_dict = {
+            "vol_name": self._driver.volname,
+            "clone_name": self._share["id"],
+        }
+
         delete_share_prefix = "fs subvolume rm"
 
         delete_share_dict = {
@@ -226,10 +233,19 @@ class CephFSDriverTestCase(test.TestCase):
             "force": True,
         }
 
+        driver.rados_command.side_effect = [driver.rados.Error, mock.Mock()]
+
         self._driver.delete_share(self._context, self._share)
 
-        driver.rados_command.assert_called_once_with(
-            self._driver.rados_client, delete_share_prefix, delete_share_dict)
+        driver.rados_command.assert_has_calls([
+            mock.call(self._driver.rados_client,
+                      clone_status_prefix,
+                      clone_status_dict),
+            mock.call(self._driver.rados_client,
+                      delete_share_prefix,
+                      delete_share_dict)])
+
+        self.assertEqual(2, driver.rados_command.call_count)
 
     def test_extend_share(self):
         extend_share_prefix = "fs subvolume resize"
@@ -397,6 +413,91 @@ class CephFSDriverTestCase(test.TestCase):
             self._driver.rados_client,
             group_snapshot_delete_prefix, group_snapshot_delete_dict)
 
+    def test_create_share_from_snapshot(self):
+        parent_share = {
+            'id': 'fakeparentshareid',
+            'name': 'fakeparentshare',
+        }
+
+        create_share_from_snapshot_prefix = "fs subvolume snapshot clone"
+
+        create_share_from_snapshot_dict = {
+            "vol_name": self._driver.volname,
+            "sub_name": parent_share["id"],
+            "snap_name": "_".join([
+                self._snapshot["snapshot_id"], self._snapshot["id"]]),
+            "target_sub_name": self._share["id"]
+        }
+
+        get_clone_status_prefix = "fs clone status"
+        get_clone_status_dict = {
+            "vol_name": self._driver.volname,
+            "clone_name": self._share["id"],
+        }
+        driver.rados_command.return_value = {
+            'status': {
+                'state': 'in-progress',
+            },
+        }
+
+        self._driver.create_share_from_snapshot(
+            self._context, self._share, self._snapshot, None,
+            parent_share=parent_share
+        )
+
+        driver.rados_command.assert_has_calls([
+            mock.call(self._driver.rados_client,
+                      create_share_from_snapshot_prefix,
+                      create_share_from_snapshot_dict),
+            mock.call(self._driver.rados_client,
+                      get_clone_status_prefix,
+                      get_clone_status_dict,
+                      True)])
+
+        self.assertEqual(2, driver.rados_command.call_count)
+
+    def test_delete_share_from_snapshot(self):
+        clone_status_prefix = "fs clone status"
+
+        clone_status_dict = {
+            "vol_name": self._driver.volname,
+            "clone_name": self._share["id"],
+        }
+
+        clone_cancel_prefix = "fs clone cancel"
+
+        clone_cancel_dict = {
+            "vol_name": self._driver.volname,
+            "clone_name": self._share["id"],
+            "force": True,
+        }
+
+        delete_share_prefix = "fs subvolume rm"
+
+        delete_share_dict = {
+            "vol_name": self._driver.volname,
+            "sub_name": self._share["id"],
+            "force": True,
+        }
+
+        driver.rados_command.side_effect = [
+            'in-progress', mock.Mock(), mock.Mock()]
+
+        self._driver.delete_share(self._context, self._share)
+
+        driver.rados_command.assert_has_calls([
+            mock.call(self._driver.rados_client,
+                      clone_status_prefix,
+                      clone_status_dict),
+            mock.call(self._driver.rados_client,
+                      clone_cancel_prefix,
+                      clone_cancel_dict),
+            mock.call(self._driver.rados_client,
+                      delete_share_prefix,
+                      delete_share_dict)])
+
+        self.assertEqual(3, driver.rados_command.call_count)
+
     def test_delete_driver(self):
         # Create share to prompt volume_client construction
         self._driver.create_share(self._context,
diff --git a/releasenotes/notes/bp-create-share-from-snapshot-cephfs-080bd6c2ece74c5b.yaml b/releasenotes/notes/bp-create-share-from-snapshot-cephfs-080bd6c2ece74c5b.yaml
new file mode 100644
index 0000000000..c18ddee611
--- /dev/null
+++ b/releasenotes/notes/bp-create-share-from-snapshot-cephfs-080bd6c2ece74c5b.yaml
@@ -0,0 +1,12 @@
+---
+features:
+  - |
+    Create share from snapshot is now available in CephFS Native and CephFS NFS
+    drivers. This feature is available in Ceph since the Ceph Nautilus release,
+    so a deployment with Ceph Nautilus (v14.2.18 or higher) or Ceph Octopus (v15.2.10
+    or higher) is required.
+deprecations:
+  - |
+    The CephFS driver ``cephfs_enable_snapshots`` configuration option has been
+    removed.  It was deprecated for removal in the Victoria release.  Snapshot
+    support is always enabled now.