Merge "[Nexenta] Refactored NexentaStor5 NFS driver"

This commit is contained in:
Zuul 2019-09-04 13:24:37 +00:00 committed by Gerrit Code Review
commit f1e10ec2ff
13 changed files with 2779 additions and 718 deletions

View File

@ -88,4 +88,5 @@ each back end.
hitachi_hnas_driver
hpe_3par_driver
tegile_driver
nexentastor5_driver
../configuration/shared-file-systems/drivers/windows-smb-driver

View File

@ -0,0 +1,104 @@
..
Copyright 2019 Nexenta by DDN, Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
NexentaStor5 Driver for OpenStack Manila
========================================
The `NexentaStor5 <http://www.nexenta.com>`__ Manila driver
provides NFS shared file systems to OpenStack.
Requirements
------------
- The NexentaStor 5.1 or newer
Supported shared filesystems and operations
-------------------------------------------
This driver supports NFS shares.
The following operations are supported:
- Create NFS Share
- Delete NFS Share
- Allow NFS Share access
* Only IP access type is supported for NFS (ro/rw).
- Deny NFS Share access
- Manage a share.
- Unmanage a share.
- Extend a share.
- Shrink a share.
- Create snapshot
- Revert to snapshot
- Delete snapshot
- Create share from snapshot
Backend Configuration
---------------------
The following parameters need to be configured in the manila configuration
file for the NexentaStor5 driver:
- `share_backend_name` = <backend name to enable>
- `share_driver` = manila.share.drivers.nexenta.ns5.nexenta_nas.NexentaNasDriver
- `driver_handles_share_servers` = False
- `nexenta_nas_host` = <Data address to NAS shares>
- `nexenta_user` = <username for management operations>
- `nexenta_password` = <password for management operations>
- `nexenta_pool` = <Pool name where NAS shares are created>
- `nexenta_rest_addresses` = <Management address for Rest API access>
- `nexenta_folder` = <Parent filesystem where all Manila shares are kept>
- `nexenta_nfs` = True
Share Types
-----------
When creating a share, a share type can be specified to determine where and
how the share will be created. If a share type is not specified, the
`default_share_type` set in the manila configuration file is used.
Manila requires that the share type includes the
`driver_handles_share_servers` extra-spec. This ensures that the share
will be created on a backend that supports the requested
driver_handles_share_servers (share networks) capability.
For the NexentaStor driver, this extra-spec's value must be set to False.
Restrictions
------------
- Only IP share access control is allowed for NFS shares.
Back-end configuration example
------------------------------
.. code-block:: ini
[DEFAULT]
enabled_share_backends = NexentaStor5
[NexentaStor5]
share_backend_name = NexentaStor5
driver_handles_share_servers = False
nexenta_folder = manila
share_driver = manila.share.drivers.nexenta.ns5.nexenta_nas.NexentaNasDriver
nexenta_rest_addresses = 10.3.1.1,10.3.1.2
nexenta_nas_host = 10.3.1.10
nexenta_rest_port = 8443
nexenta_pool = pool1
nexenta_nfs = True
nexenta_user = admin
nexenta_password = secret_password
nexenta_thin_provisioning = True

View File

@ -87,7 +87,7 @@ Mapping of share drivers and share features support
+----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+----------------------------+--------------------------+--------------------+--------------------+
| NexentaStor4 | N | \- | N | \- | N | N | \- | \- | \- |
+----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+----------------------------+--------------------------+--------------------+--------------------+
| NexentaStor5 | N | \- | N | N | N | N | \- | \- | \- |
| NexentaStor5 | N | T | N | N | N | N | \- | T | \- |
+----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+----------------------------+--------------------------+--------------------+--------------------+
| MapRFS | O | O | O | O | O | O | O | \- | \- |
+----------------------------------------+-----------------------+-----------------------+--------------------------+--------------------------+------------------------+----------------------------+--------------------------+--------------------+--------------------+
@ -156,7 +156,7 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| NexentaStor4 | NFS (N) | \- | \- | \- | \- | NFS (N) | \- | \- | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| NexentaStor5 | NFS (N) | \- | \- | \- | \- | NFS (N) | \- | \- | \- | \- |
| NexentaStor5 | NFS (N) | T | \- | \- | \- | NFS (N) | T | \- | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| MapRFS | \- | \- | MapRFS(O) | \- | \- | \- | \- | MapRFS(O) | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
@ -288,7 +288,7 @@ More information: :ref:`capabilities_and_extra_specs`
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+
| NexentaStor4 | \- | N | N | N | N | N | \- | N | \- | \- | P | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+
| NexentaStor5 | \- | N | N | N | N | N | \- | N | \- | \- | P | \- |
| NexentaStor5 | \- | N | \- | N | N | N | \- | N | T | \- | P | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+
| MapRFS | \- | N | \- | \- | \- | N | \- | O | \- | \- | P | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+

View File

@ -30,6 +30,7 @@ Share drivers
drivers/netapp-cluster-mode-driver.rst
drivers/quobyte-driver.rst
drivers/windows-smb-driver.rst
drivers/nexentastor5-driver.rst
To use different share drivers for the Shared File Systems service, use the

View File

@ -0,0 +1,100 @@
===================
NexentaStor5 Driver
===================
Nexentastor5 can be used as a storage back end for the OpenStack Shared File
System service. Shares in the Shared File System service are mapped 1:1
to Nexentastor5 filesystems. Access is provided via NFS protocol and IP-based
authentication.
Network approach
~~~~~~~~~~~~~~~~
L3 connectivity between the storage back end and the host running the
Shared File Systems share service should exist.
Supported shared filesystems and operations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The drivers supports NFS shares.
The following operations are supported:
- Create NFS share
- Delete share
- Extend share
- Shrink share
- Allow share access
Note the following limitation:
* Only IP based access is supported (ro/rw).
- Deny share access
- Create snapshot
- Revert to snapshot
- Delete snapshot
- Create share from snapshot
- Manage share
- Unmanage share
Requirements
~~~~~~~~~~~~
- NexentaStor 5.x Appliance pre-provisioned and licensed
- Pool and parent filesystem configured (this filesystem will contain
all manila shares)
Restrictions
~~~~~~~~~~~~
- Only IP share access control is allowed for NFS shares.
Configuration
~~~~~~~~~~~~~~
.. code-block:: ini
enabled_share_backends = NexentaStor5
Create the new back end configuration section, in this case named
``NexentaStor5``:
.. code-block:: ini
[NexentaStor5]
share_backend_name = NexentaStor5
driver_handles_share_servers = False
nexenta_folder = manila
share_driver = manila.share.drivers.nexenta.ns5.nexenta_nas.NexentaNasDriver
nexenta_rest_addresses = 10.3.1.1,10.3.1.2
nexenta_nas_host = 10.3.1.10
nexenta_rest_port = 8443
nexenta_pool = pool1
nexenta_nfs = True
nexenta_user = admin
nexenta_password = secret_password
nexenta_thin_provisioning = True
More information can be found at the `Nexenta documentation webpage
<https://nexenta.github.io>`.
Driver options
~~~~~~~~~~~~~~
The following table contains the configuration options specific to the
share driver.
.. include:: ../../tables/manila-nexentastor5.inc

View File

@ -0,0 +1,48 @@
.. _manila-nexentastor5:
.. list-table:: Description of NexentaStor5 configuration options
:header-rows: 1
:class: config-ref-table
* - Configuration option = Default value
- Description
* - **[DEFAULT]**
-
* - ``nexenta_rest_addresses`` = ``None``
- (List) One or more comma delimited IP addresses for management communication with NexentaStor appliance.
* - ``nexenta_rest_port`` = ``8443``
- (Integer) Port to connect to Nexenta REST API server.
* - ``nexenta_use_https`` = ``True``
- (Boolean) Use HTTP secure protocol for NexentaStor management REST API connections.
* - ``nexenta_user`` = ``admin``
- (String) User name to connect to Nexenta SA.
* - ``nexenta_password`` = ``None``
- (String) Password to connect to Nexenta SA.
* - ``nexenta_pool`` = ``pool1``
- (String) Pool name on NexentaStor.
* - ``nexenta_nfs`` = ``True``
- (Boolean) Defines whether share over NFS is enabled.
* - ``nexenta_ssl_cert_verify`` = ``False``
- (Boolean) Defines whether the driver should check ssl cert.
* - ``nexenta_rest_connect_timeout`` = ``30``
- (Float) Specifies the time limit (in seconds), within which the connection to NexentaStor management REST API server must be established.
* - ``nexenta_rest_read_timeout`` = ``300``
- (Float) Specifies the time limit (in seconds), within which NexentaStor management REST API server must send a response.
* - ``nexenta_rest_backoff_factor`` = ``1``
- (Float) Specifies the backoff factor to apply between connection attempts to NexentaStor management REST API server.
* - ``nexenta_rest_retry_count`` = ``5``
- (Integer) Specifies the number of times to repeat NexentaStor management REST API call in case of connection errors and NexentaStor appliance EBUSY or ENOENT errors.
* - ``nexenta_nas_host`` = ``None``
- (Hostname) Data IP address of Nexenta storage appliance.
* - ``nexenta_mount_point_base`` = ``$state_path/mnt``
- (String) Base directory that contains NFS share mount points.
* - ``nexenta_share_name_prefix`` = ``share-``
- (String) Nexenta share name prefix.
* - ``nexenta_folder`` = ``folder``
- (String) Parent folder on NexentaStor.
* - ``nexenta_dataset_compression`` = ``on``
- (String) Compression value for new ZFS folders.
* - ``nexenta_thin_provisioning`` = ``True``
- (Boolean) If True shares will not be space guaranteed and overprovisioning will be enabled.
* - ``nexenta_dataset_record_size`` = ``131072``
- (Integer) Specifies a suggested block size in for files in a file system. (bytes)

View File

