Merge "Add access-rules tests to improve the coverage"

This commit is contained in:
Jenkins 2017-01-31 11:17:39 +00:00 committed by Gerrit Code Review
commit 1a2a1c8cac
5 changed files with 196 additions and 67 deletions

View File

@ -495,6 +495,7 @@ class BaseSharesTest(test.BaseTestCase):
data = [] data = []
for d in share_data_list: for d in share_data_list:
client = d["kwargs"].pop("client", cls.shares_v2_client) client = d["kwargs"].pop("client", cls.shares_v2_client)
wait_for_status = d["kwargs"].pop("wait_for_status", True)
local_d = { local_d = {
"args": d["args"], "args": d["args"],
"kwargs": copy.deepcopy(d["kwargs"]), "kwargs": copy.deepcopy(d["kwargs"]),
@ -504,10 +505,13 @@ class BaseSharesTest(test.BaseTestCase):
*local_d["args"], **local_d["kwargs"]) *local_d["args"], **local_d["kwargs"])
local_d["cnt"] = 0 local_d["cnt"] = 0
local_d["available"] = False local_d["available"] = False
local_d["wait_for_status"] = wait_for_status
data.append(local_d) data.append(local_d)
while not all(d["available"] for d in data): while not all(d["available"] for d in data):
for d in data: for d in data:
if not d["wait_for_status"]:
d["available"] = True
if d["available"]: if d["available"]:
continue continue
client = d["kwargs"]["client"] client = d["kwargs"]["client"]
@ -704,6 +708,33 @@ class BaseSharesTest(test.BaseTestCase):
status_attr="replica_state") status_attr="replica_state")
return replica return replica
def _get_access_rule_data_from_config(self):
"""Get the first available access type/to combination from config.
This method opportunistically picks the first configured protocol
to create the share. Do not use this method in tests where you need
to test depth and breadth in the access types and access recipients.
"""
protocol = self.shares_v2_client.share_protocol
if protocol in CONF.share.enable_ip_rules_for_protocols:
access_type = "ip"
access_to = utils.rand_ip()
elif protocol in CONF.share.enable_user_rules_for_protocols:
access_type = "user"
access_to = CONF.share.username_for_user_rules
elif protocol in CONF.share.enable_cert_rules_for_protocols:
access_type = "cert"
access_to = "client3.com"
elif protocol in CONF.share.enable_cephx_rules_for_protocols:
access_type = "cephx"
access_to = "eve"
else:
message = "Unrecognized protocol and access rules configuration."
raise self.skipException(message)
return access_type, access_to
@classmethod @classmethod
def create_share_network(cls, client=None, def create_share_network(cls, client=None,
cleanup_in_class=False, **kwargs): cleanup_in_class=False, **kwargs):

View File

