[Tempest] Add scenario test creating share from snapshot

Add tempest scenario test where we create share from snapshot and
test its relations to source share. Design for this test is available
at 'Add spec for Scenario tests' spec [1].

[1] I224a52521033b47574ff5fd5a94b096c91593aa7

Change-Id: I9863ea70977453b3e7492164002b983f3d9944ab
This commit is contained in:
Valeriy Ponomaryov 2016-11-22 15:27:34 +02:00
parent d2bdd58feb
commit 9f4be67777
2 changed files with 118 additions and 11 deletions
manila_tempest_tests/tests/scenario

@ -97,6 +97,15 @@ class ShareScenarioTest(manager.NetworkScenarioTest):
client.wait_for_share_status(share['id'], 'available')
return share
def _create_snapshot(self, share_id, client=None, **kwargs):
client = client or self.shares_v2_client
snapshot = client.create_snapshot(share_id, **kwargs)
self.addCleanup(
client.wait_for_resource_deletion, snapshot_id=snapshot['id'])
self.addCleanup(client.delete_snapshot, snapshot['id'])
client.wait_for_snapshot_status(snapshot["id"], "available")
return snapshot
def _wait_for_share_server_deletion(self, sn_id, client=None):
"""Wait for a share server to be deleted

@ -112,11 +112,12 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
ssh_client.exec_command("ping -c 1 %s" % server_ip)
return ssh_client
def mount_share(self, location, ssh_client):
def mount_share(self, location, ssh_client, target_dir=None):
raise NotImplementedError
def umount_share(self, ssh_client):
ssh_client.exec_command("sudo umount /mnt")
def umount_share(self, ssh_client, target_dir=None):
target_dir = target_dir or "/mnt"
ssh_client.exec_command("sudo umount %s" % target_dir)
def write_data(self, data, ssh_client):
ssh_client.exec_command("echo \"%s\" | sudo tee /mnt/t1 && sudo sync" %
@ -154,14 +155,16 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
'driver_handles_share_servers': CONF.share.multitenancy_enabled
},)['share_type']
def create_share(self):
kwargs = {
def create_share(self, **kwargs):
kwargs.update({
'share_protocol': self.protocol,
'share_type_id': self._get_share_type()['id'],
}
})
if not ('share_type_id' in kwargs or 'snapshot_id' in kwargs):
kwargs.update({'share_type_id': self._get_share_type()['id']})
if CONF.share.multitenancy_enabled:
kwargs.update({'share_network_id': self.share_net['id']})
self.share = self._create_share(**kwargs)
return self.share
def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True):
if instance and not ip:
@ -343,21 +346,116 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
self.assertIn('1m4.bin', output)
self.assertIn('1m5.bin', output)
def _get_user_export_location(self, share):
if utils.is_microversion_lt(CONF.share.max_api_microversion, "2.9"):
user_export_location = share['export_locations'][0]
else:
exports = self.shares_v2_client.list_share_export_locations(
share['id'])
locations = [x['path'] for x in exports]
user_export_location = locations[0]
return user_export_location
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@testtools.skipUnless(
CONF.share.run_snapshot_tests, "Snapshot tests are disabled.")
def test_write_data_to_share_created_from_snapshot(self):
if self.protocol.upper() == 'CIFS':
msg = "Skipped for CIFS protocol because of bug/1649573"
raise self.skipException(msg)
# 1 - Create UVM, ok, created
instance = self.boot_instance(wait_until="BUILD")
# 2 - Create share S1, ok, created
parent_share = self.create_share()
instance = self.wait_for_active_instance(instance["id"])
self.addCleanup(self.servers_client.delete_server, instance['id'])
# 3 - Provide RW access to S1, ok, provided
self.allow_access_ip(
parent_share['id'], instance=instance, cleanup=False)
# 4 - SSH to UVM, ok, connected
ssh_client = self.init_ssh(instance)
# 5 - Try mount S1 to UVM, ok, mounted
user_export_location = self._get_user_export_location(parent_share)
parent_share_dir = "/mnt/parent"
ssh_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
self.mount_share(user_export_location, ssh_client, parent_share_dir)
self.addCleanup(self.umount_share, ssh_client, parent_share_dir)
# 6 - Create "file1", ok, created
ssh_client.exec_command("sudo touch %s/file1" % parent_share_dir)
# 7 - Create snapshot SS1 from S1, ok, created
snapshot = self._create_snapshot(parent_share['id'])
# 8 - Create "file2" in share S1 - ok, created. We expect that
# snapshot will not contain any data created after snapshot creation.
ssh_client.exec_command("sudo touch %s/file2" % parent_share_dir)
# 9 - Create share S2 from SS1, ok, created
child_share = self.create_share(snapshot_id=snapshot["id"])
# 10 - Try mount S2 - fail, access denied. We test that child share
# did not get access rules from parent share.
user_export_location = self._get_user_export_location(child_share)
child_share_dir = "/mnt/child"
ssh_client.exec_command("sudo mkdir -p %s" % child_share_dir)
self.assertRaises(
exceptions.SSHExecCommandFailed,
self.mount_share,
user_export_location, ssh_client, child_share_dir,
)
# 11 - Provide RW access to S2, ok, provided
self.allow_access_ip(
child_share['id'], instance=instance, cleanup=False)
# 12 - Try mount S2, ok, mounted
self.mount_share(user_export_location, ssh_client, child_share_dir)
self.addCleanup(self.umount_share, ssh_client, child_share_dir)
# 13 - List files on S2, only "file1" exists
output = ssh_client.exec_command("sudo ls -lRA %s" % child_share_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
# 14 - Create file3 on S2, ok, file created
ssh_client.exec_command("sudo touch %s/file3" % child_share_dir)
# 15 - List files on S1, two files exist - "file1" and "file2"
output = ssh_client.exec_command("sudo ls -lRA %s" % parent_share_dir)
self.assertIn('file1', output)
self.assertIn('file2', output)
self.assertNotIn('file3', output)
# 16 - List files on S2, two files exist - "file1" and "file3"
output = ssh_client.exec_command("sudo ls -lRA %s" % child_share_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
self.assertIn('file3', output)
class TestShareBasicOpsNFS(ShareBasicOpsBase):
protocol = "NFS"
def mount_share(self, location, ssh_client):
ssh_client.exec_command("sudo mount -vt nfs \"%s\" /mnt" % location)
def mount_share(self, location, ssh_client, target_dir=None):
target_dir = target_dir or "/mnt"
ssh_client.exec_command(
"sudo mount -vt nfs \"%s\" %s" % (location, target_dir))
class TestShareBasicOpsCIFS(ShareBasicOpsBase):
protocol = "CIFS"
def mount_share(self, location, ssh_client):
def mount_share(self, location, ssh_client, target_dir=None):
location = location.replace("\\", "/")
target_dir = target_dir or "/mnt"
ssh_client.exec_command(
"sudo mount.cifs \"%s\" /mnt -o guest" % location
"sudo mount.cifs \"%s\" %s -o guest" % (location, target_dir)
)