@ -1,4 +1,4 @@
# Copyright 2016 Nexenta Systems, Inc.
# Copyright 2019 Nexenta by DDN, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -12,137 +12,555 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nexenta.jsonrpc` -- Nexenta-specific JSON RPC client
=====================================================================
.. automodule:: nexenta.jsonrpc
"""
import base64
import hashlib
import json
import time
import posixpath
from oslo_log import log
from oslo_serialization import jsonutils
from eventlet import greenthread
from oslo_log import log as logging
import requests
# pylint: disable=no-member,import-error
from requests.packages.urllib3 import exceptions
requests.packages.urllib3.disable_warnings(exceptions.InsecureRequestWarning)
requests.packages.urllib3.disable_warnings(
exceptions.InsecurePlatformWarning)
# pylint: enable=no-member,import-error
import six
from manila import exception
from manila.i18n import _
LOG = log.getLogger(__name__)
session = requests.Session()
LOG = logging.getLogger(__name__)
class NexentaJSONProxy(object):
def __init__(self, scheme, host, port, user,
password, method='get'):
self.scheme = scheme
self.host = host
self.port = port
self.user = user
self.password = password
self.method = method
@property
def url(self):
return '%s://%s:%s/' % (self.scheme, self.host, self.port)
def __getattr__(self, method='get'):
if method:
return NexentaJSONProxy(
self.scheme, self.host, self.port,
self.user, self.password, method)
def __hash__(self):
return self.url.__hash__()
def __repr__(self):
return 'NEF proxy: %s' % self.url
def __call__(self, path, data=None):
auth = base64.b64encode(
('%s:%s' % (self.user, self.password)).encode('utf-8'))
url = self.url + path
if data:
data = jsonutils.dumps(data)
LOG.debug('Sending JSON to url: %s, data: %s, method: %s',
path, data, self.method)
session.headers.update({'Content-Type': 'application/json'})
response = getattr(session, self.method)(
url, data=data, verify=False)
if response.status_code in (401, 403):
LOG.debug('Login requested by NexentaStor')
if self.scheme == 'http':
session.headers.update({'Authorization': 'Basic %s' % auth})
class NefException(exception.ManilaException):
def __init__(self, data=None, **kwargs):
defaults = {
'name': 'NexentaError',
'code': 'EBADMSG',
'source': 'ManilaDriver',
'message': 'Unknown error'
}
if isinstance(data, dict):
for key in defaults:
if key in kwargs:
continue
if key in data:
kwargs[key] = data[key]
else:
session.headers.update(
{'Authorization': 'Bearer %s' % self.https_auth()})
LOG.debug('Re-sending JSON to url: %s, data: %s, method: %s',
path, data, self.method)
response = getattr(session, self.method)(
url, data=data, verify=False)
self.check_error(response)
content = json.loads(response.content) if response.content else None
LOG.debug("Got response: %(code)s %(reason)s %(content)s", {
'code': response.status_code,
'reason': response.reason,
'content': content})
response.close()
kwargs[key] = defaults[key]
elif isinstance(data, six.string_types):
if 'message' not in kwargs:
kwargs['message'] = data
for key in defaults:
if key not in kwargs:
kwargs[key] = defaults[key]
message = (_('%(message)s (source: %(source)s, '
'name: %(name)s, code: %(code)s)')
% kwargs)
self.code = kwargs['code']
del kwargs['message']
super(NefException, self).__init__(message, **kwargs)
if response.status_code == 202 and content:
url = self.url + content['links'][0]['href']
keep_going = True
while keep_going:
time.sleep(1)
response = session.get(url, verify=False)
self.check_error(response)
LOG.debug("Got response: %(code)s %(reason)s", {
'code': response.status_code,
'reason': response.reason})
content = json.loads(
response.content) if response.content else None
keep_going = response.status_code == 202
response.close()
class NefRequest(object):
def __init__(self, proxy, method):
self.proxy = proxy
self.method = method
self.path = None
self.lock = False
self.time = 0
self.data = []
self.payload = {}
self.stat = {}
self.hooks = {
'response': self.hook
}
self.kwargs = {
'hooks': self.hooks,
'timeout': self.proxy.timeout
}
def __call__(self, path, payload=None):
LOG.debug('NEF request start: %(method)s %(path)s %(payload)s',
{'method': self.method, 'path': path, 'payload': payload})
if self.method not in ['get', 'delete', 'put', 'post']:
message = (_('NEF API does not support %(method)s method'),
{'method': self.method})
raise NefException(code='EINVAL', message=message)
if not path:
message = (_('NEF API call requires collection path'))
raise NefException(code='EINVAL', message=message)
self.path = path
if payload:
if not isinstance(payload, dict):
message = (_('NEF API call payload must be a dictionary'))
raise NefException(code='EINVAL', message=message)
if self.method in ['get', 'delete']:
self.payload = {'params': payload}
elif self.method in ['put', 'post']:
self.payload = {'data': json.dumps(payload)}
try:
response = self.request(self.method, self.path, **self.payload)
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout) as error:
LOG.debug('Failed to %(method)s %(path)s %(payload)s: %(error)s',
{'method': self.method, 'path': self.path,
'payload': self.payload, 'error': error})
if not self.failover():
raise error
LOG.debug('Retry initial request after failover: '
'%(method)s %(path)s %(payload)s',
{'method': self.method,
'path': self.path,
'payload': self.payload})
response = self.request(self.method, self.path, **self.payload)
LOG.debug('NEF request done: %(method)s %(path)s %(payload)s, '
'total response time: %(time)s seconds, '
'total requests count: %(count)s, '
'requests statistics: %(stat)s',
{'method': self.method,
'path': self.path,
'payload': self.payload,
'time': self.time,
'count': sum(self.stat.values()),
'stat': self.stat})
if response.ok and not response.content:
return None
content = json.loads(response.content)
if not response.ok:
raise NefException(content)
if isinstance(content, dict) and 'data' in content:
return self.data
return content
def https_auth(self):
url = self.url + 'auth/login'
data = jsonutils.dumps(
{'username': self.user, 'password': self.password})
response = session.post(
url, data=data, verify=False)
content = json.loads(response.content) if response.content else None
LOG.debug("Got response: %(code)s %(reason)s %(content)s", {
'code': response.status_code,
'reason': response.reason,
'content': content})
response.close()
return content['token']
def request(self, method, path, **kwargs):
url = self.proxy.url(path)
LOG.debug('Perform session request: %(method)s %(url)s %(body)s',
{'method': method, 'url': url, 'body': kwargs})
kwargs.update(self.kwargs)
return self.proxy.session.request(method, url, **kwargs)
def check_error(self, response):
code = response.status_code
if code not in (200, 201, 202):
reason = response.reason
content = json.loads(
response.content) if response.content else None
response.close()
if content and 'code' in content:
message = content.get(
'message', 'Message is not specified by Nexenta REST')
raise exception.NexentaException(
reason=message, code=content['code'])
raise exception.NexentaException(
reason=_(
'Got bad response: %(code)s %(reason)s %(content)s') % {
'code': code, 'reason': reason, 'content': content})
def hook(self, response, **kwargs):
initial_text = (_('initial request %(method)s %(path)s %(body)s')
% {'method': self.method,
'path': self.path,
'body': self.payload})
request_text = (_('session request %(method)s %(url)s %(body)s')
% {'method': response.request.method,
'url': response.request.url,
'body': response.request.body})
response_text = (_('session response %(code)s %(content)s')
% {'code': response.status_code,
'content': response.content})
text = (_('%(request_text)s and %(response_text)s')
% {'request_text': request_text,
'response_text': response_text})
LOG.debug('Hook start on %(text)s', {'text': text})
if response.status_code not in self.stat:
self.stat[response.status_code] = 0
self.stat[response.status_code] += 1
self.time += response.elapsed.total_seconds()
if response.ok and not response.content:
LOG.debug('Hook done on %(text)s: '
'empty response content',
{'text': text})
return response
if not response.content:
message = (_('There is no response content '
'is available for %(text)s')
% {'text': text})
raise NefException(code='ENODATA', message=message)
try:
content = json.loads(response.content)
except (TypeError, ValueError) as error:
message = (_('Failed to decode JSON for %(text)s: %(error)s')
% {'text': text, 'error': error})
raise NefException(code='ENOMSG', message=message)
method = 'get'
# pylint: disable=no-member
if response.status_code == requests.codes.unauthorized:
if self.stat[response.status_code] > self.proxy.retries:
raise NefException(content)
self.auth()
request = response.request.copy()
request.headers.update(self.proxy.session.headers)
LOG.debug('Retry last %(text)s after authentication',
{'text': request_text})
return self.proxy.session.send(request, **kwargs)
elif response.status_code == requests.codes.not_found:
if self.lock:
LOG.debug('Hook done on %(text)s: '
'nested failover is detected',
{'text': text})
return response
if self.stat[response.status_code] > self.proxy.retries:
raise NefException(content)
if not self.failover():
LOG.debug('Hook done on %(text)s: '
'no valid hosts found',
{'text': text})
return response
LOG.debug('Retry %(text)s after failover',
{'text': initial_text})
return self.request(self.method, self.path, **self.payload)
elif response.status_code == requests.codes.server_error:
if not (isinstance(content, dict) and
'code' in content and
content['code'] == 'EBUSY'):
raise NefException(content)
if self.stat[response.status_code] > self.proxy.retries:
raise NefException(content)
self.proxy.delay(self.stat[response.status_code])
LOG.debug('Retry %(text)s after delay',
{'text': initial_text})
return self.request(self.method, self.path, **self.payload)
elif response.status_code == requests.codes.accepted:
path = self.getpath(content, 'monitor')
if not path:
message = (_('There is no monitor path '
'available for %(text)s')
% {'text': text})
raise NefException(code='ENOMSG', message=message)
self.proxy.delay(self.stat[response.status_code])
return self.request(method, path)
elif response.status_code == requests.codes.ok:
if not (isinstance(content, dict) and 'data' in content):
LOG.debug('Hook done on %(text)s: there '
'is no JSON data available',
{'text': text})
return response
LOG.debug('Append %(count)s data items to response',
{'count': len(content['data'])})
self.data += content['data']
path = self.getpath(content, 'next')
if not path:
LOG.debug('Hook done on %(text)s: there '
'is no next path available',
{'text': text})
return response
LOG.debug('Perform next session request %(method)s %(path)s',
{'method': method, 'path': path})
return self.request(method, path)
LOG.debug('Hook done on %(text)s and '
'returned original response',
{'text': text})
return response
def auth(self):
method = 'post'
path = 'auth/login'
payload = {'username': self.proxy.username,
'password': self.proxy.password}
data = json.dumps(payload)
kwargs = {'data': data}
self.proxy.delete_bearer()
response = self.request(method, path, **kwargs)
content = json.loads(response.content)
if not (isinstance(content, dict) and 'token' in content):
message = (_('There is no authentication token available '
'for authentication request %(method)s %(url)s '
'%(body)s and response %(code)s %(content)s')
% {'method': response.request.method,
'url': response.request.url,
'body': response.request.body,
'code': response.status_code,
'content': response.content})
raise NefException(code='ENODATA', message=message)
token = content['token']
self.proxy.update_token(token)
def failover(self):
result = False
self.lock = True
method = 'get'
host = self.proxy.host
root = self.proxy.root
for item in self.proxy.hosts:
if item == host:
continue
self.proxy.update_host(item)
LOG.debug('Try to failover path '
'%(root)s to host %(host)s',
{'root': root, 'host': item})
try:
response = self.request(method, root)
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout) as error:
LOG.debug('Skip unavailable host %(host)s '
'due to error: %(error)s',
{'host': item, 'error': error})
continue
LOG.debug('Failover result: %(code)s %(content)s',
{'code': response.status_code,
'content': response.content})
# pylint: disable=no-member
if response.status_code == requests.codes.ok:
LOG.debug('Successful failover path '
'%(root)s to host %(host)s',
{'root': root, 'host': item})
self.proxy.update_lock()
result = True
break
else:
LOG.debug('Skip unsuitable host %(host)s: '
'there is no %(root)s path found',
{'host': item, 'root': root})
self.lock = False
return result
@staticmethod
def getpath(content, name):
if isinstance(content, dict) and 'links' in content:
for link in content['links']:
if not isinstance(link, dict):
continue
if 'rel' in link and 'href' in link:
if link['rel'] == name:
return link['href']
return None
class NefCollections(object):
subj = 'collection'
root = '/collections'
def __init__(self, proxy):
self.proxy = proxy
def path(self, name):
quoted_name = six.moves.urllib.parse.quote_plus(name)
return posixpath.join(self.root, quoted_name)
def get(self, name, payload=None):
LOG.debug('Get properties of %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = self.path(name)
return self.proxy.get(path, payload)
def set(self, name, payload=None):
LOG.debug('Modify properties of %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = self.path(name)
return self.proxy.put(path, payload)
def list(self, payload=None):
LOG.debug('List of %(subj)ss: %(payload)s',
{'subj': self.subj, 'payload': payload})
return self.proxy.get(self.root, payload)
def create(self, payload=None):
LOG.debug('Create %(subj)s: %(payload)s',
{'subj': self.subj, 'payload': payload})
try:
return self.proxy.post(self.root, payload)
except NefException as error:
if error.code != 'EEXIST':
raise error
def delete(self, name, payload=None):
LOG.debug('Delete %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = self.path(name)
try:
return self.proxy.delete(path, payload)
except NefException as error:
if error.code != 'ENOENT':
raise error
class NefSettings(NefCollections):
subj = 'setting'
root = '/settings/properties'
def create(self, payload=None):
return NotImplemented
def delete(self, name, payload=None):
return NotImplemented
class NefDatasets(NefCollections):
subj = 'dataset'
root = '/storage/datasets'
def rename(self, name, payload=None):
LOG.debug('Rename %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'rename')
return self.proxy.post(path, payload)
class NefSnapshots(NefDatasets, NefCollections):
subj = 'snapshot'
root = '/storage/snapshots'
def clone(self, name, payload=None):
LOG.debug('Clone %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'clone')
return self.proxy.post(path, payload)
class NefFilesystems(NefDatasets, NefCollections):
subj = 'filesystem'
root = '/storage/filesystems'
def rollback(self, name, payload=None):
LOG.debug('Rollback %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'rollback')
return self.proxy.post(path, payload)
def mount(self, name, payload=None):
LOG.debug('Mount %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'mount')
return self.proxy.post(path, payload)
def unmount(self, name, payload=None):
LOG.debug('Unmount %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'unmount')
return self.proxy.post(path, payload)
def acl(self, name, payload=None):
LOG.debug('Set %(subj)s %(name)s ACL: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'acl')
return self.proxy.post(path, payload)
def promote(self, name, payload=None):
LOG.debug('Promote %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'promote')
return self.proxy.post(path, payload)
class NefHpr(NefCollections):
subj = 'HPR service'
root = '/hpr'
def activate(self, payload=None):
LOG.debug('Activate %(payload)s',
{'payload': payload})
path = posixpath.join(self.root, 'activate')
return self.proxy.post(path, payload)
def start(self, name, payload=None):
LOG.debug('Start %(subj)s %(name)s: %(payload)s',
{'subj': self.subj, 'name': name, 'payload': payload})
path = posixpath.join(self.path(name), 'start')
return self.proxy.post(path, payload)
class NefServices(NefCollections):
subj = 'service'
root = '/services'
class NefNfs(NefCollections):
subj = 'NFS'
root = '/nas/nfs'
class NefNetAddresses(NefCollections):
subj = 'network address'
root = '/network/addresses'
class NefProxy(object):
def __init__(self, proto, path, conf):
self.session = requests.Session()
self.settings = NefSettings(self)
self.filesystems = NefFilesystems(self)
self.snapshots = NefSnapshots(self)
self.services = NefServices(self)
self.hpr = NefHpr(self)
self.nfs = NefNfs(self)
self.netaddrs = NefNetAddresses(self)
self.proto = proto
self.path = path
self.lock = None
self.tokens = {}
self.headers = {
'Content-Type': 'application/json',
'X-XSS-Protection': '1'
}
if conf.nexenta_use_https:
self.scheme = 'https'
else:
self.scheme = 'http'
self.username = conf.nexenta_user
self.password = conf.nexenta_password
self.hosts = []
if conf.nexenta_rest_addresses:
for host in conf.nexenta_rest_addresses:
self.hosts.append(host.strip())
self.root = self.filesystems.path(path)
if not self.hosts:
self.hosts.append(conf.nexenta_nas_host)
self.host = self.hosts[0]
if conf.nexenta_rest_port:
self.port = conf.nexenta_rest_port
else:
if conf.nexenta_use_https:
self.port = 8443
else:
self.port = 8080
self.backoff_factor = conf.nexenta_rest_backoff_factor
self.retries = len(self.hosts) * conf.nexenta_rest_retry_count
self.timeout = (
conf.nexenta_rest_connect_timeout, conf.nexenta_rest_read_timeout)
# pylint: disable=no-member
max_retries = requests.packages.urllib3.util.retry.Retry(
total=conf.nexenta_rest_retry_count,
backoff_factor=conf.nexenta_rest_backoff_factor)
adapter = requests.adapters.HTTPAdapter(max_retries=max_retries)
self.session.verify = conf.nexenta_ssl_cert_verify
self.session.headers.update(self.headers)
self.session.mount('%s://' % self.scheme, adapter)
if not conf.nexenta_ssl_cert_verify:
requests.packages.urllib3.disable_warnings()
self.update_lock()
def __getattr__(self, name):
return NefRequest(self, name)
def delete_bearer(self):
if 'Authorization' in self.session.headers:
del self.session.headers['Authorization']
def update_bearer(self, token):
bearer = 'Bearer %s' % token
self.session.headers['Authorization'] = bearer
def update_token(self, token):
self.tokens[self.host] = token
self.update_bearer(token)
def update_host(self, host):
self.host = host
if host in self.tokens:
token = self.tokens[host]
self.update_bearer(token)
def update_lock(self):
prop = self.settings.get('system.guid')
guid = prop.get('value')
path = '%s:%s' % (guid, self.path)
if isinstance(path, six.text_type):
path = path.encode('utf-8')
self.lock = hashlib.md5(path).hexdigest()
def url(self, path):
netloc = '%s:%d' % (self.host, int(self.port))
components = (self.scheme, netloc, str(path), None, None)
url = six.moves.urllib.parse.urlunsplit(components)
return url
def delay(self, attempt):
interval = int(self.backoff_factor * (2 ** (attempt - 1)))
LOG.debug('Waiting for %(interval)s seconds',
{'interval': interval})
greenthread.sleep(interval)

View File

@ -1,4 +1,4 @@
# Copyright 2016 Nexenta Systems, Inc.
# Copyright 2019 Nexenta by DDN, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import posixpath
from oslo_log import log
from oslo_utils import units
@ -24,9 +26,9 @@ from manila.share.drivers.nexenta.ns5 import jsonrpc
from manila.share.drivers.nexenta import options
from manila.share.drivers.nexenta import utils
PATH_DELIMITER = '%2F'
VERSION = '1.0'
VERSION = '1.1'
LOG = log.getLogger(__name__)
ZFS_MULTIPLIER = 1.1 # ZFS quotas do not take metadata into account.
class NexentaNasDriver(driver.ShareDriver):
@ -35,6 +37,12 @@ class NexentaNasDriver(driver.ShareDriver):
Executes commands relating to Shares.
API version history:
1.0 - Initial version.
1.1 - Failover support.
- Unshare filesystem completely after last securityContext
is removed.
- Moved all http/url code to jsonrpc.
- Manage existing support.
- Revert to snapshot support.
"""
driver_prefix = 'nexenta'
@ -56,21 +64,34 @@ class NexentaNasDriver(driver.ShareDriver):
reason=_('Nexenta configuration missing.'))
self.nef = None
self.nef_protocol = self.configuration.nexenta_rest_protocol
self.nef_host = self.configuration.nexenta_host
self.verify_ssl = self.configuration.nexenta_ssl_cert_verify
self.nas_host = self.configuration.nexenta_nas_host
self.nef_port = self.configuration.nexenta_rest_port
self.nef_user = self.configuration.nexenta_user
self.nef_password = self.configuration.nexenta_password
self.pool_name = self.configuration.nexenta_pool
self.fs_prefix = self.configuration.nexenta_nfs_share
self.parent_fs = self.configuration.nexenta_folder
self.storage_protocol = 'NFS'
self.nfs_mount_point_base = self.configuration.nexenta_mount_point_base
self.dataset_compression = (
self.configuration.nexenta_dataset_compression)
self.provisioned_capacity = 0
@property
def storage_protocol(self):
protocol = ''
if self.configuration.nexenta_nfs:
protocol = 'NFS'
else:
msg = _('At least 1 storage protocol must be enabled.')
raise exception.NexentaException(msg)
return protocol
@property
def root_path(self):
return posixpath.join(self.pool_name, self.parent_fs)
@property
def share_backend_name(self):
if not hasattr(self, '_share_backend_name'):
@ -83,196 +104,292 @@ class NexentaNasDriver(driver.ShareDriver):
return self._share_backend_name
def do_setup(self, context):
"""Any initialization the nexenta nas driver does while starting."""
if self.nef_protocol == 'auto':
protocol = 'https'
else:
protocol = self.nef_protocol
self.nef = jsonrpc.NexentaJSONProxy(
protocol, self.nef_host, self.nef_port, self.nef_user,
self.nef_password)
self.nef = jsonrpc.NefProxy(self.storage_protocol,
self.root_path,
self.configuration)
def check_for_setup_error(self):
"""Verify that the volume for our folder exists.
:raise: :py:exc:`LookupError`
"""
url = 'storage/pools/{}'.format(self.pool_name)
if not self.nef.get(url):
raise LookupError(
_("Pool {} does not exist in Nexenta Store appliance").format(
self.pool_name))
url = 'storage/pools/{}/filesystems/{}'.format(self.pool_name,
self.fs_prefix)
if not self.nef.get(url):
raise LookupError(
_("filesystem {} does not exist in Nexenta Store "
"appliance").format(self.fs_prefix))
path = '/'.join((self.pool_name, self.fs_prefix))
shared = False
response = self.nef.get('nas/nfs')
for share in response['data']:
if share.get('filesystem') == path:
shared = True
break
if not shared:
raise LookupError(_(
"Dataset {} is not shared in Nexenta Store appliance").format(
path))
"""Check root filesystem, NFS service and NFS share."""
filesystem = self.nef.filesystems.get(self.root_path)
if filesystem['mountPoint'] == 'none':
message = (_('NFS root filesystem %(path)s is not writable')
% {'path': filesystem['mountPoint']})
raise jsonrpc.NefException(code='ENOENT', message=message)
if not filesystem['isMounted']:
message = (_('NFS root filesystem %(path)s is not mounted')
% {'path': filesystem['mountPoint']})
raise jsonrpc.NefException(code='ENOTDIR', message=message)
payload = {}
if filesystem['nonBlockingMandatoryMode']:
payload['nonBlockingMandatoryMode'] = False
if filesystem['smartCompression']:
payload['smartCompression'] = False
if payload:
self.nef.filesystems.set(self.root_path, payload)
service = self.nef.services.get('nfs')
if service['state'] != 'online':
message = (_('NFS server service is not online: %(state)s')
% {'state': service['state']})
raise jsonrpc.NefException(code='ESRCH', message=message)
self._get_provisioned_capacity()
def _get_provisioned_capacity(self):
path = '%(pool)s/%(fs)s' % {
'pool': self.pool_name, 'fs': self.fs_prefix}
url = 'storage/filesystems?parent=%s' % path
fs_list = self.nef.get(url)['data']
for fs in fs_list:
if fs['path'] != path:
self.provisioned_capacity += fs['quotaSize'] / units.Gi
payload = {'fields': 'referencedQuotaSize'}
self.provisioned_capacity += self.nef.filesystems.get(
self.root_path, payload)['referencedQuotaSize']
def ensure_share(self, context, share, share_server=None):
pass
def create_share(self, context, share, share_server=None):
"""Create a share."""
LOG.debug('Creating share: %s.', share['name'])
data = {
'recordSize': 4 * units.Ki,
LOG.debug('Creating share: %s.', self._get_share_name(share))
dataset_path = self._get_dataset_path(share)
size = int(share['size'] * units.Gi * ZFS_MULTIPLIER)
payload = {
'recordSize': self.configuration.nexenta_dataset_record_size,
'compressionMode': self.dataset_compression,
'name': '/'.join((self.fs_prefix, share['name'])),
'quotaSize': share['size'] * units.Gi,
'path': dataset_path,
'referencedQuotaSize': size,
'nonBlockingMandatoryMode': False
}
if not self.configuration.nexenta_thin_provisioning:
data['reservationSize'] = share['size'] * units.Gi
url = 'storage/pools/{}/filesystems'.format(self.pool_name)
self.nef.post(url, data)
location = {
'path': '{}:/{}/{}/{}'.format(self.nef_host, self.pool_name,
self.fs_prefix, share['name'])
}
payload['referencedReservationSize'] = size
self.nef.filesystems.create(payload)
try:
self._add_permission(share['name'])
except exception.NexentaException:
mount_path = self._mount_filesystem(share)
except jsonrpc.NefException as create_error:
try:
self.delete_share(None, share)
except exception.NexentaException as exc:
LOG.warning(
"Cannot destroy created filesystem: %(vol)s/%(folder)s, "
"exception: %(exc)s",
{'vol': self.pool_name, 'folder': '/'.join(
(self.fs_prefix, share['name'])), 'exc': exc})
raise
payload = {'force': True}
self.nef.filesystems.delete(dataset_path, payload)
except jsonrpc.NefException as delete_error:
LOG.debug('Failed to delete share %(path)s: %(error)s',
{'path': dataset_path, 'error': delete_error})
raise create_error
self.provisioned_capacity += share['size']
location = {
'path': mount_path,
'id': self._get_share_name(share)
}
return [location]
def _mount_filesystem(self, share):
"""Ensure that filesystem is activated and mounted on the host."""
dataset_path = self._get_dataset_path(share)
payload = {'fields': 'mountPoint,isMounted'}
filesystem = self.nef.filesystems.get(dataset_path, payload)
if filesystem['mountPoint'] == 'none':
payload = {'datasetName': dataset_path}
self.nef.hpr.activate(payload)
filesystem = self.nef.filesystems.get(dataset_path, payload)
elif not filesystem['isMounted']:
self.nef.filesystems.mount(dataset_path)
return '%s:%s' % (self.nas_host, filesystem['mountPoint'])
def create_share_from_snapshot(self, context, share, snapshot,
share_server=None):
"""Is called to create share from snapshot."""
LOG.debug('Creating share from snapshot %s.', snapshot['name'])
url = ('storage/pools/%(pool)s/'
'filesystems/%(fs)s/snapshots/%(snap)s/clone') % {
'pool': self.pool_name,
'fs': PATH_DELIMITER.join(
(self.fs_prefix, snapshot['share_name'])),
'snap': snapshot['name']}
location = {
'path': '{}:/{}/{}/{}'.format(self.nef_host, self.pool_name,
self.fs_prefix, share['name'])
}
path = '/'.join((self.pool_name, self.fs_prefix, share['name']))
data = {
'targetPath': path,
'quotaSize': share['size'] * units.Gi,
'recordSize': 4 * units.Ki,
snapshot_path = self._get_snapshot_path(snapshot)
LOG.debug('Creating share from snapshot %s.', snapshot_path)
clone_path = self._get_dataset_path(share)
size = int(share['size'] * units.Gi * ZFS_MULTIPLIER)
payload = {
'targetPath': clone_path,
'referencedQuotaSize': size,
'recordSize': self.configuration.nexenta_dataset_record_size,
'compressionMode': self.dataset_compression,
'nonBlockingMandatoryMode': False
}
if not self.configuration.nexenta_thin_provisioning:
data['reservationSize'] = share['size'] * units.Gi
self.nef.post(url, data)
try:
self._add_permission(share['name'])
except exception.NexentaException:
LOG.exception(
('Failed to add permissions for %s'), share['name'])
try:
self.delete_share(None, share)
except exception.NexentaException:
LOG.warning("Cannot destroy cloned filesystem: "
"%(vol)s/%(filesystem)s",
{'vol': self.pool_name,
'filesystem': '/'.join(
(self.fs_prefix, share['name']))})
raise
payload['referencedReservationSize'] = size
self.nef.snapshots.clone(snapshot_path, payload)
self._remount_filesystem(clone_path)
self.provisioned_capacity += share['size']
try:
mount_path = self._mount_filesystem(share)
except jsonrpc.NefException as create_error:
try:
payload = {'force': True}
self.nef.filesystems.delete(clone_path, payload)
except jsonrpc.NefException as delete_error:
LOG.debug('Failed to delete share %(path)s: %(error)s',
{'path': clone_path, 'error': delete_error})
raise create_error
location = {
'path': mount_path,
'id': self._get_share_name(share)
}
return [location]
def _remount_filesystem(self, clone_path):
"""Workaround for NEF bug: cloned share has offline NFS status"""
self.nef.filesystems.unmount(clone_path)
self.nef.filesystems.mount(clone_path)
def _get_dataset_path(self, share):
share_name = self._get_share_name(share)
return posixpath.join(self.root_path, share_name)
def _get_share_name(self, share):
"""Get share name with share name prefix."""
return ('%(prefix)s%(share_id)s' % {
'prefix': self.configuration.nexenta_share_name_prefix,
'share_id': share['share_id']})
def _get_snapshot_path(self, snapshot):
"""Return ZFS snapshot path for the snapshot."""
snapshot_id = (
snapshot['snapshot_id'] or snapshot['share_group_snapshot_id'])
share = snapshot.get('share') or snapshot.get('share_instance')
fs_path = self._get_dataset_path(share)
return '%s@snapshot-%s' % (fs_path, snapshot_id)
def delete_share(self, context, share, share_server=None):
"""Delete a share."""
LOG.debug('Deleting share: %s.', share['name'])
url = 'storage/pools/%(pool)s/filesystems/%(fs)s' % {
'pool': self.pool_name,
'fs': PATH_DELIMITER.join([self.fs_prefix, share['name']]),
}
self.nef.delete(url)
LOG.debug('Deleting share: %s.', self._get_share_name(share))
share_path = self._get_dataset_path(share)
delete_payload = {'force': True, 'snapshots': True}
try:
self.nef.filesystems.delete(share_path, delete_payload)
except jsonrpc.NefException as error:
if error.code != 'EEXIST':
raise error
snapshots_tree = {}
snapshots_payload = {'parent': share_path, 'fields': 'path'}
snapshots = self.nef.snapshots.list(snapshots_payload)
for snapshot in snapshots:
clones_payload = {'fields': 'clones,creationTxg'}
data = self.nef.snapshots.get(snapshot['path'], clones_payload)
if data['clones']:
snapshots_tree[data['creationTxg']] = data['clones'][0]
if snapshots_tree:
clone_path = snapshots_tree[max(snapshots_tree)]
self.nef.filesystems.promote(clone_path)
self.nef.filesystems.delete(share_path, delete_payload)
self.provisioned_capacity -= share['size']
def extend_share(self, share, new_size, share_server=None):
"""Extends a share."""
LOG.debug(
'Extending share: %(name)s to %(size)sG.', (
{'name': share['name'], 'size': new_size}))
self._set_quota(share['name'], new_size)
{'name': self._get_share_name(share), 'size': new_size}))
self._set_quota(share, new_size)
if not self.configuration.nexenta_thin_provisioning:
self._set_reservation(share, new_size)
self.provisioned_capacity += (new_size - share['size'])
def shrink_share(self, share, new_size, share_server=None):
"""Shrinks size of existing share."""
LOG.debug(
'Shrinking share: %(name)s to %(size)sG.', {
'name': share['name'], 'size': new_size})
url = 'storage/pools/{}/filesystems/{}%2F{}'.format(self.pool_name,
self.fs_prefix,
share['name'])
used = self.nef.get(url)['bytesUsed'] / units.Gi
'name': self._get_share_name(share), 'size': new_size})
share_path = self._get_dataset_path(share)
share_data = self.nef.filesystems.get(share_path)
used = share_data['bytesUsedBySelf'] / units.Gi
if used > new_size:
raise exception.ShareShrinkingPossibleDataLoss(
share_id=share['id'])
self._set_quota(share['name'], new_size)
share_id=self._get_share_name(share))
if not self.configuration.nexenta_thin_provisioning:
self._set_reservation(share, new_size)
self._set_quota(share, new_size)
self.provisioned_capacity += (share['size'] - new_size)
def create_snapshot(self, context, snapshot, share_server=None):
"""Create a snapshot."""
LOG.debug('Creating a snapshot of share: %s.', snapshot['share_name'])
url = 'storage/pools/%(pool)s/filesystems/%(fs)s/snapshots' % {
'pool': self.pool_name,
'fs': PATH_DELIMITER.join(
(self.fs_prefix, snapshot['share_name'])),
}
data = {'name': snapshot['name']}
self.nef.post(url, data)
snapshot_path = self._get_snapshot_path(snapshot)
LOG.debug('Creating snapshot: %s.', snapshot_path)
payload = {'path': snapshot_path}
self.nef.snapshots.create(payload)
def delete_snapshot(self, context, snapshot, share_server=None):
"""Delete a snapshot."""
LOG.debug('Deleting a snapshot: %(shr_name)s@%(snap_name)s.', {
'shr_name': snapshot['share_name'],
'snap_name': snapshot['name']})
"""Deletes a snapshot.
url = ('storage/pools/%(pool)s/filesystems/%(fs)s/snapshots/'
'%(snap)s') % {'pool': self.pool_name,
'fs': PATH_DELIMITER.join(
(self.fs_prefix, snapshot['share_name'])),
'snap': snapshot['name']}
try:
self.nef.delete(url)
except exception.NexentaException as e:
if e.kwargs['code'] == 'ENOENT':
LOG.warning(
'snapshot %(name)s not found, response: %(msg)s', {
'name': snapshot['name'], 'msg': e.msg})
:param snapshot: snapshot reference
"""
snapshot_path = self._get_snapshot_path(snapshot)
LOG.debug('Deleting snapshot: %s.', snapshot_path)
payload = {'defer': True}
self.nef.snapshots.delete(snapshot_path, payload)
def revert_to_snapshot(self, context, snapshot, share_access_rules,
snapshot_access_rules, share_server=None):
"""Reverts a share (in place) to the specified snapshot.
Does not delete the share snapshot. The share and snapshot must both
be 'available' for the restore to be attempted. The snapshot must be
the most recent one taken by Manila; the API layer performs this check
so the driver doesn't have to.
The share must be reverted in place to the contents of the snapshot.
Application admins should quiesce or otherwise prepare the application
for the shared file system contents to change suddenly.
:param context: Current context
:param snapshot: The snapshot to be restored
:param share_access_rules: List of all access rules for the affected
share
:param snapshot_access_rules: List of all access rules for the affected
snapshot
:param share_server: Optional -- Share server model or None
"""
snapshot_path = self._get_snapshot_path(snapshot).split('@')[1]
LOG.debug('Reverting to snapshot: %s.', snapshot_path)
share_path = self._get_dataset_path(snapshot['share'])
payload = {'snapshot': snapshot_path}
self.nef.filesystems.rollback(share_path, payload)
def manage_existing(self, share, driver_options):
"""Brings an existing share under Manila management.
If the provided share is not valid, then raise a
ManageInvalidShare exception, specifying a reason for the failure.
If the provided share is not in a state that can be managed, such as
being replicated on the backend, the driver *MUST* raise
ManageInvalidShare exception with an appropriate message.
The share has a share_type, and the driver can inspect that and
compare against the properties of the referenced backend share.
If they are incompatible, raise a
ManageExistingShareTypeMismatch, specifying a reason for the failure.
:param share: Share model
:param driver_options: Driver-specific options provided by admin.
:return: share_update dictionary with required key 'size',
which should contain size of the share.
"""
LOG.debug('Manage share %s.', self._get_share_name(share))
export_path = share['export_locations'][0]['path']
# check that filesystem with provided export exists.
fs_path = export_path.split(':/')[1]
fs_data = self.nef.filesystems.get(fs_path)
if not fs_data:
# wrong export path, raise exception.
msg = _('Share %s does not exist on Nexenta Store appliance, '
'cannot manage.') % export_path
raise exception.NexentaException(msg)
# get dataset properties.
if fs_data['referencedQuotaSize']:
size = (fs_data['referencedQuotaSize'] / units.Gi) + 1
else:
raise
size = fs_data['bytesReferenced'] / units.Gi + 1
# rename filesystem on appliance to correlate with manila ID.
new_path = '%s/%s' % (self.root_path, self._get_share_name(share))
self.nef.filesystems.rename(fs_path, {'newPath': new_path})
# make sure quotas and reservations are correct.
if not self.configuration.nexenta_thin_provisioning:
self._set_reservation(share, size)
self._set_quota(share, size)
return {'size': size, 'export_locations': [{
'path': '%s:/%s' % (self.nas_host, new_path)
}]}
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
@ -292,68 +409,112 @@ class NexentaNasDriver(driver.ShareDriver):
:param share_server: Data structure with share server information.
Not used by this driver.
"""
LOG.debug('Updating access to share %s.', share)
LOG.debug('Updating access to share %(id)s with following access '
'rules: %(rules)s', {
'id': self._get_share_name(share),
'rules': [(
rule.get('access_type'), rule.get('access_level'),
rule.get('access_to')) for rule in access_rules]})
rw_list = []
ro_list = []
security_contexts = []
update_dict = {}
if share['share_proto'] == 'NFS':
for rule in access_rules:
if rule['access_type'].lower() != 'ip':
msg = _('Only IP access type is supported.')
raise exception.InvalidShareAccess(reason=msg)
msg = _(
'Only IP access control type is supported for NFS.')
LOG.warning(msg)
update_dict[rule['access_id']] = {
'state': 'error',
}
else:
update_dict[rule['access_id']] = {
'state': 'active',
}
if rule['access_level'] == common.ACCESS_LEVEL_RW:
rw_list.append(rule['access_to'])
else:
ro_list.append(rule['access_to'])
self._update_nfs_access(share, rw_list, ro_list)
return update_dict
def _update_nfs_access(self, share, rw_list, ro_list):
# Define allowed security context types to be able to tell whether
# the 'security_contexts' dict contains any rules at all
context_types = {'none', 'root', 'readOnlyList', 'readWriteList'}
security_contexts = {'securityModes': ['sys']}
def add_sc(addr_list, sc_type):
if sc_type not in context_types:
return
rule_list = []
def append_sc(addr_list, sc_type):
for addr in addr_list:
address_mask = addr.strip().split('/', 1)
address = address_mask[0]
ls = [{"allow": True, "etype": "network", "entity": address}]
ls = {"allow": True, "etype": "fqdn", "entity": address}
if len(address_mask) == 2:
try:
mask = int(address_mask[1])
if mask != 32:
ls[0]['mask'] = mask
except Exception:
raise exception.InvalidInput(
reason=_(
'<{}> is not a valid access parameter').format(
addr))
new_sc = {"securityModes": ["sys"]}
new_sc[sc_type] = ls
security_contexts.append(new_sc)
if 0 <= mask < 31:
ls['mask'] = mask
ls['etype'] = 'network'
rule_list.append(ls)
append_sc(rw_list, 'readWriteList')
append_sc(ro_list, 'readOnlyList')
data = {"securityContexts": security_contexts}
url = 'nas/nfs/' + PATH_DELIMITER.join(
(self.pool_name, self.fs_prefix, share['name']))
self.nef.put(url, data)
# Context type with no addresses will result in an API error
if rule_list:
security_contexts[sc_type] = rule_list
def _set_quota(self, share_name, new_size):
quota = new_size * units.Gi
data = {'quotaSize': quota}
if not self.configuration.nexenta_thin_provisioning:
data['reservationSize'] = quota
url = 'storage/pools/{}/filesystems/{}%2F{}'.format(self.pool_name,
self.fs_prefix,
share_name)
self.nef.put(url, data)
add_sc(rw_list, 'readWriteList')
add_sc(ro_list, 'readOnlyList')
payload = {'securityContexts': [security_contexts]}
share_path = self._get_dataset_path(share)
if self.nef.nfs.list({'filesystem': share_path}):
if not set(security_contexts.keys()) & context_types:
self.nef.nfs.delete(share_path)
else:
self.nef.nfs.set(share_path, payload)
else:
payload['filesystem'] = share_path
self.nef.nfs.create(payload)
payload = {
'flags': ['file_inherit', 'dir_inherit'],
'permissions': ['full_set'],
'principal': 'everyone@',
'type': 'allow'
}
self.nef.filesystems.acl(share_path, payload)
def _set_quota(self, share, new_size):
quota = int(new_size * units.Gi * ZFS_MULTIPLIER)
share_path = self._get_dataset_path(share)
payload = {'referencedQuotaSize': quota}
LOG.debug('Setting quota for dataset %s.', share_path)
self.nef.filesystems.set(share_path, payload)
def _set_reservation(self, share, new_size):
res_size = int(new_size * units.Gi * ZFS_MULTIPLIER)
share_path = self._get_dataset_path(share)
payload = {'referencedReservationSize': res_size}
self.nef.filesystems.set(share_path, payload)
def _update_share_stats(self, data=None):
super(NexentaNasDriver, self)._update_share_stats()
total, free, allocated = self._get_capacity_info()
compression = not self.dataset_compression == 'off'
data = {
'vendor_name': 'Nexenta',
'storage_protocol': self.storage_protocol,
'share_backend_name': self.share_backend_name,
'nfs_mount_point_base': self.nfs_mount_point_base,
'driver_version': VERSION,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'revert_to_snapshot_support': True,
'pools': [{
'pool_name': self.pool_name,
'compression': compression,
'total_capacity_gb': total,
'free_capacity_gb': free,
'reserved_percentage': (
@ -370,53 +531,8 @@ class NexentaNasDriver(driver.ShareDriver):
def _get_capacity_info(self):
"""Calculate available space on the NFS share."""
url = 'storage/pools/{}/filesystems/{}'.format(self.pool_name,
self.fs_prefix)
data = self.nef.get(url)
total = utils.bytes_to_gb(data['bytesAvailable'])
allocated = utils.bytes_to_gb(data['bytesUsed'])
free = total - allocated
data = self.nef.filesystems.get(self.root_path)
free = int(utils.bytes_to_gb(data['bytesAvailable']))
allocated = int(utils.bytes_to_gb(data['bytesUsed']))
total = free + allocated
return total, free, allocated
def _add_permission(self, share_name):
"""Share NFS filesystem on NexentaStor Appliance.
:param share_name: relative filesystem name to be shared
"""
LOG.debug(
'Creating RW ACE for filesystem everyone on Nexenta Store '
'for <%s> filesystem.', share_name)
url = 'storage/pools/{}/filesystems/{}/acl'.format(
self.pool_name, PATH_DELIMITER.join((self.fs_prefix, share_name)))
data = {
"type": "allow",
"principal": "everyone@",
"permissions": [
"list_directory",
"read_data",
"add_file",
"write_data",
"add_subdirectory",
"append_data",
"read_xattr",
"write_xattr",
"execute",
"delete_child",
"read_attributes",
"write_attributes",
"delete",
"read_acl",
"write_acl",
"write_owner",
"synchronize",
],
"flags": [
"file_inherit",
"dir_inherit",
],
}
self.nef.post(url, data)
LOG.debug(
'RW ACE for filesystem <%s> on Nexenta Store has been '
'successfully created.', share_name)

View File

@ -1,4 +1,4 @@
# Copyright 2016 Nexenta Systems, Inc.
# Copyright 2019 Nexenta by DDN, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -23,36 +23,68 @@
from oslo_config import cfg
nexenta_connection_opts = [
cfg.HostAddressOpt('nexenta_host',
help='IP address of Nexenta storage appliance.'),
cfg.ListOpt('nexenta_rest_addresses',
help='One or more comma delimited IP addresses for management '
'communication with NexentaStor appliance.'),
cfg.IntOpt('nexenta_rest_port',
default=8457,
default=8443,
help='Port to connect to Nexenta REST API server.'),
cfg.IntOpt('nexenta_retry_count',
default=6,
help='Number of retries for unsuccessful API calls.'),
cfg.StrOpt('nexenta_rest_protocol',
default='auto',
choices=['http', 'https', 'auto'],
help='Use http or https for REST connection (default auto).'),
cfg.BoolOpt('nexenta_use_https',
default=True,
help='Use HTTP secure protocol for NexentaStor '
'management REST API connections'),
cfg.StrOpt('nexenta_user',
default='admin',
help='User name to connect to Nexenta SA.'),
help='User name to connect to Nexenta SA.',
required=True),
cfg.StrOpt('nexenta_password',
help='Password to connect to Nexenta SA.',
required=True,
secret=True),
cfg.StrOpt('nexenta_volume',
default='volume1',
help='Volume name on NexentaStor.'),
cfg.StrOpt('nexenta_pool',
default='pool1',
required=True,
help='Pool name on NexentaStor.'),
cfg.BoolOpt('nexenta_nfs',
default=True,
help='On if share over NFS is enabled.'),
help='Defines whether share over NFS is enabled.'),
cfg.BoolOpt('nexenta_ssl_cert_verify',
default=False,
help='Defines whether the driver should check ssl cert.'),
cfg.FloatOpt('nexenta_rest_connect_timeout',
default=30,
help='Specifies the time limit (in seconds), within '
'which the connection to NexentaStor management '
'REST API server must be established'),
cfg.FloatOpt('nexenta_rest_read_timeout',
default=300,
help='Specifies the time limit (in seconds), '
'within which NexentaStor management '
'REST API server must send a response'),
cfg.FloatOpt('nexenta_rest_backoff_factor',
default=1,
help='Specifies the backoff factor to apply '
'between connection attempts to NexentaStor '
'management REST API server'),
cfg.IntOpt('nexenta_rest_retry_count',
default=5,
help='Specifies the number of times to repeat NexentaStor '
'management REST API call in case of connection errors '
'and NexentaStor appliance EBUSY or ENOENT errors'),
]
nexenta_nfs_opts = [
cfg.HostAddressOpt('nexenta_nas_host',
deprecated_name='nexenta_host',
help='Data IP address of Nexenta storage appliance.',
required=True),
cfg.StrOpt('nexenta_mount_point_base',
default='$state_path/mnt',
help='Base directory that contains NFS share mount points.'),
@ -61,6 +93,14 @@ nexenta_nfs_opts = [
nexenta_dataset_opts = [
cfg.StrOpt('nexenta_nfs_share',
default='nfs_share',
help='Parent filesystem where all the shares will be created. '
'This parameter is only used by NexentaStor4 driver.'),
cfg.StrOpt('nexenta_share_name_prefix',
help='Nexenta share name prefix.',
default='share-'),
cfg.StrOpt('nexenta_folder',
default='folder',
required=True,
help='Parent folder on NexentaStor.'),
cfg.StrOpt('nexenta_dataset_compression',
default='on',
@ -71,9 +111,14 @@ nexenta_dataset_opts = [
cfg.StrOpt('nexenta_dataset_dedupe',
default='off',
choices=['on', 'off', 'sha256', 'verify', 'sha256, verify'],
help='Deduplication value for new ZFS folders.'),
help='Deduplication value for new ZFS folders. '
'Only used by NexentaStor4 driver.'),
cfg.BoolOpt('nexenta_thin_provisioning',
default=True,
help=('If True shares will not be space guaranteed and '
'overprovisioning will be enabled.')),
cfg.IntOpt('nexenta_dataset_record_size',
default=131072,
help='Specifies a suggested block size in for files in a file '
'system. (bytes)'),
]

View File

@ -1,4 +1,4 @@
# Copyright 2016 Nexenta Systems, Inc.
# Copyright 2019 Nexenta by DDN, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
# Copyright 2016 Nexenta Systems, Inc.
# Copyright 2019 Nexenta by DDN, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,13 +19,21 @@ from mock import patch
from oslo_utils import units
from manila import context
from manila import exception
from manila.share import configuration as conf
from manila.share.drivers.nexenta.ns5 import jsonrpc
from manila.share.drivers.nexenta.ns5 import nexenta_nas
from manila import test
PATH_TO_RPC = 'manila.share.drivers.nexenta.ns5.jsonrpc.NexentaJSONProxy'
RPC_PATH = 'manila.share.drivers.nexenta.ns5.jsonrpc'
DRV_PATH = 'manila.share.drivers.nexenta.ns5.nexenta_nas.NexentaNasDriver'
DRIVER_VERSION = '1.1'
SHARE = {'share_id': 'uuid', 'size': 1, 'share_proto': 'NFS'}
SHARE_PATH = 'pool1/nfs_share/share-uuid'
SHARE2 = {'share_id': 'uuid2', 'size': 2, 'share_proto': 'NFS'}
SHARE2_PATH = 'pool1/nfs_share/share-uuid2'
SNAPSHOT = {
'snapshot_id': 'snap_id',
'share': SHARE,
'snapshot_path': '%s@%s' % (SHARE_PATH, 'snapshot-snap_id')}
@ddt.ddt
@ -34,22 +42,27 @@ class TestNexentaNasDriver(test.TestCase):
def setUp(self):
def _safe_get(opt):
return getattr(self.cfg, opt)
self.cfg = conf.Configuration(None)
self.cfg.nexenta_host = '1.1.1.1'
super(TestNexentaNasDriver, self).setUp()
self.ctx = context.get_admin_context()
self.cfg = mock.Mock()
self.mock_object(
self.cfg, 'safe_get', mock.Mock(side_effect=_safe_get))
super(TestNexentaNasDriver, self).setUp()
self.cfg.nexenta_nas_host = '1.1.1.1'
self.cfg.nexenta_rest_addresses = ['2.2.2.2']
self.ctx = context.get_admin_context()
self.cfg.nexenta_rest_port = 8080
self.cfg.nexenta_rest_protocol = 'auto'
self.cfg.nexenta_pool = 'pool1'
self.cfg.nexenta_dataset_record_size = 131072
self.cfg.reserved_share_percentage = 0
self.cfg.nexenta_nfs_share = 'nfs_share'
self.cfg.nexenta_folder = 'nfs_share'
self.cfg.nexenta_user = 'user'
self.cfg.share_backend_name = 'NexentaStor5'
self.cfg.nexenta_password = 'password'
self.cfg.nexenta_thin_provisioning = False
self.cfg.nexenta_mount_point_base = 'mnt'
self.cfg.nexenta_rest_retry_count = 3
self.cfg.nexenta_share_name_prefix = 'share-'
self.cfg.max_over_subscription_ratio = 20.0
self.cfg.enabled_share_protocols = 'NFS'
self.cfg.nexenta_mount_point_base = '$state_path/mnt'
self.cfg.nexenta_dataset_compression = 'on'
@ -57,293 +70,304 @@ class TestNexentaNasDriver(test.TestCase):
self.cfg.admin_network_config_group = (
'fake_admin_network_config_group')
self.cfg.driver_handles_share_servers = False
self.cfg.safe_get = self.fake_safe_get
self.nef_mock = mock.Mock()
self.mock_object(jsonrpc, 'NefRequest')
self.drv = nexenta_nas.NexentaNasDriver(configuration=self.cfg)
self.drv.do_setup(self.ctx)
self.mock_rpc = self.mock_class(PATH_TO_RPC)
self.pool_name = self.cfg.nexenta_pool
self.fs_prefix = self.cfg.nexenta_nfs_share
def fake_safe_get(self, key):
try:
value = getattr(self.cfg, key)
except AttributeError:
value = None
return value
def test_backend_name(self):
self.assertEqual('NexentaStor5', self.drv.share_backend_name)
@patch('%s._get_provisioned_capacity' % DRV_PATH)
def test_check_for_setup_error(self, mock_provisioned):
self.drv.nef.get.return_value = None
self.assertRaises(LookupError, self.drv.check_for_setup_error)
@patch('%s._get_provisioned_capacity' % DRV_PATH)
def test_check_for_setup_error__none(self, mock_provisioned):
self.drv.nef.get.return_value = {
'data': [{'filesystem': 'pool1/nfs_share', 'quotaSize': 1}]
@mock.patch('%s._get_provisioned_capacity' % DRV_PATH)
@mock.patch('manila.share.drivers.nexenta.ns5.'
'jsonrpc.NefServices.get')
@mock.patch('manila.share.drivers.nexenta.ns5.'
'jsonrpc.NefFilesystems.set')
@mock.patch('manila.share.drivers.nexenta.ns5.'
'jsonrpc.NefFilesystems.get')
def test_check_for_setup_error(self, get_filesystem, set_filesystem,
get_service, prov_capacity):
prov_capacity.return_value = 1
get_filesystem.return_value = {
'mountPoint': '/path/to/volume',
'nonBlockingMandatoryMode': False,
'smartCompression': False,
'isMounted': True
}
get_service.return_value = {
'state': 'online'
}
self.assertIsNone(self.drv.check_for_setup_error())
get_filesystem.assert_called_with(self.drv.root_path)
set_filesystem.assert_not_called()
get_service.assert_called_with('nfs')
get_filesystem.return_value = {
'mountPoint': '/path/to/volume',
'nonBlockingMandatoryMode': True,
'smartCompression': True,
'isMounted': True
}
set_filesystem.return_value = {}
payload = {
'nonBlockingMandatoryMode': False,
'smartCompression': False
}
self.assertIsNone(self.drv.check_for_setup_error())
get_filesystem.assert_called_with(self.drv.root_path)
set_filesystem.assert_called_with(self.drv.root_path, payload)
get_service.assert_called_with('nfs')
get_filesystem.return_value = {
'mountPoint': '/path/to/volume',
'nonBlockingMandatoryMode': False,
'smartCompression': True,
'isMounted': True
}
payload = {
'smartCompression': False
}
set_filesystem.return_value = {}
self.assertIsNone(self.drv.check_for_setup_error())
get_filesystem.assert_called_with(self.drv.root_path)
set_filesystem.assert_called_with(self.drv.root_path, payload)
get_service.assert_called_with('nfs')
get_filesystem.return_value = {
'mountPoint': '/path/to/volume',
'nonBlockingMandatoryMode': True,
'smartCompression': False,
'isMounted': True
}
payload = {
'nonBlockingMandatoryMode': False
}
set_filesystem.return_value = {}
self.assertIsNone(self.drv.check_for_setup_error())
get_filesystem.assert_called_with(self.drv.root_path)
set_filesystem.assert_called_with(self.drv.root_path, payload)
get_service.assert_called_with('nfs')
get_filesystem.return_value = {
'mountPoint': 'none',
'nonBlockingMandatoryMode': False,
'smartCompression': False,
'isMounted': False
}
self.assertRaises(jsonrpc.NefException,
self.drv.check_for_setup_error)
get_filesystem.return_value = {
'mountPoint': '/path/to/volume',
'nonBlockingMandatoryMode': False,
'smartCompression': False,
'isMounted': False
}
self.assertRaises(jsonrpc.NefException,
self.drv.check_for_setup_error)
get_service.return_value = {
'state': 'online'
}
self.assertRaises(jsonrpc.NefException,
self.drv.check_for_setup_error)
@patch('%s._get_provisioned_capacity' % DRV_PATH)
def test_check_for_setup_error__with_data(self, mock_provisioned):
self.drv.nef.get.return_value = {
'data': [{'filesystem': 'asd', 'quotaSize': 1}]}
self.assertRaises(LookupError, self.drv.check_for_setup_error)
def test__get_provisioned_capacity(self):
self.drv.nef.get.return_value = {
'data': [
{'path': 'pool1/nfs_share/123', 'quotaSize': 1 * units.Gi}]
@patch('%s.NefFilesystems.get' % RPC_PATH)
def test__get_provisioned_capacity(self, fs_get):
fs_get.return_value = {
'path': 'pool1/nfs_share/123',
'referencedQuotaSize': 1 * units.Gi
}
self.drv._get_provisioned_capacity()
self.assertEqual(1, self.drv.provisioned_capacity)
def test_create_share(self):
share = {'name': 'share', 'size': 1}
self.assertEqual(1 * units.Gi, self.drv.provisioned_capacity)
@patch('%s._mount_filesystem' % DRV_PATH)
@patch('%s.NefFilesystems.create' % RPC_PATH)
@patch('%s.NefFilesystems.delete' % RPC_PATH)
def test_create_share(self, delete_fs, create_fs, mount_fs):
mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
mount_fs.return_value = mount_path
size = int(1 * units.Gi * 1.1)
self.assertEqual(
[{
'path': '{}:/{}/{}/{}'.format(
self.cfg.nexenta_host, self.pool_name,
self.fs_prefix, share['name'])
'path': mount_path,
'id': 'share-uuid'
}],
self.drv.create_share(self.ctx, share))
self.drv.create_share(self.ctx, SHARE))
@patch('%s.delete_share' % DRV_PATH)
@patch('%s._add_permission' % DRV_PATH)
def test_create_share__error_on_add_permission(
self, add_permission_mock, delete_share):
share = {'name': 'share', 'size': 1}
add_permission_mock.side_effect = exception.NexentaException(
'An error occurred while adding permission')
delete_share.side_effect = exception.NexentaException(
'An error occurred while deleting')
self.assertRaises(
exception.NexentaException, self.drv.create_share, self.ctx, share)
def test_create_share_from_snapshot(self):
share = {'name': 'share', 'size': 1}
snapshot = {'name': 'share@first', 'share_name': 'share'}
self.assertEqual(
[{
'path': '{}:/{}/{}/{}'.format(
self.cfg.nexenta_host, self.pool_name,
self.fs_prefix, share['name'])
}],
self.drv.create_share_from_snapshot(self.ctx, share, snapshot)
)
@patch('%s.delete_share' % DRV_PATH)
@patch('%s._add_permission' % DRV_PATH)
def test_create_share_from_snapshot__add_permission_error(
self, add_permission_mock, delete_share):
share = {'name': 'share', 'size': 1}
snapshot = {'share_name': 'share', 'name': 'share@first'}
delete_share.side_effect = exception.NexentaException(
'An error occurred while deleting')
add_permission_mock.side_effect = exception.NexentaException(
'Some exception')
self.assertRaises(
exception.NexentaException, self.drv.create_share_from_snapshot,
self.ctx, share, snapshot)
@patch('%s._add_permission' % DRV_PATH)
def test_create_share_from_snapshot__add_permission_error_error(
self, add_permission_mock):
share = {'name': 'share', 'size': 1}
snapshot = {'share_name': 'share', 'name': 'share@first'}
add_permission_mock.side_effect = exception.NexentaException(
'Some exception')
self.drv.nef.delete.side_effect = exception.NexentaException(
'Some exception 2')
self.assertRaises(
exception.NexentaException, self.drv.create_share_from_snapshot,
self.ctx, share, snapshot)
def test_delete_share(self):
share = {'name': 'share', 'size': 1}
self.assertIsNone(self.drv.delete_share(self.ctx, share))
def test_extend_share(self):
share = {'name': 'share', 'size': 1}
new_size = 2
quota = new_size * units.Gi
data = {
'reservationSize': quota,
'quotaSize': quota,
payload = {
'recordSize': 131072,
'compressionMode': self.cfg.nexenta_dataset_compression,
'path': SHARE_PATH,
'referencedQuotaSize': size,
'nonBlockingMandatoryMode': False,
'referencedReservationSize': size
}
url = 'storage/pools/{}/filesystems/{}%2F{}'.format(
self.pool_name, self.fs_prefix, share['name'])
self.drv.nef.filesystems.create.assert_called_with(payload)
self.drv.extend_share(share, new_size)
mount_fs.side_effect = jsonrpc.NefException('some error')
self.assertRaises(jsonrpc.NefException,
self.drv.create_share, self.ctx, SHARE)
delete_payload = {'force': True}
self.drv.nef.filesystems.delete.assert_called_with(
SHARE_PATH, delete_payload)
self.drv.nef.post.assert_called_with(url, data)
@patch('%s.NefFilesystems.promote' % RPC_PATH)
@patch('%s.NefSnapshots.get' % RPC_PATH)
@patch('%s.NefSnapshots.list' % RPC_PATH)
@patch('%s.NefFilesystems.delete' % RPC_PATH)
def test_delete_share(self, fs_delete, snap_list, snap_get, fs_promote):
delete_payload = {'force': True, 'snapshots': True}
snapshots_payload = {'parent': SHARE_PATH, 'fields': 'path'}
clones_payload = {'fields': 'clones,creationTxg'}
clone_path = '%s:/%s' % (self.cfg.nexenta_nas_host, 'path_to_fs')
fs_delete.side_effect = [
jsonrpc.NefException({
'message': 'some_error',
'code': 'EEXIST'}),
None]
snap_list.return_value = [{'path': '%s@snap1' % SHARE_PATH}]
snap_get.return_value = {'clones': [clone_path], 'creationTxg': 1}
self.assertIsNone(self.drv.delete_share(self.ctx, SHARE))
fs_delete.assert_called_with(SHARE_PATH, delete_payload)
fs_promote.assert_called_with(clone_path)
snap_get.assert_called_with('%s@snap1' % SHARE_PATH, clones_payload)
snap_list.assert_called_with(snapshots_payload)
def test_shrink_share(self):
share = {'name': 'share', 'size': 2}
new_size = 1
quota = new_size * units.Gi
data = {
'reservationSize': quota,
'quotaSize': quota
@patch('%s.NefFilesystems.mount' % RPC_PATH)
@patch('%s.NefFilesystems.get' % RPC_PATH)
def test_mount_filesystem(self, fs_get, fs_mount):
mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
fs_get.return_value = {
'mountPoint': '/%s' % SHARE_PATH, 'isMounted': False}
self.assertEqual(mount_path, self.drv._mount_filesystem(SHARE))
self.drv.nef.filesystems.mount.assert_called_with(SHARE_PATH)
@patch('%s.NefHpr.activate' % RPC_PATH)
@patch('%s.NefFilesystems.mount' % RPC_PATH)
@patch('%s.NefFilesystems.get' % RPC_PATH)
def test_mount_filesystem_with_activate(
self, fs_get, fs_mount, hpr_activate):
mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
fs_get.side_effect = [
{'mountPoint': 'none', 'isMounted': False},
{'mountPoint': '/%s' % SHARE_PATH, 'isMounted': False}]
self.assertEqual(mount_path, self.drv._mount_filesystem(SHARE))
payload = {'datasetName': SHARE_PATH}
self.drv.nef.hpr.activate.assert_called_once_with(payload)
@patch('%s.NefFilesystems.mount' % RPC_PATH)
@patch('%s.NefFilesystems.unmount' % RPC_PATH)
def test_remount_filesystem(self, fs_unmount, fs_mount):
self.drv._remount_filesystem(SHARE_PATH)
fs_unmount.assert_called_once_with(SHARE_PATH)
fs_mount.assert_called_once_with(SHARE_PATH)
def parse_fqdn(self, fqdn):
address_mask = fqdn.strip().split('/', 1)
address = address_mask[0]
ls = {"allow": True, "etype": "fqdn", "entity": address}
if len(address_mask) == 2:
ls['mask'] = address_mask[1]
ls['etype'] = 'network'
return ls
@ddt.data({'key': 'value'}, {})
@patch('%s.NefNfs.list' % RPC_PATH)
@patch('%s.NefNfs.set' % RPC_PATH)
@patch('%s.NefFilesystems.acl' % RPC_PATH)
def test_update_nfs_access(self, acl, nfs_set, nfs_list, list_data):
security_contexts = {'securityModes': ['sys']}
nfs_list.return_value = list_data
rw_list = ['1.1.1.1/24', '2.2.2.2']
ro_list = ['3.3.3.3', '4.4.4.4/30']
security_contexts['readWriteList'] = []
security_contexts['readOnlyList'] = []
for fqdn in rw_list:
ls = self.parse_fqdn(fqdn)
if ls.get('mask'):
ls['mask'] = int(ls['mask'])
security_contexts['readWriteList'].append(ls)
for fqdn in ro_list:
ls = self.parse_fqdn(fqdn)
if ls.get('mask'):
ls['mask'] = int(ls['mask'])
security_contexts['readOnlyList'].append(ls)
self.assertIsNone(self.drv._update_nfs_access(SHARE, rw_list, ro_list))
payload = {
'flags': ['file_inherit', 'dir_inherit'],
'permissions': ['full_set'],
'principal': 'everyone@',
'type': 'allow'
}
url = 'storage/pools/{}/filesystems/{}%2F{}'.format(
self.pool_name, self.fs_prefix, share['name'])
self.drv.nef.get.return_value = {'bytesUsed': 512}
self.drv.shrink_share(share, new_size)
self.drv.nef.post.assert_called_with(url, data)
def test_create_snapshot(self):
snapshot = {'share_name': 'share', 'name': 'share@first'}
url = 'storage/pools/%(pool)s/filesystems/%(fs)s/snapshots' % {
'pool': self.pool_name,
'fs': nexenta_nas.PATH_DELIMITER.join(
[self.fs_prefix, snapshot['share_name']])
}
data = {'name': snapshot['name']}
self.drv.create_snapshot(self.ctx, snapshot)
self.drv.nef.post.assert_called_with(url, data)
def test_delete_snapshot(self):
self.mock_rpc.side_effect = exception.NexentaException(
'err', code='ENOENT')
snapshot = {'share_name': 'share', 'name': 'share@first'}
self.assertIsNone(self.drv.delete_snapshot(self.ctx, snapshot))
self.mock_rpc.side_effect = exception.NexentaException(
'err', code='somecode')
self.assertRaises(
exception.NexentaException, self.drv.delete_snapshot,
self.ctx, snapshot)
def build_access_security_context(self, level, ip, mask=None):
ls = [{"allow": True, "etype": "network", "entity": ip}]
if mask is not None:
ls[0]['mask'] = mask
new_sc = {
"securityModes": ["sys"],
}
if level == 'rw':
new_sc['readWriteList'] = ls
elif level == 'ro':
new_sc['readOnlyList'] = ls
self.drv.nef.filesystems.acl.assert_called_with(SHARE_PATH, payload)
payload = {'securityContexts': [security_contexts]}
if list_data:
self.drv.nef.nfs.set.assert_called_with(SHARE_PATH, payload)
else:
raise exception.ManilaException('Wrong access level')
return new_sc
payload['filesystem'] = SHARE_PATH
self.drv.nef.nfs.create.assert_called_with(payload)
def test_update_access__unsupported_access_type(self):
share = {'name': 'share', 'size': 1}
access = {
'access_type': 'group',
'access_to': 'ordinary_users',
'access_level': 'rw'
}
def test_update_nfs_access_bad_mask(self):
security_contexts = {'securityModes': ['sys']}
rw_list = ['1.1.1.1/24', '2.2.2.2/1a']
ro_list = ['3.3.3.3', '4.4.4.4/30']
security_contexts['readWriteList'] = []
security_contexts['readOnlyList'] = []
for fqdn in rw_list:
security_contexts['readWriteList'].append(self.parse_fqdn(fqdn))
for fqdn in ro_list:
security_contexts['readOnlyList'].append(self.parse_fqdn(fqdn))
self.assertRaises(exception.InvalidShareAccess, self.drv.update_access,
self.ctx, share, [access], None, None)
self.assertRaises(ValueError, self.drv._update_nfs_access,
SHARE, rw_list, ro_list)
def test_update_access__cidr(self):
share = {'name': 'share', 'size': 1}
access = {
'access_type': 'ip',
'access_to': '1.1.1.1/24',
'access_level': 'rw'
}
url = 'nas/nfs/' + nexenta_nas.PATH_DELIMITER.join(
(self.pool_name, self.fs_prefix, share['name']))
self.drv.nef.get.return_value = {}
self.drv.update_access(self.ctx, share, [access], None, None)
self.drv.nef.put.assert_called_with(
url, {'securityContexts': [
self.build_access_security_context('rw', '1.1.1.1', 24)]})
def test_update_access__ip(self):
share = {'name': 'share', 'size': 1}
@patch('%s._update_nfs_access' % DRV_PATH)
def test_update_access__ip_rw(self, update_nfs_access):
access = {
'access_type': 'ip',
'access_to': '1.1.1.1',
'access_level': 'rw'
'access_level': 'rw',
'access_id': 'fake_id'
}
url = 'nas/nfs/' + nexenta_nas.PATH_DELIMITER.join(
(self.pool_name, self.fs_prefix, share['name']))
self.drv.nef.get.return_value = {}
self.drv.update_access(self.ctx, share, [access], None, None)
self.assertEqual(
{'fake_id': {'state': 'active'}},
self.drv.update_access(
self.ctx, SHARE, [access], None, None))
self.drv._update_nfs_access.assert_called_with(SHARE, ['1.1.1.1'], [])
self.drv.nef.put.assert_called_with(
url, {'securityContexts': [
self.build_access_security_context('rw', '1.1.1.1')]})
@ddt.data('rw', 'ro')
def test_update_access__cidr_wrong_mask(self, access_level):
share = {'name': 'share', 'size': 1}
@patch('%s._update_nfs_access' % DRV_PATH)
def test_update_access__ip_ro(self, update_nfs_access):
access = {
'access_type': 'ip',
'access_to': '1.1.1.1/aa',
'access_to': '1.1.1.1',
'access_level': 'ro',
'access_id': 'fake_id'
}
expected = {'fake_id': {'state': 'active'}}
self.assertEqual(
expected, self.drv.update_access(
self.ctx, SHARE, [access], None, None))
self.drv._update_nfs_access.assert_called_with(SHARE, [], ['1.1.1.1'])
@ddt.data('rw', 'ro')
def test_update_access__not_ip(self, access_level):
access = {
'access_type': 'username',
'access_to': 'some_user',
'access_level': access_level,
'access_id': 'fake_id'
}
self.assertRaises(exception.InvalidInput, self.drv.update_access,
self.ctx, share, [access], None, None)
def test_update_access__one_ip_ro_add_rule_to_existing(self):
share = {'name': 'share', 'size': 1}
access = [
{
'access_type': 'ip',
'access_to': '5.5.5.5',
'access_level': 'ro'
},
{
'access_type': 'ip',
'access_to': '1.1.1.1/24',
'access_level': 'rw'
}
]
url = 'nas/nfs/' + nexenta_nas.PATH_DELIMITER.join(
(self.pool_name, self.fs_prefix, share['name']))
sc = self.build_access_security_context('rw', '1.1.1.1', 24)
self.drv.nef.get.return_value = {'securityContexts': [sc]}
self.drv.update_access(self.ctx, share, access, None, None)
self.drv.nef.put.assert_called_with(
url, {'securityContexts': [
sc, self.build_access_security_context('ro', '5.5.5.5')]})
def test_update_access__one_ip_ro_add_rule_to_existing_wrong_mask(
self):
share = {'name': 'share', 'size': 1}
access = [
{
'access_type': 'ip',
'access_to': '5.5.5.5/aa',
'access_level': 'ro'
},
{
'access_type': 'ip',
'access_to': '1.1.1.1/24',
'access_level': 'rw'
}
]
sc = self.build_access_security_context('rw', '1.1.1.1', 24)
self.drv.nef.get.return_value = {'securityContexts': [sc]}
self.assertRaises(exception.InvalidInput, self.drv.update_access,
self.ctx, share, access, None, None)
expected = {'fake_id': {'state': 'error'}}
self.assertEqual(expected, self.drv.update_access(
self.ctx, SHARE, [access], None, None))
@patch('%s._get_capacity_info' % DRV_PATH)
@patch('manila.share.driver.ShareDriver._update_share_stats')
@ -353,9 +377,13 @@ class TestNexentaNasDriver(test.TestCase):
'vendor_name': 'Nexenta',
'storage_protocol': 'NFS',
'nfs_mount_point_base': self.cfg.nexenta_mount_point_base,
'driver_version': '1.0',
'create_share_from_snapshot_support': True,
'revert_to_snapshot_support': True,
'snapshot_support': True,
'driver_version': DRIVER_VERSION,
'share_backend_name': self.cfg.share_backend_name,
'pools': [{
'compression': True,
'pool_name': 'pool1',
'total_capacity_gb': 100,
'free_capacity_gb': 90,
@ -373,6 +401,134 @@ class TestNexentaNasDriver(test.TestCase):
def test_get_capacity_info(self):
self.drv.nef.get.return_value = {
'bytesAvailable': 10 * units.Gi, 'bytesUsed': 1 * units.Gi}
'bytesAvailable': 9 * units.Gi, 'bytesUsed': 1 * units.Gi}
self.assertEqual((10, 9, 1), self.drv._get_capacity_info())
@patch('%s._set_reservation' % DRV_PATH)
@patch('%s._set_quota' % DRV_PATH)
@patch('%s.NefFilesystems.rename' % RPC_PATH)
@patch('%s.NefFilesystems.get' % RPC_PATH)
def test_manage_existing(self, fs_get, fs_rename, set_res, set_quota):
fs_get.return_value = {'referencedQuotaSize': 1073741824}
old_path = '%s:/%s' % (self.cfg.nexenta_nas_host, 'path_to_fs')
new_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
SHARE['export_locations'] = [{'path': old_path}]
expected = {'size': 2, 'export_locations': [{
'path': new_path
}]}
self.assertEqual(expected, self.drv.manage_existing(SHARE, None))
fs_rename.assert_called_with('path_to_fs', {'newPath': SHARE_PATH})
set_res.assert_called_with(SHARE, 2)
set_quota.assert_called_with(SHARE, 2)
@patch('%s.NefSnapshots.create' % RPC_PATH)
def test_create_snapshot(self, snap_create):
self.assertIsNone(self.drv.create_snapshot(self.ctx, SNAPSHOT))
snap_create.assert_called_once_with({
'path': SNAPSHOT['snapshot_path']})
@patch('%s.NefSnapshots.delete' % RPC_PATH)
def test_delete_snapshot(self, snap_delete):
self.assertIsNone(self.drv.delete_snapshot(self.ctx, SNAPSHOT))
payload = {'defer': True}
snap_delete.assert_called_once_with(
SNAPSHOT['snapshot_path'], payload)
@patch('%s._mount_filesystem' % DRV_PATH)
@patch('%s._remount_filesystem' % DRV_PATH)
@patch('%s.NefFilesystems.delete' % RPC_PATH)
@patch('%s.NefSnapshots.clone' % RPC_PATH)
def test_create_share_from_snapshot(
self, snap_clone, fs_delete, remount_fs, mount_fs):
mount_fs.return_value = 'mount_path'
location = {
'path': 'mount_path',
'id': 'share-uuid2'
}
self.assertEqual([location], self.drv.create_share_from_snapshot(
self.ctx, SHARE2, SNAPSHOT))
size = int(SHARE2['size'] * units.Gi * 1.1)
payload = {
'targetPath': SHARE2_PATH,
'referencedQuotaSize': size,
'recordSize': self.cfg.nexenta_dataset_record_size,
'compressionMode': self.cfg.nexenta_dataset_compression,
'nonBlockingMandatoryMode': False,
'referencedReservationSize': size
}
snap_clone.assert_called_once_with(SNAPSHOT['snapshot_path'], payload)
@patch('%s._mount_filesystem' % DRV_PATH)
@patch('%s._remount_filesystem' % DRV_PATH)
@patch('%s.NefFilesystems.delete' % RPC_PATH)
@patch('%s.NefSnapshots.clone' % RPC_PATH)
def test_create_share_from_snapshot_error(
self, snap_clone, fs_delete, remount_fs, mount_fs):
fs_delete.side_effect = jsonrpc.NefException('delete error')
mount_fs.side_effect = jsonrpc.NefException('create error')
self.assertRaises(
jsonrpc.NefException,
self.drv.create_share_from_snapshot, self.ctx, SHARE2, SNAPSHOT)
size = int(SHARE2['size'] * units.Gi * 1.1)
payload = {
'targetPath': SHARE2_PATH,
'referencedQuotaSize': size,
'recordSize': self.cfg.nexenta_dataset_record_size,
'compressionMode': self.cfg.nexenta_dataset_compression,
'nonBlockingMandatoryMode': False,
'referencedReservationSize': size
}
snap_clone.assert_called_once_with(SNAPSHOT['snapshot_path'], payload)
payload = {'force': True}
fs_delete.assert_called_once_with(SHARE2_PATH, payload)
@patch('%s.NefFilesystems.rollback' % RPC_PATH)
def test_revert_to_snapshot(self, fs_rollback):
self.assertIsNone(self.drv.revert_to_snapshot(
self.ctx, SNAPSHOT, [], []))
payload = {'snapshot': 'snapshot-snap_id'}
fs_rollback.assert_called_once_with(
SHARE_PATH, payload)
@patch('%s._set_reservation' % DRV_PATH)
@patch('%s._set_quota' % DRV_PATH)
def test_extend_share(self, set_quota, set_reservation):
self.assertIsNone(self.drv.extend_share(
SHARE, 2))
set_quota.assert_called_once_with(
SHARE, 2)
set_reservation.assert_called_once_with(
SHARE, 2)
@patch('%s.NefFilesystems.get' % RPC_PATH)
@patch('%s._set_reservation' % DRV_PATH)
@patch('%s._set_quota' % DRV_PATH)
def test_shrink_share(self, set_quota, set_reservation, fs_get):
fs_get.return_value = {
'bytesUsedBySelf': 0.5 * units.Gi
}
self.assertIsNone(self.drv.shrink_share(
SHARE2, 1))
set_quota.assert_called_once_with(
SHARE2, 1)
set_reservation.assert_called_once_with(
SHARE2, 1)
@patch('%s.NefFilesystems.set' % RPC_PATH)
def test_set_quota(self, fs_set):
quota = int(2 * units.Gi * 1.1)
payload = {'referencedQuotaSize': quota}
self.assertIsNone(self.drv._set_quota(
SHARE, 2))
fs_set.assert_called_once_with(SHARE_PATH, payload)
@patch('%s.NefFilesystems.set' % RPC_PATH)
def test_set_reservation(self, fs_set):
reservation = int(2 * units.Gi * 1.1)
payload = {'referencedReservationSize': reservation}
self.assertIsNone(self.drv._set_reservation(
SHARE, 2))
fs_set.assert_called_once_with(SHARE_PATH, payload)

View File

@ -0,0 +1,23 @@
---
features:
- Added revert to snapshot support for NexentaStor5 driver.
- Added manage existing support for NexentaStor5 driver.
upgrade:
- Added a new config option ``nexenta_ssl_cert_verify``.
This option defines whether the NexentaStor5 driver should check
ssl certificate.
- Added a new config option ``nexenta_rest_connect_timeout``. This option
specifies the time limit (in seconds), within which the connection to
NexentaStor management REST API server must be established.
- Added a new config option ``nexenta_rest_read_timeout``. This option
specifies the time limit (in seconds), within which NexentaStor
management REST API server must send a response.
- Added a new config option ``nexenta_rest_backoff_factor``. This option
specifies the backoff factor to apply between connection attempts to
NexentaStor management REST API server.
- Added a new config option ``nexenta_rest_retry_count``. This option
specifies the number of times to repeat NexentaStor management REST
API call in case of connection errors and NexentaStor appliance EBUSY
or ENOENT errors.
- Added a new config option ``nexenta_dataset_record_size``. This option
specifies a suggested block size in for files in a filesystem'