Add MapR-FS native driver

Implement new manila driver for MapR-FS distributed file system

DocImpact

Co-Authored-By: Grigoriy Rozhkov <groghkov@maprtech.com>

Implements: blueprint maprfs-driver

Change-Id: I6073edf76fbf11bf9d3c521129c377c96e57a21
This commit is contained in:
Vitaliy Levitksi 2016-12-15 16:16:35 +02:00 committed by Vitaliy Levitski
parent 75e685d40d
commit 306c79fd02
18 changed files with 1735 additions and 3 deletions

View File

@ -310,7 +310,7 @@ outlining some core sections.
# Enable protocols NFS and CIFS as those are the only supported
# by Generic driver that we are configuring in this set up.
# All available values are (NFS, CIFS, GlusterFS, HDFS, 'CEPHFS')
# All available values are (NFS, CIFS, GlusterFS, HDFS, 'CEPHFS', MapRFS)
enabled_share_protocols = NFS,CIFS
# Manila requires share-type for share creation.

View File

@ -44,6 +44,7 @@ protocols is as follows:
- CIFS
- GlusterFS
- HDFS
- MapRFS
- CephFS
Access rules

View File

@ -121,6 +121,7 @@ Share backends
hitachi_hnas_driver
hpe_3par_driver
tegile_driver
maprfs_native_driver
Indices and tables
------------------

View File

@ -79,6 +79,8 @@ Mapping of share drivers and share features support
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+
| NexentaStor5 | N | \- | N | N | N | N | \- |
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+
| MapRFS | O | O | O | O | O | O | O |
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+
Mapping of share drivers and share access rules support
-------------------------------------------------------
@ -134,6 +136,8 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| NexentaStor5 | NFS (N) | \- | \- | \- | NFS (N) | \- | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| MapRFS | \- | MapRFS(O) | \- | \- | \- | MapRFS(O) | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
Mapping of share drivers and security services support
------------------------------------------------------
@ -187,6 +191,9 @@ Mapping of share drivers and security services support
+----------------------------------------+------------------+-----------------+------------------+
| NexentaStor5 | \- | \- | \- |
+----------------------------------------+------------------+-----------------+------------------+
| MapRFS | \- | \- | \- |
+----------------------------------------+------------------+-----------------+------------------+
Mapping of share drivers and common capabilities
------------------------------------------------
@ -242,6 +249,8 @@ More information: :ref:`capabilities_and_extra_specs`
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+
| NexentaStor5 | \- | N | N | N | N | N | \- | N |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+
| MapRFS | \- | N | \- | \- | \- | N | \- | O |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+
.. note::

View File

@ -89,7 +89,7 @@ UPDATING_RULES_STATUSES = (
)
SUPPORTED_SHARE_PROTOCOLS = (
'NFS', 'CIFS', 'GLUSTERFS', 'HDFS', 'CEPHFS')
'NFS', 'CIFS', 'GLUSTERFS', 'HDFS', 'CEPHFS', 'MAPRFS')
SECURITY_SERVICES_ALLOWED_TYPES = ['active_directory', 'ldap', 'kerberos']

View File

@ -707,6 +707,10 @@ class HDFSException(ManilaException):
message = _("HDFS exception occurred!")
class MapRFSException(ManilaException):
message = _("MapRFS exception occurred: %(msg)s")
class ZFSonLinuxException(ManilaException):
message = _("ZFSonLinux exception occurred: %(msg)s")

View File

