diff --git a/etc/manila/rootwrap.d/share.filters b/etc/manila/rootwrap.d/share.filters
index 87e213b953..cf26c0b1ac 100644
--- a/etc/manila/rootwrap.d/share.filters
+++ b/etc/manila/rootwrap.d/share.filters
@@ -38,6 +38,12 @@ smbcontrol: CommandFilter, smbcontrol, root
 # manila/share/drivers/helpers.py: 'net', 'conf', 'getparm', '%s', 'hosts allow'
 net: CommandFilter, net, root
 
+# manila/share/drivers/helpers.py: 'cp', '%s', '%s'
+cp: CommandFilter, cp, root
+
+# manila/share/drivers/helpers.py: 'service', '%s', '%s'
+service: CommandFilter, service, root
+
 # manila/share/drivers/lvm.py: 'lvremove', '-f', "%s/%s
 lvremove: CommandFilter, lvremove, root
 
@@ -106,21 +112,13 @@ exportfs: CommandFilter, exportfs, root
 stat: CommandFilter, stat, root
 # manila/share/drivers/ibm/gpfs.py: 'df', '-P', '-B', '1', '%s'
 df: CommandFilter, df, root
+# manila/share/drivers/ibm/gpfs.py: 'chmod', '777', '%s'
+chmod: CommandFilter, chmod, root
+# manila/share/drivers/ibm/gpfs.py: 'mmnfs', 'export', '%s', '%s'
+mmnfs: CommandFilter, mmnfs, root
 
-# Ganesha commands
-# manila/share/drivers/ibm/ganesha_utils.py: 'mv', '%s', '%s'
 # manila/share/drivers/ganesha/manager.py: 'mv', '%s', '%s'
 mv: CommandFilter, mv, root
-# manila/share/drivers/ibm/ganesha_utils.py: 'cp', '%s', '%s'
-cp: CommandFilter, cp, root
-# manila/share/drivers/ibm/ganesha_utils.py: 'scp', '-i', '%s', '%s', '%s'
-scp: CommandFilter, scp, root
-# manila/share/drivers/ibm/ganesha_utils.py: 'ssh', '%s', '%s'
-ssh: CommandFilter, ssh, root
-# manila/share/drivers/ibm/ganesha_utils.py: 'chmod', '%s', '%s'
-chmod: CommandFilter, chmod, root
-# manila/share/drivers/ibm/ganesha_utils.py: 'service', '%s', 'restart'
-service: CommandFilter, service, root
 
 # manila/share/drivers/ganesha/manager.py: 'mktemp', '-p', '%s', '-t', '%s'
 mktemp: CommandFilter, mktemp, root
diff --git a/manila/share/drivers/ibm/ganesha_utils.py b/manila/share/drivers/ibm/ganesha_utils.py
deleted file mode 100644
index af482ebab1..0000000000
--- a/manila/share/drivers/ibm/ganesha_utils.py
+++ /dev/null
@@ -1,332 +0,0 @@
-# Copyright 2014 IBM Corp.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-"""
-Ganesha Admin Utilities
-
-Ganesha NFS does not provide many tools for automating the process of creating
-and managing export definitions.  This module provides utilities to help parse
-a specified ganesha config file and return a map containing the export
-definitions and attributes.  A method republishing updated export definitions
-is also provided.  And there are methods for requesting the ganesha server
-to reload the export definitions.
-
-Consider moving this to common location for use by other manila drivers.
-"""
-
-import copy
-import re
-import socket
-import time
-
-import netaddr
-from oslo_log import log
-import six
-
-from manila import exception
-from manila.i18n import _, _LI
-from manila import utils
-
-LOG = log.getLogger(__name__)
-# more simple pattern for matching a single avpair per line,
-# skips lines starting with # comment char
-AVPATTERN = re.compile('^\s*(?!#)\s*(?P<attr>\S+)\s*=\s*(?P<val>\S+)\s*;')
-
-# NFS Ganesha v1.5, v2.0 format used here.
-# TODO(nileshb): Upgrade it to NFS Ganesha 2.1 format.
-DEFAULT_EXPORT_ATTRS = {
-    'export_id': 'undefined',
-    'path': 'undefined',
-    'fsal': 'undefined',
-    'root_access': '"*"',
-    'rw_access': '"*"',
-    'pseudo': 'undefined',
-    'anonymous_root_uid': '-2',
-    'nfs_protocols': '"3,4"',
-    'transport_protocols': '"UDP,TCP"',
-    'sectype': '"sys"',
-    'maxread': '65536',
-    'maxwrite': '65536',
-    'prefread': '65536',
-    'prefwrite': '65536',
-    'filesystem_id': '192.168',
-    'tag': 'undefined',
-}
-
-STARTING_EXPORT_ID = 100
-
-
-def valid_flags():
-    return DEFAULT_EXPORT_ATTRS.keys()
-
-
-def parse_ganesha_config(configpath):
-    """Parse the specified ganesha configuration.
-
-    Parse a configuration file and return a list of lines that were found
-    before the first EXPORT block, and a dictionary of exports and their
-    attributes.
-
-    The input configuration file should be a valid ganesha config file and the
-    export blocks should be the last items in the file.
-    :returns: pre_lines -- List of lines, before the exports clause begins
-              exports -- Dict of exports, indexed with the 'export_id'
-
-    Hers is a sample output:
-
-    pre_lines =
-    [   '###################################################',
-        '#     Export entries',
-        '###################################################',
-        '',
-        '',
-        '# First export entry']
-
-    exports =
-    {   '100': {   'anonymous_root_uid': '-2',
-                   'export_id': '100',
-                   'filesystem_id': '192.168',
-                   'fsal': '"GPFS"',
-                   'maxread': '65536',
-                   'maxwrite': '65536',
-                   'nfs_protocols': '"3,4"',
-                   'path': '"/gpfs0/share-0d7df0c0-4792-4e2a-68dc7206a164"',
-                   'prefread': '65536',
-                   'prefwrite': '65536',
-                   'pseudo': '"/gpfs0/share-0d7df0c0-4792-4e2a-68dc7206a164"',
-                   'root_access': '"*"',
-                   'rw_access': '""',
-                   'sectype': '"sys"',
-                   'tag': '"fs100"',
-                   'transport_protocols': '"UDP,TCP"'},
-        '101': {   'anonymous_root_uid': '-2',
-                   'export_id': '101',
-                   'filesystem_id': '192.168',
-                   'fsal': '"GPFS"',
-                   'maxread': '65536',
-                   'maxwrite': '65536',
-                   'nfs_protocols': '"3,4"',
-                   'path': '"/gpfs0/share-74bee4dc-e07a-44a9-4be619a13fb1"',
-                   'prefread': '65536',
-                   'prefwrite': '65536',
-                   'pseudo': '"/gpfs0/share-74bee4dc-e07a-44a9-4be619a13fb1"',
-                   'root_access': '"*"',
-                   'rw_access': '"172.24.4.4"',
-                   'sectype': '"sys"',
-                   'tag': '"fs101"',
-                   'transport_protocols': '"UDP,TCP"'}}
-    """
-    export_count = 0
-    exports = dict()
-    pre_lines = []
-    with open(configpath) as f:
-        for l in f.readlines():
-            line = l.strip()
-            if export_count == 0 and line != 'EXPORT':
-                pre_lines.append(line)
-            else:
-                if line == 'EXPORT':
-                    export_count += 1
-                    expattrs = dict()
-                try:
-                    match_obj = AVPATTERN.match(line)
-                    attr = match_obj.group('attr').lower()
-                    val = match_obj.group('val')
-                    expattrs[attr] = val
-                    if attr == 'export_id':
-                        exports[val] = expattrs
-                except AttributeError:
-                    pass
-
-    if export_count != len(exports):
-        msg = (_('Invalid export config file %(configpath)s: '
-                 '%(exports)s export clauses found, but '
-                 '%(export_ids)s export_ids.')
-               % {"configpath": configpath,
-                  "exports": str(export_count),
-                  "export_ids": str(len(exports))})
-
-        LOG.error(msg)
-        raise exception.GPFSGaneshaException(msg)
-    return pre_lines, exports
-
-
-def _get_export_by_path(exports, path):
-    for index, export in exports.items():
-        if export and 'path' in export and export['path'].strip('"\'') == path:
-            return export
-    return None
-
-
-def get_export_by_path(exports, path):
-    """Return the export that matches the specified path."""
-    return _get_export_by_path(exports, path)
-
-
-def export_exists(exports, path):
-    """Return true if an export exists with the specified path."""
-    return _get_export_by_path(exports, path) is not None
-
-
-def get_next_id(exports):
-    """Return an export id that is one larger than largest existing id."""
-    try:
-        next_id = max(map(int, exports.keys())) + 1
-    except ValueError:
-        next_id = STARTING_EXPORT_ID
-
-    LOG.debug("Export id = %d", next_id)
-    return next_id
-
-
-def get_export_template():
-    return copy.copy(DEFAULT_EXPORT_ATTRS)
-
-
-def _convert_ipstring_to_ipn(ipstring):
-    """Transform a single ip string into a list of IPNetwork objects."""
-    if netaddr.valid_glob(ipstring):
-        ipns = netaddr.glob_to_cidrs(ipstring)
-    else:
-        try:
-            ipns = [netaddr.IPNetwork(ipstring)]
-        except netaddr.AddrFormatError:
-            msg = (_('Invalid IP access string %s.') % ipstring)
-            LOG.error(msg)
-            raise exception.GPFSGaneshaException(msg)
-    return ipns
-
-
-def _format_ips(iptokens):
-    ipaddrs = set()
-    for iptoken in iptokens:
-        ipn_list = _convert_ipstring_to_ipn(iptoken)
-        for ipn in ipn_list:
-            ips = [ip for ip in netaddr.iter_unique_ips(ipn)]
-            ipaddrs = ipaddrs.union(ips)
-    return ipaddrs
-
-
-def format_access_list(access_string, deny_access=None):
-    """Transform access string into a format ganesha understands."""
-    # handle the case where there is an access string with a trailing comma
-    access_string = access_string.strip(',')
-    iptokens = access_string.split(',')
-
-    ipaddrs = _format_ips(iptokens)
-
-    if deny_access:
-        deny_tokens = deny_access.split(',')
-        deny_ipaddrs = _format_ips(deny_tokens)
-        ipaddrs = ipaddrs - deny_ipaddrs
-
-    ipaddrlist = sorted(list(ipaddrs))
-
-    return ','.join([six.text_type(ip) for ip in ipaddrlist])
-
-
-def _publish_local_config(configpath, pre_lines, exports):
-    tmp_path = '%s.tmp.%s' % (configpath, time.time())
-    LOG.debug("tmp_path = %s", tmp_path)
-    cpcmd = ['install', '-m', '666', configpath, tmp_path]
-    try:
-        utils.execute(*cpcmd, run_as_root=True)
-    except exception.ProcessExecutionError as e:
-        msg = (_('Failed while publishing ganesha config locally. '
-                 'Error: %s.') % six.text_type(e))
-        LOG.error(msg)
-        raise exception.GPFSGaneshaException(msg)
-
-    with open(tmp_path, 'w+') as f:
-        for l in pre_lines:
-            f.write('%s\n' % l)
-        for e in exports:
-            f.write('EXPORT\n{\n')
-            for attr in exports[e]:
-                f.write('%s = %s ;\n' % (attr, exports[e][attr]))
-
-            f.write('}\n')
-    mvcmd = ['mv', tmp_path, configpath]
-    try:
-        utils.execute(*mvcmd, run_as_root=True)
-    except exception.ProcessExecutionError as e:
-        msg = (_('Failed while publishing ganesha config locally. '
-                 'Error: %s.') % six.text_type(e))
-        LOG.error(msg)
-        raise exception.GPFSGaneshaException(msg)
-    LOG.info(_LI('Ganesha config %s published locally.'), configpath)
-
-
-def _publish_remote_config(server, sshlogin, sshkey, configpath):
-    dest = '%s@%s:%s' % (sshlogin, server, configpath)
-    scpcmd = ['scp', '-i', sshkey, configpath, dest]
-    try:
-        utils.execute(*scpcmd, run_as_root=False)
-    except exception.ProcessExecutionError as e:
-        msg = (_('Failed while publishing ganesha config on remote server. '
-                 'Error: %s.') % six.text_type(e))
-        LOG.error(msg)
-        raise exception.GPFSGaneshaException(msg)
-    LOG.info(_LI('Ganesha config %(path)s published to %(server)s.'),
-             {'path': configpath,
-              'server': server})
-
-
-def publish_ganesha_config(servers, sshlogin, sshkey, configpath,
-                           pre_lines, exports):
-    """Publish the specified configuration information.
-
-    Save the existing configuration file and then publish a new
-    ganesha configuration to the specified path.  The pre-export
-    lines are written first, followed by the collection of export
-    definitions.
-    """
-    _publish_local_config(configpath, pre_lines, exports)
-
-    localserver_iplist = socket.gethostbyname_ex(socket.gethostname())[2]
-    for gsvr in servers:
-        if gsvr not in localserver_iplist:
-            _publish_remote_config(gsvr, sshlogin, sshkey, configpath)
-
-
-def reload_ganesha_config(servers, sshlogin, service='ganesha.nfsd'):
-    """Request ganesha server reload updated config."""
-
-    # Note:  dynamic reload of ganesha config is not enabled
-    # in ganesha v2.0. Therefore, the code uses the ganesha service restart
-    # option to make sure the config changes are reloaded
-    for server in servers:
-        # Until reload is fully implemented and if the reload returns a bad
-        # status revert to service restart instead
-        LOG.info(_LI('Restart service %(service)s on %(server)s to force a '
-                     'config file reload'),
-                 {'service': service, 'server': server})
-        run_local = True
-
-        reload_cmd = ['service', service, 'restart']
-        localserver_iplist = socket.gethostbyname_ex(
-            socket.gethostname())[2]
-        if server not in localserver_iplist:
-            remote_login = sshlogin + '@' + server
-            reload_cmd = ['ssh', remote_login] + reload_cmd
-            run_local = False
-        try:
-            utils.execute(*reload_cmd, run_as_root=run_local)
-        except exception.ProcessExecutionError as e:
-            msg = (_('Could not restart service %(service)s on '
-                     '%(server)s: %(excmsg)s')
-                   % {'service': service,
-                      'server': server,
-                      'excmsg': six.text_type(e)})
-            LOG.error(msg)
-            raise exception.GPFSGaneshaException(msg)
diff --git a/manila/share/drivers/ibm/gpfs.py b/manila/share/drivers/ibm/gpfs.py
index 77fa045f2e..90c9108cf7 100644
--- a/manila/share/drivers/ibm/gpfs.py
+++ b/manila/share/drivers/ibm/gpfs.py
@@ -29,7 +29,6 @@ Limitation:
 
 """
 import abc
