diff --git a/doc/source/devref/share_back_ends_feature_support_mapping.rst b/doc/source/devref/share_back_ends_feature_support_mapping.rst
index 2963d49d39..f8209bb9aa 100644
--- a/doc/source/devref/share_back_ends_feature_support_mapping.rst
+++ b/doc/source/devref/share_back_ends_feature_support_mapping.rst
@@ -81,7 +81,8 @@ Mapping of share drivers and share features support
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+--------------------+
| MapRFS | O | O | O | O | O | O | O | \- |
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+--------------------+
-
+| QNAP | O | O | O | \- | O | O | O | \- |
++----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+--------------------+
Mapping of share drivers and share access rules support
-------------------------------------------------------
@@ -139,6 +140,8 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| MapRFS | \- | MapRFS(O) | \- | \- | \- | MapRFS(O) | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
+| QNAP | NFS (O) | \- | \- | \- | NFS (O) | \- | \- | \- |
++----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
Mapping of share drivers and security services support
------------------------------------------------------
@@ -194,7 +197,8 @@ Mapping of share drivers and security services support
+----------------------------------------+------------------+-----------------+------------------+
| MapRFS | \- | \- | \- |
+----------------------------------------+------------------+-----------------+------------------+
-
+| QNAP | \- | \- | \- |
++----------------------------------------+------------------+-----------------+------------------+
Mapping of share drivers and common capabilities
------------------------------------------------
@@ -252,6 +256,8 @@ More information: :ref:`capabilities_and_extra_specs`
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+
| MapRFS | \- | N | \- | \- | \- | N | \- | O | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+
+| QNAP | \- | O | \- | \- | O | \- | \- | O | \- |
++----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+
.. note::
diff --git a/manila/opts.py b/manila/opts.py
index 4366cf3549..f90a405357 100644
--- a/manila/opts.py
+++ b/manila/opts.py
@@ -70,6 +70,7 @@ import manila.share.drivers.lvm
import manila.share.drivers.maprfs.maprfs_native
import manila.share.drivers.netapp.options
import manila.share.drivers.nexenta.options
+import manila.share.drivers.qnap.qnap
import manila.share.drivers.quobyte.quobyte
import manila.share.drivers.service_instance
import manila.share.drivers.tegile.tegile
@@ -147,6 +148,7 @@ _global_opt_lists = [
manila.share.drivers.nexenta.options.nexenta_connection_opts,
manila.share.drivers.nexenta.options.nexenta_dataset_opts,
manila.share.drivers.nexenta.options.nexenta_nfs_opts,
+ manila.share.drivers.qnap.qnap.qnap_manila_opts,
manila.share.drivers.quobyte.quobyte.quobyte_manila_share_opts,
manila.share.drivers.service_instance.common_opts,
manila.share.drivers.service_instance.no_share_servers_handling_mode_opts,
diff --git a/manila/share/drivers/qnap/__init__.py b/manila/share/drivers/qnap/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/manila/share/drivers/qnap/api.py b/manila/share/drivers/qnap/api.py
new file mode 100644
index 0000000000..40cad06e0a
--- /dev/null
+++ b/manila/share/drivers/qnap/api.py
@@ -0,0 +1,646 @@
+# Copyright (c) 2016 QNAP Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+API for QNAP Storage.
+"""
+import base64
+import functools
+import re
+import ssl
+
+try:
+ import xml.etree.cElementTree as ET
+except ImportError:
+ import xml.etree.ElementTree as ET
+
+from oslo_log import log as logging
+import six
+from six.moves import http_client
+from six.moves import urllib
+
+from manila import exception
+from manila.i18n import _
+from manila import utils
+
+LOG = logging.getLogger(__name__)
+MSG_SESSION_EXPIRED = _("Session ID expired")
+MSG_UNEXPECT_RESP = _("Unexpected response from QNAP API")
+
+
+@utils.retry(exception=exception.ShareBackendException,
+ retries=5)
+def _connection_checker(func):
+ """Decorator to check session has expired or not."""
+ @functools.wraps(func)
+ def inner_connection_checker(self, *args, **kwargs):
+ LOG.debug('in _connection_checker')
+ pattern = re.compile(r".*Session ID expired.$")
+ try:
+ return func(self, *args, **kwargs)
+ except exception.ShareBackendException as e:
+ matches = pattern.match(six.text_type(e))
+ if matches:
+ LOG.debug('Session might have expired.'
+ ' Trying to relogin')
+ self._login()
+ raise
+ return inner_connection_checker
+
+
+class QnapAPIExecutor(object):
+ """Makes QNAP API calls for ES NAS."""
+
+ def __init__(self, *args, **kwargs):
+ self.sid = None
+ self.username = kwargs['username']
+ self.password = kwargs['password']
+ self.ip, self.port, self.ssl = (
+ self._parse_management_url(kwargs['management_url']))
+ self._login()
+
+ def _parse_management_url(self, management_url):
+ pattern = re.compile(r"(http|https)\:\/\/(\S+)\:(\d+)")
+ matches = pattern.match(management_url)
+ if matches.group(1) == 'http':
+ management_ssl = False
+ else:
+ management_ssl = True
+ management_ip = matches.group(2)
+ management_port = matches.group(3)
+ return management_ip, management_port, management_ssl
+
+ def _prepare_connection(self, isSSL, ip, port):
+ if isSSL:
+ if hasattr(ssl, '_create_unverified_context'):
+ context = ssl._create_unverified_context()
+ connection = http_client.HTTPSConnection(ip,
+ port=port,
+ context=context)
+ else:
+ connection = http_client.HTTPSConnection(ip,
+ port=port)
+ else:
+ connection = http_client.HTTPConnection(ip, port)
+ return connection
+
+ def get_basic_info(self, management_url):
+ """Get the basic information of NAS."""
+ LOG.debug('in get_basic_info')
+ management_ip, management_port, management_ssl = (
+ self._parse_management_url(management_url))
+ connection = self._prepare_connection(management_ssl,
+ management_ip,
+ management_port)
+
+ connection.request('GET', '/cgi-bin/authLogin.cgi')
+ response = connection.getresponse()
+ data = response.read()
+ LOG.debug('response data: %s', data)
+
+ root = ET.fromstring(data)
+
+ display_model_name = root.find('model/displayModelName').text
+ internal_model_name = root.find('model/internalModelName').text
+ fw_version = root.find('firmware/version').text
+
+ connection.close()
+ return display_model_name, internal_model_name, fw_version
+
+ def _execute_and_get_response_details(self, nas_ip, url):
+ """Will prepare response after executing a http request."""
+ LOG.debug('port: %(port)s, ssl: %(ssl)s',
+ {'port': self.port, 'ssl': self.ssl})
+
+ res_details = {}
+
+ # Prepare the connection
+ connection = self._prepare_connection(self.ssl,
+ nas_ip,
+ self.port)
+
+ # Make the connection
+ LOG.debug('url : %s', url)
+ connection.request('GET', url)
+ # Extract the response as the connection was successful
+ response = connection.getresponse()
+ # Read the response
+ data = response.read()
+ LOG.debug('response data: %s', data)
+
+ res_details['data'] = data
+ res_details['error'] = None
+ res_details['http_status'] = response.status
+
+ connection.close()
+ return res_details
+
+ def execute_login(self):
+ """Login and return sid."""
+ params = {
+ 'user': self.username,
+ 'pwd': base64.b64encode(self.password.encode("utf-8")),
+ 'serviceKey': '1',
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/authLogin.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+
+ session_id = root.find('authSid').text
+ return session_id
+
+ def _login(self):
+ """Execute Https Login API."""
+ self.sid = self.execute_login()
+ LOG.debug('sid: %s', self.sid)
+
+ def _sanitize_params(self, params):
+ sanitized_params = {}
+ for key in params:
+ value = params[key]
+ if value is not None:
+ sanitized_params[key] = six.text_type(value)
+ return sanitized_params
+
+ @_connection_checker
+ def create_share(self, share, pool_name, create_share_name, share_proto):
+ """Create share."""
+ LOG.debug('create_share_name: %s', create_share_name)
+
+ params = {
+ 'wiz_func': 'share_create',
+ 'action': 'add_share',
+ 'vol_name': create_share_name,
+ 'vol_size': six.text_type(share['size']) + 'GB',
+ 'threshold': '80',
+ 'dedup': 'off',
+ 'compression': '1',
+ 'thin_pro': '0',
+ 'cache': '0',
+ 'cifs_enable': '0' if share_proto == 'NFS' else '1',
+ 'nfs_enable': '0' if share_proto == 'CIFS' else '1',
+ 'afp_enable': '0',
+ 'ftp_enable': '0',
+ 'encryption': '0',
+ 'hidden': '0',
+ 'oplocks': '1',
+ 'sync': 'always',
+ 'userrw0': 'admin',
+ 'userrd_len': '0',
+ 'userrw_len': '1',
+ 'userno_len': '0',
+ 'access_r': 'setup_users',
+ 'path_type': 'auto',
+ 'recycle_bin': '1',
+ 'recycle_bin_administrators_only': '0',
+ 'pool_name': pool_name,
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/wizReq.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('ES_RET_CODE').text < '0':
+ msg = _('Create share %s failed') % share['display_name']
+ raise exception.ShareBackendException(msg=msg)
+
+ vol_list = root.find('func').find('ownContent').find('volumeList')
+ vol_info_tree = vol_list.findall('volume')
+ for vol in vol_info_tree:
+ LOG.debug('Iterating vol name: %(name)s, index: %(id)s',
+ {'name': vol.find('volumeLabel').text,
+ 'id': vol.find('volumeValue').text})
+ if (create_share_name == vol.find('volumeLabel').text):
+ LOG.debug('volumeLabel:%s', vol.find('volumeLabel').text)
+ return vol.find('volumeValue').text
+
+ return res_details['data']
+
+ @_connection_checker
+ def delete_share(self, vol_id, *args, **kwargs):
+ """Execute delete share API."""
+ params = {
+ 'func': 'volume_mgmt',
+ 'vol_remove': '1',
+ 'volumeID': vol_id,
+ 'stop_service': 'no',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ msg = _('Delete share id: %s failed') % vol_id
+ raise exception.ShareBackendException(msg=msg)
+
+ @_connection_checker
+ def get_specific_poolinfo(self, pool_id):
+ """Execute get_specific_poolinfo API."""
+ params = {
+ 'store': 'poolInfo',
+ 'func': 'extra_get',
+ 'poolID': pool_id,
+ 'Pool_Info': '1',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ msg = _('get_specific_poolinfo failed')
+ raise exception.ShareBackendException(msg=msg)
+
+ pool_list = root.find('Pool_Index')
+ pool_info_tree = pool_list.findall('row')
+ for pool in pool_info_tree:
+ if pool_id == pool.find('poolID').text:
+ LOG.debug('poolID: %s', pool.find('poolID').text)
+ return pool
+
+ @_connection_checker
+ def get_share_info(self, pool_id, **kwargs):
+ """Execute get_share_info API."""
+ for key, value in six.iteritems(kwargs):
+ LOG.debug('%(key)s = %(val)s',
+ {'key': key, 'val': value})
+
+ params = {
+ 'store': 'poolVolumeList',
+ 'poolID': pool_id,
+ 'func': 'extra_get',
+ 'Pool_Vol_Info': '1',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+
+ if ('vol_no' in kwargs) or ('vol_label' in kwargs):
+ vol_list = root.find('Volume_Info')
+ vol_info_tree = vol_list.findall('row')
+ for vol in vol_info_tree:
+ LOG.debug('Iterating vol name: %(name)s, index: %(id)s',
+ {'name': vol.find('vol_label').text,
+ 'id': vol.find('vol_no').text})
+ if 'vol_no' in kwargs:
+ if kwargs['vol_no'] == vol.find('vol_no').text:
+ LOG.debug('vol_no:%s',
+ vol.find('vol_no').text)
+ return vol
+ elif 'vol_label' in kwargs:
+ if kwargs['vol_label'] == vol.find('vol_label').text:
+ LOG.debug('vol_label:%s', vol.find('vol_label').text)
+ return vol
+ if vol is vol_info_tree[-1]:
+ return None
+ else:
+ return res_details['data']
+
+ @_connection_checker
+ def get_specific_volinfo(self, vol_id, **kwargs):
+ """Execute get_specific_volinfo API."""
+ params = {
+ 'store': 'volumeInfo',
+ 'volumeID': vol_id,
+ 'func': 'extra_get',
+ 'Volume_Info': '1',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+
+ vol_list = root.find('Volume_Info')
+ vol_info_tree = vol_list.findall('row')
+ for vol in vol_info_tree:
+ if vol_id == vol.find('vol_no').text:
+ LOG.debug('vol_no: %s', vol.find('vol_no').text)
+ return vol
+
+ @_connection_checker
+ def get_snapshot_info(self, **kwargs):
+ """Execute get_snapshot_info API."""
+ params = {
+ 'func': 'extra_get',
+ 'volumeID': kwargs['volID'],
+ 'snapshot_list': '1',
+ 'snap_start': '0',
+ 'snap_count': '100',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
+
+ snapshot_list = root.find('SnapshotList')
+ # if snapshot_list is None:
+ if not snapshot_list:
+ return None
+ if ('snapshot_name' in kwargs):
+ snapshot_tree = snapshot_list.findall('row')
+ for snapshot in snapshot_tree:
+ if (kwargs['snapshot_name'] ==
+ snapshot.find('snapshot_name').text):
+ LOG.debug('snapshot_name:%s', kwargs['snapshot_name'])
+ return snapshot
+ if (snapshot is snapshot_tree[-1]):
+ return None
+
+ return res_details['data']
+
+ @_connection_checker
+ def create_snapshot_api(self, volumeID, snapshot_name):
+ """Execute CGI to create snapshot from source share."""
+ LOG.debug('volumeID: %s', volumeID)
+ LOG.debug('snapshot_name: %s', snapshot_name)
+
+ params = {
+ 'func': 'create_snapshot',
+ 'volumeID': volumeID,
+ 'snapshot_name': snapshot_name,
+ 'expire_min': '0',
+ 'vital': '1',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('ES_RET_CODE').text < '0':
+ msg = _('Create snapshot failed')
+ raise exception.ShareBackendException(msg=msg)
+
+ @_connection_checker
+ def delete_snapshot_api(self, snapshot_id):
+ """Execute CGI to delete snapshot from snapshot_id."""
+ params = {
+ 'func': 'del_snapshots',
+ 'snapshotID': snapshot_id,
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ # snapshot not exist
+ if root.find('result').text == '-206021':
+ return
+ # lun not exist
+ if root.find('result').text == '-200005':
+ return
+ if root.find('result').text < '0':
+ msg = _('Failed to delete snapshot.')
+ raise exception.ShareBackendException(msg=msg)
+
+ @_connection_checker
+ def clone_snapshot(self, snapshot_id, new_sharename):
+ """Execute CGI to clone snapshot as share."""
+ params = {
+ 'func': 'clone_qsnapshot',
+ 'by_vol': '1',
+ 'snapshotID': snapshot_id,
+ 'new_name': new_sharename,
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ msg = _('Failed to clone snapshot.')
+ raise exception.ShareBackendException(msg=msg)
+
+ @_connection_checker
+ def edit_share(self, share_dict):
+ """Edit share properties."""
+ LOG.debug('share_dict[sharename]: %s', share_dict['sharename'])
+
+ params = {
+ 'wiz_func': 'share_property',
+ 'action': 'share_property',
+ 'sharename': share_dict['sharename'],
+ 'old_sharename': share_dict['old_sharename'],
+ 'vol_size': six.text_type(share_dict['new_size']) + 'GB',
+ 'dedup': '0',
+ 'compression': '1',
+ 'thin_pro': '0',
+ 'cache': '0',
+ 'afp_enable': '0',
+ 'ftp_enable': '1',
+ 'hidden': '0',
+ 'oplocks': '1',
+ 'sync': 'always',
+ 'recycle_bin': '1',
+ 'recycle_bin_administrators_only': '0',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('ES_RET_CODE').text < '0':
+ msg = _('Edit sharename %s failed') % share_dict['sharename']
+ raise exception.ShareBackendException(msg=msg)
+
+ @_connection_checker
+ def get_host_list(self, **kwargs):
+ """Execute get_host_list API."""
+ params = {
+ 'module': 'hosts',
+ 'func': 'get_hostlist',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
+ sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
+
+ host_list = root.find('content').find('host_list')
+ # if host_list is None:
+ if not host_list:
+ return None
+
+ return_hosts = []
+ host_tree = host_list.findall('host')
+ for host in host_tree:
+ LOG.debug('host:%s', host)
+ return_hosts.append(host)
+
+ return return_hosts
+
+ @_connection_checker
+ def add_host(self, hostname, ipv4):
+ """Execute add_host API."""
+ params = {
+ 'module': 'hosts',
+ 'func': 'apply_addhost',
+ 'name': hostname,
+ 'ipaddr_v4': ipv4,
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
+ sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
+
+ @_connection_checker
+ def set_nfs_access(self, sharename, access, host_name):
+ """Execute set_nfs_access API."""
+ params = {
+ 'wiz_func': 'share_nfs_control',
+ 'action': 'share_nfs_control',
+ 'sharename': sharename,
+ 'access': access,
+ 'host_name': host_name,
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
+
+
+class QnapAPIExecutorTS(QnapAPIExecutor):
+ """Makes QNAP API calls for TS NAS."""
+
+ @_connection_checker
+ def get_snapshot_info(self, **kwargs):
+ """Execute get_snapshot_info API."""
+ for key, value in six.iteritems(kwargs):
+ LOG.debug('%(key)s = %(val)s',
+ {'key': key, 'val': value})
+
+ params = {
+ 'func': 'extra_get',
+ 'LUNIndex': kwargs['lun_index'],
+ 'smb_snapshot_list': '1',
+ 'smb_snapshot': '1',
+ 'snapshot_list': '1',
+ 'sid': self.sid,
+ }
+ sanitized_params = self._sanitize_params(params)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ res_details = self._execute_and_get_response_details(self.ip, url)
+ root = ET.fromstring(res_details['data'])
+ if root.find('authPassed').text == '0':
+ raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
+ if root.find('result').text < '0':
+ raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
+
+ snapshot_list = root.find('SnapshotList')
+ if snapshot_list is None:
+ return None
+ snapshot_tree = snapshot_list.findall('row')
+ for snapshot in snapshot_tree:
+ if (kwargs['snapshot_name'] ==
+ snapshot.find('snapshot_name').text):
+ LOG.debug('snapshot_name:%s', kwargs['snapshot_name'])
+ return snapshot
+
+ return None
diff --git a/manila/share/drivers/qnap/qnap.py b/manila/share/drivers/qnap/qnap.py
new file mode 100644
index 0000000000..d85bf9c8c0
--- /dev/null
+++ b/manila/share/drivers/qnap/qnap.py
@@ -0,0 +1,707 @@
+# Copyright (c) 2016 QNAP Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Share driver for QNAP Storage.
+This driver supports QNAP Storage for NFS.
+"""
+import re
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_utils import timeutils
+from oslo_utils import units
+
+from manila.common import constants
+from manila import exception
+from manila import share
+from manila.i18n import _, _LE, _LI, _LW
+from manila.share import driver
+from manila.share.drivers.qnap import api
+from manila import utils
+
+LOG = logging.getLogger(__name__)
+
+qnap_manila_opts = [
+ cfg.StrOpt('qnap_management_url',
+ required=True,
+ help='The URL to manage QNAP Storage.'),
+ cfg.StrOpt('qnap_share_ip',
+ required=True,
+ help='NAS share IP for mounting shares.'),
+ cfg.StrOpt('qnap_nas_login',
+ required=True,
+ help='Username for QNAP storage.'),
+ cfg.StrOpt('qnap_nas_password',
+ required=True,
+ secret=True,
+ help='Password for QNAP storage.'),
+ cfg.StrOpt('qnap_poolname',
+ required=True,
+ help='Pool within which QNAP shares must be created.'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(qnap_manila_opts)
+
+
+class QnapShareDriver(driver.ShareDriver):
+ """OpenStack driver to enable QNAP Storage.
+
+ Version history:
+ 1.0.0 - Initial driver (Only NFS)
+ """
+
+ DRIVER_VERSION = '1.0.0'
+
+ def __init__(self, *args, **kwargs):
+ """Initialize QnapShareDriver."""
+ super(QnapShareDriver, self).__init__(False, *args, **kwargs)
+ self.private_storage = kwargs.get('private_storage')
+ self.api_executor = None
+ self.group_stats = {}
+ self.configuration.append_config_values(qnap_manila_opts)
+ self.share_api = share.API()
+
+ def do_setup(self, context):
+ """Setup the QNAP Manila share driver."""
+ self.ctxt = context
+ LOG.debug('context: %s', context)
+
+ # Setup API Executor
+ try:
+ self.api_executor = self._create_api_executor()
+ except Exception:
+ LOG.exception(_LE('Failed to create HTTP client. Check IP '
+ 'address, port, username, password and make '
+ 'sure the array version is compatible.'))
+ raise
+
+ def check_for_setup_error(self):
+ """Check the status of setup."""
+ if self.api_executor is None:
+ msg = _("Failed to instantiate API client to communicate with "
+ "QNAP storage systems.")
+ raise exception.ShareBackendException(msg=msg)
+
+ def _create_api_executor(self):
+ """Create API executor by NAS model."""
+ """LOG.debug('CONF.qnap_nas_login=%(conf)s',
+ {'conf': CONF.qnap_nas_login})
+ LOG.debug('self.configuration.qnap_nas_login=%(conf)s',
+ {'conf': self.configuration.qnap_nas_login})"""
+ self.api_executor = api.QnapAPIExecutor(
+ username=self.configuration.qnap_nas_login,
+ password=self.configuration.qnap_nas_password,
+ management_url=self.configuration.qnap_management_url)
+
+ display_model_name, internal_model_name, fw_version = (
+ self.api_executor.get_basic_info(
+ self.configuration.qnap_management_url))
+
+ pattern = re.compile(r"^([A-Z]+)-?[A-Z]{0,2}(\d+)\d{2}(U|[a-z]*)")
+ matches = pattern.match(display_model_name)
+
+ if not matches:
+ return None
+ model_type = matches.group(1)
+
+ ts_model_types = (
+ "TS", "SS", "IS", "TVS", "TDS", "TBS"
+ )
+ tes_model_types = (
+ "TES",
+ )
+ es_model_types = (
+ "ES",
+ )
+
+ if model_type in ts_model_types:
+ if (fw_version.startswith("4.2") or fw_version.startswith("4.3")):
+ LOG.debug('Create TS API Executor')
+ # modify the pool name to pool index
+ self.configuration.qnap_poolname = (
+ self._get_ts_model_pool_id(
+ self.configuration.qnap_poolname))
+
+ return api.QnapAPIExecutorTS(
+ username=self.configuration.qnap_nas_login,
+ password=self.configuration.qnap_nas_password,
+ management_url=self.configuration.qnap_management_url)
+ elif model_type in tes_model_types:
+ if 'TS' in internal_model_name:
+ if (fw_version.startswith("4.2") or
+ fw_version.startswith("4.3")):
+ LOG.debug('Create TS API Executor')
+ # modify the pool name to pool index
+ self.configuration.qnap_poolname = (
+ self._get_ts_model_pool_id(
+ self.configuration.qnap_poolname))
+ return api.QnapAPIExecutorTS(
+ username=self.configuration.qnap_nas_login,
+ password=self.configuration.qnap_nas_password,
+ management_url=self.configuration.qnap_management_url)
+
+ if (fw_version.startswith("1.1.2") or
+ fw_version.startswith("1.1.3")):
+ LOG.debug('Create ES API Executor')
+ return api.QnapAPIExecutor(
+ username=self.configuration.qnap_nas_login,
+ password=self.configuration.qnap_nas_password,
+ management_url=self.configuration.qnap_management_url)
+ elif model_type in es_model_types:
+ if (fw_version.startswith("1.1.2") or
+ fw_version.startswith("1.1.3")):
+ LOG.debug('Create ES API Executor')
+ return api.QnapAPIExecutor(
+ username=self.configuration.qnap_nas_login,
+ password=self.configuration.qnap_nas_password,
+ management_url=self.configuration.qnap_management_url)
+
+ msg = _('QNAP Storage model is not supported by this driver.')
+ raise exception.ShareBackendException(msg=msg)
+
+ def _get_ts_model_pool_id(self, pool_name):
+ """Modify the pool name to pool index."""
+ pattern = re.compile(r"^(\d+)+|^Storage Pool (\d+)+")
+ matches = pattern.match(pool_name)
+ if matches.group(1):
+ return matches.group(1)
+ else:
+ return matches.group(2)
+
+ @utils.synchronized('qnap-gen_name')
+ def _gen_random_name(self, type):
+ if type == 'share':
+ infix = "shr-"
+ elif type == 'snapshot':
+ infix = "snp-"
+ elif type == 'host':
+ infix = "hst-"
+ else:
+ infix = ""
+ return ("manila-%(ifx)s%(time)s" %
+ {'ifx': infix,
+ 'time': timeutils.utcnow().strftime('%Y%m%d%H%M%S%f')})
+
+ def _get_location_path(self, share_name, share_proto, ip):
+ if share_proto == 'NFS':
+ created_share = self.api_executor.get_share_info(
+ self.configuration.qnap_poolname,
+ vol_label=share_name)
+ vol_no = created_share.find('vol_no').text
+ vol = self.api_executor.get_specific_volinfo(vol_no)
+ vol_mount_path = vol.find('vol_mount_path').text
+
+ location = '%s:%s' % (ip, vol_mount_path)
+ else:
+ msg = _('Invalid NAS protocol: %s') % share_proto
+ raise exception.InvalidInput(reason=msg)
+
+ export_location = {
+ 'path': location,
+ 'is_admin_only': False,
+ }
+ return export_location
+
+ def _update_share_stats(self):
+ """Get latest share stats."""
+ backend_name = (self.configuration.safe_get(
+ 'share_backend_name') or
+ self.__class__.__name__)
+ LOG.debug('backend_name=%(backend_name)s',
+ {'backend_name': backend_name})
+
+ selected_pool = self.api_executor.get_specific_poolinfo(
+ self.configuration.qnap_poolname)
+ total_capacity_gb = (int(selected_pool.find('capacity_bytes').text) /
+ units.Gi)
+ LOG.debug('total_capacity_gb: %s GB', total_capacity_gb)
+ free_capacity_gb = (int(selected_pool.find('freesize_bytes').text) /
+ units.Gi)
+ LOG.debug('free_capacity_gb: %s GB', free_capacity_gb)
+ alloc_capacity_gb = (int(selected_pool.find('allocated_bytes').text) /
+ units.Gi)
+ LOG.debug('allocated_capacity_gb: %s GB', alloc_capacity_gb)
+
+ reserved_percentage = self.configuration.safe_get(
+ 'reserved_share_percentage')
+
+ # single pool now, need support multiple pools in the future
+ single_pool = {
+ "pool_name": self.configuration.qnap_poolname,
+ "total_capacity_gb": total_capacity_gb,
+ "free_capacity_gb": free_capacity_gb,
+ "allocated_capacity_gb": alloc_capacity_gb,
+ "reserved_percentage": reserved_percentage,
+ "qos": False,
+ }
+
+ data = {
+ "share_backend_name": backend_name,
+ "vendor_name": "QNAP",
+ "driver_version": self.DRIVER_VERSION,
+ "storage_protocol": "NFS",
+ "snapshot_support": True,
+ "create_share_from_snapshot_support": True,
+ "driver_handles_share_servers": self.configuration.safe_get(
+ 'driver_handles_share_servers'),
+ 'pools': [single_pool],
+ }
+ super(self.__class__, self)._update_share_stats(data)
+
+ @utils.retry(exception=exception.ShareBackendException,
+ interval=3,
+ retries=200)
+ def create_share(self, context, share, share_server=None):
+ """Create a new share."""
+ LOG.debug('share: %s', share.__dict__)
+
+ share_proto = share['share_proto']
+
+ # User could create two shares with the same name on horizon.
+ # Therefore, we should not use displayname to create shares on NAS.
+ create_share_name = self._gen_random_name("share")
+ # If share name exists, need to change to another name.
+ created_share = self.api_executor.get_share_info(
+ self.configuration.qnap_poolname,
+ vol_label=create_share_name)
+
+ if created_share is not None:
+ msg = _("Failed to create an unused share name.")
+ raise exception.ShareBackendException(msg=msg)
+
+ create_volID = self.api_executor.create_share(
+ share,
+ self.configuration.qnap_poolname,
+ create_share_name,
+ share_proto)
+
+ # Use private_storage to record volume ID and Name created in the NAS.
+ _metadata = {'volID': create_volID, 'volName': create_share_name}
+ self.private_storage.update(share['id'], _metadata)
+
+ return self._get_location_path(create_share_name,
+ share['share_proto'],
+ self.configuration.qnap_share_ip)
+
+ def delete_share(self, context, share, share_server=None):
+ """Delete the specified share."""
+ # Use private_storage to retreive volume ID created in the NAS.
+ volID = self.private_storage.get(share['id'], 'volID')
+ if not volID:
+ LOG.warning(_LW('volID for Share %s does not exist'), share['id'])
+ return
+ LOG.debug('volID: %s', volID)
+
+ del_share = self.api_executor.get_share_info(
+ self.configuration.qnap_poolname,
+ vol_no=volID)
+ if del_share is None:
+ LOG.warning(_LW('Share %s does not exist'), share['id'])
+ return
+
+ vol_no = del_share.find('vol_no').text
+
+ self.api_executor.delete_share(vol_no)
+ self.private_storage.delete(share['id'])
+
+ def extend_share(self, share, new_size, share_server=None):
+ """Extend an existing share."""
+ LOG.debug('Entering extend_share share=%(share)s '
+ 'new_size=%(size)s',
+ {'share': share['display_name'], 'size': new_size})
+
+ # Use private_storage to retrieve volume Name created in the NAS.
+ volName = self.private_storage.get(share['id'], 'volName')
+ if not volName:
+ LOG.debug('Share %s does not exist', share['id'])
+ raise exception.ShareResourceNotFound(share_id=share['id'])
+ LOG.debug('volName: %s', volName)
+
+ share_dict = {
+ "sharename": volName,
+ "old_sharename": volName,
+ "new_size": new_size,
+ }
+ self.api_executor.edit_share(share_dict)
+
+ @utils.retry(exception=exception.ShareBackendException,
+ interval=3,
+ retries=200)
+ def create_snapshot(self, context, snapshot, share_server=None):
+ """Create a snapshot."""
+ LOG.debug('snapshot[share][share_id]: %s',
+ snapshot['share']['share_id'])
+ LOG.debug('snapshot id: %s', snapshot['id'])
+
+ # Use private_storage to retrieve volume ID created in the NAS.
+ volID = self.private_storage.get(snapshot['share']['id'], 'volID')
+ if not volID:
+ LOG.warning(
+ _LW('volID for Share %s does not exist'),
+ snapshot['share']['id'])
+ raise exception.ShareResourceNotFound(
+ share_id=snapshot['share']['id'])
+ LOG.debug('volID: %s', volID)
+
+ # User could create two snapshot with the same name on horizon.
+ # Therefore, we should not use displayname to create snapshot on NAS.
+
+ # if snapshot exist, need to change another
+ create_snapshot_name = self._gen_random_name("snapshot")
+ LOG.debug('create_snapshot_name: %s', create_snapshot_name)
+ check_snapshot = self.api_executor.get_snapshot_info(
+ volID=volID, snapshot_name=create_snapshot_name)
+ if check_snapshot is not None:
+ msg = _("Failed to create an unused snapshot name.")
+ raise exception.ShareBackendException(msg=msg)
+
+ LOG.debug('create_snapshot_name: %s', create_snapshot_name)
+ self.api_executor.create_snapshot_api(volID, create_snapshot_name)
+
+ snapshot_id = ""
+ created_snapshot = self.api_executor.get_snapshot_info(
+ volID=volID, snapshot_name=create_snapshot_name)
+ if created_snapshot is not None:
+ snapshot_id = created_snapshot.find('snapshot_id').text
+ else:
+ msg = _("Failed to get snapshot information.")
+ raise exception.ShareBackendException(msg=msg)
+
+ LOG.debug('created_snapshot: %s', created_snapshot)
+ LOG.debug('snapshot_id: %s', snapshot_id)
+
+ # Use private_storage to record data instead of metadata.
+ _metadata = {'snapshot_id': snapshot_id}
+ self.private_storage.update(snapshot['id'], _metadata)
+
+ # Test to get value from private_storage.
+ snapshot_id = self.private_storage.get(snapshot['id'], 'snapshot_id')
+ LOG.debug('snapshot_id: %s', snapshot_id)
+
+ return {'provider_location': snapshot_id}
+
+ def delete_snapshot(self, context, snapshot, share_server=None):
+ """Delete a snapshot."""
+ LOG.debug('Entering delete_snapshot. The deleted snapshot=%(snap)s',
+ {'snap': snapshot['id']})
+
+ snapshot_id = (snapshot.get('provider_location') or
+ self.private_storage.get(snapshot['id'], 'snapshot_id'))
+ if not snapshot_id:
+ LOG.warning(_LW('Snapshot %s does not exist'), snapshot['id'])
+ return
+ LOG.debug('snapshot_id: %s', snapshot_id)
+
+ self.api_executor.delete_snapshot_api(snapshot_id)
+ self.private_storage.delete(snapshot['id'])
+
+ @utils.retry(exception=exception.ShareBackendException,
+ interval=3,
+ retries=200)
+ def create_share_from_snapshot(self, context, share, snapshot,
+ share_server=None):
+ """Create a share from a snapshot."""
+ LOG.debug('Entering create_share_from_snapshot. The source '
+ 'snapshot=%(snap)s. The created share=%(share)s',
+ {'snap': snapshot['id'], 'share': share['id']})
+
+ snapshot_id = (snapshot.get('provider_location') or
+ self.private_storage.get(snapshot['id'], 'snapshot_id'))
+ if not snapshot_id:
+ LOG.warning(_LW('Snapshot %s does not exist'), snapshot['id'])
+ raise exception.SnapshotResourceNotFound(name=snapshot['id'])
+ LOG.debug('snapshot_id: %s', snapshot_id)
+
+ create_share_name = self._gen_random_name("share")
+ # if sharename exist, need to change another
+ created_share = self.api_executor.get_share_info(
+ self.configuration.qnap_poolname,
+ vol_label=create_share_name)
+
+ if created_share is not None:
+ msg = _("Failed to create an unused share name.")
+ raise exception.ShareBackendException(msg=msg)
+
+ self.api_executor.clone_snapshot(snapshot_id, create_share_name)
+
+ create_volID = ""
+ created_share = self.api_executor.get_share_info(
+ self.configuration.qnap_poolname,
+ vol_label=create_share_name)
+ if created_share.find('vol_no') is not None:
+ create_volID = created_share.find('vol_no').text
+ else:
+ msg = _("Failed to clone a snapshot in time.")
+ raise exception.ShareBackendException(msg=msg)
+
+ snap_share = self.share_api.get(context,
+ snapshot['share_instance']['share_id'])
+ LOG.debug('snap_share[size]: %s', snap_share['size'])
+
+ if (share['size'] > snap_share['size']):
+ share_dict = {'sharename': create_share_name,
+ 'old_sharename': create_share_name,
+ 'new_size': share['size']}
+ self.api_executor.edit_share(share_dict)
+
+ # Use private_storage to record volume ID and Name created in the NAS.
+ _metadata = {
+ 'volID': create_volID,
+ 'volName': create_share_name,
+ }
+ self.private_storage.update(share['id'], _metadata)
+
+ # Test to get value from private_storage.
+ volName = self.private_storage.get(share['id'], 'volName')
+ LOG.debug('volName: %s', volName)
+
+ return self._get_location_path(create_share_name,
+ share['share_proto'],
+ self.configuration.qnap_share_ip)
+
+ def _get_manila_hostIPv4s(self, hostlist):
+ host_dict_IPs = []
+ if hostlist is None:
+ return host_dict_IPs
+ for host in hostlist:
+ # Check host alias name with prefix "manila-hst-" to verify this
+ # host is created/managed by Manila or not.
+ if (re.match("^manila-hst-[0-9]+$", host.find('name').text)
+ is not None):
+ LOG.debug('host netaddrs text: %s', host.find('netaddrs').text)
+ if host.find('netaddrs').text is not None:
+ # Because Manila supports only IPv4 now, check "netaddrs"
+ # have "ipv4" tag to verify this host is created/managed
+ # by Manila or not.
+ if host.find('netaddrs/ipv4').text is not None:
+ host_dict = {
+ 'index': host.find('index').text,
+ 'hostid': host.find('hostid').text,
+ 'name': host.find('name').text,
+ 'netaddrs': host.find('netaddrs').find('ipv4').text
+ }
+ host_dict_IPs.append(host_dict)
+ return host_dict_IPs
+
+ def update_access(self, context, share, access_rules, add_rules,
+ delete_rules, share_server=None):
+ if not (add_rules or delete_rules):
+ volName = self.private_storage.get(share['id'], 'volName')
+ LOG.debug('volName: %s', volName)
+
+ if volName is None:
+ LOG.debug('Share %s does not exist', share['id'])
+ raise exception.ShareResourceNotFound(share_id=share['id'])
+
+ # Clear all current ACLs
+ self.api_executor.set_nfs_access(volName, 2, "all")
+
+ # Add each one through all rules.
+ for access in access_rules:
+ self._allow_access(context, share, access, share_server)
+ else:
+ # Adding/Deleting specific rules
+ for access in delete_rules:
+ self._deny_access(context, share, access, share_server)
+ for access in add_rules:
+ self._allow_access(context, share, access, share_server)
+
+ def _allow_access(self, context, share, access, share_server=None):
+ """Allow access to the share."""
+ share_proto = share['share_proto']
+ access_type = access['access_type']
+ access_level = access['access_level']
+ access_to = access['access_to']
+
+ self._check_share_access(share_proto, access_type)
+
+ hostlist = self.api_executor.get_host_list()
+ host_dict_IPs = self._get_manila_hostIPv4s(hostlist)
+ LOG.debug('host_dict_IPs: %s', host_dict_IPs)
+ if len(host_dict_IPs) == 0:
+ host_name = self._gen_random_name("host")
+ self.api_executor.add_host(host_name, access_to)
+ else:
+ for host in host_dict_IPs:
+ LOG.debug('host[netaddrs]: %s', host['netaddrs'])
+ LOG.debug('access_to: %s', access_to)
+ if host['netaddrs'] == access_to:
+ LOG.debug('in match ip')
+ host_name = host['name']
+ break
+ if host is host_dict_IPs[-1]:
+ host_name = self._gen_random_name("host")
+ self.api_executor.add_host(host_name, access_to)
+
+ volName = self.private_storage.get(share['id'], 'volName')
+ LOG.debug('volName: %(volName)s for share: %(share)s',
+ {'volName': volName, 'share': share['id']})
+
+ LOG.debug('access_level: %(access)s for share: %(share)s',
+ {'access': access_level, 'share': share['id']})
+ LOG.debug('host_name: %(host)s for share: %(share)s',
+ {'host': host_name, 'share': share['id']})
+ if access_level == constants.ACCESS_LEVEL_RO:
+ self.api_executor.set_nfs_access(volName, 1, host_name)
+ elif access_level == constants.ACCESS_LEVEL_RW:
+ self.api_executor.set_nfs_access(volName, 0, host_name)
+
+ def _deny_access(self, context, share, access, share_server=None):
+ """Deny access to the share."""
+ share_proto = share['share_proto']
+ access_type = access['access_type']
+ access_to = access['access_to']
+
+ try:
+ self._check_share_access(share_proto, access_type)
+ except exception.InvalidShareAccess:
+ LOG.warning(_LW('The denied rule is invalid and does not exist.'))
+ return
+
+ hostlist = self.api_executor.get_host_list()
+ host_dict_IPs = self._get_manila_hostIPv4s(hostlist)
+ LOG.debug('host_dict_IPs: %s', host_dict_IPs)
+ if len(host_dict_IPs) == 0:
+ return
+ else:
+ for host in host_dict_IPs:
+ if (host['netaddrs'] == access_to):
+ host_name = host['name']
+ break
+ if (host is host_dict_IPs[-1]):
+ return
+
+ volName = self.private_storage.get(share['id'], 'volName')
+ LOG.debug('volName: %s', volName)
+
+ self.api_executor.set_nfs_access(volName, 2, host_name)
+
+ def _check_share_access(self, share_proto, access_type):
+ if share_proto == 'NFS' and access_type != 'ip':
+ reason = _('Only "ip" access type is allowed for '
+ 'NFS shares.')
+ LOG.warning(reason)
+ raise exception.InvalidShareAccess(reason=reason)
+ elif share_proto != 'NFS':
+ reason = _('Invalid NAS protocol: %s') % share_proto
+ raise exception.InvalidShareAccess(reason=reason)
+
+ def manage_existing(self, share, driver_options):
+ """Manages a share that exists on backend."""
+ if share['share_proto'].lower() == 'nfs':
+ # 10.0.0.1:/share/example
+ LOG.info(_LI("Share %(shr_path)s will be managed with ID "
+ "%(shr_id)s."),
+ {'shr_path': share['export_locations'][0]['path'],
+ 'shr_id': share['id']})
+
+ old_path_info = share['export_locations'][0]['path'].split(
+ ':/share/')
+
+ if len(old_path_info) == 2:
+ ip = old_path_info[0]
+ share_name = old_path_info[1]
+ else:
+ msg = _("Incorrect path. It should have the following format: "
+ "IP:/share/share_name.")
+ raise exception.ShareBackendException(msg=msg)
+ else:
+ msg = _('Invalid NAS protocol: %s') % share['share_proto']
+ raise exception.InvalidInput(reason=msg)
+
+ if ip != self.configuration.qnap_share_ip:
+ msg = _("The NAS IP %(ip)s is not configured.") % {'ip': ip}
+ raise exception.ShareBackendException(msg=msg)
+
+ existing_share = self.api_executor.get_share_info(
+ self.configuration.qnap_poolname,
+ vol_label=share_name)
+ if existing_share is None:
+ msg = _("The share %s trying to be managed was not found on "
+ "backend.") % share['id']
+ raise exception.ManageInvalidShare(reason=msg)
+
+ _metadata = {}
+ vol_no = existing_share.find('vol_no').text
+ _metadata['volID'] = vol_no
+ _metadata['volName'] = share_name
+ self.private_storage.update(share['id'], _metadata)
+
+ # Test to get value from private_storage.
+ volID = self.private_storage.get(share['id'], 'volID')
+ LOG.debug('volID: %s', volID)
+ volName = self.private_storage.get(share['id'], 'volName')
+ LOG.debug('volName: %s', volName)
+
+ LOG.info(_LI("Share %(shr_path)s was successfully managed with ID "
+ "%(shr_id)s."),
+ {'shr_path': share['export_locations'][0]['path'],
+ 'shr_id': share['id']})
+
+ vol = self.api_executor.get_specific_volinfo(vol_no)
+ vol_size_gb = int(vol.find('size').text) / units.Gi
+ export_locations = self._get_location_path(
+ share_name,
+ share['share_proto'],
+ self.configuration.qnap_share_ip)
+
+ return {'size': vol_size_gb, 'export_locations': export_locations}
+
+ def unmanage(self, share):
+ """Remove the specified share from Manila management."""
+ self.private_storage.delete(share['id'])
+
+ def manage_existing_snapshot(self, snapshot, driver_options):
+ """Manage existing share snapshot with manila."""
+ volID = self.private_storage.get(snapshot['share']['id'], 'volID')
+ LOG.debug('volID: %s', volID)
+
+ existing_share = self.api_executor.get_share_info(
+ self.configuration.qnap_poolname,
+ vol_no=volID)
+
+ if existing_share is None:
+ msg = _("The share id %s was not found on backend.") % volID
+ LOG.error(msg)
+ raise exception.ShareNotFound(reason=msg)
+
+ snapshot_id = snapshot.get('provider_location')
+ snapshot_id_info = snapshot_id.split('@')
+
+ if len(snapshot_id_info) == 2:
+ share_name = snapshot_id_info[0]
+ else:
+ msg = _("Incorrect provider_location format. It should have the "
+ "following format: share_name@snapshot_name.")
+ LOG.error(msg)
+ raise exception.InvalidParameterValue(reason=msg)
+
+ if share_name != existing_share.find('vol_label').text:
+ msg = (_("The assigned share %(share_name)s was not matched "
+ "%(vol_label)s on backend.") %
+ {'share_name': share_name,
+ 'vol_label': existing_share.find('vol_label').text})
+ LOG.error(msg)
+ raise exception.ShareNotFound(reason=msg)
+
+ _metadata = {
+ 'snapshot_id': snapshot_id,
+ }
+ self.private_storage.update(snapshot['id'], _metadata)
+
+ def unmanage_snapshot(self, snapshot):
+ """Remove the specified snapshot from Manila management."""
+ self.private_storage.delete(snapshot['id'])
diff --git a/manila/tests/conf_fixture.py b/manila/tests/conf_fixture.py
index 6124307ce2..3bea20bda9 100644
--- a/manila/tests/conf_fixture.py
+++ b/manila/tests/conf_fixture.py
@@ -53,6 +53,12 @@ def set_defaults(conf):
_safe_set_of_opts(conf, 'hitachi_hsp_username', 'hsp_user')
_safe_set_of_opts(conf, 'hitachi_hsp_password', 'hsp_password')
+ _safe_set_of_opts(conf, 'qnap_management_url', 'http://1.2.3.4:8080')
+ _safe_set_of_opts(conf, 'qnap_share_ip', '1.2.3.4')
+ _safe_set_of_opts(conf, 'qnap_nas_login', 'admin')
+ _safe_set_of_opts(conf, 'qnap_nas_password', 'qnapadmin')
+ _safe_set_of_opts(conf, 'qnap_poolname', 'Storage Pool 1')
+
def _safe_set_of_opts(conf, *args, **kwargs):
try:
diff --git a/manila/tests/share/drivers/qnap/__init__.py b/manila/tests/share/drivers/qnap/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/manila/tests/share/drivers/qnap/fakes.py b/manila/tests/share/drivers/qnap/fakes.py
new file mode 100644
index 0000000000..bd735e21bf
--- /dev/null
+++ b/manila/tests/share/drivers/qnap/fakes.py
@@ -0,0 +1,527 @@
+# Copyright (c) 2016 QNAP Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+FAKE_RES_DETAIL_DATA_LOGIN = """
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_GETBASIC_INFO_ES = """
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_GETBASIC_INFO_TS = """
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_GETBASIC_INFO_TES_TS = """
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_GETBASIC_INFO_TES_ES = """
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_GETBASIC_INFO_ERROR = """
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_SHARE_INFO = """
+
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_VOLUME_INFO = """
+
+
+
+
+
+
+ fakeMountPath
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_SNAPSHOT = """
+
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_SPECIFIC_POOL_INFO = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_GET_HOST_LIST = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_CREATE_SHARE = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_ES_RET_CODE_NEGATIVE = """
+
+
+
+ """
+
+
+FAKE_RES_DETAIL_DATA_RESULT_NEGATIVE = """
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_AUTHPASS_FAIL = """
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_DELETE_SHARE = """
+
+
+
+ 0
+ """
+
+FAKE_RES_DETAIL_DATA_DELETE_SNAPSHOT = """
+
+
+
+ 0
+ """
+
+FAKE_RES_DETAIL_DATA_GET_HOST_LIST_API = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+
+FAKE_RES_DETAIL_DATA_CREATE_SNAPSHOT = """
+
+
+
+ """
+
+
+class SnapshotClass(object):
+ """Snapshot Class."""
+
+ size = 0
+ provider_location = 'fakeShareName@fakeSnapshotName'
+
+ def __init__(self, size, provider_location=None):
+ """Init."""
+ self.size = size
+ self.provider_location = provider_location
+
+ def get(self, provider_location):
+ """Get function."""
+ return self.provider_location
+
+ def __getitem__(self, arg):
+ """Getitem."""
+ return {
+ 'display_name': 'fakeSnapshotDisplayName',
+ 'id': 'fakeSnapshotId',
+ 'share': {'share_id': 'fakeShareId', 'id': 'fakeId'},
+ 'share_instance': {'share_id': 'fakeShareId', 'id': 'fakeId'},
+ 'size': self.size
+ }[arg]
+
+ def __setitem__(self, key, value):
+ """Setitem."""
+ if key == 'provider_location':
+ self.provider_location = value
+
+
+class ShareNfsClass(object):
+ """Share Class."""
+
+ share_proto = 'NFS'
+ id = ''
+ size = 0
+
+ def __init__(self, share_id, size):
+ """Init."""
+ self.id = share_id
+ self.size = size
+
+ def __getitem__(self, arg):
+ """Getitem."""
+ return {
+ 'share_proto': self.share_proto,
+ 'id': self.id,
+ 'display_name': 'fakeDisplayName',
+ 'export_locations': [{'path': '1.2.3.4:/share/fakeShareName'}],
+ 'host': 'QnapShareDriver',
+ 'size': self.size
+ }[arg]
+
+ def __setitem__(self, key, value):
+ """Setitem."""
+ if key == 'share_proto':
+ self.share_proto = value
+
+
+class ShareCifsClass(object):
+ """Share Class."""
+
+ share_proto = 'CIFS'
+ id = ''
+ size = 0
+
+ def __init__(self, share_id, size):
+ """Init."""
+ self.id = share_id
+ self.size = size
+
+ def __getitem__(self, arg):
+ """Getitem."""
+ return {
+ 'share_proto': self.share_proto,
+ 'id': self.id,
+ 'display_name': 'fakeDisplayName',
+ 'export_locations': [{'path': '\\\\1.2.3.4\\fakeShareName'}],
+ 'host': 'QnapShareDriver',
+ 'size': self.size
+ }[arg]
+
+ def __setitem__(self, key, value):
+ """Setitem."""
+ if key == 'share_proto':
+ self.share_proto = value
+
+
+class AccessClass(object):
+ """Access Class."""
+
+ access_type = 'fakeAccessType'
+ access_level = 'ro'
+ access_to = 'fakeIp'
+
+ def __init__(self, access_type, access_level, access_to):
+ """Init."""
+ self.access_type = access_type
+ self.access_level = access_level
+ self.access_to = access_to
+
+ def __getitem__(self, arg):
+ """Getitem."""
+ return {
+ 'access_type': self.access_type,
+ 'access_level': self.access_level,
+ 'access_to': self.access_to,
+ }[arg]
+
+
+class FakeGetBasicInfoResponseEs(object):
+ """Fake GetBasicInfo response from ES nas."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_GETBASIC_INFO_ES
+
+
+class FakeGetBasicInfoResponseTs(object):
+ """Fake GetBasicInfoTS response from TS nas."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_GETBASIC_INFO_TS
+
+
+class FakeGetBasicInfoResponseTesTs(object):
+ """Fake GetBasicInfoTS response from TS nas."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_GETBASIC_INFO_TES_TS
+
+
+class FakeGetBasicInfoResponseTesEs(object):
+ """Fake GetBasicInfoTS response from TS nas."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_GETBASIC_INFO_TES_ES
+
+
+class FakeGetBasicInfoResponseError(object):
+ """Fake GetBasicInfoTS response from TS nas."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_GETBASIC_INFO_ERROR
+
+
+class FakeCreateShareResponse(object):
+ """Fake login response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_CREATE_SHARE
+
+
+class FakeDeleteShareResponse(object):
+ """Fake login response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_DELETE_SHARE
+
+
+class FakeDeleteSnapshotResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_DELETE_SNAPSHOT
+
+
+class FakeGetHostListResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_GET_HOST_LIST_API
+
+
+class FakeAuthPassFailResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_AUTHPASS_FAIL
+
+
+class FakeEsResCodeNegativeResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_ES_RET_CODE_NEGATIVE
+
+
+class FakeResultNegativeResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_RESULT_NEGATIVE
+
+
+class FakeLoginResponse(object):
+ """Fake login response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_LOGIN
+
+
+class FakeSpecificPoolInfoResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_SPECIFIC_POOL_INFO
+
+
+class FakeShareInfoResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_SHARE_INFO
+
+
+class FakeSnapshotInfoResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_SNAPSHOT
+
+
+class FakeSpecificVolInfoResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_VOLUME_INFO
+
+
+class FakeCreateSnapshotResponse(object):
+ """Fake pool info response."""
+
+ status = 'fackStatus'
+
+ def read(self):
+ """Mock response.read."""
+ return FAKE_RES_DETAIL_DATA_CREATE_SNAPSHOT
diff --git a/manila/tests/share/drivers/qnap/test_api.py b/manila/tests/share/drivers/qnap/test_api.py
new file mode 100644
index 0000000000..da46734748
--- /dev/null
+++ b/manila/tests/share/drivers/qnap/test_api.py
@@ -0,0 +1,788 @@
+# Copyright (c) 2016 QNAP Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import base64
+
+import ddt
+import mock
+import six
+from six.moves import urllib
+
+from manila import exception
+from manila.share.drivers.qnap import qnap
+from manila import test
+from manila.tests import fake_share
+from manila.tests.share.drivers.qnap import fakes
+
+
+def create_configuration(management_url, qnap_share_ip, qnap_nas_login,
+ qnap_nas_password, qnap_poolname):
+ """Create configuration."""
+ configuration = mock.Mock()
+ configuration.qnap_management_url = management_url
+ configuration.qnap_share_ip = qnap_share_ip
+ configuration.qnap_nas_login = qnap_nas_login
+ configuration.qnap_nas_password = qnap_nas_password
+ configuration.qnap_poolname = qnap_poolname
+ configuration.safe_get.return_value = False
+ return configuration
+
+
+class QnapShareDriverBaseTestCase(test.TestCase):
+ """Base Class for the QnapShareDriver Tests."""
+
+ def setUp(self):
+ """Setup the Qnap Driver Base TestCase."""
+ super(QnapShareDriverBaseTestCase, self).setUp()
+ self.driver = None
+ self.share_api = None
+
+ def _do_setup(self, management_url, share_ip, nas_login,
+ nas_password, poolname, **kwargs):
+ """Config do setup configurations."""
+ self.driver = qnap.QnapShareDriver(
+ configuration=create_configuration(
+ management_url,
+ share_ip,
+ nas_login,
+ nas_password,
+ poolname),
+ private_storage=kwargs.get('private_storage'))
+ self.driver.do_setup('context')
+
+
+@ddt.ddt
+class QnapAPITestCase(QnapShareDriverBaseTestCase):
+ """Tests QNAP api functions."""
+
+ login_url = ('/cgi-bin/authLogin.cgi?')
+ get_basic_info_url = ('/cgi-bin/authLogin.cgi')
+ fake_password = 'qnapadmin'
+
+ def setUp(self):
+ """Setup the Qnap API TestCase."""
+ super(QnapAPITestCase, self).setUp()
+ fake_parms = {}
+ fake_parms['user'] = 'admin'
+ fake_parms['pwd'] = base64.b64encode(
+ self.fake_password.encode("utf-8"))
+ fake_parms['serviceKey'] = 1
+ sanitized_params = self._sanitize_params(fake_parms)
+ self.login_url = ('/cgi-bin/authLogin.cgi?%s' % sanitized_params)
+ self.mock_object(six.moves.http_client, 'HTTPConnection')
+ self.share = fake_share.fake_share(
+ share_proto='NFS',
+ id='shareId',
+ display_name='fakeDisplayName',
+ export_locations=[{'path': '1.2.3.4:/share/fakeShareName'}],
+ host='QnapShareDriver',
+ size=10)
+
+ def _sanitize_params(self, params):
+ sanitized_params = {}
+ for key in params:
+ value = params[key]
+ if value is not None:
+ sanitized_params[key] = six.text_type(value)
+
+ sanitized_params = urllib.parse.urlencode(sanitized_params)
+ return sanitized_params
+
+ @ddt.data('fake_share_name', 'fakeLabel')
+ def test_create_share_api(self, fake_name):
+ """Test create share api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeCreateShareResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.create_share(
+ self.share,
+ 'Storage Pool 1',
+ fake_name,
+ 'NFS')
+
+ fake_params = {
+ 'wiz_func': 'share_create',
+ 'action': 'add_share',
+ 'vol_name': fake_name,
+ 'vol_size': '10' + 'GB',
+ 'threshold': '80',
+ 'dedup': 'off',
+ 'compression': '1',
+ 'thin_pro': '0',
+ 'cache': '0',
+ 'cifs_enable': '0',
+ 'nfs_enable': '1',
+ 'afp_enable': '0',
+ 'ftp_enable': '0',
+ 'encryption': '0',
+ 'hidden': '0',
+ 'oplocks': '1',
+ 'sync': 'always',
+ 'userrw0': 'admin',
+ 'userrd_len': '0',
+ 'userrw_len': '1',
+ 'userno_len': '0',
+ 'access_r': 'setup_users',
+ 'path_type': 'auto',
+ 'recycle_bin': '1',
+ 'recycle_bin_administrators_only': '0',
+ 'pool_name': 'Storage Pool 1',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = ('/cgi-bin/wizReq.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_api_delete_share(self):
+ """Test delete share api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeDeleteShareResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.delete_share(
+ 'fakeId')
+
+ fake_params = {
+ 'func': 'volume_mgmt',
+ 'vol_remove': '1',
+ 'volumeID': 'fakeId',
+ 'stop_service': 'no',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_get_specific_poolinfo(self):
+ """Test get specific poolinfo api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeSpecificPoolInfoResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.get_specific_poolinfo(
+ 'fakePoolId')
+
+ fake_params = {
+ 'store': 'poolInfo',
+ 'func': 'extra_get',
+ 'poolID': 'fakePoolId',
+ 'Pool_Info': '1',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ @ddt.data({'pool_id': "Storage Pool 1"},
+ {'pool_id': "Storage Pool 1", 'vol_no': 'fakeNo'},
+ {'pool_id': "Storage Pool 1", 'vol_label': 'fakeShareName'})
+ def test_get_share_info(self, dict_parm):
+ """Test get share info api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeShareInfoResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.get_share_info(**dict_parm)
+
+ fake_params = {
+ 'store': 'poolVolumeList',
+ 'poolID': 'Storage Pool 1',
+ 'func': 'extra_get',
+ 'Pool_Vol_Info': '1',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_get_specific_volinfo(self):
+ """Test get specific volume info api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeSpecificVolInfoResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.get_specific_volinfo(
+ 'fakeNo')
+
+ fake_params = {
+ 'store': 'volumeInfo',
+ 'volumeID': 'fakeNo',
+ 'func': 'extra_get',
+ 'Volume_Info': '1',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_get_snapshot_info_es(self):
+ """Test get snapsho info api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeSnapshotInfoResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.get_snapshot_info(
+ volID='volId', snapshot_name='fakeSnapshotName')
+
+ fake_params = {
+ 'func': 'extra_get',
+ 'volumeID': 'volId',
+ 'snapshot_list': '1',
+ 'snap_start': '0',
+ 'snap_count': '100',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_create_snapshot_api(self):
+ """Test create snapshot api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeCreateSnapshotResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.create_snapshot_api(
+ 'fakeVolumeId',
+ 'fakeSnapshotName')
+
+ fake_params = {
+ 'func': 'create_snapshot',
+ 'volumeID': 'fakeVolumeId',
+ 'snapshot_name': 'fakeSnapshotName',
+ 'expire_min': '0',
+ 'vital': '1',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_delete_snapshot_api(self):
+ """Test delete snapshot api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeDeleteSnapshotResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.delete_snapshot_api(
+ 'fakeSnapshotId')
+
+ fake_params = {
+ 'func': 'del_snapshots',
+ 'snapshotID': 'fakeSnapshotId',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_clone_snapshot_api(self):
+ """Test clone snapshot api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeDeleteSnapshotResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.clone_snapshot(
+ 'fakeSnapshotId',
+ 'fakeNewShareName')
+
+ fake_params = {
+ 'func': 'clone_qsnapshot',
+ 'by_vol': '1',
+ 'snapshotID': 'fakeSnapshotId',
+ 'new_name': 'fakeNewShareName',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_edit_share_api(self):
+ """Test edit share api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseTs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeCreateSnapshotResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ expect_share_dict = {
+ "sharename": 'fakeVolId',
+ "old_sharename": 'fakeVolId',
+ "new_size": 100,
+ }
+ self.driver.api_executor.edit_share(
+ expect_share_dict)
+
+ fake_params = {
+ 'wiz_func': 'share_property',
+ 'action': 'share_property',
+ 'sharename': 'fakeVolId',
+ 'old_sharename': 'fakeVolId',
+ 'vol_size': '100GB',
+ 'dedup': '0',
+ 'compression': '1',
+ 'thin_pro': '0',
+ 'cache': '0',
+ 'afp_enable': '0',
+ 'ftp_enable': '1',
+ 'hidden': '0',
+ 'oplocks': '1',
+ 'sync': 'always',
+ 'recycle_bin': '1',
+ 'recycle_bin_administrators_only': '0',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ '/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_get_host_list(self):
+ """Test get host list api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetHostListResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.get_host_list()
+
+ fake_params = {
+ 'module': 'hosts',
+ 'func': 'get_hostlist',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') %
+ sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_add_host(self):
+ """Test add host api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetHostListResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.add_host(
+ 'fakeHostName', 'fakeIpV4')
+
+ fake_params = {
+ 'module': 'hosts',
+ 'func': 'apply_addhost',
+ 'name': 'fakeHostName',
+ 'ipaddr_v4': 'fakeIpV4',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') %
+ sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_set_nfs_access(self):
+ """Test get host list api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetHostListResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.set_nfs_access(
+ 'fakeShareName', 'fakeAccess', 'fakeHostName')
+
+ fake_params = {
+ 'wiz_func': 'share_nfs_control',
+ 'action': 'share_nfs_control',
+ 'sharename': 'fakeShareName',
+ 'access': 'fakeAccess',
+ 'host_name': 'fakeHostName',
+ 'sid': 'fakeSid',
+ }
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ ('/cgi-bin/priv/privWizard.cgi?%s') %
+ sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ def test_get_snapshot_info_ts_api(self):
+ """Test get snapshot info api."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseTs(),
+ fakes.FakeLoginResponse(),
+ fakes.FakeSnapshotInfoResponse()]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.driver.api_executor.get_snapshot_info(
+ snapshot_name='fakeSnapshotName',
+ lun_index='fakeLunIndex')
+
+ fake_params = {
+ 'func': 'extra_get',
+ 'LUNIndex': 'fakeLunIndex',
+ 'smb_snapshot_list': '1',
+ 'smb_snapshot': '1',
+ 'snapshot_list': '1',
+ 'sid': 'fakeSid'}
+
+ sanitized_params = self._sanitize_params(fake_params)
+ fake_url = (
+ ('/cgi-bin/disk/snapshot.cgi?%s') %
+ sanitized_params)
+
+ expected_call_list = [
+ mock.call('GET', self.login_url),
+ mock.call('GET', self.get_basic_info_url),
+ mock.call('GET', self.login_url),
+ mock.call('GET', fake_url)]
+ self.assertEqual(
+ expected_call_list,
+ mock_http_connection.return_value.request.call_args_list)
+
+ @ddt.data(fakes.FakeAuthPassFailResponse(),
+ fakes.FakeEsResCodeNegativeResponse())
+ def test_api_create_share_with_fail_response(self, fake_fail_response):
+ """Test create share api with fail response."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fakes.FakeGetBasicInfoResponseEs(),
+ fakes.FakeLoginResponse(),
+ fake_fail_response,
+ fake_fail_response,
+ fake_fail_response,
+ fake_fail_response,
+ fake_fail_response]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+ self.assertRaises(
+ exception.ShareBackendException,
+ self.driver.api_executor.create_share,
+ share=self.share,
+ pool_name='Storage Pool 1',
+ create_share_name='fake_share_name',
+ share_proto='NFS')
+
+ @ddt.unpack
+ @ddt.data(['self.driver.api_executor.get_share_info',
+ {'pool_id': 'fakeId'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_specific_volinfo',
+ {'vol_id': 'fakeId'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.create_snapshot_api',
+ {'volumeID': 'fakeVolumeId',
+ 'snapshot_name': 'fakeSnapshotName'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.create_snapshot_api',
+ {'volumeID': 'fakeVolumeId',
+ 'snapshot_name': 'fakeSnapshotName'},
+ fakes.FakeEsResCodeNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_snapshot_info',
+ {'volID': 'volId'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_snapshot_info',
+ {'volID': 'volId'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_specific_poolinfo',
+ {'pool_id': 'Storage Pool 1'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_specific_poolinfo',
+ {'pool_id': 'Storage Pool 1'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.delete_share',
+ {'vol_id': 'fakeId'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.delete_share',
+ {'vol_id': 'fakeId'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.delete_snapshot_api',
+ {'snapshot_id': 'fakeSnapshotId'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.delete_snapshot_api',
+ {'snapshot_id': 'fakeSnapshotId'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.clone_snapshot',
+ {'snapshot_id': 'fakeSnapshotId',
+ 'new_sharename': 'fakeNewShareName'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.clone_snapshot',
+ {'snapshot_id': 'fakeSnapshotId',
+ 'new_sharename': 'fakeNewShareName'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.edit_share',
+ {'share_dict': {"sharename": 'fakeVolId',
+ "old_sharename": 'fakeVolId',
+ "new_size": 100}},
+ fakes.FakeEsResCodeNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.edit_share',
+ {'share_dict': {"sharename": 'fakeVolId',
+ "old_sharename": 'fakeVolId',
+ "new_size": 100}},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.add_host',
+ {'hostname': 'fakeHostName',
+ 'ipv4': 'fakeIpV4'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.add_host',
+ {'hostname': 'fakeHostName',
+ 'ipv4': 'fakeIpV4'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_host_list',
+ {},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_host_list',
+ {},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.set_nfs_access',
+ {'sharename': 'fakeShareName',
+ 'access': 'fakeAccess',
+ 'host_name': 'fakeHostName'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.set_nfs_access',
+ {'sharename': 'fakeShareName',
+ 'access': 'fakeAccess',
+ 'host_name': 'fakeHostName'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseEs()],
+ ['self.driver.api_executor.get_snapshot_info',
+ {'snapshot_name': 'fakeSnapshoName',
+ 'lun_index': 'fakeLunIndex'},
+ fakes.FakeAuthPassFailResponse(),
+ fakes.FakeGetBasicInfoResponseTs()],
+ ['self.driver.api_executor.get_snapshot_info',
+ {'snapshot_name': 'fakeSnapshoName',
+ 'lun_index': 'fakeLunIndex'},
+ fakes.FakeResultNegativeResponse(),
+ fakes.FakeGetBasicInfoResponseTs()])
+ def test_get_snapshot_info_ts_with_fail_response(
+ self, api, dict_parm,
+ fake_fail_response, fake_basic_info):
+ """Test get snapshot info api with fail response."""
+ mock_http_connection = six.moves.http_client.HTTPConnection
+ mock_http_connection.return_value.getresponse.side_effect = [
+ fakes.FakeLoginResponse(),
+ fake_basic_info,
+ fakes.FakeLoginResponse(),
+ fake_fail_response,
+ fake_fail_response,
+ fake_fail_response,
+ fake_fail_response,
+ fake_fail_response]
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+
+ self.assertRaises(
+ exception.ShareBackendException,
+ eval(api),
+ **dict_parm)
diff --git a/manila/tests/share/drivers/qnap/test_qnap.py b/manila/tests/share/drivers/qnap/test_qnap.py
new file mode 100644
index 0000000000..4fa06dcd2d
--- /dev/null
+++ b/manila/tests/share/drivers/qnap/test_qnap.py
@@ -0,0 +1,1111 @@
+# Copyright (c) 2016 QNAP Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+try:
+ import xml.etree.cElementTree as ET
+except ImportError:
+ import xml.etree.ElementTree as ET
+
+import ddt
+import mock
+from oslo_config import cfg
+import six
+
+from manila import exception
+from manila.share.drivers.qnap import qnap
+from manila import test
+from manila.tests import fake_share
+from manila.tests.share.drivers.qnap import fakes
+
+
+CONF = cfg.CONF
+
+
+def create_configuration(management_url, qnap_share_ip, qnap_nas_login,
+ qnap_nas_password, qnap_poolname):
+ """Create configuration."""
+ configuration = mock.Mock()
+ configuration.qnap_management_url = management_url
+ configuration.qnap_share_ip = qnap_share_ip
+ configuration.qnap_nas_login = qnap_nas_login
+ configuration.qnap_nas_password = qnap_nas_password
+ configuration.qnap_poolname = qnap_poolname
+ configuration.safe_get.return_value = False
+ return configuration
+
+
+class QnapShareDriverBaseTestCase(test.TestCase):
+ """Base Class for the QnapShareDriver Tests."""
+
+ def setUp(self):
+ """Setup the Qnap Driver Base TestCase."""
+ super(QnapShareDriverBaseTestCase, self).setUp()
+ self.driver = None
+ self.share_api = None
+
+ def _do_setup(self, management_url, share_ip, nas_login,
+ nas_password, poolname, **kwargs):
+ """Config do setup configurations."""
+ self.driver = qnap.QnapShareDriver(
+ configuration=create_configuration(
+ management_url,
+ share_ip,
+ nas_login,
+ nas_password,
+ poolname),
+ private_storage=kwargs.get('private_storage'))
+ self.driver.do_setup('context')
+
+
+@ddt.ddt
+class QnapShareDriverLoginTestCase(QnapShareDriverBaseTestCase):
+ """Tests do_setup api."""
+
+ def setUp(self):
+ """Setup the Qnap Share Driver login TestCase."""
+ super(QnapShareDriverLoginTestCase, self).setUp()
+ self.mock_object(six.moves.http_client, 'HTTPConnection')
+ self.mock_object(six.moves.http_client, 'HTTPSConnection')
+
+ @ddt.unpack
+ @ddt.data({'mng_url': 'http://1.2.3.4:8080', 'port': '8080', 'ssl': False},
+ {'mng_url': 'https://1.2.3.4:443', 'port': '443', 'ssl': True})
+ def test_do_setup_positive(self, mng_url, port, ssl):
+ """Test do_setup with http://1.2.3.4:8080."""
+ fake_login_response = fakes.FakeLoginResponse()
+ fake_get_basic_info_response_es = fakes.FakeGetBasicInfoResponseEs()
+ if ssl:
+ mock_connection = six.moves.http_client.HTTPSConnection
+ else:
+ mock_connection = six.moves.http_client.HTTPConnection
+ mock_connection.return_value.getresponse.side_effect = [
+ fake_login_response,
+ fake_get_basic_info_response_es,
+ fake_login_response]
+
+ self._do_setup(mng_url, '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+
+ self.assertEqual(
+ mng_url,
+ self.driver.configuration.qnap_management_url)
+ self.assertEqual(
+ '1.2.3.4', self.driver.configuration.qnap_share_ip)
+ self.assertEqual(
+ 'admin', self.driver.configuration.qnap_nas_login)
+ self.assertEqual(
+ 'qnapadmin', self.driver.configuration.qnap_nas_password)
+ self.assertEqual(
+ 'Storage Pool 1', self.driver.configuration.qnap_poolname)
+ self.assertEqual('fakeSid', self.driver.api_executor.sid)
+ self.assertEqual('admin', self.driver.api_executor.username)
+ self.assertEqual('qnapadmin', self.driver.api_executor.password)
+ self.assertEqual('1.2.3.4', self.driver.api_executor.ip)
+ self.assertEqual(port, self.driver.api_executor.port)
+ self.assertEqual(ssl, self.driver.api_executor.ssl)
+
+ @ddt.data(fakes.FakeGetBasicInfoResponseTs(),
+ fakes.FakeGetBasicInfoResponseTesTs(),
+ fakes.FakeGetBasicInfoResponseTesEs())
+ def test_do_setup_positive_with_diff_nas(self, fake_basic_info):
+ """Test do_setup with different NAS model."""
+ fake_login_response = fakes.FakeLoginResponse()
+ # fake_get_basic_info_response_ts = FakeGetBasicInfoResponseTs()
+ mock_connection = six.moves.http_client.HTTPSConnection
+ mock_connection.return_value.getresponse.side_effect = [
+ fake_login_response,
+ fake_basic_info,
+ fake_login_response]
+
+ self._do_setup('https://1.2.3.4:443', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1')
+
+ self.assertEqual('fakeSid', self.driver.api_executor.sid)
+ self.assertEqual('admin', self.driver.api_executor.username)
+ self.assertEqual('qnapadmin', self.driver.api_executor.password)
+ self.assertEqual('1.2.3.4', self.driver.api_executor.ip)
+ self.assertEqual('443', self.driver.api_executor.port)
+ self.assertTrue(self.driver.api_executor.ssl)
+
+ def test_do_setup_with_exception(self):
+ """Test do_setup with exception."""
+ fake_login_response = fakes.FakeLoginResponse()
+ fake_get_basic_info_response_error = (
+ fakes.FakeGetBasicInfoResponseError())
+ mock_connection = six.moves.http_client.HTTPSConnection
+ mock_connection.return_value.getresponse.side_effect = [
+ fake_login_response,
+ fake_get_basic_info_response_error,
+ fake_login_response]
+
+ self.driver = qnap.QnapShareDriver(
+ configuration=create_configuration(
+ 'https://1.2.3.4:443', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Pool1'))
+ self.assertRaises(
+ exception.ShareBackendException,
+ self.driver.do_setup,
+ context='context')
+
+ def test_check_for_setup_error(self):
+ """Test do_setup with exception."""
+ self.driver = qnap.QnapShareDriver(
+ configuration=create_configuration(
+ 'https://1.2.3.4:443', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Pool1'))
+ self.assertRaises(
+ exception.ShareBackendException,
+ self.driver.check_for_setup_error)
+
+
+@ddt.ddt
+class QnapShareDriverTestCase(QnapShareDriverBaseTestCase):
+ """Tests share driver functions."""
+
+ def setUp(self):
+ """Setup the Qnap Driver Base TestCase."""
+ super(QnapShareDriverTestCase, self).setUp()
+ self.mock_object(qnap.QnapShareDriver, '_create_api_executor')
+ self.share = fake_share.fake_share(
+ share_proto='NFS',
+ id='shareId',
+ display_name='fakeDisplayName',
+ export_locations=[{'path': '1.2.3.4:/share/fakeShareName'}],
+ host='QnapShareDriver',
+ size=10)
+
+ def get_share_info_return_value(self):
+ """Return the share info form get_share_info method."""
+ root = ET.fromstring(fakes.FAKE_RES_DETAIL_DATA_SHARE_INFO)
+
+ share_list = root.find('Volume_Info')
+ share_info_tree = share_list.findall('row')
+ for share in share_info_tree:
+ return share
+
+ def get_snapshot_info_return_value(self):
+ """Return the snapshot info form get_snapshot_info method."""
+ root = ET.fromstring(fakes.FAKE_RES_DETAIL_DATA_SNAPSHOT)
+
+ snapshot_list = root.find('SnapshotList')
+ snapshot_info_tree = snapshot_list.findall('row')
+ for snapshot in snapshot_info_tree:
+ return snapshot
+
+ def get_specific_volinfo_return_value(self):
+ """Return the volume info form get_specific_volinfo method."""
+ root = ET.fromstring(fakes.FAKE_RES_DETAIL_DATA_VOLUME_INFO)
+
+ volume_list = root.find('Volume_Info')
+ volume_info_tree = volume_list.findall('row')
+ for volume in volume_info_tree:
+ return volume
+
+ def get_specific_poolinfo_return_value(self):
+ """Get specific pool info."""
+ root = ET.fromstring(fakes.FAKE_RES_DETAIL_DATA_SPECIFIC_POOL_INFO)
+
+ pool_list = root.find('Pool_Index')
+ pool_info_tree = pool_list.findall('row')
+ for pool in pool_info_tree:
+ return pool
+
+ def get_host_list_return_value(self):
+ """Get host list."""
+ root = ET.fromstring(fakes.FAKE_RES_DETAIL_DATA_GET_HOST_LIST)
+
+ hosts = []
+ host_list = root.find('host_list')
+ host_tree = host_list.findall('host')
+ for host in host_tree:
+ hosts.append(host)
+
+ return hosts
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
+ @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
+ def test_create_share_positive(
+ self,
+ mock_gen_random_name,
+ mock_get_location_path):
+ """Test create share."""
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = None
+ mock_gen_random_name.return_value = 'fakeShareName'
+ mock_api_executor.return_value.create_share.return_value = (
+ 'fakeCreateShareId')
+ mock_get_location_path.return_value = None
+ mock_private_storage = mock.Mock()
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.create_share('context', self.share)
+
+ mock_api_executor.return_value.get_share_info.assert_called_once_with(
+ 'Storage Pool 1',
+ vol_label='fakeShareName')
+ mock_api_executor.return_value.create_share.assert_called_once_with(
+ self.share,
+ self.driver.configuration.qnap_poolname,
+ 'fakeShareName',
+ 'NFS')
+ mock_get_location_path.assert_called_once_with(
+ 'fakeShareName', 'NFS', '1.2.3.4')
+
+ def test_delete_share_positive(self):
+ """Test delete share with fake_share."""
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_api_executor.return_value.delete_share.return_value = (
+ 'fakeCreateShareId')
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeVolNo'
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.delete_share('context', self.share, share_server=None)
+
+ mock_api_executor.return_value.get_share_info.assert_called_once_with(
+ 'Storage Pool 1', vol_no='fakeVolNo')
+ mock_api_executor.return_value.delete_share.assert_called_once_with(
+ 'fakeNo')
+
+ def test_delete_share_no_volid(self):
+ """Test delete share with fake_share and no volID."""
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.delete_share('context', self.share, share_server=None)
+
+ mock_private_storage.get.assert_called_once_with(
+ 'shareId', 'volID')
+
+ def test_delete_share_no_delete_share(self):
+ """Test delete share with fake_share."""
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = None
+ mock_api_executor.return_value.delete_share.return_value = (
+ 'fakeCreateShareId')
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeVolNo'
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.delete_share('context', self.share, share_server=None)
+
+ mock_api_executor.return_value.get_share_info.assert_called_once_with(
+ 'Storage Pool 1', vol_no='fakeVolNo')
+
+ def test_extend_share(self):
+ """Test extend share with fake_share."""
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_api_executor.return_value.edit_share.return_value = None
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeVolId'
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.extend_share(self.share, 100, share_server=None)
+
+ expect_share_dict = {
+ "sharename": 'fakeVolId',
+ "old_sharename": 'fakeVolId',
+ "new_size": 100,
+ }
+ mock_api_executor.return_value.edit_share.assert_called_once_with(
+ expect_share_dict)
+
+ def test_extend_share_without_share_name(self):
+ """Test extend share without share name."""
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.ShareResourceNotFound,
+ self.driver.extend_share,
+ share=self.share,
+ new_size=100,
+ share_server=None)
+
+ @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
+ def test_create_snapshot(
+ self,
+ mock_gen_random_name):
+ """Test create snapshot with fake_snapshot."""
+ fake_snapshot = fakes.SnapshotClass(
+ 10, 'fakeShareName@fakeSnapshotName')
+
+ mock_gen_random_name.return_value = 'fakeSnapshotName'
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_snapshot_info.side_effect = [
+ None, self.get_snapshot_info_return_value()]
+ mock_api_executor.return_value.create_snapshot_api.return_value = (
+ 'fakeCreateShareId')
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeVolId'
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.create_snapshot(
+ 'context', fake_snapshot, share_server=None)
+
+ mock_api_return = mock_api_executor.return_value
+ expected_call_list = [
+ mock.call(volID='fakeVolId', snapshot_name='fakeSnapshotName'),
+ mock.call(volID='fakeVolId', snapshot_name='fakeSnapshotName')]
+ self.assertEqual(
+ expected_call_list,
+ mock_api_return.get_snapshot_info.call_args_list)
+
+ mock_api_return.create_snapshot_api.assert_called_once_with(
+ 'fakeVolId', 'fakeSnapshotName')
+
+ def test_create_snapshot_without_volid(self):
+ """Test create snapshot with fake_snapshot."""
+ fake_snapshot = fakes.SnapshotClass(10, None)
+
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.ShareResourceNotFound,
+ self.driver.create_snapshot,
+ context='context',
+ snapshot=fake_snapshot,
+ share_server=None)
+
+ def test_delete_snapshot(self):
+ """Test delete snapshot with fakeSnapshot."""
+ fake_snapshot = fakes.SnapshotClass(
+ 10, 'fakeShareName@fakeSnapshotName')
+
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.delete_snapshot_api.return_value = (
+ 'fakeCreateShareId')
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeSnapshotId'
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.delete_snapshot(
+ 'context', fake_snapshot, share_server=None)
+
+ mock_api_return = mock_api_executor.return_value
+ mock_api_return.delete_snapshot_api.assert_called_once_with(
+ 'fakeShareName@fakeSnapshotName')
+
+ def test_delete_snapshot_without_snapshot_id(self):
+ """Test delete snapshot with fakeSnapshot and no snapshot id."""
+ fake_snapshot = fakes.SnapshotClass(10, None)
+
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.delete_snapshot(
+ 'context', fake_snapshot, share_server=None)
+
+ mock_private_storage.get.assert_called_once_with(
+ 'fakeSnapshotId', 'snapshot_id')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
+ @mock.patch('manila.share.API')
+ @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
+ def test_create_share_from_snapshot(
+ self,
+ mock_gen_random_name,
+ mock_share_api,
+ mock_get_location_path):
+ """Test create share from snapshot."""
+ fake_snapshot = fakes.SnapshotClass(
+ 10, 'fakeShareName@fakeSnapshotName')
+
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_gen_random_name.return_value = 'fakeShareName'
+ mock_api_executor.return_value.get_share_info.side_effect = (
+ None, self.get_share_info_return_value())
+ mock_api_executor.return_value.clone_snapshot.return_value = (
+ None)
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeSnapshotId'
+ mock_share_api.return_value.get.return_value = {'size': 10}
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.create_share_from_snapshot(
+ 'context', self.share, fake_snapshot, share_server=None)
+
+ mock_gen_random_name.assert_called_once_with(
+ 'share')
+ mock_api_return = mock_api_executor.return_value
+ expected_call_list = [
+ mock.call('Storage Pool 1', vol_label='fakeShareName'),
+ mock.call('Storage Pool 1', vol_label='fakeShareName')]
+ self.assertEqual(
+ expected_call_list,
+ mock_api_return.get_share_info.call_args_list)
+ mock_api_return.clone_snapshot.assert_called_once_with(
+ 'fakeShareName@fakeSnapshotName', 'fakeShareName')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
+ @mock.patch('manila.share.API')
+ @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
+ def test_create_share_from_snapshot_diff_size(
+ self,
+ mock_gen_random_name,
+ mock_share_api,
+ mock_get_location_path):
+ """Test create share from snapshot."""
+ fake_snapshot = fakes.SnapshotClass(
+ 10, 'fakeShareName@fakeSnapshotName')
+
+ mock_gen_random_name.return_value = 'fakeShareName'
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.side_effect = (
+ None, self.get_share_info_return_value())
+ mock_api_executor.return_value.clone_snapshot.return_value = (
+ None)
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeSnapshotId'
+ mock_share_api.return_value.get.return_value = {'size': 5}
+ mock_api_executor.return_value.edit_share.return_value = (
+ None)
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.create_share_from_snapshot(
+ 'context', self.share, fake_snapshot, share_server=None)
+
+ mock_gen_random_name.assert_called_once_with(
+ 'share')
+ mock_api_return = mock_api_executor.return_value
+ expected_call_list = [
+ mock.call('Storage Pool 1', vol_label='fakeShareName'),
+ mock.call('Storage Pool 1', vol_label='fakeShareName')]
+ self.assertEqual(
+ expected_call_list,
+ mock_api_return.get_share_info.call_args_list)
+ mock_api_return.clone_snapshot.assert_called_once_with(
+ 'fakeShareName@fakeSnapshotName', 'fakeShareName')
+ expect_share_dict = {
+ 'sharename': 'fakeShareName',
+ 'old_sharename': 'fakeShareName',
+ 'new_size': 10
+ }
+ mock_api_return.edit_share.assert_called_once_with(
+ expect_share_dict)
+
+ def test_create_share_from_snapshot_without_snapshot_id(self):
+ """Test create share from snapshot."""
+ fake_snapshot = fakes.SnapshotClass(10, None)
+
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.SnapshotResourceNotFound,
+ self.driver.create_share_from_snapshot,
+ context='context',
+ share=self.share,
+ snapshot=fake_snapshot,
+ share_server=None)
+
+ @mock.patch.object(qnap.QnapShareDriver, '_allow_access')
+ def test_update_access_allow_access(
+ self, mock_allow_access):
+ """Test update access with allow access rules."""
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeVolName'
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.set_nfs_access.return_value = None
+ mock_allow_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.update_access(
+ 'context', self.share, 'access_rules',
+ None, None, share_server=None)
+
+ mock_api_executor.return_value.set_nfs_access.assert_called_once_with(
+ 'fakeVolName', 2, 'all')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_allow_access')
+ @mock.patch.object(qnap.QnapShareDriver, '_deny_access')
+ def test_update_access_deny_and_allow_access(
+ self,
+ mock_deny_access,
+ mock_allow_access):
+ """Test update access with deny and allow access rules."""
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'fakeVolName'
+ mock_deny_access.return_value = None
+ mock_allow_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ delete_rules = []
+ delete_rules.append('access1')
+ add_rules = []
+ add_rules.append('access1')
+ self.driver.update_access(
+ 'context', self.share, None,
+ add_rules, delete_rules, share_server=None)
+
+ mock_deny_access.assert_called_once_with(
+ 'context', self.share, 'access1', None)
+ mock_allow_access.assert_called_once_with(
+ 'context', self.share, 'access1', None)
+
+ def test_update_access_without_volname(self):
+ """Test update access without volName."""
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.ShareResourceNotFound,
+ self.driver.update_access,
+ context='context',
+ share=self.share,
+ access_rules='access_rules',
+ add_rules=None,
+ delete_rules=None,
+ share_server=None)
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
+ def test_manage_existing_nfs(
+ self,
+ mock_get_location_path):
+ """Test manage existing."""
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_private_storage = mock.Mock()
+ mock_private_storage.update.return_value = None
+ mock_private_storage.get.side_effect = [
+ 'fakeVolId',
+ 'fakeVolName']
+ mock_api_executor.return_value.get_specific_volinfo.return_value = (
+ self.get_specific_volinfo_return_value())
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_get_location_path.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.manage_existing(self.share, 'driver_options')
+
+ mock_api_return = mock_api_executor.return_value
+ mock_api_return.get_share_info.assert_called_once_with(
+ 'Storage Pool 1', vol_label='fakeShareName')
+ mock_api_return.get_specific_volinfo.assert_called_once_with(
+ 'fakeNo')
+ mock_get_location_path.assert_called_once_with(
+ 'fakeShareName', 'NFS', '1.2.3.4')
+
+ def test_manage_existing_nfs_without_export_locations(self):
+ share = fake_share.fake_share(
+ share_proto='NFS',
+ id='fakeId',
+ display_name='fakeDisplayName',
+ export_locations=[{'path': ''}],
+ host='QnapShareDriver',
+ size=10)
+
+ mock_private_storage = mock.Mock()
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.ShareBackendException,
+ self.driver.manage_existing,
+ share=share,
+ driver_options='driver_options')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
+ def test_manage_existing_nfs_ip_not_equel_share_ip(
+ self,
+ mock_get_location_path):
+ """Test manage existing with nfs ip not equel to share ip."""
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_private_storage = mock.Mock()
+ mock_private_storage.update.return_value = None
+ mock_private_storage.get.side_effect = [
+ 'fakeVolId',
+ 'fakeVolName']
+ mock_api_executor.return_value.get_specific_volinfo.return_value = (
+ self.get_specific_volinfo_return_value())
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_get_location_path.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.1.1.1', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.ShareBackendException,
+ self.driver.manage_existing,
+ share=self.share,
+ driver_options='driver_options')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
+ def test_manage_existing_nfs_without_existing_share(
+ self,
+ mock_get_location_path):
+ """Test manage existing nfs without existing share."""
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_private_storage = mock.Mock()
+ mock_private_storage.update.return_value = None
+ mock_private_storage.get.side_effect = [
+ 'fakeVolId',
+ 'fakeVolName']
+ mock_api_executor.return_value.get_specific_volinfo.return_value = (
+ self.get_specific_volinfo_return_value())
+ mock_api_executor.return_value.get_share_info.return_value = (
+ None)
+ mock_get_location_path.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.ManageInvalidShare,
+ self.driver.manage_existing,
+ share=self.share,
+ driver_options='driver_options')
+
+ def test_unmanage(self):
+ """Test unmanage."""
+ mock_private_storage = mock.Mock()
+ mock_private_storage.delete.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.unmanage(self.share)
+
+ mock_private_storage.delete.assert_called_once_with(
+ 'shareId')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
+ def test_manage_existing_snapshot(
+ self,
+ mock_get_location_path):
+ """Test manage existing snapshot snapshot."""
+ fake_snapshot = fakes.SnapshotClass(
+ 10, 'fakeShareName@fakeSnapshotName')
+
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_private_storage = mock.Mock()
+ mock_private_storage.update.return_value = None
+ mock_private_storage.get.side_effect = [
+ 'fakeVolId', 'fakeVolName']
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.manage_existing_snapshot(fake_snapshot, 'driver_options')
+
+ mock_api_return = mock_api_executor.return_value
+ mock_api_return.get_share_info.assert_called_once_with(
+ 'Storage Pool 1', vol_no='fakeVolId')
+ fake_metadata = {
+ 'snapshot_id': 'fakeShareName@fakeSnapshotName'}
+ mock_private_storage.update.assert_called_once_with(
+ 'fakeSnapshotId', fake_metadata)
+
+ def test_unmanage_snapshot(self):
+ """Test unmanage snapshot."""
+ fake_snapshot = fakes.SnapshotClass(
+ 10, 'fakeShareName@fakeSnapshotName')
+
+ mock_private_storage = mock.Mock()
+ mock_private_storage.delete.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver.unmanage_snapshot(fake_snapshot)
+
+ mock_private_storage.delete.assert_called_once_with(
+ 'fakeSnapshotId')
+
+ @ddt.data(
+ {'expect_result': 'manila-shr-fake_time', 'test_string': 'share'},
+ {'expect_result': 'manila-snp-fake_time', 'test_string': 'snapshot'},
+ {'expect_result': 'manila-hst-fake_time', 'test_string': 'host'},
+ {'expect_result': 'manila-fake_time', 'test_string': ''})
+ @ddt.unpack
+ @mock.patch('oslo_utils.timeutils.utcnow')
+ def test_gen_random_name(
+ self, mock_utcnow, expect_result, test_string):
+ """Test gen random name."""
+ mock_private_storage = mock.Mock()
+ mock_utcnow.return_value.strftime.return_value = 'fake_time'
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+
+ self.assertEqual(
+ expect_result, self.driver._gen_random_name(test_string))
+
+ def test_get_location_path(self):
+ """Test get location path name."""
+ mock_private_storage = mock.Mock()
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_api_executor.return_value.get_specific_volinfo.return_value = (
+ self.get_specific_volinfo_return_value())
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+
+ location = 'fakeIp:fakeMountPath'
+ expect_result = {
+ 'path': location,
+ 'is_admin_only': False,
+ }
+ self.assertEqual(
+ expect_result, self.driver._get_location_path(
+ 'fakeShareName', 'NFS', 'fakeIp'))
+
+ self.assertRaises(
+ exception.InvalidInput,
+ self.driver._get_location_path,
+ share_name='fakeShareName',
+ share_proto='fakeProto',
+ ip='fakeIp')
+
+ def test_update_share_stats(self):
+ """Test update share stats."""
+ mock_private_storage = mock.Mock()
+ mock_api_return = (
+ qnap.QnapShareDriver._create_api_executor.return_value)
+ mock_api_return.get_specific_poolinfo.return_value = (
+ self.get_specific_poolinfo_return_value())
+ mock_api_return.get_share_info.return_value = (
+ self.get_share_info_return_value())
+ mock_api_return.get_specific_volinfo.return_value = (
+ self.get_specific_volinfo_return_value())
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._update_share_stats()
+
+ mock_api_return.get_specific_poolinfo.assert_called_once_with(
+ self.driver.configuration.qnap_poolname)
+
+ def test_get_manila_host_ipv4s(self):
+ """Test get manila host IPV4s."""
+ mock_private_storage = mock.Mock()
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+
+ expect_host_dict_ips = []
+ host_list = self.get_host_list_return_value()
+ for host in host_list:
+ host_dict = {
+ 'index': host.find('index').text,
+ 'hostid': host.find('hostid').text,
+ 'name': host.find('name').text,
+ 'netaddrs': host.find('netaddrs').find('ipv4').text
+ }
+ expect_host_dict_ips.append(host_dict)
+
+ self.assertEqual(
+ expect_host_dict_ips, self.driver._get_manila_hostIPv4s(
+ host_list))
+
+ @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
+ @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
+ def test_allow_access_ro(
+ self,
+ mock_check_share_access,
+ mock_gen_random_name):
+ """Test allow_access with access type ro."""
+ fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
+
+ mock_private_storage = mock.Mock()
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_host_list.return_value = (
+ self.get_host_list_return_value())
+ mock_gen_random_name.return_value = 'fakeHostName'
+ mock_api_executor.return_value.add_host.return_value = None
+ mock_api_executor.return_value.set_nfs_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._allow_access(
+ 'context', self.share, fake_access, share_server=None)
+
+ mock_check_share_access.assert_called_once_with(
+ 'NFS', 'fakeAccessType')
+ mock_gen_random_name.assert_called_once_with('host')
+ mock_api_executor.return_value.add_host.assert_called_once_with(
+ 'fakeHostName', 'fakeIp')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
+ def test_allow_access_ro_with_hostlist(
+ self,
+ mock_check_share_access):
+ """Test allow_access_ro_with_hostlist."""
+ host_dict_ips = []
+ for host in self.get_host_list_return_value():
+ if host.find('netaddrs/ipv4').text is not None:
+ host_dict = {
+ 'index': host.find('index').text,
+ 'hostid': host.find('hostid').text,
+ 'name': host.find('name').text,
+ 'netaddrs': host.find('netaddrs').find('ipv4').text}
+ host_dict_ips.append(host_dict)
+
+ for host in host_dict_ips:
+ fake_access_to = host['netaddrs']
+ fake_access = fakes.AccessClass(
+ 'fakeAccessType', 'ro', fake_access_to)
+
+ mock_private_storage = mock.Mock()
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_host_list.return_value = (
+ self.get_host_list_return_value())
+ mock_api_executor.return_value.set_nfs_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._allow_access(
+ 'context', self.share, fake_access, share_server=None)
+
+ mock_check_share_access.assert_called_once_with(
+ 'NFS', 'fakeAccessType')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
+ @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
+ def test_allow_access_rw(
+ self,
+ mock_check_share_access,
+ mock_gen_random_name):
+ """Test allow_access with access type rw."""
+ fake_access = fakes.AccessClass('fakeAccessType', 'rw', 'fakeIp')
+
+ mock_private_storage = mock.Mock()
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_host_list.return_value = (
+ self.get_host_list_return_value())
+ mock_gen_random_name.return_value = 'fakeHostName'
+ mock_api_executor.return_value.add_host.return_value = None
+ mock_api_executor.return_value.set_nfs_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._allow_access(
+ 'context', self.share, fake_access, share_server=None)
+
+ mock_check_share_access.assert_called_once_with(
+ 'NFS', 'fakeAccessType')
+ mock_gen_random_name.assert_called_once_with('host')
+ mock_api_executor.return_value.add_host.assert_called_once_with(
+ 'fakeHostName', 'fakeIp')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_manila_hostIPv4s')
+ @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
+ @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
+ def test_allow_access_without_hostlist(
+ self,
+ mock_check_share_access,
+ mock_gen_random_name,
+ mock_get_manila_hostipv4s):
+ """Test allow access without host list."""
+ fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
+
+ mock_private_storage = mock.Mock()
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_host_list.return_value = None
+ mock_gen_random_name.return_value = 'fakeHostName'
+ mock_api_executor.return_value.add_host.return_value = None
+ mock_api_executor.return_value.set_nfs_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._allow_access(
+ 'context', self.share, fake_access, share_server=None)
+
+ mock_check_share_access.assert_called_once_with(
+ 'NFS', 'fakeAccessType')
+ mock_gen_random_name.assert_called_once_with('host')
+ mock_api_executor.return_value.add_host.assert_called_once_with(
+ 'fakeHostName', 'fakeIp')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
+ def test_deny_access_with_hostlist(
+ self,
+ mock_check_share_access):
+ """Test deny access."""
+ host_dict_ips = []
+ for host in self.get_host_list_return_value():
+ if host.find('netaddrs/ipv4').text is not None:
+ host_dict = {
+ 'index': host.find('index').text,
+ 'hostid': host.find('hostid').text,
+ 'name': host.find('name').text,
+ 'netaddrs': host.find('netaddrs').find('ipv4').text}
+ host_dict_ips.append(host_dict)
+
+ for host in host_dict_ips:
+ fake_access_to = host['netaddrs']
+ fake_access = fakes.AccessClass('fakeAccessType', 'ro', fake_access_to)
+
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'vol_name'
+
+ mock_api_return = (
+ qnap.QnapShareDriver._create_api_executor.return_value)
+ mock_api_return.get_host_list.return_value = (
+ self.get_host_list_return_value())
+ mock_api_return.add_host.return_value = None
+ mock_api_return.set_nfs_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._deny_access(
+ 'context', self.share, fake_access, share_server=None)
+
+ mock_check_share_access.assert_called_once_with(
+ 'NFS', 'fakeAccessType')
+ mock_api_return.set_nfs_access.assert_called_once_with(
+ 'vol_name', 2, 'manila-hst-123')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
+ def test_deny_access_with_hostlist_not_equel_access_to(
+ self,
+ mock_check_share_access):
+ """Test deny access."""
+ fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
+
+ mock_private_storage = mock.Mock()
+ mock_private_storage.get.return_value = 'vol_name'
+ mock_api_return = (
+ qnap.QnapShareDriver._create_api_executor.return_value)
+ mock_api_return.get_host_list.return_value = (
+ self.get_host_list_return_value())
+ mock_api_return.add_host.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._deny_access(
+ 'context', self.share, fake_access, share_server=None)
+
+ mock_check_share_access.assert_called_once_with(
+ 'NFS', 'fakeAccessType')
+
+ @mock.patch.object(qnap.QnapShareDriver, '_get_manila_hostIPv4s')
+ @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
+ def test_deny_access_without_hostlist(
+ self,
+ mock_check_share_access,
+ mock_get_manila_hostipv4s):
+ """Test deny access without hostlist."""
+ fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
+
+ mock_private_storage = mock.Mock()
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_host_list.return_value = None
+ mock_api_executor.return_value.add_host.return_value = None
+ mock_api_executor.return_value.set_nfs_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.driver._deny_access(
+ 'context', self.share, fake_access, share_server=None)
+
+ mock_check_share_access.assert_called_once_with(
+ 'NFS', 'fakeAccessType')
+
+ @ddt.data('NFS', 'CIFS', 'proto')
+ def test_check_share_access(self, test_proto):
+ """Test check_share_access."""
+ mock_private_storage = mock.Mock()
+ mock_api_executor = qnap.QnapShareDriver._create_api_executor
+ mock_api_executor.return_value.get_host_list.return_value = None
+ mock_api_executor.return_value.add_host.return_value = None
+ mock_api_executor.return_value.set_nfs_access.return_value = None
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', 'Storage Pool 1',
+ private_storage=mock_private_storage)
+ self.assertRaises(
+ exception.InvalidShareAccess,
+ self.driver._check_share_access,
+ share_proto=test_proto,
+ access_type='notser')
+
+ def test_get_ts_model_pool_id(self):
+ """Test get ts model pool id."""
+ mock_private_storage = mock.Mock()
+
+ self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
+ 'qnapadmin', '1',
+ private_storage=mock_private_storage)
+ self.assertEqual('1', self.driver._get_ts_model_pool_id('1'))
diff --git a/releasenotes/notes/qnap-manila-driver-a30fe4011cb90801.yaml b/releasenotes/notes/qnap-manila-driver-a30fe4011cb90801.yaml
new file mode 100644
index 0000000000..887a88b71c
--- /dev/null
+++ b/releasenotes/notes/qnap-manila-driver-a30fe4011cb90801.yaml
@@ -0,0 +1,4 @@
+---
+features:
+ - Added Manila share driver for QNAP ES series storage systems.
+