@ -68,6 +68,7 @@ import manila.share.drivers.hpe.hpe_3par_driver
import manila.share.drivers.huawei.huawei_nas
import manila.share.drivers.ibm.gpfs
import manila.share.drivers.lvm
import manila.share.drivers.maprfs.maprfs_native
import manila.share.drivers.netapp.options
import manila.share.drivers.nexenta.options
import manila.share.drivers.quobyte.quobyte
@ -137,6 +138,7 @@ _global_opt_lists = [
manila.share.drivers.hpe.hpe_3par_driver.HPE3PAR_OPTS,
manila.share.drivers.huawei.huawei_nas.huawei_opts,
manila.share.drivers.ibm.gpfs.gpfs_share_opts,
manila.share.drivers.maprfs.maprfs_native.maprfs_native_share_opts,
manila.share.drivers.lvm.share_opts,
manila.share.drivers.netapp.options.netapp_proxy_opts,
manila.share.drivers.netapp.options.netapp_connection_opts,

View File

View File

@ -0,0 +1,348 @@
# Copyright (c) 2016, MapR Technologies
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utility for processing MapR cluster operations
"""
import json
import pipes
import socket
from oslo_concurrency import processutils
from oslo_log import log
import six
from manila.common import constants
from manila import exception
from manila.i18n import _
from manila.i18n import _LE
from manila import utils
LOG = log.getLogger(__name__)
def get_version_handler(configuration):
# here can be choosing DriverUtils depend on cluster version
return BaseDriverUtil(configuration)
class BaseDriverUtil(object):
"""Utility class for MapR-FS specific operations."""
NOT_FOUND_MSG = 'No such'
ERROR_MSG = 'ERROR'
def __init__(self, configuration):
self.configuration = configuration
self.ssh_connections = {}
self.hosts = self.configuration.maprfs_clinode_ip
self.local_hosts = socket.gethostbyname_ex(socket.gethostname())[2]
self.maprcli_bin = '/usr/bin/maprcli'
self.hadoop_bin = '/usr/bin/hadoop'
def _execute(self, *cmd, **kwargs):
for x in range(0, len(self.hosts)):
try:
check_exit_code = kwargs.pop('check_exit_code', True)
host = self.hosts[x]
if host in self.local_hosts:
cmd = self._as_user(cmd,
self.configuration.maprfs_ssh_name)
out, err = utils.execute(*cmd,
check_exit_code=check_exit_code)
else:
out, err = self._run_ssh(host, cmd, check_exit_code)
# move available cldb host to the beginning
if x > 0:
self.hosts[0], self.hosts[x] = self.hosts[x], self.hosts[0]
return out, err
except exception.ProcessExecutionError as e:
if self._check_error(e):
raise
elif x < len(self.hosts) - 1:
msg = _LE('Error running SSH command. Trying another host')
LOG.error(msg)
else:
raise
except Exception as e:
if x < len(self.hosts) - 1:
msg = _LE('Error running SSH command. Trying another host')
LOG.error(msg)
else:
raise exception.ProcessExecutionError(six.text_type(e))
def _run_ssh(self, host, cmd_list, check_exit_code=False):
command = ' '.join(pipes.quote(cmd_arg) for cmd_arg in cmd_list)
connection = self.ssh_connections.get(host)
if connection is None:
ssh_name = self.configuration.maprfs_ssh_name
password = self.configuration.maprfs_ssh_pw
private_key = self.configuration.maprfs_ssh_private_key
remote_ssh_port = self.configuration.maprfs_ssh_port
ssh_conn_timeout = self.configuration.ssh_conn_timeout
min_size = self.configuration.ssh_min_pool_conn
max_size = self.configuration.ssh_max_pool_conn
ssh_pool = utils.SSHPool(host,
remote_ssh_port,
ssh_conn_timeout,
ssh_name,
password=password,
privatekey=private_key,
min_size=min_size,
max_size=max_size)
ssh = ssh_pool.create()
self.ssh_connections[host] = (ssh_pool, ssh)
else:
ssh_pool, ssh = connection
if not ssh.get_transport().is_active():
ssh_pool.remove(ssh)
ssh = ssh_pool.create()
self.ssh_connections[host] = (ssh_pool, ssh)
return processutils.ssh_execute(
ssh,
command,
check_exit_code=check_exit_code)
@staticmethod
def _check_error(error):
# check if error was native
return BaseDriverUtil.ERROR_MSG in error.stdout
@staticmethod
def _as_user(cmd, user):
return ['sudo', 'su', '-', user, '-c',
' '.join(pipes.quote(cmd_arg) for cmd_arg in cmd)]
@staticmethod
def _add_params(cmd, **kwargs):
params = []
for x in kwargs.keys():
params.append('-' + x)
params.append(kwargs[x])
return cmd + params
def create_volume(self, name, path, size, **kwargs):
# delete size param as it is set separately
if kwargs.get('quota'):
del kwargs['quota']
sizestr = six.text_type(size) + 'G'
cmd = [self.maprcli_bin, 'volume', 'create', '-name',
name, '-path', path, '-quota',
sizestr, '-readAce', '', '-writeAce', '']
cmd = self._add_params(cmd, **kwargs)
self._execute(*cmd)
def volume_exists(self, volume_name):
cmd = [self.maprcli_bin, 'volume', 'info', '-name', volume_name]
out, __ = self._execute(*cmd, check_exit_code=False)
return self.NOT_FOUND_MSG not in out
def delete_volume(self, name):
cmd = [self.maprcli_bin, 'volume', 'remove', '-name', name, '-force',
'true']
out, __ = self._execute(*cmd, check_exit_code=False)
# if volume does not exist do not raise exception.ProcessExecutionError
if self.ERROR_MSG in out and self.NOT_FOUND_MSG not in out:
raise exception.ProcessExecutionError(out)
def set_volume_size(self, name, size):
sizestr = six.text_type(size) + 'G'
cmd = [self.maprcli_bin, 'volume', 'modify', '-name', name, '-quota',
sizestr]
self._execute(*cmd)
def create_snapshot(self, name, volume_name):
cmd = [self.maprcli_bin, 'volume', 'snapshot', 'create',
'-snapshotname',
name, '-volume', volume_name]
self._execute(*cmd)
def delete_snapshot(self, name, volume_name):
cmd = [self.maprcli_bin, 'volume', 'snapshot', 'remove',
'-snapshotname',
name, '-volume', volume_name]
out, __ = self._execute(*cmd, check_exit_code=False)
# if snapshot does not exist do not raise ProcessExecutionError
if self.ERROR_MSG in out and self.NOT_FOUND_MSG not in out:
raise exception.ProcessExecutionError(out)
def get_volume_info(self, volume_name, columns=None):
cmd = [self.maprcli_bin, 'volume', 'info', '-name', volume_name,
'-json']
if columns:
cmd += ['-columns', ','.join(columns)]
out, __ = self._execute(*cmd)
return json.loads(out)['data'][0]
def get_volume_info_by_path(self, volume_path, columns=None,
check_if_exists=False):
cmd = [self.maprcli_bin, 'volume', 'info', '-path', volume_path,
'-json']
if columns:
cmd += ['-columns', ','.join(columns)]
out, __ = self._execute(*cmd, check_exit_code=not check_if_exists)
if check_if_exists and self.NOT_FOUND_MSG in out:
return None
return json.loads(out)['data'][0]
def get_snapshot_list(self, volume_name=None, volume_path=None):
params = {}
if volume_name:
params['volume'] = volume_name
if volume_path:
params['path'] = volume_name
cmd = [self.maprcli_bin, 'volume', 'snapshot', 'list', '-volume',
'-columns',
'snapshotname', '-json']
cmd = self._add_params(cmd, **params)
out, __ = self._execute(*cmd)
return [x['snapshotname'] for x in json.loads(out)['data']]
def rename_volume(self, name, new_name):
cmd = [self.maprcli_bin, 'volume', 'rename', '-name', name, '-newname',
new_name]
self._execute(*cmd)
def fs_capacity(self):
cmd = [self.hadoop_bin, 'fs', '-df']
out, err = self._execute(*cmd)
lines = out.splitlines()
try:
fields = lines[1].split()
total = int(fields[1])
free = int(fields[3])
except (IndexError, ValueError):
msg = _('Failed to get MapR-FS capacity info.')
LOG.exception(msg)
raise exception.ProcessExecutionError(msg)
return total, free
def maprfs_ls(self, path):
cmd = [self.hadoop_bin, 'fs', '-ls', path]
out, __ = self._execute(*cmd)
return out
def maprfs_cp(self, source, dest):
cmd = [self.hadoop_bin, 'fs', '-cp', '-p', source, dest]
self._execute(*cmd)
def maprfs_chmod(self, dest, mod):
cmd = [self.hadoop_bin, 'fs', '-chmod', mod, dest]
self._execute(*cmd)
def maprfs_du(self, path):
cmd = [self.hadoop_bin, 'fs', '-du', '-s', path]
out, __ = self._execute(*cmd)
return int(out.split(' ')[0])
def check_state(self):
cmd = [self.hadoop_bin, 'fs', '-ls', '/']
out, __ = self._execute(*cmd, check_exit_code=False)
return 'Found' in out
def dir_not_empty(self, path):
cmd = [self.hadoop_bin, 'fs', '-ls', path]
out, __ = self._execute(*cmd, check_exit_code=False)
return 'Found' in out
def set_volume_ace(self, volume_name, access_rules):
read_accesses = []
write_accesses = []
for access_rule in access_rules:
if access_rule['access_level'] == constants.ACCESS_LEVEL_RO:
read_accesses.append(access_rule['access_to'])
elif access_rule['access_level'] == constants.ACCESS_LEVEL_RW:
read_accesses.append(access_rule['access_to'])
write_accesses.append(access_rule['access_to'])
def rule_type(access_to):
if self.group_exists(access_to):
return 'g'
elif self.user_exists(access_to):
return 'u'
else:
# if nor user nor group exits, it should try add group rule
return 'g'
read_accesses_string = '|'.join(
map(lambda x: rule_type(x) + ':' + x, read_accesses))
write_accesses_string = '|'.join(
map(lambda x: rule_type(x) + ':' + x, write_accesses))
cmd = [self.maprcli_bin, 'volume', 'modify', '-name', volume_name,
'-readAce', read_accesses_string, '-writeAce',
write_accesses_string]
self._execute(*cmd)
def add_volume_ace_rules(self, volume_name, access_rules):
if not access_rules:
return
access_rules_map = self.get_access_rules(volume_name)
for access_rule in access_rules:
access_rules_map[access_rule['access_to']] = access_rule
self.set_volume_ace(volume_name, access_rules_map.values())
def remove_volume_ace_rules(self, volume_name, access_rules):
if not access_rules:
return
access_rules_map = self.get_access_rules(volume_name)
for access_rule in access_rules:
if access_rules_map.get(access_rule['access_to']):
del access_rules_map[access_rule['access_to']]
self.set_volume_ace(volume_name, access_rules_map.values())
def get_access_rules(self, volume_name):
info = self.get_volume_info(volume_name)
aces = info['volumeAces']
read_ace = aces['readAce']
write_ace = aces['writeAce']
access_rules_map = {}
self._retrieve_access_rules_from_ace(read_ace, 'r', access_rules_map)
self._retrieve_access_rules_from_ace(write_ace, 'w', access_rules_map)
return access_rules_map
def _retrieve_access_rules_from_ace(self, ace, ace_type, access_rules_map):
access = constants.ACCESS_LEVEL_RW if ace_type == 'w' else (
constants.ACCESS_LEVEL_RO)
if ace not in ['p', '']:
write_rules = [x.strip() for x in ace.split('|')]
for user in write_rules:
rule_type, username = user.split(':')
if rule_type not in ['u', 'g']:
continue
access_rules_map[username] = {
'access_level': access,
'access_to': username,
'access_type': 'user',
}
def user_exists(self, user):
cmd = ['getent', 'passwd', user]
out, __ = self._execute(*cmd, check_exit_code=False)
return out != ''
def group_exists(self, group):
cmd = ['getent', 'group', group]
out, __ = self._execute(*cmd, check_exit_code=False)
return out != ''
def get_cluster_name(self):
cmd = [self.maprcli_bin, 'dashboard', 'info', '-json']
out, __ = self._execute(*cmd)
try:
return json.loads(out)['data'][0]['cluster']['name']
except (IndexError, ValueError) as e:
msg = (_("Failed to parse cluster name. Error: %s") % e)
raise exception.ProcessExecutionError(msg)

View File

@ -0,0 +1,438 @@
# Copyright (c) 2016, MapR Technologies
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Share driver for MapR-FS distributed file system.
"""
import math
import os
from oslo_config import cfg
from oslo_log import log
from oslo_utils import strutils
from oslo_utils import units
from manila import context
from manila import exception
from manila.i18n import _, _LW, _LI
from manila.share import api
from manila.share import driver
from manila.share.drivers.maprfs import driver_util as mapru
LOG = log.getLogger(__name__)
maprfs_native_share_opts = [
cfg.ListOpt('maprfs_clinode_ip',
help='The list of IPs or hostnames of nodes where mapr-core '
'is installed.'),
cfg.PortOpt('maprfs_ssh_port',
default=22,
help='CLDB node SSH port.'),
cfg.StrOpt('maprfs_ssh_name',
default="mapr",
help='Cluster admin user ssh login name.'),
cfg.StrOpt('maprfs_ssh_pw',
help='Cluster node SSH login password, '
'This parameter is not necessary, if '
'\'maprfs_ssh_private_key\' is configured.'),
cfg.StrOpt('maprfs_ssh_private_key',
help='Path to SSH private '
'key for login.'),
cfg.StrOpt('maprfs_base_volume_dir',
default='/',
help='Path in MapRFS where share volumes must be created.'),
cfg.ListOpt('maprfs_zookeeper_ip',
help='The list of IPs or hostnames of ZooKeeper nodes.'),
cfg.ListOpt('maprfs_cldb_ip',
help='The list of IPs or hostnames of CLDB nodes.'),
cfg.BoolOpt('maprfs_rename_managed_volume',
default=True,
help='Specify whether existing volume should be renamed when'
' start managing.'),
]
CONF = cfg.CONF
CONF.register_opts(maprfs_native_share_opts)
class MapRFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver):
"""MapR-FS Share Driver.
Executes commands relating to shares.
driver_handles_share_servers must be False because this driver does not
support creating or managing virtual storage servers (share servers)
API version history:
1.0 - Initial Version
"""
def __init__(self, *args, **kwargs):
super(MapRFSNativeShareDriver, self).__init__(False, *args, **kwargs)
self.configuration.append_config_values(maprfs_native_share_opts)
self.backend_name = self.configuration.safe_get(
'share_backend_name') or 'MapR-FS-Native'
self._base_volume_dir = self.configuration.safe_get(
'maprfs_base_volume_dir') or '/'
self._maprfs_util = None
self._maprfs_base_path = "maprfs://"
self.cldb_ip = self.configuration.maprfs_cldb_ip or []
self.zookeeper_ip = self.configuration.maprfs_zookeeper_ip or []
self.rename_volume = self.configuration.maprfs_rename_managed_volume
self.api = api.API()
def do_setup(self, context):
"""Do initialization while the share driver starts."""
super(MapRFSNativeShareDriver, self).do_setup(context)
self._maprfs_util = mapru.get_version_handler(self.configuration)
def _share_dir(self, share_name):
return os.path.join(self._base_volume_dir, share_name)
def _volume_name(self, share_name):
return share_name
def _get_share_path(self, share):
return share['export_location']
def _get_snapshot_path(self, snapshot):
share_dir = snapshot['share_instance']['export_location'].split(
' ')[0][len(self._maprfs_base_path):]
return os.path.join(share_dir, '.snapshot',
snapshot['provider_location'] or snapshot['name'])
def _get_volume_name(self, context, share):
metadata = self.api.get_share_metadata(context,
{'id': share['share_id']})
return metadata.get('_name', self._volume_name(share['name']))
def _get_share_export_locations(self, share, path=None):
"""Return share path on storage provider."""
cluster_name = self._maprfs_util.get_cluster_name()
path = '%(path)s -C %(cldb)s -Z %(zookeeper)s -N %(name)s' % {
'path': self._maprfs_base_path + (
path or self._share_dir(share['name'])),
'cldb': ' '.join(self.cldb_ip),
'zookeeper': ' '.join(self.zookeeper_ip),
'name': cluster_name
}
export_list = [{
"path": path,
"is_admin_only": False,
"metadata": {
"cldb": ','.join(self.cldb_ip),
"zookeeper": ','.join(self.zookeeper_ip),
"cluster-name": cluster_name,
},
}]
return export_list
def _create_share(self, share, metadata, context):
"""Creates a share."""
if share['share_proto'].lower() != 'maprfs':
msg = _('Only MapRFS protocol supported!')
LOG.error(msg)
raise exception.MapRFSException(msg=msg)
options = {k[1:]: v for k, v in metadata.items() if k[0] == '_'}
share_dir = options.pop('path', self._share_dir(share['name']))
volume_name = options.pop('name', self._volume_name(share['name']))
try:
self._maprfs_util.create_volume(volume_name, share_dir,
share['size'],
**options)
# posix permissions should be 777, ACEs are used as a restriction
self._maprfs_util.maprfs_chmod(share_dir, '777')
except exception.ProcessExecutionError:
self.api.update_share_metadata(context,
{'id': share['share_id']},
{'_name': 'error'})
msg = (_('Failed to create volume in MapR-FS for the '
'share %(share_name)s.') % {'share_name': share['name']})
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def _set_share_size(self, share, size):
volume_name = self._get_volume_name(context.get_admin_context(), share)
try:
if share['size'] > size:
info = self._maprfs_util.get_volume_info(volume_name)
used = info['totalused']
if int(used) >= int(size) * units.Ki:
raise exception.ShareShrinkingPossibleDataLoss(
share_id=share['id'])
self._maprfs_util.set_volume_size(volume_name, size)
except exception.ProcessExecutionError:
msg = (_('Failed to set space quota for the share %(share_name)s.')
% {'share_name': share['name']})
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def get_network_allocations_number(self):
return 0
def create_share(self, context, share, share_server=None):
"""Create a MapRFS volume which acts as a share."""
metadata = self.api.get_share_metadata(context,
{'id': share['share_id']})
self._create_share(share, metadata, context)
return self._get_share_export_locations(share,
path=metadata.get('_path'))
def ensure_share(self, context, share, share_server=None):
"""Updates export location if it is changes."""
volume_name = self._get_volume_name(context, share)
if self._maprfs_util.volume_exists(volume_name):
info = self._maprfs_util.get_volume_info(volume_name)
path = info['mountdir']
old_location = share['export_locations'][0]
new_location = self._get_share_export_locations(
share, path=path)
if new_location[0]['path'] != old_location['path']:
return new_location
else:
raise exception.ShareResourceNotFound(share_id=share['share_id'])
def create_share_from_snapshot(self, context, share, snapshot,
share_server=None):
"""Creates a share from snapshot."""
metadata = self.api.get_share_metadata(context,
{'id': share['share_id']})
sn_share_tenant = self.api.get_share_metadata(context, {
'id': snapshot['share_instance']['share_id']}).get('_tenantuser')
if sn_share_tenant and sn_share_tenant != metadata.get('_tenantuser'):
msg = (
_('Cannot create share from snapshot %(snapshot_name)s '
'with name %(share_name)s. Error: Tenant user should not '
'differ from tenant of the source snapshot.') %
{'snapshot_name': snapshot['name'],
'share_name': share['name']})
LOG.error(msg)
raise exception.MapRFSException(msg=msg)
share_dir = metadata.get('_path', self._share_dir(share['name']))
snapshot_path = self._get_snapshot_path(snapshot)
self._create_share(share, metadata, context)
try:
if self._maprfs_util.dir_not_empty(snapshot_path):
self._maprfs_util.maprfs_cp(snapshot_path + '/*', share_dir)
except exception.ProcessExecutionError:
msg = (
_('Failed to create share from snapshot %(snapshot_name)s '
'with name %(share_name)s.') % {
'snapshot_name': snapshot['name'],
'share_name': share['name']})
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
return self._get_share_export_locations(share,
path=metadata.get('_path'))
def create_snapshot(self, context, snapshot, share_server=None):
"""Creates a snapshot."""
volume_name = self._get_volume_name(context, snapshot['share'])
snapshot_name = snapshot['name']
try:
self._maprfs_util.create_snapshot(snapshot_name, volume_name)
return {'provider_location': snapshot_name}
except exception.ProcessExecutionError:
msg = (
_('Failed to create snapshot %(snapshot_name)s for the share '
'%(share_name)s.') % {'snapshot_name': snapshot_name,
'share_name': snapshot['share_name']})
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def delete_share(self, context, share, share_server=None):
"""Deletes share storage."""
volume_name = self._get_volume_name(context, share)
if volume_name == "error":
LOG.info(_LI("Skipping deleting share with name %s, as it does not"
" exist on the backend"), share['name'])
return
try:
self._maprfs_util.delete_volume(volume_name)
except exception.ProcessExecutionError:
msg = (_('Failed to delete share %(share_name)s.') %
{'share_name': share['name']})
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def delete_snapshot(self, context, snapshot, share_server=None):
"""Deletes a snapshot."""
snapshot_name = snapshot['provider_location'] or snapshot['name']
volume_name = self._get_volume_name(context, snapshot['share'])
try:
self._maprfs_util.delete_snapshot(snapshot_name, volume_name)
except exception.ProcessExecutionError:
msg = (_('Failed to delete snapshot %(snapshot_name)s.') %
{'snapshot_name': snapshot['name']})
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
"""Update access rules for given share."""
for access in access_rules:
if access['access_type'].lower() != 'user':
msg = _("Only 'user' access type allowed!")
LOG.error(msg)
raise exception.InvalidShareAccess(reason=msg)
volume_name = self._get_volume_name(context, share)
try:
# 'update_access' is called before share is removed, so this
# method shouldn`t raise exception if share does
# not exist actually
if not self._maprfs_util.volume_exists(volume_name):
LOG.warning(_LW('Can not get share %s.'), share['name'])
return
# check update
if add_rules or delete_rules:
self._maprfs_util.remove_volume_ace_rules(volume_name,
delete_rules)
self._maprfs_util.add_volume_ace_rules(volume_name, add_rules)
else:
self._maprfs_util.set_volume_ace(volume_name, access_rules)
except exception.ProcessExecutionError:
msg = (_('Failed to update access for share %(name)s.') %
{'name': share['name']})
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def extend_share(self, share, new_size, share_server=None):
"""Extend share storage."""
self._set_share_size(share, new_size)
def shrink_share(self, share, new_size, share_server=None):
"""Shrink share storage."""
self._set_share_size(share, new_size)
def _check_maprfs_state(self):
try:
return self._maprfs_util.check_state()
except exception.ProcessExecutionError:
msg = _('Failed to check MapRFS state.')
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def check_for_setup_error(self):
"""Return an error if the prerequisites are not met."""
if not self.configuration.maprfs_clinode_ip:
msg = _(
'MapR cluster has not been specified in the configuration. '
'Add the ip or list of ip of nodes with mapr-core installed '
'in the "maprfs_clinode_ip" configuration parameter.')
LOG.error(msg)
raise exception.MapRFSException(msg=msg)
if not self.configuration.maprfs_cldb_ip:
LOG.warning(_LW('CLDB nodes are not specified!'))
if not self.configuration.maprfs_zookeeper_ip:
LOG.warning(_LW('Zookeeper nodes are not specified!'))
if not self._check_maprfs_state():
msg = _('MapR-FS is not in healthy state.')
LOG.error(msg)
raise exception.MapRFSException(msg=msg)
try:
self._maprfs_util.maprfs_ls(
os.path.join(self._base_volume_dir, ''))
except exception.ProcessExecutionError:
msg = _('Invalid "maprfs_base_volume_name". No such directory.')
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def manage_existing(self, share, driver_options):
try:
# retrieve share path from export location, maprfs:// prefix and
# metadata (-C -Z -N) should be casted away
share_path = share['export_location'].split(
)[0][len(self._maprfs_base_path):]
info = self._maprfs_util.get_volume_info_by_path(
share_path, check_if_exists=True)
if not info:
msg = _("Share %s not found") % share[
'export_location']
LOG.error(msg)
raise exception.ManageInvalidShare(reason=msg)
size = math.ceil(float(info['quota']) / units.Ki)
used = math.ceil(float(info['totalused']) / units.Ki)
volume_name = info['volumename']
should_rename = self.rename_volume
rename_option = driver_options.get('rename')
if rename_option:
should_rename = strutils.bool_from_string(rename_option)
if should_rename:
self._maprfs_util.rename_volume(volume_name, share['name'])
else:
self.api.update_share_metadata(context.get_admin_context(),
{'id': share['share_id']},
{'_name': volume_name})
location = self._get_share_export_locations(share, path=share_path)
if size == 0:
size = used
msg = _LW(
'Share %s has no size quota. Total used value will be'
' used as share size')
LOG.warning(msg, share['name'])
return {'size': size, 'export_locations': location}
except (ValueError, KeyError, exception.ProcessExecutionError):
msg = _('Failed to manage share.')
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def manage_existing_snapshot(self, snapshot, driver_options):
volume_name = self._get_volume_name(context.get_admin_context(),
snapshot['share'])
snapshot_path = self._get_snapshot_path(snapshot)
try:
snapshot_list = self._maprfs_util.get_snapshot_list(
volume_name=volume_name)
snapshot_name = snapshot['provider_location']
if snapshot_name not in snapshot_list:
msg = _("Snapshot %s not found") % snapshot_name
LOG.error(msg)
raise exception.ManageInvalidShareSnapshot(reason=msg)
size = math.ceil(float(self._maprfs_util.maprfs_du(
snapshot_path)) / units.Gi)
return {'size': size}
except exception.ProcessExecutionError:
msg = _("Manage existing share snapshot failed.")
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
def _update_share_stats(self):
"""Retrieves stats info of share directories group."""
try:
total, free = self._maprfs_util.fs_capacity()
except exception.ProcessExecutionError:
msg = _('Failed to check MapRFS capacity info.')
LOG.exception(msg)
raise exception.MapRFSException(msg=msg)
total_capacity_gb = int(math.ceil(float(total) / units.Gi))
free_capacity_gb = int(math.floor(float(free) / units.Gi))
data = {
'share_backend_name': self.backend_name,
'storage_protocol': 'MAPRFS',
'driver_handles_share_servers': self.driver_handles_share_servers,
'vendor_name': 'MapR Technologies',
'driver_version': '1.0',
'total_capacity_gb': total_capacity_gb,
'free_capacity_gb': free_capacity_gb,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
}
super(MapRFSNativeShareDriver, self)._update_share_stats(data)