-import copy
 import math
 import os
 import re
@@ -43,10 +42,11 @@ from oslo_utils import strutils
 from oslo_utils import units
 import six
 
+from manila.common import constants
 from manila import exception
-from manila.i18n import _, _LE, _LI
+from manila.i18n import _
 from manila.share import driver
-from manila.share.drivers.ibm import ganesha_utils
+from manila.share import share_types
 from manila import utils
 
 LOG = log.getLogger(__name__)
@@ -67,10 +67,16 @@ gpfs_share_opts = [
     cfg.StrOpt('gpfs_nfs_server_type',
                default='KNFS',
                help=('NFS Server type. Valid choices are "KNFS" (kernel NFS) '
-                     'or "GNFS" (Ganesha NFS).')),
+                     'or "CES" (Ganesha NFS).')),
     cfg.ListOpt('gpfs_nfs_server_list',
                 help=('A list of the fully qualified NFS server names that '
                       'make up the OpenStack Manila configuration.')),
+    cfg.BoolOpt('is_gpfs_node',
+                default=False,
+                help=('True:when Manila services are running on one of the '
+                      'Spectrum Scale node. '
+                      'False:when Manila services are not running on any of '
+                      'the Spectrum Scale node.')),
     cfg.PortOpt('gpfs_ssh_port',
                 default=22,
                 help='GPFS server SSH port.'),
@@ -86,7 +92,7 @@ gpfs_share_opts = [
     cfg.ListOpt('gpfs_share_helpers',
                 default=[
                     'KNFS=manila.share.drivers.ibm.gpfs.KNFSHelper',
-                    'GNFS=manila.share.drivers.ibm.gpfs.GNFSHelper',
+                    'CES=manila.share.drivers.ibm.gpfs.CESHelper',
                 ],
                 help='Specify list of share export helpers.'),
     cfg.StrOpt('knfs_export_options',
@@ -95,7 +101,11 @@ gpfs_share_opts = [
                help=('Options to use when exporting a share using kernel '
                      'NFS server. Note that these defaults can be overridden '
                      'when a share is created by passing metadata with key '
-                     'name export_options.')),
+                     'name export_options.'),
+               deprecated_for_removal=True,
+               deprecated_reason="This option isn't used any longer. Please "
+                                 "use share-type extra specs for export "
+                                 "options."),
 ]
 
 
@@ -115,6 +125,7 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
 
         1.0 - Initial version.
         1.1 - Added extend_share functionality
