Merge "volume functest: ensure snapshots deleted when volume delete"

This commit is contained in:
Jenkins 2017-06-06 04:48:46 +00:00 committed by Gerrit Code Review
commit ee59989103
10 changed files with 130 additions and 129 deletions

View File

@ -125,12 +125,12 @@ class ComputeTestCase(base.TestCase):
name name
)) ))
status = cmd_output['status'] status = cmd_output['status']
print('Waiting for {}, current status: {}'.format(
expected_status,
status,
))
if status == expected_status: if status == expected_status:
print('Server {} now has status {}'.format(
name, status))
break break
print('Server {}: Waiting for {}, current status: {}'.format(
name, expected_status, status))
self.assertNotIn(status, failures) self.assertNotIn(status, failures)
time.sleep(interval) time.sleep(interval)
total_sleep += interval total_sleep += interval

View File

@ -17,7 +17,7 @@ from tempest.lib import exceptions
from openstackclient.tests.functional import base from openstackclient.tests.functional import base
from openstackclient.tests.functional.compute.v2 import common from openstackclient.tests.functional.compute.v2 import common
from openstackclient.tests.functional.volume.v2 import test_volume from openstackclient.tests.functional.volume.v2 import common as volume_common
class ServerTests(common.ComputeTestCase): class ServerTests(common.ComputeTestCase):
@ -282,9 +282,7 @@ class ServerTests(common.ComputeTestCase):
def test_server_boot_from_volume(self): def test_server_boot_from_volume(self):
"""Test server create from volume, server delete""" """Test server create from volume, server delete"""
# get volume status wait function # get volume status wait function
volume_wait_for = test_volume.VolumeTests( volume_wait_for = volume_common.BaseVolumeTests.wait_for_status
methodName='wait_for',
).wait_for
# get image size # get image size
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -391,9 +389,8 @@ class ServerTests(common.ComputeTestCase):
def test_server_boot_with_bdm_snapshot(self): def test_server_boot_with_bdm_snapshot(self):
"""Test server create from image with bdm snapshot, server delete""" """Test server create from image with bdm snapshot, server delete"""
# get volume status wait function # get volume status wait function
volume_wait_for = test_volume.VolumeTests( volume_wait_for = volume_common.BaseVolumeTests.wait_for_status
methodName='wait_for', volume_wait_for_delete = volume_common.BaseVolumeTests.wait_for_delete
).wait_for
# create source empty volume # create source empty volume
empty_volume_name = uuid.uuid4().hex empty_volume_name = uuid.uuid4().hex
@ -419,6 +416,13 @@ class ServerTests(common.ComputeTestCase):
empty_snapshot_name empty_snapshot_name
)) ))
self.assertIsNotNone(cmd_output["id"]) self.assertIsNotNone(cmd_output["id"])
# Deleting volume snapshot take time, so we need to wait until the
# snapshot goes. Entries registered by self.addCleanup will be called
# in the reverse order, so we need to register wait_for_delete first.
self.addCleanup(volume_wait_for_delete,
'volume snapshot', empty_snapshot_name)
self.addCleanup(self.openstack,
'volume snapshot delete ' + empty_snapshot_name)
self.assertEqual( self.assertEqual(
empty_snapshot_name, empty_snapshot_name,
cmd_output['name'], cmd_output['name'],
@ -489,12 +493,6 @@ class ServerTests(common.ComputeTestCase):
# the attached volume had been deleted # the attached volume had been deleted
pass pass
# clean up volume snapshot manually, make sure the snapshot and volume
# can be deleted sequentially, self.addCleanup so fast, that cause
# volume service API 400 error and the volume is left over at the end.
self.openstack('volume snapshot delete ' + empty_snapshot_name)
volume_wait_for('volume snapshot', empty_snapshot_name, 'disappear')
def test_server_create_with_none_network(self): def test_server_create_with_none_network(self):
"""Test server create with none network option.""" """Test server create with none network option."""
server_name = uuid.uuid4().hex server_name = uuid.uuid4().hex

View File

@ -0,0 +1,64 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import time
from openstackclient.tests.functional import base
class BaseVolumeTests(base.TestCase):
"""Base class for Volume functional tests. """
@classmethod
def wait_for_status(cls, check_type, check_name, desired_status,
wait=120, interval=5, failures=None):
current_status = "notset"
if failures is None:
failures = ['error']
total_sleep = 0
while total_sleep < wait:
output = json.loads(cls.openstack(
check_type + ' show -f json ' + check_name))
current_status = output['status']
if (current_status == desired_status):
print('{} {} now has status {}'
.format(check_type, check_name, current_status))
return
print('Checking {} {} Waiting for {} current status: {}'
.format(check_type, check_name,
desired_status, current_status))
if current_status in failures:
raise Exception(
'Current status {} of {} {} is one of failures {}'
.format(current_status, check_type, check_name, failures))
time.sleep(interval)
total_sleep += interval
cls.assertOutput(desired_status, current_status)
@classmethod
def wait_for_delete(cls, check_type, check_name, wait=120, interval=5,
name_field=None):
total_sleep = 0
name_field = name_field or 'Name'
while total_sleep < wait:
result = json.loads(cls.openstack(check_type + ' list -f json'))
names = [x[name_field] for x in result]
if check_name not in names:
print('{} {} is now deleted'.format(check_type, check_name))
return
print('Checking {} {} Waiting for deleted'
.format(check_type, check_name))
time.sleep(interval)
total_sleep += interval
raise Exception('Timeout: {} {} was not deleted in {} seconds'
.format(check_type, check_name, wait))

View File

@ -12,10 +12,10 @@
import os import os
from openstackclient.tests.functional import base from openstackclient.tests.functional.volume import base
class BaseVolumeTests(base.TestCase): class BaseVolumeTests(base.BaseVolumeTests):
"""Base class for Volume functional tests. """ """Base class for Volume functional tests. """
@classmethod @classmethod

View File

@ -11,7 +11,6 @@
# under the License. # under the License.
import json import json
import time
import uuid import uuid
from openstackclient.tests.functional.volume.v1 import common from openstackclient.tests.functional.volume.v1 import common
@ -22,16 +21,6 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
VOLLY = uuid.uuid4().hex VOLLY = uuid.uuid4().hex
@classmethod
def wait_for_status(cls, command, status, tries):
opts = cls.get_opts(['status'])
for attempt in range(tries):
time.sleep(1)
raw_output = cls.openstack(command + opts)
if (raw_output.rstrip() == status):
return
cls.assertOutput(status, raw_output)
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
super(VolumeSnapshotTests, cls).setUpClass() super(VolumeSnapshotTests, cls).setUpClass()
@ -41,12 +30,12 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
'--size 1 ' + '--size 1 ' +
cls.VOLLY cls.VOLLY
)) ))
cls.wait_for_status('volume show ' + cls.VOLLY, 'available', 6) cls.wait_for_status('volume', cls.VOLLY, 'available')
cls.VOLUME_ID = cmd_output['id'] cls.VOLUME_ID = cmd_output['id']
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.wait_for_status('volume show ' + cls.VOLLY, 'available', 6) cls.wait_for_status('volume', cls.VOLLY, 'available')
raw_output = cls.openstack('volume delete --force ' + cls.VOLLY) raw_output = cls.openstack('volume delete --force ' + cls.VOLLY)
cls.assertOutput('', raw_output) cls.assertOutput('', raw_output)
@ -74,14 +63,14 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
cmd_output["display_name"], cmd_output["display_name"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name1, 'available')
'volume snapshot show ' + name1, 'available', 6) self.wait_for_status('volume snapshot', name2, 'available')
self.wait_for_status(
'volume snapshot show ' + name2, 'available', 6)
del_output = self.openstack( del_output = self.openstack(
'volume snapshot delete ' + name1 + ' ' + name2) 'volume snapshot delete ' + name1 + ' ' + name2)
self.assertOutput('', del_output) self.assertOutput('', del_output)
self.wait_for_delete('volume snapshot', name1)
self.wait_for_delete('volume snapshot', name2)
def test_volume_snapshot_list(self): def test_volume_snapshot_list(self):
"""Test create, list filter""" """Test create, list filter"""
@ -91,6 +80,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
name1 + name1 +
' --volume ' + self.VOLLY ' --volume ' + self.VOLLY
)) ))
self.addCleanup(self.wait_for_delete, 'volume snapshot', name1)
self.addCleanup(self.openstack, 'volume snapshot delete ' + name1) self.addCleanup(self.openstack, 'volume snapshot delete ' + name1)
self.assertEqual( self.assertEqual(
name1, name1,
@ -104,8 +94,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
1, 1,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name1, 'available')
'volume snapshot show ' + name1, 'available', 6)
name2 = uuid.uuid4().hex name2 = uuid.uuid4().hex
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -113,6 +102,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
name2 + name2 +
' --volume ' + self.VOLLY ' --volume ' + self.VOLLY
)) ))
self.addCleanup(self.wait_for_delete, 'volume snapshot', name2)
self.addCleanup(self.openstack, 'volume snapshot delete ' + name2) self.addCleanup(self.openstack, 'volume snapshot delete ' + name2)
self.assertEqual( self.assertEqual(
name2, name2,
@ -126,8 +116,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
1, 1,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name2, 'available')
'volume snapshot show ' + name2, 'available', 6)
# Test list --long, --status # Test list --long, --status
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -167,6 +156,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
' --description aaaa ' + ' --description aaaa ' +
name name
)) ))
self.addCleanup(self.wait_for_delete, 'volume snapshot', new_name)
self.addCleanup(self.openstack, 'volume snapshot delete ' + new_name) self.addCleanup(self.openstack, 'volume snapshot delete ' + new_name)
self.assertEqual( self.assertEqual(
name, name,
@ -180,8 +170,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
'aaaa', 'aaaa',
cmd_output["display_description"], cmd_output["display_description"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name, 'available')
'volume snapshot show ' + name, 'available', 6)
# Test volume snapshot set # Test volume snapshot set
raw_output = self.openstack( raw_output = self.openstack(

View File

@ -11,7 +11,6 @@
# under the License. # under the License.
import json import json
import time
import uuid import uuid
from openstackclient.tests.functional.volume.v1 import common from openstackclient.tests.functional.volume.v1 import common
@ -44,8 +43,8 @@ class VolumeTests(common.BaseVolumeTests):
cmd_output["size"], cmd_output["size"],
) )
self.wait_for("volume", name1, "available") self.wait_for_status("volume", name1, "available")
self.wait_for("volume", name2, "available") self.wait_for_status("volume", name2, "available")
del_output = self.openstack('volume delete ' + name1 + ' ' + name2) del_output = self.openstack('volume delete ' + name1 + ' ' + name2)
self.assertOutput('', del_output) self.assertOutput('', del_output)
@ -62,7 +61,7 @@ class VolumeTests(common.BaseVolumeTests):
1, 1,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for("volume", name1, "available") self.wait_for_status("volume", name1, "available")
name2 = uuid.uuid4().hex name2 = uuid.uuid4().hex
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -75,7 +74,7 @@ class VolumeTests(common.BaseVolumeTests):
2, 2,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for("volume", name2, "available") self.wait_for_status("volume", name2, "available")
# Test list # Test list
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -131,7 +130,7 @@ class VolumeTests(common.BaseVolumeTests):
'false', 'false',
cmd_output["bootable"], cmd_output["bootable"],
) )
self.wait_for("volume", name, "available") self.wait_for_status("volume", name, "available")
# Test volume set # Test volume set
new_name = uuid.uuid4().hex new_name = uuid.uuid4().hex
@ -208,7 +207,7 @@ class VolumeTests(common.BaseVolumeTests):
self.assertNotIn('name', json_output) self.assertNotIn('name', json_output)
self.addCleanup(self.openstack, 'volume delete ' + volume_id) self.addCleanup(self.openstack, 'volume delete ' + volume_id)
self.wait_for("volume", name1, "available") self.wait_for_status("volume", name1, "available")
json_output = json.loads(self.openstack( json_output = json.loads(self.openstack(
'volume list -f json ' + 'volume list -f json ' +
@ -233,20 +232,3 @@ class VolumeTests(common.BaseVolumeTests):
self.assertEqual(name1, json_output['display_name']) self.assertEqual(name1, json_output['display_name'])
self.assertIn('id', json_output) self.assertIn('id', json_output)
self.assertNotIn('name', json_output) self.assertNotIn('name', json_output)
def wait_for(self, check_type, check_name, desired_status, wait=120,
interval=5, failures=['ERROR']):
status = "notset"
total_sleep = 0
opts = self.get_opts(['status'])
while total_sleep < wait:
status = self.openstack(check_type + ' show ' + check_name + opts)
status = status.rstrip()
print('Checking {} {} Waiting for {} current status: {}'
.format(check_type, check_name, desired_status, status))
if status == desired_status:
break
self.assertNotIn(status, failures)
time.sleep(interval)
total_sleep += interval
self.assertEqual(desired_status, status)

View File

@ -12,10 +12,10 @@
import os import os
from openstackclient.tests.functional import base from openstackclient.tests.functional.volume import base
class BaseVolumeTests(base.TestCase): class BaseVolumeTests(base.BaseVolumeTests):
"""Base class for Volume functional tests. """ """Base class for Volume functional tests. """
@classmethod @classmethod

View File

@ -11,7 +11,6 @@
# under the License. # under the License.
import json import json
import time
import uuid import uuid
from openstackclient.tests.functional.volume.v2 import common from openstackclient.tests.functional.volume.v2 import common
@ -22,16 +21,6 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
VOLLY = uuid.uuid4().hex VOLLY = uuid.uuid4().hex
@classmethod
def wait_for_status(cls, command, status, tries):
opts = cls.get_opts(['status'])
for attempt in range(tries):
time.sleep(1)
raw_output = cls.openstack(command + opts)
if (raw_output.rstrip() == status):
return
cls.assertOutput(status, raw_output)
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
super(VolumeSnapshotTests, cls).setUpClass() super(VolumeSnapshotTests, cls).setUpClass()
@ -41,13 +30,14 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
'--size 1 ' + '--size 1 ' +
cls.VOLLY cls.VOLLY
)) ))
cls.wait_for_status('volume show ' + cls.VOLLY, 'available', 6) cls.wait_for_status('volume', cls.VOLLY, 'available')
cls.VOLUME_ID = cmd_output['id'] cls.VOLUME_ID = cmd_output['id']
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.wait_for_status('volume show ' + cls.VOLLY, 'available', 6) cls.wait_for_status('volume', cls.VOLLY, 'available')
raw_output = cls.openstack('volume delete --force ' + cls.VOLLY) raw_output = cls.openstack(
'volume delete --force ' + cls.VOLLY)
cls.assertOutput('', raw_output) cls.assertOutput('', raw_output)
def test_volume_snapshot__delete(self): def test_volume_snapshot__delete(self):
@ -74,14 +64,14 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
cmd_output["name"], cmd_output["name"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name1, 'available')
'volume snapshot show ' + name1, 'available', 6) self.wait_for_status('volume snapshot', name2, 'available')
self.wait_for_status(
'volume snapshot show ' + name2, 'available', 6)
del_output = self.openstack( del_output = self.openstack(
'volume snapshot delete ' + name1 + ' ' + name2) 'volume snapshot delete ' + name1 + ' ' + name2)
self.assertOutput('', del_output) self.assertOutput('', del_output)
self.wait_for_delete('volume snapshot', name1)
self.wait_for_delete('volume snapshot', name2)
def test_volume_snapshot_list(self): def test_volume_snapshot_list(self):
"""Test create, list filter""" """Test create, list filter"""
@ -91,6 +81,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
name1 + name1 +
' --volume ' + self.VOLLY ' --volume ' + self.VOLLY
)) ))
self.addCleanup(self.wait_for_delete, 'volume snapshot', name1)
self.addCleanup(self.openstack, 'volume snapshot delete ' + name1) self.addCleanup(self.openstack, 'volume snapshot delete ' + name1)
self.assertEqual( self.assertEqual(
name1, name1,
@ -104,8 +95,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
1, 1,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name1, 'available')
'volume snapshot show ' + name1, 'available', 6)
name2 = uuid.uuid4().hex name2 = uuid.uuid4().hex
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -113,6 +103,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
name2 + name2 +
' --volume ' + self.VOLLY ' --volume ' + self.VOLLY
)) ))
self.addCleanup(self.wait_for_delete, 'volume snapshot', name2)
self.addCleanup(self.openstack, 'volume snapshot delete ' + name2) self.addCleanup(self.openstack, 'volume snapshot delete ' + name2)
self.assertEqual( self.assertEqual(
name2, name2,
@ -126,8 +117,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
1, 1,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name2, 'available')
'volume snapshot show ' + name2, 'available', 6)
raw_output = self.openstack( raw_output = self.openstack(
'volume snapshot set ' + 'volume snapshot set ' +
'--state error ' + '--state error ' +
@ -174,6 +164,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
'--property Alpha=a ' + '--property Alpha=a ' +
name name
)) ))
self.addCleanup(self.wait_for_delete, 'volume snapshot', new_name)
self.addCleanup(self.openstack, 'volume snapshot delete ' + new_name) self.addCleanup(self.openstack, 'volume snapshot delete ' + new_name)
self.assertEqual( self.assertEqual(
name, name,
@ -191,8 +182,7 @@ class VolumeSnapshotTests(common.BaseVolumeTests):
"Alpha='a'", "Alpha='a'",
cmd_output["properties"], cmd_output["properties"],
) )
self.wait_for_status( self.wait_for_status('volume snapshot', name, 'available')
'volume snapshot show ' + name, 'available', 6)
# Test volume snapshot set # Test volume snapshot set
raw_output = self.openstack( raw_output = self.openstack(

View File

@ -11,11 +11,8 @@
# under the License. # under the License.
import json import json
import time
import uuid import uuid
from tempest.lib import exceptions
from openstackclient.tests.functional.volume.v2 import common from openstackclient.tests.functional.volume.v2 import common
@ -46,8 +43,8 @@ class VolumeTests(common.BaseVolumeTests):
cmd_output["size"], cmd_output["size"],
) )
self.wait_for("volume", name1, "available") self.wait_for_status("volume", name1, "available")
self.wait_for("volume", name2, "available") self.wait_for_status("volume", name2, "available")
del_output = self.openstack('volume delete ' + name1 + ' ' + name2) del_output = self.openstack('volume delete ' + name1 + ' ' + name2)
self.assertOutput('', del_output) self.assertOutput('', del_output)
@ -64,7 +61,7 @@ class VolumeTests(common.BaseVolumeTests):
1, 1,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for("volume", name1, "available") self.wait_for_status("volume", name1, "available")
name2 = uuid.uuid4().hex name2 = uuid.uuid4().hex
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -77,7 +74,7 @@ class VolumeTests(common.BaseVolumeTests):
2, 2,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for("volume", name2, "available") self.wait_for_status("volume", name2, "available")
raw_output = self.openstack( raw_output = self.openstack(
'volume set ' + 'volume set ' +
'--state error ' + '--state error ' +
@ -138,7 +135,7 @@ class VolumeTests(common.BaseVolumeTests):
'false', 'false',
cmd_output["bootable"], cmd_output["bootable"],
) )
self.wait_for("volume", name, "available") self.wait_for_status("volume", name, "available")
# Test volume set # Test volume set
raw_output = self.openstack( raw_output = self.openstack(
@ -218,7 +215,7 @@ class VolumeTests(common.BaseVolumeTests):
'--size 1 ' + '--size 1 ' +
volume_name volume_name
)) ))
self.wait_for("volume", volume_name, "available") self.wait_for_status("volume", volume_name, "available")
self.assertEqual( self.assertEqual(
volume_name, volume_name,
cmd_output["name"], cmd_output["name"],
@ -228,9 +225,10 @@ class VolumeTests(common.BaseVolumeTests):
snapshot_name + snapshot_name +
' --volume ' + volume_name ' --volume ' + volume_name
)) ))
self.wait_for("volume snapshot", snapshot_name, "available") self.wait_for_status("volume snapshot", snapshot_name, "available")
name = uuid.uuid4().hex name = uuid.uuid4().hex
# Create volume from snapshot
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
'volume create -f json ' + 'volume create -f json ' +
'--snapshot ' + snapshot_name + '--snapshot ' + snapshot_name +
@ -242,12 +240,15 @@ class VolumeTests(common.BaseVolumeTests):
name, name,
cmd_output["name"], cmd_output["name"],
) )
self.wait_for("volume", name, "available") self.wait_for_status("volume", name, "available")
# Delete snapshot # Delete snapshot
raw_output = self.openstack( raw_output = self.openstack(
'volume snapshot delete ' + snapshot_name) 'volume snapshot delete ' + snapshot_name)
self.assertOutput('', raw_output) self.assertOutput('', raw_output)
# Deleting snapshot may take time. If volume snapshot still exists when
# a parent volume delete is requested, the volume deletion will fail.
self.wait_for_delete('volume snapshot', snapshot_name)
def test_volume_list_backward_compatibility(self): def test_volume_list_backward_compatibility(self):
"""Test backward compatibility of list command""" """Test backward compatibility of list command"""
@ -262,7 +263,7 @@ class VolumeTests(common.BaseVolumeTests):
1, 1,
cmd_output["size"], cmd_output["size"],
) )
self.wait_for("volume", name1, "available") self.wait_for_status("volume", name1, "available")
# Test list -c "Display Name" # Test list -c "Display Name"
cmd_output = json.loads(self.openstack( cmd_output = json.loads(self.openstack(
@ -279,26 +280,3 @@ class VolumeTests(common.BaseVolumeTests):
)) ))
for each_volume in cmd_output: for each_volume in cmd_output:
self.assertIn('Name', each_volume) self.assertIn('Name', each_volume)
def wait_for(self, check_type, check_name, desired_status, wait=120,
interval=5, failures=['ERROR']):
status = "notset"
total_sleep = 0
opts = self.get_opts(['status'])
while total_sleep < wait:
try:
status = self.openstack(
check_type + ' show ' + check_name + opts
)
except exceptions.CommandFailed:
# Show command raise Exception when object had been deleted
status = 'disappear'
status = status.rstrip()
print('Checking {} {} Waiting for {} current status: {}'
.format(check_type, check_name, desired_status, status))
if status == desired_status:
break
self.assertNotIn(status, failures)
time.sleep(interval)
total_sleep += interval
self.assertEqual(desired_status, status)

View File

@ -12,10 +12,10 @@
import os import os
from openstackclient.tests.functional import base from openstackclient.tests.functional.volume import base
class BaseVolumeTests(base.TestCase): class BaseVolumeTests(base.BaseVolumeTests):
"""Base class for Volume functional tests. """ """Base class for Volume functional tests. """
@classmethod @classmethod