@ -21,7 +21,6 @@ from testtools import testcase as tc
from manila_tempest_tests.common import constants from manila_tempest_tests.common import constants
from manila_tempest_tests import share_exceptions from manila_tempest_tests import share_exceptions
from manila_tempest_tests.tests.api import base from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF CONF = config.CONF
_MIN_SUPPORTED_MICROVERSION = '2.11' _MIN_SUPPORTED_MICROVERSION = '2.11'
@ -72,9 +71,6 @@ class ReplicationTest(base.BaseSharesMixedTest):
cls.instance_id1 = cls._get_instance(cls.shares[0]) cls.instance_id1 = cls._get_instance(cls.shares[0])
cls.instance_id2 = cls._get_instance(cls.shares[1]) cls.instance_id2 = cls._get_instance(cls.shares[1])
cls.access_type = "ip"
cls.access_to = utils.rand_ip()
@classmethod @classmethod
def _get_instance(cls, share): def _get_instance(cls, share):
share_instances = cls.admin_client.get_instances_of_share(share["id"]) share_instances = cls.admin_client.get_instances_of_share(share["id"])
@ -107,24 +103,36 @@ class ReplicationTest(base.BaseSharesMixedTest):
return [replica for replica in replica_list return [replica for replica in replica_list
if replica['replica_state'] == r_state] if replica['replica_state'] == r_state]
def _verify_config_and_set_access_rule_data(self): def _verify_in_sync_replica_promotion(self, share, original_replica):
"""Verify the access rule configuration is enabled for NFS. # Verify that 'in-sync' replica has been promoted successfully
Set the data after verification. # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
""" # being promoted since it will become the 'primary'/'active' replica.
protocol = self.shares_v2_client.share_protocol replica = self.create_share_replica(share["id"], self.replica_zone,
cleanup=False)
# Wait for replica state to update after creation
self.shares_v2_client.wait_for_share_replica_status(
replica['id'], constants.REPLICATION_STATE_IN_SYNC,
status_attr='replica_state')
# Promote the first in_sync replica to active state
promoted_replica = self.promote_share_replica(replica['id'])
# Delete the demoted replica so promoted replica can be cleaned
# during the cleanup of the share.
self.addCleanup(self.delete_share_replica, original_replica['id'])
self._verify_active_replica_count(share["id"])
# Verify the replica_state for promoted replica
promoted_replica = self.shares_v2_client.get_share_replica(
promoted_replica["id"])
self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
promoted_replica["replica_state"])
# TODO(Yogi1): Add access rules for other protocols. def _check_skip_promotion_tests(self):
if not ((protocol.lower() == 'nfs') and # Check if the replication type is right for replica promotion tests
(protocol in CONF.share.enable_ip_rules_for_protocols) and if (self.replication_type
CONF.share.enable_ip_rules_for_protocols): not in constants.REPLICATION_PROMOTION_CHOICES):
message = "IP access rules are not supported for this protocol." msg = "Option backend_replication_type should be one of (%s)!"
raise self.skipException(message) raise self.skipException(
msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
access_type = "ip"
access_to = utils.rand_ip()
return access_type, access_to
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_add_delete_share_replica(self): def test_add_delete_share_replica(self):
@ -137,7 +145,7 @@ class ReplicationTest(base.BaseSharesMixedTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_add_access_rule_create_replica_delete_rule(self): def test_add_access_rule_create_replica_delete_rule(self):
# Add access rule to the share # Add access rule to the share
access_type, access_to = self._verify_config_and_set_access_rule_data() access_type, access_to = self._get_access_rule_data_from_config()
rule = self.shares_v2_client.create_access_rule( rule = self.shares_v2_client.create_access_rule(
self.shares[0]["id"], access_type, access_to, 'ro') self.shares[0]["id"], access_type, access_to, 'ro')
self.shares_v2_client.wait_for_access_rule_status( self.shares_v2_client.wait_for_access_rule_status(
@ -159,7 +167,7 @@ class ReplicationTest(base.BaseSharesMixedTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_create_replica_add_access_rule_delete_replica(self): def test_create_replica_add_access_rule_delete_replica(self):
access_type, access_to = self._verify_config_and_set_access_rule_data() access_type, access_to = self._get_access_rule_data_from_config()
# Create the replica # Create the replica
share_replica = self._verify_create_replica() share_replica = self._verify_create_replica()
@ -208,42 +216,40 @@ class ReplicationTest(base.BaseSharesMixedTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_promote_in_sync_share_replica(self): def test_promote_in_sync_share_replica(self):
# Test promote 'in_sync' share_replica to 'active' state # Test promote 'in_sync' share_replica to 'active' state
if (self.replication_type self._check_skip_promotion_tests()
not in constants.REPLICATION_PROMOTION_CHOICES):
msg = "Option backend_replication_type should be one of (%s)!"
raise self.skipException(
msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
share = self.create_shares([self.creation_data])[0] share = self.create_shares([self.creation_data])[0]
original_replica = self.shares_v2_client.list_share_replicas( original_replica = self.shares_v2_client.list_share_replicas(
share["id"])[0] share["id"])[0]
# NOTE(Yogi1): Cleanup needs to be disabled for replica that is self._verify_in_sync_replica_promotion(share, original_replica)
# being promoted since it will become the 'primary'/'active' replica.
replica = self.create_share_replica(share["id"], self.replica_zone, @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
cleanup=False) def test_add_rule_promote_share_replica_verify_rule(self):
# Wait for replica state to update after creation # Verify the access rule stays intact after share replica promotion
self.shares_v2_client.wait_for_share_replica_status( self._check_skip_promotion_tests()
replica['id'], constants.REPLICATION_STATE_IN_SYNC,
status_attr='replica_state') share = self.create_shares([self.creation_data])[0]
# Promote the first in_sync replica to active state # Add access rule
promoted_replica = self.promote_share_replica(replica['id']) access_type, access_to = self._get_access_rule_data_from_config()
# Delete the demoted replica so promoted replica can be cleaned rule = self.shares_v2_client.create_access_rule(
# during the cleanup of the share. share["id"], access_type, access_to, 'ro')
self.addCleanup(self.delete_share_replica, original_replica['id']) self.shares_v2_client.wait_for_access_rule_status(
self._verify_active_replica_count(share["id"]) share["id"], rule["id"], constants.RULE_STATE_ACTIVE)
# Verify the replica_state for promoted replica
promoted_replica = self.shares_v2_client.get_share_replica( original_replica = self.shares_v2_client.list_share_replicas(
promoted_replica["id"]) share["id"])[0]
self.assertEqual(constants.REPLICATION_STATE_ACTIVE, self._verify_in_sync_replica_promotion(share, original_replica)
promoted_replica["replica_state"])
# verify rule's values
rules_list = self.shares_v2_client.list_access_rules(share["id"])
self.assertEqual(1, len(rules_list))
self.assertEqual(access_type, rules_list[0]["access_type"])
self.assertEqual(access_to, rules_list[0]["access_to"])
self.assertEqual('ro', rules_list[0]["access_level"])
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_promote_and_promote_back(self): def test_promote_and_promote_back(self):
# Test promote back and forth between 2 share replicas # Test promote back and forth between 2 share replicas
if (self.replication_type self._check_skip_promotion_tests()
not in constants.REPLICATION_PROMOTION_CHOICES):
msg = "Option backend_replication_type should be one of (%s)!"
raise self.skipException(
msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
# Create a new share # Create a new share
share = self.create_shares([self.creation_data])[0] share = self.create_shares([self.creation_data])[0]

View File

@ -159,6 +159,22 @@ class ReplicationNegativeTest(base.BaseSharesMixedTest):
# Try promoting the replica # Try promoting the replica
self.shares_v2_client.promote_share_replica(replica['id']) self.shares_v2_client.promote_share_replica(replica['id'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_add_access_rule_share_replica_error_status(self):
access_type, access_to = self._get_access_rule_data_from_config()
# Create the replica
share_replica = self.create_share_replica(self.share1["id"],
self.replica_zone,
cleanup_in_class=False)
# Reset the replica status to error
self.admin_client.reset_share_replica_status(
share_replica['id'], constants.STATUS_ERROR)
# Verify access rule cannot be added
self.assertRaises(lib_exc.BadRequest,
self.admin_client.create_access_rule,
self.share1["id"], access_type, access_to, 'ro')
@testtools.skipUnless(CONF.share.run_replication_tests, @testtools.skipUnless(CONF.share.run_replication_tests,
'Replication tests are disabled.') 'Replication tests are disabled.')

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import itertools
import ddt import ddt
from tempest import config from tempest import config
from tempest.lib import exceptions as lib_exc from tempest.lib import exceptions as lib_exc
@ -44,6 +46,12 @@ def _create_delete_ro_access_rule(self, version):
for key in ('deleted', 'deleted_at', 'instance_mappings'): for key in ('deleted', 'deleted_at', 'instance_mappings'):
self.assertNotIn(key, rule.keys()) self.assertNotIn(key, rule.keys())
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after 2.28
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_le(version, '2.9'): if utils.is_microversion_le(version, '2.9'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
self.share["id"], rule["id"], "active") self.share["id"], rule["id"], "active")
@ -51,6 +59,11 @@ def _create_delete_ro_access_rule(self, version):
self.shares_v2_client.wait_for_share_status( self.shares_v2_client.wait_for_share_status(
self.share["id"], "active", status_attr='access_rules_status', self.share["id"], "active", status_attr='access_rules_status',
version=version) version=version)
# If the 'access_rules_status' transitions to 'active',
# rule state must too
rules = self.shares_v2_client.list_access_rules(self.share['id'])
rule = [r for r in rules if r['id'] == rule['id']][0]
self.assertEqual("active", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"]) self.shares_client.delete_access_rule(self.share["id"], rule["id"])
@ -79,7 +92,7 @@ class ShareIpRulesForNFSTest(base.BaseSharesTest):
cls.access_to = "2.2.2.2" cls.access_to = "2.2.2.2"
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_access_rules_with_one_ip(self, version): def test_create_delete_access_rules_with_one_ip(self, version):
# test data # test data
@ -98,6 +111,12 @@ class ShareIpRulesForNFSTest(base.BaseSharesTest):
for key in ('deleted', 'deleted_at', 'instance_mappings'): for key in ('deleted', 'deleted_at', 'instance_mappings'):
self.assertNotIn(key, rule.keys()) self.assertNotIn(key, rule.keys())
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
self.share["id"], rule["id"], "active") self.share["id"], rule["id"], "active")
@ -121,7 +140,7 @@ class ShareIpRulesForNFSTest(base.BaseSharesTest):
rule_id=rule["id"], share_id=self.share['id'], version=version) rule_id=rule["id"], share_id=self.share['id'], version=version)
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_access_rule_with_cidr(self, version): def test_create_delete_access_rule_with_cidr(self, version):
# test data # test data
@ -140,6 +159,12 @@ class ShareIpRulesForNFSTest(base.BaseSharesTest):
self.assertNotIn(key, rule.keys()) self.assertNotIn(key, rule.keys())
self.assertEqual('rw', rule['access_level']) self.assertEqual('rw', rule['access_level'])
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
self.share["id"], rule["id"], "active") self.share["id"], rule["id"], "active")
@ -166,7 +191,7 @@ class ShareIpRulesForNFSTest(base.BaseSharesTest):
@testtools.skipIf( @testtools.skipIf(
"nfs" not in CONF.share.enable_ro_access_level_for_protocols, "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
"RO access rule tests are disabled for NFS protocol.") "RO access rule tests are disabled for NFS protocol.")
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_ro_access_rule(self, client_name): def test_create_delete_ro_access_rule(self, client_name):
_create_delete_ro_access_rule(self, client_name) _create_delete_ro_access_rule(self, client_name)
@ -179,7 +204,7 @@ class ShareIpRulesForCIFSTest(ShareIpRulesForNFSTest):
@testtools.skipIf( @testtools.skipIf(
"cifs" not in CONF.share.enable_ro_access_level_for_protocols, "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
"RO access rule tests are disabled for CIFS protocol.") "RO access rule tests are disabled for CIFS protocol.")
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_ro_access_rule(self, version): def test_create_delete_ro_access_rule(self, version):
_create_delete_ro_access_rule(self, version) _create_delete_ro_access_rule(self, version)
@ -201,7 +226,7 @@ class ShareUserRulesForNFSTest(base.BaseSharesTest):
cls.access_to = CONF.share.username_for_user_rules cls.access_to = CONF.share.username_for_user_rules
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_user_rule(self, version): def test_create_delete_user_rule(self, version):
# create rule # create rule
@ -217,6 +242,12 @@ class ShareUserRulesForNFSTest(base.BaseSharesTest):
for key in ('deleted', 'deleted_at', 'instance_mappings'): for key in ('deleted', 'deleted_at', 'instance_mappings'):
self.assertNotIn(key, rule.keys()) self.assertNotIn(key, rule.keys())
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
self.share["id"], rule["id"], "active") self.share["id"], rule["id"], "active")
@ -243,7 +274,7 @@ class ShareUserRulesForNFSTest(base.BaseSharesTest):
@testtools.skipIf( @testtools.skipIf(
"nfs" not in CONF.share.enable_ro_access_level_for_protocols, "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
"RO access rule tests are disabled for NFS protocol.") "RO access rule tests are disabled for NFS protocol.")
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_ro_access_rule(self, version): def test_create_delete_ro_access_rule(self, version):
_create_delete_ro_access_rule(self, version) _create_delete_ro_access_rule(self, version)
@ -256,7 +287,7 @@ class ShareUserRulesForCIFSTest(ShareUserRulesForNFSTest):
@testtools.skipIf( @testtools.skipIf(
"cifs" not in CONF.share.enable_ro_access_level_for_protocols, "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
"RO access rule tests are disabled for CIFS protocol.") "RO access rule tests are disabled for CIFS protocol.")
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_ro_access_rule(self, version): def test_create_delete_ro_access_rule(self, version):
_create_delete_ro_access_rule(self, version) _create_delete_ro_access_rule(self, version)
@ -280,7 +311,7 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesTest):
cls.access_to = "client1.com" cls.access_to = "client1.com"
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_cert_rule(self, version): def test_create_delete_cert_rule(self, version):
# create rule # create rule
@ -296,6 +327,12 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesTest):
for key in ('deleted', 'deleted_at', 'instance_mappings'): for key in ('deleted', 'deleted_at', 'instance_mappings'):
self.assertNotIn(key, rule.keys()) self.assertNotIn(key, rule.keys())
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
self.share["id"], rule["id"], "active") self.share["id"], rule["id"], "active")
@ -322,7 +359,7 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesTest):
@testtools.skipIf( @testtools.skipIf(
"glusterfs" not in CONF.share.enable_ro_access_level_for_protocols, "glusterfs" not in CONF.share.enable_ro_access_level_for_protocols,
"RO access rule tests are disabled for GLUSTERFS protocol.") "RO access rule tests are disabled for GLUSTERFS protocol.")
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_create_delete_cert_ro_access_rule(self, version): def test_create_delete_cert_ro_access_rule(self, version):
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule( rule = self.shares_client.create_access_rule(
@ -336,6 +373,12 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesTest):
for key in ('deleted', 'deleted_at', 'instance_mappings'): for key in ('deleted', 'deleted_at', 'instance_mappings'):
self.assertNotIn(key, rule.keys()) self.assertNotIn(key, rule.keys())
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
self.share["id"], rule["id"], "active") self.share["id"], rule["id"], "active")
@ -377,10 +420,13 @@ class ShareCephxRulesForCephFSTest(base.BaseSharesTest):
cls.access_to = "bob" cls.access_to = "bob"
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data("alice", "alice_bob", "alice bob") @ddt.data(*itertools.product(
def test_create_delete_cephx_rule(self, access_to): set(['2.13', '2.27', '2.28', LATEST_MICROVERSION]),
("alice", "alice_bob", "alice bob")))
@ddt.unpack
def test_create_delete_cephx_rule(self, version, access_to):
rule = self.shares_v2_client.create_access_rule( rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, access_to) self.share["id"], self.access_type, access_to, version=version)
self.assertEqual('rw', rule['access_level']) self.assertEqual('rw', rule['access_level'])
for key in ('deleted', 'deleted_at', 'instance_mappings'): for key in ('deleted', 'deleted_at', 'instance_mappings'):
@ -388,7 +434,8 @@ class ShareCephxRulesForCephFSTest(base.BaseSharesTest):
self.shares_v2_client.wait_for_access_rule_status( self.shares_v2_client.wait_for_access_rule_status(
self.share["id"], rule["id"], "active") self.share["id"], rule["id"], "active")
self.shares_v2_client.delete_access_rule(self.share["id"], rule["id"]) self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion( self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id']) rule_id=rule["id"], share_id=self.share['id'])
@ -429,7 +476,7 @@ class ShareRulesTest(base.BaseSharesTest):
cls.share = cls.create_share() cls.share = cls.create_share()
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_list_access_rules(self, version): def test_list_access_rules(self, version):
if (utils.is_microversion_lt(version, '2.13') and if (utils.is_microversion_lt(version, '2.13') and
CONF.share.enable_cephx_rules_for_protocols): CONF.share.enable_cephx_rules_for_protocols):
@ -445,6 +492,11 @@ class ShareRulesTest(base.BaseSharesTest):
rule = self.shares_v2_client.create_access_rule( rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, self.access_to, self.share["id"], self.access_type, self.access_to,
version=version) version=version)
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
@ -499,7 +551,7 @@ class ShareRulesTest(base.BaseSharesTest):
rule_id=rule["id"], share_id=self.share['id'], version=version) rule_id=rule["id"], share_id=self.share['id'], version=version)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND) @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@ddt.data('1.0', '2.9', LATEST_MICROVERSION) @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
def test_access_rules_deleted_if_share_deleted(self, version): def test_access_rules_deleted_if_share_deleted(self, version):
if (utils.is_microversion_lt(version, '2.13') and if (utils.is_microversion_lt(version, '2.13') and
CONF.share.enable_cephx_rules_for_protocols): CONF.share.enable_cephx_rules_for_protocols):
@ -519,6 +571,12 @@ class ShareRulesTest(base.BaseSharesTest):
share["id"], self.access_type, self.access_to, share["id"], self.access_type, self.access_to,
version=version) version=version)
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'): if utils.is_microversion_eq(version, '1.0'):
self.shares_client.wait_for_access_rule_status( self.shares_client.wait_for_access_rule_status(
share["id"], rule["id"], "active") share["id"], rule["id"], "active")