+        2.0 - Added CES support for NFS Ganesha
     """
 
     def __init__(self, *args, **kwargs):
@@ -131,9 +142,7 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
     def do_setup(self, context):
         """Any initialization the share driver does while starting."""
         super(GPFSShareDriver, self).do_setup(context)
-        host = self.configuration.gpfs_share_export_ip
-        localserver_iplist = socket.gethostbyname_ex(socket.gethostname())[2]
-        if host in localserver_iplist:  # run locally
+        if self.configuration.is_gpfs_node:
             self._gpfs_execute = self._gpfs_local_execute
         else:
             self._gpfs_execute = self._gpfs_remote_execute
@@ -142,20 +151,24 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
     def _gpfs_local_execute(self, *cmd, **kwargs):
         if 'run_as_root' not in kwargs:
             kwargs.update({'run_as_root': True})
+        if 'ignore_exit_code' in kwargs:
+            check_exit_code = kwargs.pop('ignore_exit_code')
+            check_exit_code.append(0)
+            kwargs.update({'check_exit_code': check_exit_code})
 
         return utils.execute(*cmd, **kwargs)
 
     def _gpfs_remote_execute(self, *cmd, **kwargs):
         host = self.configuration.gpfs_share_export_ip
         check_exit_code = kwargs.pop('check_exit_code', True)
+        ignore_exit_code = kwargs.pop('ignore_exit_code', None)
 
-        return self._run_ssh(host, cmd, check_exit_code)
+        return self._run_ssh(host, cmd, ignore_exit_code, check_exit_code)
 
     def _run_ssh(self, host, cmd_list, ignore_exit_code=None,
                  check_exit_code=True):
         command = ' '.join(six.moves.shlex_quote(cmd_arg)
                            for cmd_arg in cmd_list)
-
         if not self.sshpool:
             gpfs_ssh_login = self.configuration.gpfs_ssh_login
             password = self.configuration.gpfs_ssh_password
@@ -178,6 +191,7 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
                 return self._gpfs_ssh_execute(
                     ssh,
                     command,
+                    ignore_exit_code=ignore_exit_code,
                     check_exit_code=check_exit_code)
 
         except Exception as e:
@@ -326,8 +340,8 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
             raise exception.GPFSException(msg)
 
         try:
-            self._gpfs_execute('mmsetquota', '-j', sharename, '-h',
-                               sizestr, fsdev)
+            self._gpfs_execute('mmsetquota', fsdev + ':' + sharename,
+                               '--block', '0:' + sizestr)
         except exception.ProcessExecutionError as e:
             msg = (_('Failed to set quota for the share %(sharename)s. '
                      'Error: %(excmsg)s.') %
@@ -354,11 +368,12 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
         # we want to ignore that error condition while deleting the fileset,
         # i.e. 'Fileset name share-xyz not found', with error code '2'
         # and mark the deletion successful
-        # ignore_exit_code = [ERR_FILE_NOT_FOUND]
+        ignore_exit_code = [ERR_FILE_NOT_FOUND]
 
         # unlink and delete the share's fileset
         try:
-            self._gpfs_execute('mmunlinkfileset', fsdev, sharename, '-f')
+            self._gpfs_execute('mmunlinkfileset', fsdev, sharename, '-f',
+                               ignore_exit_code=ignore_exit_code)
         except exception.ProcessExecutionError as e:
             msg = (_('Failed unlink fileset for share %(sharename)s. '
                      'Error: %(excmsg)s.') %
@@ -367,7 +382,8 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
             raise exception.GPFSException(msg)
 
         try:
-            self._gpfs_execute('mmdelfileset', fsdev, sharename, '-f')
+            self._gpfs_execute('mmdelfileset', fsdev, sharename, '-f',
+                               ignore_exit_code=ignore_exit_code)
         except exception.ProcessExecutionError as e:
             msg = (_('Failed delete fileset for share %(sharename)s. '
                      'Error: %(excmsg)s.') %
@@ -396,9 +412,11 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
         sharename = snapshot['share_name']
         snapshotname = snapshot['name']
         fsdev = self._get_gpfs_device()
-        LOG.debug("sharename = %{share}s, snapshotname = %{snap}s, "
-                  "fsdev = %{dev}s",
-                  {'share': sharename, 'snap': snapshotname, 'dev': fsdev})
+        LOG.debug(
+            'Attempting to create a snapshot %(snap)s from share %(share)s '
+            'on device %(dev)s.',
+            {'share': sharename, 'snap': snapshotname, 'dev': fsdev}
+        )
 
         try:
             self._gpfs_execute('mmcrsnapshot', fsdev, snapshot['name'],
@@ -445,8 +463,8 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
         sizestr = '%sG' % new_size
         fsdev = self._get_gpfs_device()
         try:
-            self._gpfs_execute('mmsetquota', '-j', sharename, '-h',
-                               sizestr, fsdev)
+            self._gpfs_execute('mmsetquota', fsdev + ':' + sharename,
+                               '--block', '0:' + sizestr)
         except exception.ProcessExecutionError as e:
             msg = (_('Failed to set quota for the share %(sharename)s. '
                      'Error: %(excmsg)s.') %
@@ -496,16 +514,12 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
     def allow_access(self, ctx, share, access, share_server=None):
         """Allow access to the share."""
         location = self._get_share_path(share)
-        self._get_helper(share).allow_access(location, share,
-                                             access['access_type'],
-                                             access['access_to'])
+        self._get_helper(share).allow_access(location, share, access)
 
     def deny_access(self, ctx, share, access, share_server=None):
         """Deny access to the share."""
         location = self._get_share_path(share)
-        self._get_helper(share).deny_access(location, share,
-                                            access['access_type'],
-                                            access['access_to'])
+        self._get_helper(share).deny_access(location, share, access)
 
     def check_for_setup_error(self):
         """Returns an error if prerequisites aren't met."""
@@ -536,14 +550,15 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
             LOG.error(msg)
             raise exception.GPFSException(msg)
 
-        if self.configuration.gpfs_nfs_server_type not in ['KNFS', 'GNFS']:
+        if self.configuration.gpfs_nfs_server_type not in ("KNFS", "CES"):
             msg = (_('Invalid gpfs_nfs_server_type value: %s. '
-                     'Valid values are: "KNFS", "GNFS".')
+                     'Valid values are: "KNFS", "CES".')
                    % self.configuration.gpfs_nfs_server_type)
             LOG.error(msg)
             raise exception.InvalidParameterValue(err=msg)
 
-        if self.configuration.gpfs_nfs_server_list is None:
+        if ((not self.configuration.gpfs_nfs_server_list) and
+                (self.configuration.gpfs_nfs_server_type != 'CES')):
             msg = (_('Missing value for gpfs_nfs_server_list.'))
             LOG.error(msg)
             raise exception.InvalidParameterValue(err=msg)
@@ -599,6 +614,43 @@ class NASHelperBase(object):
         """Construct location of new export."""
         return ':'.join([self.configuration.gpfs_share_export_ip, local_path])
 
+    def get_export_options(self, share, access, helper, options_not_allowed):
+        """Get the export options."""
+        extra_specs = share_types.get_extra_specs_from_share(share)
+        if helper == 'KNFS':
+            export_options = extra_specs.get('knfs:export_options')
+        elif helper == 'CES':
+            export_options = extra_specs.get('ces:export_options')
+        else:
+            export_options = None
+
+        if export_options:
+            options = export_options.lower().split(',')
+        else:
+            options = []
+
+        invalid_options = [
+            option for option in options if option in options_not_allowed
+        ]
+
+        if invalid_options:
+            raise exception.InvalidInput(reason='Invalid export_option %s as '
+                                                'it is set by access_type.'
+                                                % invalid_options)
+
+        if access['access_level'] == constants.ACCESS_LEVEL_RO:
+            if helper == 'KNFS':
+                options.append(constants.ACCESS_LEVEL_RO)
+            elif helper == 'CES':
+                options.append('access_type=ro')
+        else:
+            if helper == 'KNFS':
+                options.append(constants.ACCESS_LEVEL_RW)
+            elif helper == 'CES':
+                options.append('access_type=rw')
+
+        return ','.join(options)
+
     @abc.abstractmethod
     def remove_export(self, local_path, share):
         """Remove export."""
@@ -643,33 +695,15 @@ class KNFSHelper(NASHelperBase):
             except exception.ProcessExecutionError:
                 raise
 
-    def _get_export_options(self, share):
-        """Set various export attributes for share."""
-
-        metadata = share.get('share_metadata')
-        options = None
-        if metadata:
-            for item in metadata:
-                if item['key'] == 'export_options':
-                    options = item['value']
-                else:
-                    msg = (_('Unknown metadata key %s.') % item['key'])
-                    LOG.error(msg)
-                    raise exception.InvalidInput(reason=msg)
-        if not options:
-            options = self.configuration.knfs_export_options
-
-        return options
-
     def remove_export(self, local_path, share):
         """Remove export."""
 
-    def allow_access(self, local_path, share, access_type, access):
+    def allow_access(self, local_path, share, access):
         """Allow access to one or more vm instances."""
 
-        if access_type != 'ip':
-            raise exception.InvalidShareAccess('Only ip access type '
-                                               'supported.')
+        if access['access_type'] != 'ip':
+            raise exception.InvalidShareAccess(reason='Only ip access type '
+                                                      'supported.')
 
         # check if present in export
         try:
@@ -680,16 +714,20 @@ class KNFSHelper(NASHelperBase):
             LOG.error(msg)
             raise exception.GPFSException(msg)
 
-        out = re.search(re.escape(local_path) + '[\s\n]*' + re.escape(access),
-                        out)
+        out = re.search(re.escape(local_path) + '[\s\n]*'
+                        + re.escape(access['access_to']), out)
+
         if out is not None:
+            access_type = access['access_type']
+            access_to = access['access_to']
             raise exception.ShareAccessExists(access_type=access_type,
-                                              access=access)
-
-        export_opts = self._get_export_options(share)
+                                              access=access_to)
 
+        options_not_allowed = list(constants.ACCESS_LEVELS)
+        export_opts = self.get_export_options(share, access, 'KNFS',
+                                              options_not_allowed)
         cmd = ['exportfs', '-o', export_opts,
-               ':'.join([access, local_path])]
+               ':'.join([access['access_to'], local_path])]
         try:
             self._publish_access(*cmd)
         except exception.ProcessExecutionError as e:
@@ -700,10 +738,9 @@ class KNFSHelper(NASHelperBase):
             LOG.error(msg)
             raise exception.GPFSException(msg)
 
-    def deny_access(self, local_path, share, access_type, access,
-                    force=False):
+    def deny_access(self, local_path, share, access, force=False):
         """Remove access for one or more vm instances."""
-        cmd = ['exportfs', '-u', ':'.join([access, local_path])]
+        cmd = ['exportfs', '-u', ':'.join([access['access_to'], local_path])]
         try:
             self._publish_access(*cmd)
         except exception.ProcessExecutionError as e:
@@ -715,141 +752,73 @@ class KNFSHelper(NASHelperBase):
             raise exception.GPFSException(msg)
 
 
-class GNFSHelper(NASHelperBase):
-    """Wrapper for Ganesha NFS Commands."""
+class CESHelper(NASHelperBase):
+    """Wrapper for NFS by Spectrum Scale CES"""
 
     def __init__(self, execute, config_object):