View File

@ -0,0 +1,909 @@
# Copyright (c) 2016, MapR Technologies
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for MapRFS native protocol driver module."""
import socket
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
import six
from manila import context
from manila import exception
import manila.share.configuration as config
import manila.share.drivers.maprfs.maprfs_native as maprfs
from manila import test
from manila.tests import fake_share
from manila import utils
CONF = cfg.CONF
class MapRFSNativeShareDriverTestCase(test.TestCase):
"""Tests MapRFSNativeShareDriver."""
def setUp(self):
super(MapRFSNativeShareDriverTestCase, self).setUp()
self._context = context.get_admin_context()
self._hdfs_execute = mock.Mock(return_value=('', ''))
self.local_ip = '192.168.1.1'
CONF.set_default('driver_handles_share_servers', False)
CONF.set_default('maprfs_clinode_ip', [self.local_ip])
CONF.set_default('maprfs_ssh_name', 'fake_sshname')
CONF.set_default('maprfs_ssh_pw', 'fake_sshpw')
CONF.set_default('maprfs_ssh_private_key', 'fake_sshkey')
CONF.set_default('maprfs_rename_managed_volume', True)
self.fake_conf = config.Configuration(None)
self.cluster_name = 'fake'
self._driver = maprfs.MapRFSNativeShareDriver(
configuration=self.fake_conf)
self._driver.do_setup(self._context)
export_locations = {0: {'path': '/share-0'}}
export_locations[0]['el_metadata'] = {
'volume-name': 'share-0'}
self.share = fake_share.fake_share(share_proto='MAPRFS',
name='share-0', size=2, share_id=1,
export_locations=export_locations,
export_location='/share-0')
self.snapshot = fake_share.fake_snapshot(share_proto='MAPRFS',
name='fake',
share_name=self.share['name'],
share_id=self.share['id'],
share=self.share,
share_instance=self.share,
provider_location='fake')
self.access = fake_share.fake_access(access_type='user',
access_to='fake',
access_level='rw')
self.snapshot = self.snapshot.values
self.snapshot.update(share_instance=self.share)
self.export_path = 'maprfs:///share-0 -C -Z -N fake'
self.fakesnapshot_path = '/share-0/.snapshot/snapshot-0'
self.hadoop_bin = '/usr/bin/hadoop'
self.maprcli_bin = '/usr/bin/maprcli'
self._driver.api.get_share_metadata = mock.Mock(return_value={})
self._driver.api.update_share_metadata = mock.Mock()
utils.execute = mock.Mock()
socket.gethostname = mock.Mock(return_value='testserver')
socket.gethostbyname_ex = mock.Mock(return_value=(
'localhost',
['localhost.localdomain', 'testserver'],
['127.0.0.1', self.local_ip]))
def test_do_setup(self):
self._driver.do_setup(self._context)
self.assertIsNotNone(self._driver._maprfs_util)
self.assertEqual([self.local_ip], self._driver._maprfs_util.hosts)
def test_check_for_setup_error(self):
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver._maprfs_util.check_state = mock.Mock(return_value=True)
self._driver._maprfs_util.maprfs_ls = mock.Mock()
self._driver.check_for_setup_error()
def test_check_for_setup_error_exception_config(self):
self._driver.configuration.maprfs_clinode_ip = None
self.assertRaises(exception.MapRFSException,
self._driver.check_for_setup_error)
def test_check_for_setup_error_exception_no_dir(self):
self._driver._maprfs_util.check_state = mock.Mock(return_value=True)
self._driver._maprfs_util.maprfs_ls = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.MapRFSException,
self._driver.check_for_setup_error)
def test_check_for_setup_error_exception_cldb_state(self):
self._driver._check_maprfs_state = mock.Mock(return_value=False)
self.assertRaises(exception.MapRFSException,
self._driver.check_for_setup_error)
def test__check_maprfs_state_healthy(self):
fake_out = """Found 8 items
drwxr-xr-x - mapr mapr 0 2016-07-29 05:38 /apps"""
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, ''))
result = self._driver._check_maprfs_state()
self._driver._maprfs_util._execute.assert_called_once_with(
self.hadoop_bin, 'fs', '-ls', '/', check_exit_code=False)
self.assertTrue(result)
def test__check_maprfs_state_down(self):
fake_out = "No CLDB"
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, ''))
result = self._driver._check_maprfs_state()
self._driver._maprfs_util._execute.assert_called_once_with(
self.hadoop_bin, 'fs', '-ls', '/', check_exit_code=False)
self.assertFalse(result)
def test__check_maprfs_state_exception(self):
self._driver._maprfs_util._execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.MapRFSException,
self._driver._check_maprfs_state)
self._driver._maprfs_util._execute.assert_called_once_with(
self.hadoop_bin, 'fs', '-ls', '/', check_exit_code=False)
def test_create_share_unsupported_proto(self):
self._driver.api.get_share_metadata = mock.Mock(return_value={})
self._driver._get_share_path = mock.Mock()
self.assertRaises(exception.MapRFSException,
self._driver.create_share,
self._context,
fake_share.fake_share(share_id=1),
share_server=None)
self.assertFalse(self._driver._get_share_path.called)
def test_manage_existing(self):
self._driver._maprfs_util.get_volume_info_by_path = mock.Mock(
return_value={'quota': 1024, 'totalused': 966,
'volumename': 'fake'})
self._driver._maprfs_util._execute = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value="fake")
def test_manage_existing_no_rename(self):
self._driver._maprfs_util.get_volume_info_by_path = mock.Mock(
return_value={'quota': 1024, 'totalused': 966,
'volumename': 'fake'})
self._driver._maprfs_util._execute = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value="fake")
result = self._driver.manage_existing(self.share, {'rename': 'no'})
self.assertEqual(1, result['size'])
def test_manage_existing_exception(self):
self._driver._maprfs_util.get_volume_info_by_path = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.MapRFSException,
self._driver.manage_existing, self.share, {})
def test_manage_existing_invalid_share(self):
def fake_execute(self, *cmd, **kwargs):
check_exit_code = kwargs.get('check_exit_code', True)
if check_exit_code:
raise exception.ProcessExecutionError
else:
return 'No such volume', 0
self._driver._maprfs_util._execute = fake_execute
mock_execute = self._driver.manage_existing
self.assertRaises(exception.ManageInvalidShare, mock_execute,
self.share, {})
def test_manage_existing_snapshot(self):
self._driver._maprfs_util.get_snapshot_list = mock.Mock(
return_value=[self.snapshot['provider_location']])
self._driver._maprfs_util.maprfs_du = mock.Mock(return_value=11)
update = self._driver.manage_existing_snapshot(self.snapshot, {})
self.assertEqual(1, update['size'])
def test_manage_existing_snapshot_invalid(self):
self._driver._maprfs_util.get_snapshot_list = mock.Mock(
return_value=[])
mock_execute = self._driver.manage_existing_snapshot
self.assertRaises(exception.ManageInvalidShareSnapshot, mock_execute,
self.snapshot, {})
def test_manage_existing_snapshot_exception(self):
self._driver._maprfs_util.get_snapshot_list = mock.Mock(
side_effect=exception.ProcessExecutionError)
mock_execute = self._driver.manage_existing_snapshot
self.assertRaises(exception.MapRFSException, mock_execute,
self.snapshot, {})
def test_manage_existing_with_no_quota(self):
self._driver._maprfs_util.get_volume_info_by_path = mock.Mock(
return_value={'quota': 0, 'totalused': 1999,
'volumename': 'fake'})
self._driver._maprfs_util.rename_volume = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value="fake")
result = self._driver.manage_existing(self.share, {})
self.assertEqual(2, result['size'])
def test__set_volume_size(self):
volume = self._driver._volume_name(self.share['name'])
sizestr = six.text_type(self.share['size']) + 'G'
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver._maprfs_util.set_volume_size(volume,
self.share['size'])
self._driver._maprfs_util._execute.assert_called_once_with(
self.maprcli_bin, 'volume', 'modify', '-name', volume, '-quota',
sizestr)
def test_extend_share(self):
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util.set_volume_size = mock.Mock()
self._driver.extend_share(self.share, self.share['size'])
self._driver._maprfs_util.set_volume_size.assert_called_once_with(
volume, self.share['size'])
def test_extend_exception(self):
self._driver._maprfs_util.set_volume_size = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.MapRFSException, self._driver.extend_share,
self.share, self.share['size'])
def test_shrink_share(self):
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util.set_volume_size = mock.Mock()
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value={'total_user': 0})
self._driver.shrink_share(self.share, self.share['size'])
self._driver._maprfs_util.set_volume_size.assert_called_once_with(
volume, self.share['size'])
def test_update_access_add(self):
aces = {
'volumeAces': {
'readAce': 'u:fake|fake:fake',
'writeAce': 'u:fake',
}
}
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value=aces)
self._driver._maprfs_util.group_exists = mock.Mock(return_value=True)
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.update_access(self._context, self.share, [self.access],
[self.access], [])
self._driver._maprfs_util._execute.assert_any_call(
self.maprcli_bin, 'volume', 'modify', '-name', volume, '-readAce',
'g:' + self.access['access_to'], '-writeAce',
'g:' + self.access['access_to'])
def test_update_access_add_no_user_no_group_exists(self):
aces = {
'volumeAces': {
'readAce': 'u:fake|fake:fake',
'writeAce': 'u:fake',
}
}
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value=aces)
self._driver._maprfs_util.group_exists = mock.Mock(return_value=False)
self._driver._maprfs_util.user_exists = mock.Mock(return_value=False)
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.update_access(self._context, self.share, [self.access],
[self.access], [])
self._driver._maprfs_util._execute.assert_any_call(
self.maprcli_bin, 'volume', 'modify', '-name', volume, '-readAce',
'g:' + self.access['access_to'], '-writeAce',
'g:' + self.access['access_to'])
def test_update_access_delete(self):
aces = {
'volumeAces': {
'readAce': 'p',
'writeAce': 'p',
}
}
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value=aces)
self._driver._maprfs_util.group_exists = mock.Mock(return_value=True)
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.update_access(self._context, self.share, [], [],
[self.access])
self._driver._maprfs_util._execute.assert_any_call(
self.maprcli_bin, 'volume', 'modify', '-name', volume, '-readAce',
'',
'-writeAce', '')
def test_update_access_recover(self):
aces = {
'volumeAces': {
'readAce': 'u:fake',
'writeAce': 'u:fake',
}
}
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value=aces)
self._driver._maprfs_util.group_exists = mock.Mock(return_value=False)
self._driver._maprfs_util.user_exists = mock.Mock(return_value=True)
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.update_access(self._context, self.share, [self.access],
[], [])
self._driver._maprfs_util._execute.assert_any_call(
self.maprcli_bin, 'volume', 'modify', '-name', volume, '-readAce',
'u:' + self.access['access_to'], '-writeAce',
'u:' + self.access['access_to'])
def test_update_access_share_not_exists(self):
self._driver._maprfs_util.volume_exists = mock.Mock(
return_value=False)
self._driver._maprfs_util.group_exists = mock.Mock(return_value=True)
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.update_access(self._context, self.share, [self.access],
[], [])
self._driver._maprfs_util._execute.assert_not_called()
def test_update_access_exception(self):
aces = {
'volumeAces': {
'readAce': 'p',
'writeAce': 'p',
}
}
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value=aces)
self._driver._maprfs_util.group_exists = mock.Mock(return_value=True)
utils.execute = mock.Mock(
side_effect=exception.ProcessExecutionError(stdout='ERROR'))
self.assertRaises(exception.MapRFSException,
self._driver.update_access, self._context,
self.share, [self.access], [], [])
def test_update_access_invalid_access(self):
access = fake_share.fake_access(access_type='ip', access_to='fake',
access_level='rw')
self.assertRaises(exception.InvalidShareAccess,
self._driver.update_access, self._context,
self.share, [access], [], [])
def test_ensure_share(self):
self._driver._maprfs_util.volume_exists = mock.Mock(
return_value=True)
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value={'mountdir': self.share['export_location']})
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value=self.cluster_name)
result = self._driver.ensure_share(self._context, self.share)
self.assertEqual(self.export_path, result[0]['path'])
def test_create_share(self):
size_str = six.text_type(self.share['size']) + 'G'
path = self._driver._share_dir(self.share['name'])
self._driver.api.get_share_metadata = mock.Mock(
return_value={'_fake': 'fake'})
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver._maprfs_util.set_volume_size = mock.Mock()
self._driver._maprfs_util.maprfs_chmod = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value=self.cluster_name)
self._driver.create_share(self._context, self.share)
self._driver._maprfs_util._execute.assert_called_once_with(
self.maprcli_bin, 'volume', 'create', '-name', self.share['name'],
'-path', path, '-quota', size_str, '-readAce', '', '-writeAce', '',
'-fake', 'fake')
self._driver._maprfs_util.maprfs_chmod.assert_called_once_with(path,
'777')
def test_create_share_with_custom_name(self):
size_str = six.text_type(self.share['size']) + 'G'
self._driver.api.get_share_metadata = mock.Mock(
return_value={'_name': 'fake', '_path': 'fake'})
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver._maprfs_util.set_volume_size = mock.Mock()
self._driver._maprfs_util.maprfs_chmod = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value=self.cluster_name)
self._driver.create_share(self._context, self.share)
self._driver._maprfs_util._execute.assert_called_once_with(
self.maprcli_bin, 'volume', 'create', '-name', 'fake',
'-path', 'fake', '-quota', size_str, '-readAce', '', '-writeAce',
'')
self._driver._maprfs_util.maprfs_chmod.assert_called_once_with('fake',
'777')
def test_create_share_exception(self):
self._driver.api.get_share_metadata = mock.Mock(return_value={})
self._driver._maprfs_util._execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self._driver._maprfs_util.set_volume_size = mock.Mock()
self._driver._maprfs_util.maprfs_chmod = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value=self.cluster_name)
self.assertRaises(exception.MapRFSException, self._driver.create_share,
self._context, self.share)
def test_create_share_from_snapshot(self):
fake_snapshot = dict(self.snapshot)
fake_snapshot.update(share_instance={'share_id': 1})
size_str = six.text_type(self.share['size']) + 'G'
path = self._driver._share_dir(self.share['name'])
snapthot_path = self._driver._get_snapshot_path(self.snapshot) + '/*'
self._driver._maprfs_util._execute = mock.Mock(
return_value=('Found', 0))
self._driver._maprfs_util.set_volume_size = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value=self.cluster_name)
self._driver.api.get_share_metadata = mock.Mock(
return_value={'_fake': 'fake', 'fake2': 'fake2'})
mock_execute = self._driver._maprfs_util._execute
self._driver.create_share_from_snapshot(self._context, self.share,
self.snapshot)
mock_execute.assert_any_call(self.hadoop_bin, 'fs', '-cp', '-p',
snapthot_path, path)
mock_execute.assert_any_call(self.maprcli_bin, 'volume', 'create',
'-name',
self.share['name'], '-path', path,
'-quota', size_str, '-readAce', '',
'-writeAce', '', '-fake', 'fake')
def test_create_share_from_snapshot_wrong_tenant(self):
fake_snapshot = dict(self.snapshot)
fake_snapshot.update(share_instance={'share_id': 10})
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver._maprfs_util.set_volume_size = mock.Mock()
self._driver._maprfs_util.get_cluster_name = mock.Mock(
return_value=self.cluster_name)
def fake_meta(context, share):
return {'_tenantuser': 'fake'} if share['id'] == 10 else {}
self._driver.api.get_share_metadata = fake_meta
self.assertRaises(exception.MapRFSException,
self._driver.create_share_from_snapshot,
self._context, self.share, fake_snapshot)
def test_create_share_from_snapshot_exception(self):
fake_snapshot = dict(self.snapshot)
fake_snapshot.update(share_instance={'share_id': 10})
self._driver._maprfs_util._execute = mock.Mock(
return_value=('Found 0', 0))
self._driver._maprfs_util.maprfs_cp = mock.Mock(
side_effect=exception.ProcessExecutionError)
self._driver.api.get_share_metadata = mock.Mock(
return_value={'_tenantuser': 'fake'})
self.assertRaises(exception.MapRFSException,
self._driver.create_share_from_snapshot,
self._context, self.share, self.snapshot)
def test_delete_share(self):
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.delete_share(self._context, self.share)
self._driver._maprfs_util._execute.assert_called_once_with(
self.maprcli_bin, 'volume', 'remove', '-name', self.share['name'],
'-force', 'true', check_exit_code=False)
def test_delete_share_skip(self):
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.api.get_share_metadata = mock.Mock(
return_value={'_name': 'error'})
self._driver.delete_share(self._context, self.share)
self._driver._maprfs_util._execute.assert_not_called()
def test_delete_share_exception(self):
self._driver._maprfs_util._execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.MapRFSException, self._driver.delete_share,
self._context, self.share)
def test_delete_share_not_exist(self):
self._driver._maprfs_util._execute = mock.Mock(
return_value=('No such volume', 0))
self._driver.delete_share(self._context, self.share)
def test_create_snapshot(self):
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.create_snapshot(self._context, self.snapshot)
self._driver._maprfs_util._execute.assert_called_once_with(
self.maprcli_bin, 'volume', 'snapshot', 'create', '-snapshotname',
self.snapshot['name'], '-volume', volume)
def test_create_snapshot_exception(self):
self._driver._maprfs_util._execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.MapRFSException,
self._driver.create_snapshot, self._context,
self.snapshot)
def test_delete_snapshot(self):
volume = self._driver._volume_name(self.share['name'])
self._driver._maprfs_util._execute = mock.Mock(return_value=('', 0))
self._driver.delete_snapshot(self._context, self.snapshot)
self._driver._maprfs_util._execute.assert_called_once_with(
self.maprcli_bin, 'volume', 'snapshot', 'remove', '-snapshotname',
self.snapshot['name'], '-volume', volume, check_exit_code=False)
def test_delete_snapshot_exception(self):
self._driver._maprfs_util._execute = mock.Mock(
return_value=('ERROR (fake)', None))
self.assertRaises(exception.MapRFSException,
self._driver.delete_snapshot,
self._context, self.snapshot)
def test__execute(self):
hosts = ['192.168.1.0', '10.10.10.10', '11.11.11.11']
self._driver._maprfs_util.hosts += hosts
available_host = hosts[2]
# mutable container
done = [False]
def fake_ssh_run(host, cmd, check_exit_code):
if host == available_host:
done[0] = True
return '', 0
else:
raise Exception()
self._driver._maprfs_util._run_ssh = fake_ssh_run
self._driver._maprfs_util._execute('fake', 'cmd')
self.assertTrue(done[0])
self.assertEqual(available_host, self._driver._maprfs_util.hosts[0])
def test__execute_exeption(self):
utils.execute = mock.Mock(side_effect=Exception)
self.assertRaises(exception.ProcessExecutionError,
self._driver._maprfs_util._execute, "fake", "cmd")
def test__execute_native_exeption(self):
utils.execute = mock.Mock(
side_effect=exception.ProcessExecutionError(stdout='fake'))
self.assertRaises(exception.ProcessExecutionError,
self._driver._maprfs_util._execute, "fake", "cmd")
def test__execute_local(self):
utils.execute = mock.Mock(return_value=("fake", 0))
self._driver._maprfs_util._execute("fake", "cmd")
utils.execute.assert_called_once_with('sudo', 'su', '-',
'fake_sshname', '-c', 'fake cmd',
check_exit_code=True)
def test_share_shrink_error(self):
fake_info = {
'totalused': 1024,
'quota': 2024
}
self._driver._maprfs_util._execute = mock.Mock()
self._driver._maprfs_util.get_volume_info = mock.Mock(
return_value=fake_info)
self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
self._driver.shrink_share, self.share, 1)
def test__get_volume_info(self):
fake_out = """
{"data": [{"mounted":1,"quota":"1024","used":"0","totalused":"0"}]}
"""
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, 0))
result = self._driver._maprfs_util.get_volume_info('fake_name')
self.assertEqual('0', result['used'])
def test__get_volume_info_by_path(self):
fake_out = """
{"data": [{"mounted":1,"quota":"1024","used":"0","totalused":"0"}]}
"""
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, 0))
result = self._driver._maprfs_util.get_volume_info_by_path('fake_path')
self.assertEqual('0', result['used'])
def test__get_volume_info_by_path_not_exist(self):
fake_out = "No such volume"
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, 0))
result = self._driver._maprfs_util.get_volume_info_by_path(
'fake_path', check_if_exists=True)
self.assertIsNone(result)
def test_get_share_stats_refresh_false(self):
self._driver._stats = {'fake_key': 'fake_value'}
result = self._driver.get_share_stats(False)
self.assertEqual(self._driver._stats, result)
def test_get_share_stats_refresh_true(self):
self._driver._maprfs_util.fs_capacity = mock.Mock(
return_value=(1143554.0, 124111.0))
result = self._driver.get_share_stats(True)
expected_keys = [
'qos', 'driver_version', 'share_backend_name',
'free_capacity_gb', 'total_capacity_gb',
'driver_handles_share_servers',
'reserved_percentage', 'vendor_name', 'storage_protocol',
]
for key in expected_keys:
self.assertIn(key, result)
self.assertEqual('MAPRFS', result['storage_protocol'])
self._driver._maprfs_util.fs_capacity.assert_called_once_with()
def test_get_share_stats_refresh_exception(self):
self._driver._maprfs_util.fs_capacity = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.MapRFSException,
self._driver.get_share_stats, True)
def test__get_available_capacity(self):
fake_out = """Filesystem Size Used Available Use%
maprfs:/// 26367492096 1231028224 25136463872 5%
"""
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, ''))
total, free = self._driver._maprfs_util.fs_capacity()
self._driver._maprfs_util._execute.assert_called_once_with(
self.hadoop_bin, 'fs', '-df')
self.assertEqual(26367492096, total)
self.assertEqual(25136463872, free)
def test__get_available_capacity_exception(self):
fake_out = 'fake'
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, ''))
self.assertRaises(exception.ProcessExecutionError,
self._driver._maprfs_util.fs_capacity)
def test__get_snapshot_list(self):
fake_out = """{"data":[{"snapshotname":"fake-snapshot"}]}"""
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, None))
snapshot_list = self._driver._maprfs_util.get_snapshot_list(
volume_name='fake', volume_path='fake')
self.assertEqual(['fake-snapshot'], snapshot_list)
def test__cluster_name(self):
fake_info = """{
"data":[
{
"version":"fake",
"cluster":{
"name":"fake",
"secure":false,
"ip":"10.10.10.10",
"id":"7133813101868836065",
"nodesUsed":1,
"totalNodesAllowed":-1
}
}
]
}
"""
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_info, 0))
name = self._driver._maprfs_util.get_cluster_name()
self.assertEqual('fake', name)
def test__cluster_name_exception(self):
fake_info = 'fake'
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_info, 0))
self.assertRaises(exception.ProcessExecutionError,
self._driver._maprfs_util.get_cluster_name)
def test__run_ssh(self):
ssh_output = 'fake_ssh_output'
cmd_list = ['fake', 'cmd']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(return_value=False)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(return_value=ssh)
self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool))
self.mock_object(processutils, 'ssh_execute',
mock.Mock(return_value=ssh_output))
result = self._driver._maprfs_util._run_ssh(self.local_ip, cmd_list)
utils.SSHPool.assert_called_once_with(
self._driver.configuration.maprfs_clinode_ip[0],
self._driver.configuration.maprfs_ssh_port,
self._driver.configuration.ssh_conn_timeout,
self._driver.configuration.maprfs_ssh_name,
password=self._driver.configuration.maprfs_ssh_pw,
privatekey=self._driver.configuration.maprfs_ssh_private_key,
min_size=self._driver.configuration.ssh_min_pool_conn,
max_size=self._driver.configuration.ssh_max_pool_conn)
ssh_pool.create.assert_called()
ssh.get_transport().is_active.assert_called_once_with()
processutils.ssh_execute.assert_called_once_with(
ssh, 'fake cmd', check_exit_code=False)
self.assertEqual(ssh_output, result)
def test__run_ssh_exception(self):
cmd_list = ['fake', 'cmd']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(return_value=True)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(return_value=ssh)
self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool))
self.mock_object(processutils, 'ssh_execute', mock.Mock(
side_effect=exception.ProcessExecutionError))
self.assertRaises(exception.ProcessExecutionError,
self._driver._maprfs_util._run_ssh,
self.local_ip,
cmd_list)
utils.SSHPool.assert_called_once_with(
self._driver.configuration.maprfs_clinode_ip[0],
self._driver.configuration.maprfs_ssh_port,
self._driver.configuration.ssh_conn_timeout,
self._driver.configuration.maprfs_ssh_name,
password=self._driver.configuration.maprfs_ssh_pw,
privatekey=self._driver.configuration.maprfs_ssh_private_key,
min_size=self._driver.configuration.ssh_min_pool_conn,
max_size=self._driver.configuration.ssh_max_pool_conn)
ssh_pool.create.assert_called_once_with()
ssh.get_transport().is_active.assert_called_once_with()
processutils.ssh_execute.assert_called_once_with(
ssh, 'fake cmd', check_exit_code=False)
def test__share_dir(self):
self._driver._base_volume_dir = '/volumes'
share_dir = '/volumes/' + self.share['name']
actual_dir = self._driver._share_dir(self.share['name'])
self.assertEqual(share_dir, actual_dir)
def test__get_volume_name(self):
volume_name = self._driver._get_volume_name("fake", self.share)
self.assertEqual('share-0', volume_name)
def test__maprfs_du(self):
self._driver._maprfs_util._execute = mock.Mock(
return_value=('1024 /', 0))
size = self._driver._maprfs_util.maprfs_du('/')
self._driver._maprfs_util._execute.assert_called()
self.assertEqual(1024, size)
def test__maprfs_ls(self):
self._driver._maprfs_util._execute = mock.Mock(
return_value=('fake', 0))
self._driver._maprfs_util.maprfs_ls('/')
self._driver._maprfs_util._execute.assert_called_with(self.hadoop_bin,
'fs', '-ls', '/')
def test_rename_volume(self):
self._driver._maprfs_util._execute = mock.Mock(
return_value=('fake', 0))
self._driver._maprfs_util.rename_volume('fake', 'newfake')
self._driver._maprfs_util._execute.assert_called_with(self.maprcli_bin,
'volume',
'rename',
'-name', 'fake',
'-newname',
'newfake')
def test__run_as_user(self):
cmd = ['fake', 'cmd']
u_cmd = self._driver._maprfs_util._as_user(cmd, 'user')
self.assertEqual(['sudo', 'su', '-', 'user', '-c', 'fake cmd'], u_cmd)
def test__add_params(self):
params = {'p1': 1, 'p2': 2, 'p3': '3'}
cmd = ['fake', 'cmd']
cmd_with_params = self._driver._maprfs_util._add_params(cmd, **params)
self.assertEqual(cmd[:2], cmd_with_params[:2])
def test_get_network_allocations_number(self):
number = self._driver.get_admin_network_allocations_number()
self.assertEqual(0, number)
def test__user_exists(self):
fake_out = 'user:x:1000:1000::/opt/user:/bin/bash'
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, 0))
result = self._driver._maprfs_util.user_exists('user')
self.assertTrue(result)
def test__group_exists(self):
fake_out = 'user:x:1000:'
self._driver._maprfs_util._execute = mock.Mock(
return_value=(fake_out, 0))
result = self._driver._maprfs_util.group_exists('user')
self.assertTrue(result)

View File

@ -246,3 +246,7 @@ class ManageHDFSShareTest(ManageNFSShareTest):
class ManageCephFSShareTest(ManageNFSShareTest):
protocol = 'cephfs'
class ManageMapRFSShareTest(ManageNFSShareTest):
protocol = 'maprfs'

View File

@ -164,3 +164,7 @@ class ManageGLUSTERFSSnapshotTest(ManageNFSSnapshotTest):
class ManageHDFSSnapshotTest(ManageNFSSnapshotTest):
protocol = 'hdfs'
class ManageMapRFSSnapshotTest(ManageNFSSnapshotTest):
protocol = 'maprfs'

View File

@ -109,3 +109,7 @@ class ManageGLUSTERFSSnapshotNegativeTest(ManageNFSSnapshotNegativeTest):
class ManageHDFSSnapshotNegativeTest(ManageNFSSnapshotNegativeTest):
protocol = 'hdfs'
class ManageMapRFSSnapshotNegativeTest(ManageNFSSnapshotNegativeTest):
protocol = 'maprfs'

View File

@ -123,7 +123,7 @@ class BaseSharesTest(test.BaseTestCase):
credentials = ('primary', )
force_tenant_isolation = False
protocols = ["nfs", "cifs", "glusterfs", "hdfs", "cephfs"]
protocols = ["nfs", "cifs", "glusterfs", "hdfs", "cephfs", "maprfs"]
# Will be cleaned up in resource_cleanup
class_resources = []

View File

@ -229,3 +229,8 @@ class SharesHDFSTest(SharesNFSTest):
class SharesCephFSTest(SharesNFSTest):
"""Covers share functionality that is related to CEPHFS share type."""
protocol = "cephfs"
class SharesMapRFSTest(SharesNFSTest):
"""Covers share functionality that is related to MapRFS share type."""
protocol = "maprfs"

View File

@ -0,0 +1,3 @@
---
features:
- Added share backend drivers for MapR-FS.