View File

@ -27,12 +27,13 @@ LATEST_MICROVERSION = CONF.share.max_api_microversion
@ddt.ddt @ddt.ddt
class ShareIpRulesForNFSNegativeTest(base.BaseSharesTest): class ShareIpRulesForNFSNegativeTest(base.BaseSharesMixedTest):
protocol = "nfs" protocol = "nfs"
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):
super(ShareIpRulesForNFSNegativeTest, cls).resource_setup() super(ShareIpRulesForNFSNegativeTest, cls).resource_setup()
cls.admin_client = cls.admin_shares_v2_client
if not (cls.protocol in CONF.share.enable_protocols and if not (cls.protocol in CONF.share.enable_protocols and
cls.protocol in CONF.share.enable_ip_rules_for_protocols): cls.protocol in CONF.share.enable_ip_rules_for_protocols):
msg = "IP rule tests for %s protocol are disabled" % cls.protocol msg = "IP rule tests for %s protocol are disabled" % cls.protocol
@ -158,6 +159,23 @@ class ShareIpRulesForNFSNegativeTest(base.BaseSharesTest):
self.shares_v2_client.wait_for_resource_deletion( self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share["id"], version=version) rule_id=rule["id"], share_id=self.share["id"], version=version)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_add_access_rule_on_share_with_no_host(self):
access_type, access_to = self._get_access_rule_data_from_config()
extra_specs = self.add_extra_specs_to_dict(
{"share_backend_name": 'invalid_backend'})
share_type = self.create_share_type('invalid_backend',
extra_specs=extra_specs,
client=self.admin_client)
share_type = share_type['share_type']
share = self.create_share(share_type_id=share_type['id'],
client=self.admin_client,
cleanup_in_class=False,
wait_for_status=False)
self.assertRaises(lib_exc.BadRequest,
self.admin_client.create_access_rule,
share["id"], access_type, access_to)
@ddt.ddt @ddt.ddt
class ShareIpRulesForCIFSNegativeTest(ShareIpRulesForNFSNegativeTest): class ShareIpRulesForCIFSNegativeTest(ShareIpRulesForNFSNegativeTest):