-        super(GNFSHelper, self).__init__(execute, config_object)
-        self.default_export_options = dict()
-        for m in AVPATTERN.finditer(
-            self.configuration.ganesha_nfs_export_options
-        ):
-            self.default_export_options[m.group('attr')] = m.group('val')
+        super(CESHelper, self).__init__(execute, config_object)
+        self._execute = execute
 
-    def _get_export_options(self, share):
-        """Set various export attributes for share."""
-
-        # load default options first - any options passed as share metadata
-        # will take precedence
-        options = copy.copy(self.default_export_options)
-
-        metadata = share.get('share_metadata')
-        for item in metadata:
-            attr = item['key']
-            if attr in ganesha_utils.valid_flags():
-                options[attr] = item['value']
-            else:
-                LOG.error(_LE('Invalid metadata %(attr)s for share '
-                              '%(share)s.'),
-                          {'attr': attr, 'share': share['name']})
-
-        return options
-
-    @utils.synchronized("ganesha-process-req", external=True)
-    def _ganesha_process_request(self, req_type, local_path,
-                                 share, access_type=None,
-                                 access=None, force=False):
-        cfgpath = self.configuration.ganesha_config_path
-        gservice = self.configuration.ganesha_service_name
-        gservers = self.configuration.gpfs_nfs_server_list
-        sshlogin = self.configuration.gpfs_ssh_login
-        sshkey = self.configuration.gpfs_ssh_private_key
-        pre_lines, exports = ganesha_utils.parse_ganesha_config(cfgpath)
-        reload_needed = True
-
-        if (req_type == "allow_access"):
-            export_opts = self._get_export_options(share)
-            # add the new share if it's not already defined
-            if not ganesha_utils.export_exists(exports, local_path):
-                # Add a brand new export definition
-                new_id = ganesha_utils.get_next_id(exports)
-                export = ganesha_utils.get_export_template()
-                export['fsal'] = '"GPFS"'
-                export['export_id'] = new_id
-                export['tag'] = '"fs%s"' % new_id
-                export['path'] = '"%s"' % local_path
-                export['pseudo'] = '"%s"' % local_path
-                export['rw_access'] = (
-                    '"%s"' % ganesha_utils.format_access_list(access)
-                )
-                for key in export_opts:
-                    export[key] = export_opts[key]
-
-                exports[new_id] = export
-                LOG.info(_LI('Add %(share)s with access from %(access)s'),
-                         {'share': share['name'], 'access': access})
-            else:
-                # Update existing access with new/extended access information
-                export = ganesha_utils.get_export_by_path(exports, local_path)
-                initial_access = export['rw_access'].strip('"')
-                merged_access = ','.join([access, initial_access])
-                updated_access = ganesha_utils.format_access_list(
-                    merged_access
-                )
-                if initial_access != updated_access:
-                    LOG.info(_LI('Update %(share)s with access from '
-                                 '%(access)s'),
-                             {'share': share['name'], 'access': access})
-                    export['rw_access'] = '"%s"' % updated_access
-                else:
-                    LOG.info(_LI('Do not update %(share)s, access from '
-                                 '%(access)s already defined'),
-                             {'share': share['name'], 'access': access})
-                    reload_needed = False
-
-        elif (req_type == "deny_access"):
-            export = ganesha_utils.get_export_by_path(exports, local_path)
-            initial_access = export['rw_access'].strip('"')
-            updated_access = ganesha_utils.format_access_list(
-                initial_access,
-                deny_access=access
-            )
-
-            if initial_access != updated_access:
-                LOG.info(_LI('Update %(share)s removing access from '
-                             '%(access)s'),
-                         {'share': share['name'], 'access': access})
-                export['rw_access'] = '"%s"' % updated_access
-            else:
-                LOG.info(_LI('Do not update %(share)s, access from %(access)s '
-                             'already removed'), {'share': share['name'],
-                                                  'access': access})
-                reload_needed = False
-
-        elif (req_type == "remove_export"):
-            export = ganesha_utils.get_export_by_path(exports, local_path)
-            if export:
-                exports.pop(export['export_id'])
-                LOG.info(_LI('Remove export for %s'), share['name'])
-            else:
-                LOG.info(_LI('Export for %s is not defined in Ganesha '
-                             'config.'),
-                         share['name'])
-                reload_needed = False
-
-        if reload_needed:
-            # publish config to all servers and reload or restart
-            ganesha_utils.publish_ganesha_config(gservers, sshlogin, sshkey,
-                                                 cfgpath, pre_lines, exports)
-            ganesha_utils.reload_ganesha_config(gservers, sshlogin, gservice)
+    def _execute_mmnfs_command(self, cmd, err_msg):
+        try:
+            out, __ = self._execute('mmnfs', 'export', *cmd)
+        except exception.ProcessExecutionError as e:
+            msg = (_('%(err_msg)s Error: %(e)s.')
+                   % {'err_msg': err_msg, 'e': e})
+            LOG.error(msg)
+            raise exception.GPFSException(msg)
+        return out
 
     def remove_export(self, local_path, share):
         """Remove export."""
-        self._ganesha_process_request("remove_export", local_path, share)
+        err_msg = 'Failed to check exports on the system.'
+        out = self._execute_mmnfs_command(('list', '-n', local_path), err_msg)
 
-    def allow_access(self, local_path, share, access_type, access):
+        out = re.search(re.escape(local_path), out)
+
+        if out is not None:
+            err_msg = ('Failed to remove export for share %s.'
+                       % share['name'])
+            self._execute_mmnfs_command(('remove', local_path), err_msg)
+
+    def allow_access(self, local_path, share, access):
         """Allow access to the host."""
-        # TODO(nileshb):  add support for read only, metadata, and other
-        # access types
-        if access_type != 'ip':
-            raise exception.InvalidShareAccess('Only ip access type '
-                                               'supported.')
 
-        self._ganesha_process_request("allow_access", local_path,
-                                      share, access_type, access)
+        if access['access_type'] != 'ip':
+            raise exception.InvalidShareAccess(reason='Only ip access type '
+                                                      'supported.')
+        err_msg = 'Failed to check exports on the system.'
+        out = self._execute_mmnfs_command(('list', '-n', local_path), err_msg)
 
-    def deny_access(self, local_path, share, access_type, access,
-                    force=False):
+        options_not_allowed = ['access_type=ro', 'access_type=rw']
+        export_opts = self.get_export_options(share, access, 'CES',
+                                              options_not_allowed)
+
+        out = re.search(re.escape(local_path), out)
+
+        if out is None:
+            cmd = ['add', local_path, '-c',
+                   access['access_to'] +
+                   '(' + export_opts + ')']
+        else:
+            cmd = ['change', local_path, '--nfsadd',
+                   access['access_to'] +
+                   '(' + export_opts + ')']
+
+        err_msg = ('Failed to allow access for share %s.'
+                   % share['name'])
+        self._execute_mmnfs_command(cmd, err_msg)
+
+    def deny_access(self, local_path, share, access, force=False):
         """Deny access to the host."""
-        self._ganesha_process_request("deny_access", local_path,
-                                      share, access_type, access, force)
+        err_msg = 'Failed to check exports on the system.'
+        out = self._execute_mmnfs_command(('list', '-n', local_path), err_msg)
+
+        out = re.search(re.escape(access['access_to']), out)
+
+        if out is not None:
+            err_msg = ('Failed to remove access for share %s.'
+                       % share['name'])
+            self._execute_mmnfs_command(('change', local_path,
+                                         '--nfsremove', access['access_to']),
+                                        err_msg)
diff --git a/manila/tests/share/drivers/ibm/test_ganesha_utils.py b/manila/tests/share/drivers/ibm/test_ganesha_utils.py
deleted file mode 100644
index 260c38af44..0000000000
--- a/manila/tests/share/drivers/ibm/test_ganesha_utils.py
+++ /dev/null
@@ -1,281 +0,0 @@
-# Copyright (c) 2014 IBM Corp.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-"""Unit tests for the Ganesha Utils module."""
-
-import socket
-import time
-
-import mock
-from oslo_config import cfg
-
-from manila import exception
-import manila.share.drivers.ibm.ganesha_utils as ganesha_utils
-from manila import test
-from manila import utils
-
-
-CONF = cfg.CONF
-
-
-def fake_pre_lines(**kwargs):
-    pre_lines = [
-        '###################################################',
-        '#     Export entries',
-        '###################################################',
-        '',
-        '',
-        '# First export entry',
-    ]
-    return pre_lines
-
-
-def fake_exports(**kwargs):
-    exports = {
-        '100': {
-            'anonymous_root_uid': '-2',
-            'export_id': '100',
-            'filesystem_id': '192.168',
-            'fsal': '"GPFS"',
-            'maxread': '65536',
-            'maxwrite': '65536',
-            'nfs_protocols': '"3,4"',
-            'path': '"/fs0/share-1234"',
-            'prefread': '65536',
-            'prefwrite': '65536',
-            'pseudo': '"/fs0/share-1234"',
-            'root_access': '"*"',
-            'rw_access': '""',
-            'sectype': '"sys"',
-            'tag': '"fs100"',
-            'transport_protocols': '"UDP,TCP"',
-        },
-        '101': {
-            'anonymous_root_uid': '-2',
-            'export_id': '101',
-            'filesystem_id': '192.168',
-            'fsal': '"GPFS"',
-            'maxread': '65536',
-            'maxwrite': '65536',
-            'nfs_protocols': '"3,4"',
-            'path': '"/fs0/share-5678"',
-            'prefread': '65536',
-            'prefwrite': '65536',
-            'pseudo': '"/fs0/share-5678"',
-            'root_access': '"*"',
-            'rw_access': '"172.24.4.4"',
-            'sectype': '"sys"',
-            'tag': '"fs101"',
-            'transport_protocols': '"UDP,TCP"',
-        },
-    }
-    return exports
-
-
-class GaneshaUtilsTestCase(test.TestCase):
-    """Tests Ganesha Utils."""
-
-    def setUp(self):
-        super(GaneshaUtilsTestCase, self).setUp()
-        self.fake_path = "/fs0/share-1234"
-        self.fake_pre_lines = fake_pre_lines()
-        self.fake_exports = fake_exports()
-        self.fake_configpath = "/etc/ganesha/ganesha.exports.conf"
-        self.local_ip = ["192.11.22.1"]
-        self.remote_ips = ["192.11.22.2", "192.11.22.3"]
-        self.servers = self.local_ip + self.remote_ips
-        self.sshlogin = "fake_login"
-        self.sshkey = "fake_sshkey"
-        self.STARTING_EXPORT_ID = 100
-        self.mock_object(socket, 'gethostname',
-                         mock.Mock(return_value="testserver"))
-        self.mock_object(socket, 'gethostbyname_ex', mock.Mock(
-            return_value=('localhost',
-                          ['localhost.localdomain', 'testserver'],
-                          ['127.0.0.1'] + self.local_ip)
-        ))
-
-    def test_get_export_by_path(self):
-        fake_export = {'export_id': '100'}
-        self.mock_object(ganesha_utils, '_get_export_by_path',
-                         mock.Mock(return_value=fake_export))
-        export = ganesha_utils.get_export_by_path(self.fake_exports,
-                                                  self.fake_path)
-        self.assertEqual(export, fake_export)
-        ganesha_utils._get_export_by_path.assert_called_once_with(
-            self.fake_exports, self.fake_path
-        )
-
-    def test_export_exists(self):
-        fake_export = {'export_id': '100'}
-        self.mock_object(ganesha_utils, '_get_export_by_path',
-                         mock.Mock(return_value=fake_export))
-        result = ganesha_utils.export_exists(self.fake_exports, self.fake_path)
-        self.assertTrue(result)
-        ganesha_utils._get_export_by_path.assert_called_once_with(
-            self.fake_exports, self.fake_path
-        )
-
-    def test__get_export_by_path_export_exists(self):
-        expected_export = {
-            'anonymous_root_uid': '-2',
-            'export_id': '100',
-            'filesystem_id': '192.168',
-            'fsal': '"GPFS"',
-            'maxread': '65536',
-            'maxwrite': '65536',
-            'nfs_protocols': '"3,4"',
-            'path': '"/fs0/share-1234"',
-            'prefread': '65536',
-            'prefwrite': '65536',
-            'pseudo': '"/fs0/share-1234"',
-            'root_access': '"*"',
-            'rw_access': '""',
-            'sectype': '"sys"',
-            'tag': '"fs100"',
-            'transport_protocols': '"UDP,TCP"',
-        }
-        export = ganesha_utils._get_export_by_path(self.fake_exports,
-                                                   self.fake_path)
-        self.assertEqual(export, expected_export)
-
-    def test__get_export_by_path_export_does_not_exists(self):
-        share_path = '/fs0/share-1111'
-        export = ganesha_utils._get_export_by_path(self.fake_exports,
-                                                   share_path)
-        self.assertIsNone(export)
-
-    def test_get_next_id(self):
-        expected_id = 102
-        result = ganesha_utils.get_next_id(self.fake_exports)
-        self.assertEqual(result, expected_id)
-
-    def test_convert_ipstring_to_ipn_exception(self):
-        ipstring = 'fake ip string'
-        self.assertRaises(exception.GPFSGaneshaException,
-                          ganesha_utils._convert_ipstring_to_ipn,
-                          ipstring)
-
-    @mock.patch('six.moves.builtins.map')
-    def test_get_next_id_first_export(self, mock_map):
-        expected_id = self.STARTING_EXPORT_ID
-        mock_map.side_effect = ValueError
-        result = ganesha_utils.get_next_id(self.fake_exports)
-        self.assertEqual(result, expected_id)
-
-    def test_format_access_list(self):
-        access_string = "9.123.12.1,9.123.12.2,9.122"
-        result = ganesha_utils.format_access_list(access_string, None)
-        self.assertEqual(result, "9.122.0.0,9.123.12.1,9.123.12.2")
-
-    def test_format_access_list_deny_access(self):
-        access_string = "9.123.12.1,9.123,12.2"
-        deny_access = "9.123,12.2"
-        result = ganesha_utils.format_access_list(access_string,
-                                                  deny_access=deny_access)
-        self.assertEqual(result, "9.123.12.1")
-
-    def test_publish_ganesha_config(self):
-        configpath = self.fake_configpath
-        methods = ('_publish_local_config', '_publish_remote_config')
-        for method in methods:
-            self.mock_object(ganesha_utils, method)
-        ganesha_utils.publish_ganesha_config(self.servers, self.sshlogin,
-                                             self.sshkey, configpath,
-                                             self.fake_pre_lines,
-                                             self.fake_exports)
-        ganesha_utils._publish_local_config.assert_called_once_with(
-            configpath, self.fake_pre_lines, self.fake_exports
-        )
-        for remote_ip in self.remote_ips:
-            ganesha_utils._publish_remote_config.assert_any_call(
-                remote_ip, self.sshlogin, self.sshkey, configpath
-            )
-
-    def test_reload_ganesha_config(self):
-        self.mock_object(utils, 'execute', mock.Mock(return_value=True))
-        service = 'ganesha.nfsd'
-        ganesha_utils.reload_ganesha_config(self.servers, self.sshlogin)
-        reload_cmd = ['service', service, 'restart']
-        utils.execute.assert_any_call(*reload_cmd, run_as_root=True)
-        for remote_ip in self.remote_ips:
-            reload_cmd = ['service', service, 'restart']
-            remote_login = self.sshlogin + '@' + remote_ip
-            reload_cmd = ['ssh', remote_login] + reload_cmd
-            utils.execute.assert_any_call(
-                *reload_cmd, run_as_root=False
-            )
-
-    def test_reload_ganesha_config_exception(self):
-        self.mock_object(
-            utils, 'execute',
-            mock.Mock(side_effect=exception.ProcessExecutionError))
-        self.assertRaises(exception.GPFSGaneshaException,
-                          ganesha_utils.reload_ganesha_config,
-                          self.servers, self.sshlogin)
-
-    @mock.patch('six.moves.builtins.open')
-    def test__publish_local_config(self, mock_open):
-        self.mock_object(utils, 'execute', mock.Mock(return_value=True))
-        fake_timestamp = 1415506949.75
-        self.mock_object(time, 'time', mock.Mock(return_value=fake_timestamp))
-        configpath = self.fake_configpath
-        tmp_path = '%s.tmp.%s' % (configpath, fake_timestamp)
-        ganesha_utils._publish_local_config(configpath,
-                                            self.fake_pre_lines,
-                                            self.fake_exports)
-        cpcmd = ['install', '-m', '666', configpath, tmp_path]
-        utils.execute.assert_any_call(*cpcmd, run_as_root=True)
-        mvcmd = ['mv', tmp_path, configpath]
-        utils.execute.assert_any_call(*mvcmd, run_as_root=True)
-        self.assertTrue(time.time.called)
-
-    @mock.patch('six.moves.builtins.open')
-    def test__publish_local_config_exception(self, mock_open):
-        self.mock_object(
-            utils, 'execute',
-            mock.Mock(side_effect=exception.ProcessExecutionError))
-        fake_timestamp = 1415506949.75
-        self.mock_object(time, 'time', mock.Mock(return_value=fake_timestamp))
-        configpath = self.fake_configpath
-        tmp_path = '%s.tmp.%s' % (configpath, fake_timestamp)
-        self.assertRaises(exception.GPFSGaneshaException,
-                          ganesha_utils._publish_local_config, configpath,
-                          self.fake_pre_lines, self.fake_exports)
-        cpcmd = ['install', '-m', '666', configpath, tmp_path]
-        utils.execute.assert_called_once_with(*cpcmd, run_as_root=True)
-        self.assertTrue(time.time.called)
-
-    def test__publish_remote_config(self):
-        utils.execute = mock.Mock(return_value=True)
-        server = self.remote_ips[1]
-        dest = '%s@%s:%s' % (self.sshlogin, server, self.fake_configpath)
-        scpcmd = ['scp', '-i', self.sshkey, self.fake_configpath, dest]
-
-        ganesha_utils._publish_remote_config(server, self.sshlogin,
-                                             self.sshkey, self.fake_configpath)
-        utils.execute.assert_called_once_with(*scpcmd, run_as_root=False)
-
-    def test__publish_remote_config_exception(self):
-        self.mock_object(
-            utils, 'execute',
-            mock.Mock(side_effect=exception.ProcessExecutionError))
-        server = self.remote_ips[1]
-        dest = '%s@%s:%s' % (self.sshlogin, server, self.fake_configpath)
-        scpcmd = ['scp', '-i', self.sshkey, self.fake_configpath, dest]
-
-        self.assertRaises(exception.GPFSGaneshaException,
-                          ganesha_utils._publish_remote_config, server,
-                          self.sshlogin, self.sshkey, self.fake_configpath)
-        utils.execute.assert_called_once_with(*scpcmd, run_as_root=False)
diff --git a/manila/tests/share/drivers/ibm/test_gpfs.py b/manila/tests/share/drivers/ibm/test_gpfs.py
index e83e083ec7..373c33c0c9 100644
--- a/manila/tests/share/drivers/ibm/test_gpfs.py
+++ b/manila/tests/share/drivers/ibm/test_gpfs.py
@@ -24,8 +24,8 @@ from oslo_config import cfg
 from manila import context
 from manila import exception
 import manila.share.configuration as config
-import manila.share.drivers.ibm.ganesha_utils as ganesha_utils
 import manila.share.drivers.ibm.gpfs as gpfs
+from manila.share import share_types
 from manila import test
 from manila.tests import fake_share
 from manila import utils
@@ -50,8 +50,8 @@ class GPFSShareDriverTestCase(test.TestCase):
                                             configuration=self.fake_conf)
         self._knfs_helper = gpfs.KNFSHelper(self._gpfs_execute,
                                             self.fake_conf)
-        self._gnfs_helper = gpfs.GNFSHelper(self._gpfs_execute,
-                                            self.fake_conf)
+        self._ces_helper = gpfs.CESHelper(self._gpfs_execute,
+                                          self.fake_conf)
         self.fakedev = "/dev/gpfs0"
         self.fakefspath = "/gpfs0"
         self.fakesharepath = "/gpfs0/share-fakeid"
@@ -74,16 +74,16 @@ class GPFSShareDriverTestCase(test.TestCase):
         gpfs_nfs_server_list = [self.local_ip, self.remote_ip]
         self._knfs_helper.configuration.gpfs_nfs_server_list = \
             gpfs_nfs_server_list
-        self._gnfs_helper.configuration.gpfs_nfs_server_list = \
+        self._ces_helper.configuration.gpfs_nfs_server_list = \
             gpfs_nfs_server_list
-        self._gnfs_helper.configuration.ganesha_config_path = \
+        self._ces_helper.configuration.ganesha_config_path = \
             "fake_ganesha_config_path"
         self.sshlogin = "fake_login"
         self.sshkey = "fake_sshkey"
         self.gservice = "fake_ganesha_service"
-        self._gnfs_helper.configuration.gpfs_ssh_login = self.sshlogin
-        self._gnfs_helper.configuration.gpfs_ssh_private_key = self.sshkey
-        self._gnfs_helper.configuration.ganesha_service_name = self.gservice
+        self._ces_helper.configuration.gpfs_ssh_login = self.sshlogin
+        self._ces_helper.configuration.gpfs_ssh_private_key = self.sshkey
+        self._ces_helper.configuration.ganesha_service_name = self.gservice
         self.mock_object(socket, 'gethostname',
                          mock.Mock(return_value="testserver"))
         self.mock_object(socket, 'gethostbyname_ex', mock.Mock(
@@ -105,7 +105,8 @@ class GPFSShareDriverTestCase(test.TestCase):
         self._driver._run_ssh(self.local_ip, cmd_list)
 
         self._driver._gpfs_ssh_execute.assert_called_once_with(
-            mock.ANY, expected_cmd, check_exit_code=True)
+            mock.ANY, expected_cmd, check_exit_code=True,
+            ignore_exit_code=None)
 
     def test__run_ssh_exception(self):
         cmd_list = ['fake', 'cmd']
@@ -181,6 +182,16 @@ class GPFSShareDriverTestCase(test.TestCase):
     def test_do_setup(self):
         self.mock_object(self._driver, '_setup_helpers')
         self._driver.do_setup(self._context)
+        self.assertEqual(self._driver._gpfs_execute,
+                         self._driver._gpfs_remote_execute)
+        self._driver._setup_helpers.assert_called_once_with()
+
+    def test_do_setup_gpfs_local_execute(self):
+        self.mock_object(self._driver, '_setup_helpers')
+        self._driver.configuration.is_gpfs_node = True
+        self._driver.do_setup(self._context)
+        self.assertEqual(self._driver._gpfs_execute,
+                         self._driver._gpfs_local_execute)
         self._driver._setup_helpers.assert_called_once_with()
 
     def test_setup_helpers(self):
@@ -395,10 +406,11 @@ class GPFSShareDriverTestCase(test.TestCase):
         self._driver._get_gpfs_device = mock.Mock(return_value=self.fakedev)
         self._driver._gpfs_execute = mock.Mock(return_value=True)
         self._driver._extend_share(self.share, 10)
-        self._driver._gpfs_execute.assert_called_once_with('mmsetquota', '-j',
-                                                           self.share['name'],
-                                                           '-h', '10G',
-                                                           self.fakedev)
+        self._driver._gpfs_execute.assert_called_once_with(
+            'mmsetquota',
+            self.fakedev + ':' + self.share['name'],
+            '--block',
+            '0:10G')
         self._driver._get_gpfs_device.assert_called_once_with()
 
     def test__extend_share_exception(self):
@@ -408,10 +420,12 @@ class GPFSShareDriverTestCase(test.TestCase):
         )
         self.assertRaises(exception.GPFSException,
                           self._driver._extend_share, self.share, 10)
-        self._driver._gpfs_execute.assert_called_once_with('mmsetquota', '-j',
+        self._driver._gpfs_execute.assert_called_once_with('mmsetquota',
+                                                           self.fakedev +
+                                                           ':' +
                                                            self.share['name'],
-                                                           '-h', '10G',
-                                                           self.fakedev)
+                                                           '--block',
+                                                           '0:10G')
         self._driver._get_gpfs_device.assert_called_once_with()
 
     def test_allow_access(self):
@@ -422,10 +436,7 @@ class GPFSShareDriverTestCase(test.TestCase):
         self._driver.allow_access(self._context, self.share,
                                   self.access, share_server=None)
         self._helper_fake.allow_access.assert_called_once_with(
-            self.fakesharepath, self.share,
-            self.access['access_type'],
-            self.access['access_to']
-        )
+            self.fakesharepath, self.share, self.access)
         self._driver._get_share_path.assert_called_once_with(self.share)
 
     def test_deny_access(self):
@@ -435,10 +446,7 @@ class GPFSShareDriverTestCase(test.TestCase):
         self._driver.deny_access(self._context, self.share,
                                  self.access, share_server=None)
         self._helper_fake.deny_access.assert_called_once_with(
-            self.fakesharepath, self.share,
-            self.access['access_type'],
-            self.access['access_to']
-        )
+            self.fakesharepath, self.share, self.access)
         self._driver._get_share_path.assert_called_once_with(self.share)
 
     def test__check_gpfs_state_active(self):
@@ -546,10 +554,9 @@ class GPFSShareDriverTestCase(test.TestCase):
                                                    self.fakedev,
                                                    self.share['name'],
                                                    '-J', self.fakesharepath)
-        self._driver._gpfs_execute.assert_any_call('mmsetquota', '-j',
-                                                   self.share['name'], '-h',
-                                                   sizestr,
-                                                   self.fakedev)
+        self._driver._gpfs_execute.assert_any_call('mmsetquota', self.fakedev +
+                                                   ':' + self.share['name'],
+                                                   '--block', '0:' + sizestr)
         self._driver._gpfs_execute.assert_any_call('chmod',
                                                    '777',
                                                    self.fakesharepath)
@@ -579,10 +586,10 @@ class GPFSShareDriverTestCase(test.TestCase):
         self._driver._delete_share(self.share)
         self._driver._gpfs_execute.assert_any_call(
             'mmunlinkfileset', self.fakedev, self.share['name'],
-            '-f')
+            '-f', ignore_exit_code=[2])
         self._driver._gpfs_execute.assert_any_call(
             'mmdelfileset', self.fakedev, self.share['name'],
-            '-f')
+            '-f', ignore_exit_code=[2])
         self._driver._get_gpfs_device.assert_called_once_with()
 
     def test__delete_share_exception(self):
@@ -595,7 +602,7 @@ class GPFSShareDriverTestCase(test.TestCase):
         self._driver._get_gpfs_device.assert_called_once_with()
         self._driver._gpfs_execute.assert_called_once_with(
             'mmunlinkfileset', self.fakedev, self.share['name'],
-            '-f')
+            '-f', ignore_exit_code=[2])
 
     def test__create_share_snapshot(self):
         self._driver._gpfs_execute = mock.Mock(return_value=True)
@@ -652,8 +659,9 @@ class GPFSShareDriverTestCase(test.TestCase):
     def test__gpfs_local_execute(self):
         self.mock_object(utils, 'execute', mock.Mock(return_value=True))
         cmd = "testcmd"
-        self._driver._gpfs_local_execute(cmd)
-        utils.execute.assert_called_once_with(cmd, run_as_root=True)
+        self._driver._gpfs_local_execute(cmd, ignore_exit_code=[2])
+        utils.execute.assert_called_once_with(cmd, run_as_root=True,
+                                              check_exit_code=[2, 0])
 
     def test__gpfs_remote_execute(self):
         self._driver._run_ssh = mock.Mock(return_value=True)
@@ -662,92 +670,128 @@ class GPFSShareDriverTestCase(test.TestCase):
         self._driver.configuration.gpfs_share_export_ip = self.local_ip
         self._driver._gpfs_remote_execute(cmd, check_exit_code=True)
         self._driver._run_ssh.assert_called_once_with(
-            self.local_ip, tuple([cmd]), True
+            self.local_ip, tuple([cmd]), None, True
         )
         self._driver.configuration.gpfs_share_export_ip = orig_value
 
+    def test_knfs_get_export_options(self):
+        mock_out = {"knfs:export_options": "no_root_squash"}
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value=mock_out))
+        access = self.access
+        options_not_allowed = ['rw', 'ro']
+        out = self._knfs_helper.get_export_options(self.share, access,
+                                                   'KNFS', options_not_allowed)
+        self.assertEqual("no_root_squash,rw", out)
+
+    def test_knfs_get_export_options_default(self):
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value={}))
+        access = self.access
+        options_not_allowed = ['rw', 'ro']
+        out = self._knfs_helper.get_export_options(self.share, access,
+                                                   'KNFS', options_not_allowed)
+        self.assertEqual("rw", out)
+
+    def test_knfs_get_export_options_invalid_option_ro(self):
+        mock_out = {"knfs:export_options": "ro"}
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value=mock_out))
+        access = self.access
+        options_not_allowed = ['rw', 'ro']
+        share = fake_share.fake_share(share_type="fake_share_type")
+        self.assertRaises(exception.InvalidInput,
+                          self._knfs_helper.get_export_options,
+                          share, access, 'KNFS', options_not_allowed)
+
+    def test_knfs_get_export_options_invalid_option_rw(self):
+        mock_out = {"knfs:export_options": "rw"}
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value=mock_out))
+        access = self.access
+        options_not_allowed = ['rw', 'ro']
+        share = fake_share.fake_share(share_type="fake_share_type")
+        self.assertRaises(exception.InvalidInput,
+                          self._knfs_helper.get_export_options,
+                          share, access, 'KNFS', options_not_allowed)
+
     def test_knfs_allow_access(self):
         self._knfs_helper._execute = mock.Mock(
             return_value=['/fs0 <world>', 0]
         )
         self.mock_object(re, 'search', mock.Mock(return_value=None))
         export_opts = None
-        self._knfs_helper._get_export_options = mock.Mock(
+        self._knfs_helper.get_export_options = mock.Mock(
             return_value=export_opts
         )
         self._knfs_helper._publish_access = mock.Mock()
-        access_type = self.access['access_type']
-        access = self.access['access_to']
+        access = self.access
+        options_not_allowed = ['rw', 'ro']
         local_path = self.fakesharepath
-        self._knfs_helper.allow_access(local_path, self.share,
-                                       access_type, access)
+        self._knfs_helper.allow_access(local_path, self.share, access)
         self._knfs_helper._execute.assert_called_once_with('exportfs',
                                                            run_as_root=True)
         self.assertTrue(re.search.called)
-        self._knfs_helper._get_export_options.assert_any_call(self.share)
-        cmd = ['exportfs', '-o', export_opts, ':'.join([access, local_path])]
+        self._knfs_helper.get_export_options.assert_any_call(
+            self.share, access, 'KNFS',
+            options_not_allowed)
+        cmd = ['exportfs', '-o', export_opts, ':'.join([access['access_to'],
+                                                       local_path])]
         self._knfs_helper._publish_access.assert_called_once_with(*cmd)
 
     def test_knfs_allow_access_access_exists(self):
         out = ['/fs0 <world>', 0]
         self._knfs_helper._execute = mock.Mock(return_value=out)
         self.mock_object(re, 'search', mock.Mock(return_value="fake"))
-        self._knfs_helper._get_export_options = mock.Mock()
-        access_type = self.access['access_type']
-        access = self.access['access_to']
+        self._knfs_helper.get_export_options = mock.Mock()
+        access = self.access
         local_path = self.fakesharepath
         self.assertRaises(exception.ShareAccessExists,
                           self._knfs_helper.allow_access,
-                          local_path, self.share,
-                          access_type, access)
+                          local_path, self.share, access)
         self._knfs_helper._execute.assert_any_call('exportfs',
                                                    run_as_root=True)
         self.assertTrue(re.search.called)
-        self.assertFalse(self._knfs_helper._get_export_options.called)
+        self.assertFalse(self._knfs_helper.get_export_options.called)
 
     def test_knfs_allow_access_invalid_access(self):
-        access_type = 'invalid_access_type'
+        access = fake_share.fake_access(access_type='test')
         self.assertRaises(exception.InvalidShareAccess,
                           self._knfs_helper.allow_access,
                           self.fakesharepath, self.share,
-                          access_type,
-                          self.access['access_to'])
+                          access)
 
     def test_knfs_allow_access_exception(self):
         self._knfs_helper._execute = mock.Mock(
             side_effect=exception.ProcessExecutionError
         )
-        access_type = self.access['access_type']
-        access = self.access['access_to']
+        access = self.access
         local_path = self.fakesharepath
         self.assertRaises(exception.GPFSException,
                           self._knfs_helper.allow_access,
                           local_path, self.share,
-                          access_type, access)
+                          access)
         self._knfs_helper._execute.assert_called_once_with('exportfs',
                                                            run_as_root=True)
 
     def test_knfs_deny_access(self):
         self._knfs_helper._publish_access = mock.Mock()
-        access = self.access['access_to']
-        access_type = self.access['access_type']
+        access = self.access
         local_path = self.fakesharepath
-        self._knfs_helper.deny_access(local_path, self.share,
-                                      access_type, access)
-        cmd = ['exportfs', '-u', ':'.join([access, local_path])]
+        self._knfs_helper.deny_access(local_path, self.share, access)
+        cmd = ['exportfs', '-u', ':'.join([access['access_to'], local_path])]
         self._knfs_helper._publish_access.assert_called_once_with(*cmd)
 
     def test_knfs_deny_access_exception(self):
         self._knfs_helper._publish_access = mock.Mock(
             side_effect=exception.ProcessExecutionError
         )
-        access = self.access['access_to']
-        access_type = self.access['access_type']
+        access = self.access
         local_path = self.fakesharepath
-        cmd = ['exportfs', '-u', ':'.join([access, local_path])]
+        cmd = ['exportfs', '-u', ':'.join([access['access_to'], local_path])]
         self.assertRaises(exception.GPFSException,
                           self._knfs_helper.deny_access, local_path,
-                          self.share, access_type, access)
+                          self.share, access)
         self._knfs_helper._publish_access.assert_called_once_with(*cmd)
 
     def test_knfs__publish_access(self):
@@ -775,142 +819,164 @@ class GPFSShareDriverTestCase(test.TestCase):
         utils.execute.assert_called_once_with(*cmd, run_as_root=True,
                                               check_exit_code=True)
 
-    def test_gnfs_allow_access(self):
-        self._gnfs_helper._ganesha_process_request = mock.Mock()
-        access = self.access['access_to']
-        access_type = self.access['access_type']
-        local_path = self.fakesharepath
-        self._gnfs_helper.allow_access(local_path, self.share,
-                                       access_type, access)
-        self._gnfs_helper._ganesha_process_request.assert_called_once_with(
-            "allow_access", local_path, self.share, access_type, access
-        )
+    def test_ces_get_export_options(self):
+        mock_out = {"ces:export_options": "squash=no_root_squash"}
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value=mock_out))
+        access = self.access
+        options_not_allowed = ['access_type=ro', 'access_type=rw']
+        out = self._ces_helper.get_export_options(self.share, access,
+                                                  'CES', options_not_allowed)
+        self.assertEqual("squash=no_root_squash,access_type=rw", out)
 
-    def test_gnfs_allow_access_invalid_access(self):
-        access_type = 'invalid_access_type'
+    def test_ces_get_export_options_default(self):
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value={}))
+        access = self.access
+        options_not_allowed = ['access_type=ro', 'access_type=rw']
+        out = self._ces_helper.get_export_options(self.share, access,
+                                                  'CES', options_not_allowed)
+        self.assertEqual("access_type=rw", out)
+
+    def test_ces_get_export_options_invalid_option_ro(self):
+        mock_out = {"ces:export_options": "access_type=ro"}
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value=mock_out))
+        access = self.access
+        options_not_allowed = ['access_type=ro', 'access_type=rw']
+        share = fake_share.fake_share(share_type="fake_share_type")
+        self.assertRaises(exception.InvalidInput,
+                          self._ces_helper.get_export_options,
+                          share, access, 'CES', options_not_allowed)
+
+    def test_ces_get_export_options_invalid_option_rw(self):
+        mock_out = {"ces:export_options": "access_type=rw"}
+        self.mock_object(share_types, 'get_extra_specs_from_share',
+                         mock.Mock(return_value=mock_out))
+        access = self.access
+        options_not_allowed = ['access_type=ro', 'access_type=rw']
+        share = fake_share.fake_share(share_type="fake_share_type")
+        self.assertRaises(exception.InvalidInput,
+                          self._ces_helper.get_export_options,
+                          share, access, 'CES', options_not_allowed)
+
+    def test_ces_remove_export(self):
+        mock_out = "Path Delegations Clients\n\
+                    ------------------------\n\
+                    /gpfs0/share-fakeid none *"
+        self._ces_helper._execute = mock.Mock(
+            return_value=[mock_out, 0])
+
+        mock_search_out = "/gpfs0/share-fakeid"
+        self.mock_object(re, 'search', mock.Mock(return_value=mock_search_out))
+
+        local_path = self.fakesharepath
+
+        self._ces_helper.remove_export(local_path, self.share)
+
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'list',
+                                                  '-n', local_path)
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'remove',
+                                                  local_path)
+
+    def test_ces_remove_export_exception(self):
+        local_path = self.fakesharepath
+        self._ces_helper._execute = mock.Mock(
+            side_effect=exception.ProcessExecutionError)
+        self.assertRaises(exception.GPFSException,
+                          self._ces_helper.remove_export,
+                          local_path, self.share)
+
+    def test_ces_allow_access(self):
+        mock_out = "Path Delegations Clients\n\
+                    ------------------------"
+        self._ces_helper._execute = mock.Mock(
+            return_value=[mock_out, 0])
+
+        export_opts = "access_type=rw"
+        self._ces_helper.get_export_options = mock.Mock(
+            return_value=export_opts)
+        self.mock_object(re, 'search', mock.Mock(return_value=None))
+
+        access = self.access
+        local_path = self.fakesharepath
+
+        self._ces_helper.allow_access(local_path, self.share, access)
+
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'list',
+                                                  '-n', local_path)
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'add',
+                                                  local_path, '-c',
+                                                  access['access_to']
+                                                  + '(' + export_opts + ')')
+
+    def test_ces_allow_access_existing_export(self):
+        mock_out = "Path Delegations Clients\n\
+                    ------------------------\n\
+                    /gpfs0/share-fakeid none *"
+        self._ces_helper._execute = mock.Mock(
+            return_value=[mock_out, 0])
+
+        export_opts = "access_type=rw"
+        self._ces_helper.get_export_options = mock.Mock(
+            return_value=export_opts)
+        mock_search_out = "/gpfs0/share-fakeid"
+        self.mock_object(re, 'search', mock.Mock(return_value=mock_search_out))
+
+        access = self.access
+        local_path = self.fakesharepath
+
+        self._ces_helper.allow_access(local_path, self.share, access)
+
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'list',
+                                                  '-n', local_path)
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'change',
+                                                  local_path, '--nfsadd',
+                                                  access['access_to']
+                                                  + '(' + export_opts + ')')
+
+    def test_ces_allow_access_invalid_access_type(self):
+        access = fake_share.fake_access(access_type='test')
         self.assertRaises(exception.InvalidShareAccess,
-                          self._gnfs_helper.allow_access,
+                          self._ces_helper.allow_access,
                           self.fakesharepath, self.share,
-                          access_type,
-                          self.access['access_to'])
+                          access)
 
-    def test_gnfs_deny_access(self):
-        self._gnfs_helper._ganesha_process_request = mock.Mock()
-        access = self.access['access_to']
-        access_type = self.access['access_type']
+    def test_ces_allow_access_exception(self):
+        access = self.access
         local_path = self.fakesharepath
-        self._gnfs_helper.deny_access(local_path, self.share,
-                                      access_type, access)
-        self._gnfs_helper._ganesha_process_request.assert_called_once_with(
-            "deny_access", local_path, self.share, access_type, access, False
-        )
+        self._ces_helper._execute = mock.Mock(
+            side_effect=exception.ProcessExecutionError)
+        self.assertRaises(exception.GPFSException,
+                          self._ces_helper.allow_access, local_path,
+                          self.share, access)
 
-    def test_gnfs_remove_export(self):
-        self._gnfs_helper._ganesha_process_request = mock.Mock()
-        local_path = self.fakesharepath
-        self._gnfs_helper.remove_export(local_path, self.share)
-        self._gnfs_helper._ganesha_process_request.assert_called_once_with(
-            "remove_export", local_path, self.share
-        )
+    def test_ces_deny_access(self):
+        mock_out = "Path Delegations Clients\n\
+                    ------------------------\n\
+                    /gpfs0/share-fakeid none *"
+        self._ces_helper._execute = mock.Mock(
+            return_value=[mock_out, 0])
 
-    def test_gnfs__ganesha_process_request_allow_access(self):
-        access = self.access['access_to']
-        access_type = self.access['access_type']
-        local_path = self.fakesharepath
-        cfgpath = self._gnfs_helper.configuration.ganesha_config_path
-        gservers = self._gnfs_helper.configuration.gpfs_nfs_server_list
-        export_opts = []
-        pre_lines = []
-        exports = {}
-        self._gnfs_helper._get_export_options = mock.Mock(
-            return_value=export_opts
-        )
-        self.mock_object(ganesha_utils, 'parse_ganesha_config', mock.Mock(
-            return_value=(pre_lines, exports)
-        ))
-        self.mock_object(ganesha_utils, 'export_exists', mock.Mock(
-            return_value=False
-        ))
-        self.mock_object(ganesha_utils, 'get_next_id', mock.Mock(
-            return_value=101
-        ))
-        self.mock_object(ganesha_utils, 'get_export_template', mock.Mock(
-            return_value={}
-        ))
-        self.mock_object(ganesha_utils, 'publish_ganesha_config')
-        self.mock_object(ganesha_utils, 'reload_ganesha_config')
-        self._gnfs_helper._ganesha_process_request(
-            "allow_access", local_path, self.share, access_type, access
-        )
-        self._gnfs_helper._get_export_options.assert_called_once_with(
-            self.share
-        )
-        ganesha_utils.export_exists.assert_called_once_with(exports,
-                                                            local_path)
-        ganesha_utils.parse_ganesha_config.assert_called_once_with(cfgpath)
-        ganesha_utils.publish_ganesha_config.assert_called_once_with(
-            gservers, self.sshlogin, self.sshkey, cfgpath, pre_lines, exports
-        )
-        ganesha_utils.reload_ganesha_config.assert_called_once_with(
-            gservers, self.sshlogin, self.gservice
-        )
+        mock_search_out = "/gpfs0/share-fakeid"
+        self.mock_object(re, 'search', mock.Mock(return_value=mock_search_out))
 
-    def test_gnfs__ganesha_process_request_deny_access(self):
-        access = self.access['access_to']
-        access_type = self.access['access_type']
+        access = self.access
         local_path = self.fakesharepath
-        cfgpath = self._gnfs_helper.configuration.ganesha_config_path
-        gservers = self._gnfs_helper.configuration.gpfs_nfs_server_list
-        pre_lines = []
-        initial_access = "10.0.0.1,10.0.0.2"
-        export = {"rw_access": initial_access}
-        exports = {}
-        self.mock_object(ganesha_utils, 'parse_ganesha_config', mock.Mock(
-            return_value=(pre_lines, exports)
-        ))
-        self.mock_object(ganesha_utils, 'get_export_by_path', mock.Mock(
-            return_value=export
-        ))
-        self.mock_object(ganesha_utils, 'format_access_list', mock.Mock(
-            return_value="10.0.0.1"
-        ))
-        self.mock_object(ganesha_utils, 'publish_ganesha_config')
-        self.mock_object(ganesha_utils, 'reload_ganesha_config')
-        self._gnfs_helper._ganesha_process_request(
-            "deny_access", local_path, self.share, access_type, access
-        )
-        ganesha_utils.parse_ganesha_config.assert_called_once_with(cfgpath)
-        ganesha_utils.get_export_by_path.assert_called_once_with(exports,
-                                                                 local_path)
-        ganesha_utils.format_access_list.assert_called_once_with(
-            initial_access, deny_access=access
-        )
-        ganesha_utils.publish_ganesha_config.assert_called_once_with(
-            gservers, self.sshlogin, self.sshkey, cfgpath, pre_lines, exports
-        )
-        ganesha_utils.reload_ganesha_config.assert_called_once_with(
-            gservers, self.sshlogin, self.gservice
-        )
 
-    def test_gnfs__ganesha_process_request_remove_export(self):
+        self._ces_helper.deny_access(local_path, self.share, access)
+
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'list',
+                                                  '-n', local_path)
+        self._ces_helper._execute.assert_any_call('mmnfs', 'export', 'change',
+                                                  local_path, '--nfsremove',
+                                                  access['access_to'])
+
+    def test_ces_deny_access_exception(self):
+        access = self.access
         local_path = self.fakesharepath
-        cfgpath = self._gnfs_helper.configuration.ganesha_config_path
-        pre_lines = []
-        exports = {}
-        export = {}
-        self.mock_object(ganesha_utils, 'parse_ganesha_config', mock.Mock(
-            return_value=(pre_lines, exports)
-        ))
-        self.mock_object(ganesha_utils, 'get_export_by_path', mock.Mock(
-            return_value=export
-        ))
-        self.mock_object(ganesha_utils, 'publish_ganesha_config')
-        self.mock_object(ganesha_utils, 'reload_ganesha_config')
-        self._gnfs_helper._ganesha_process_request(
-            "remove_export", local_path, self.share
-        )
-        ganesha_utils.parse_ganesha_config.assert_called_once_with(cfgpath)
-        ganesha_utils.get_export_by_path.assert_called_once_with(exports,
-                                                                 local_path)
-        self.assertFalse(ganesha_utils.publish_ganesha_config.called)
-        self.assertFalse(ganesha_utils.reload_ganesha_config.called)
+        self._ces_helper._execute = mock.Mock(
+            side_effect=exception.ProcessExecutionError)
+        self.assertRaises(exception.GPFSException,
+                          self._ces_helper.deny_access, local_path,
+                          self.share, access)
diff --git a/releasenotes/notes/ibm-gpfs-ces-support-3498e35d9fea1b55.yaml b/releasenotes/notes/ibm-gpfs-ces-support-3498e35d9fea1b55.yaml
new file mode 100644
index 0000000000..da99109377
--- /dev/null
+++ b/releasenotes/notes/ibm-gpfs-ces-support-3498e35d9fea1b55.yaml
@@ -0,0 +1,14 @@
+---
+prelude: >
+    Refactored GPFS driver to support NFS Ganesha
+    through Spectrum Scale CES framework.
+upgrade:
+  - Added a new config option is_gpfs_node which will
+    determine if manila share service is running on
+    GPFS node or not.
+    Added mmnfs commands in the root wrap share.filters.
+    Removed scp and ssh commands from root wrap share.filters.
+deprecations:
+  - Deprecated knfs_export_options configuration
+    parameter as export options are now configured
+    in extra specs of share types.