Add ability to create image from volume

OpenStack has the ability to create an image from a volume - expose it.

It should be noted that literally nothing about this API is documented,
although it is exposed in python-cinderclient and
python-openstackclient.

Change-Id: Icb06d43a63d0b120a17ce6c19807abcb3de71bcb
This commit is contained in:
Monty Taylor 2017-01-31 12:19:07 -06:00
parent 16a058f16e
commit 504cb05658
4 changed files with 145 additions and 12 deletions

View File

@ -0,0 +1,3 @@
---
features:
- Added ability to create an image from a volume.

View File

@ -3267,7 +3267,7 @@ class OpenStackCloud(_normalize.Normalizer):
disk_format=None, container_format=None,
disable_vendor_agent=True,
wait=False, timeout=3600,
allow_duplicates=False, meta=None, **kwargs):
allow_duplicates=False, meta=None, volume=None, **kwargs):
"""Upload an image to Glance.
:param str name: Name of the image to create. If it is a pathname
@ -3303,6 +3303,9 @@ class OpenStackCloud(_normalize.Normalizer):
image name. (optional, defaults to False)
:param meta: A dict of key/value pairs to use for metadata that
bypasses automatic type conversion.
:param volume: Name or ID or volume object of a volume to create an
image from. Mutually exclusive with (optional, defaults
to None)
Additional kwargs will be passed to the image creation as additional
metadata for the image and will have all values converted to string
@ -3326,10 +3329,6 @@ class OpenStackCloud(_normalize.Normalizer):
if not meta:
meta = {}
# If there is no filename, see if name is actually the filename
if not filename:
name, filename = self._get_name_and_filename(name)
if not disk_format:
disk_format = self.cloud_config.config['image_format']
if not container_format:
@ -3337,6 +3336,26 @@ class OpenStackCloud(_normalize.Normalizer):
container_format = 'ovf'
else:
container_format = 'bare'
if volume:
if 'id' in volume:
volume_id = volume['id']
else:
volume_obj = self.get_volume(volume)
if not volume_obj:
raise OpenStackCloudException(
"Volume {volume} given to create_image could"
" not be foud".format(volume=volume))
volume_id = volume_obj['id']
return self._upload_image_from_volume(
name=name, volume_id=volume_id,
allow_duplicates=allow_duplicates,
container_format=container_format, disk_format=disk_format,
wait=wait, timeout=timeout)
# If there is no filename, see if name is actually the filename
if not filename:
name, filename = self._get_name_and_filename(name)
if not (md5 or sha256):
(md5, sha256) = self._get_file_hashes(filename)
if allow_duplicates:
@ -3419,6 +3438,32 @@ class OpenStackCloud(_normalize.Normalizer):
ret.update(meta)
return ret
def _upload_image_from_volume(
self, name, volume_id, allow_duplicates,
container_format, disk_format, wait, timeout):
response = self._volume_client.post(
'/volumes/{id}/action'.format(id=volume_id),
json={
'os-volume_upload_image': {
'force': allow_duplicates,
'image_name': name,
'container_format': container_format,
'disk_format': disk_format}})
if not wait:
return self.get_image(response['image_id'])
try:
for count in _utils._iterate_timeout(
timeout,
"Timeout waiting for the image to finish."):
image_obj = self.get_image(response['image_id'])
if image_obj and image_obj.status not in ('queued', 'saving'):
return image_obj
except OpenStackCloudTimeout:
self.log.debug(
"Timeout waiting for image to become ready. Deleting.")
self.delete_image(response['image_id'], wait=True)
raise
def _upload_image_put_v2(self, name, image_data, meta, **image_kwargs):
properties = image_kwargs.pop('properties', {})

View File

@ -34,7 +34,7 @@ class TestVolume(base.BaseFunctionalTestCase):
volume_name = self.getUniqueString()
snapshot_name = self.getUniqueString()
self.addDetail('volume', content.text_content(volume_name))
self.addCleanup(self.cleanup, volume_name, snapshot_name)
self.addCleanup(self.cleanup, volume_name, snapshot_name=snapshot_name)
volume = self.demo_cloud.create_volume(
display_name=volume_name, size=1)
snapshot = self.demo_cloud.create_volume_snapshot(
@ -56,11 +56,38 @@ class TestVolume(base.BaseFunctionalTestCase):
self.demo_cloud.delete_volume_snapshot(snapshot_name, wait=True)
self.demo_cloud.delete_volume(volume_name, wait=True)
def cleanup(self, volume_name, snapshot_name):
volume = self.demo_cloud.get_volume(volume_name)
snapshot = self.demo_cloud.get_volume_snapshot(snapshot_name)
def test_volume_to_image(self):
'''Test volume export to image functionality'''
volume_name = self.getUniqueString()
image_name = self.getUniqueString()
self.addDetail('volume', content.text_content(volume_name))
self.addCleanup(self.cleanup, volume_name, image_name=image_name)
volume = self.demo_cloud.create_volume(
display_name=volume_name, size=1)
image = self.demo_cloud.create_image(
image_name, volume=volume, wait=True)
volume_ids = [v['id'] for v in self.demo_cloud.list_volumes()]
self.assertIn(volume['id'], volume_ids)
image_list = self.demo_cloud.list_images()
image_ids = [s['id'] for s in image_list]
self.assertIn(image['id'], image_ids)
self.demo_cloud.delete_image(image_name, wait=True)
self.demo_cloud.delete_volume(volume_name, wait=True)
def cleanup(self, volume_name, snapshot_name=None, image_name=None):
# Need to delete snapshots before volumes
if snapshot_name:
snapshot = self.demo_cloud.get_volume_snapshot(snapshot_name)
if snapshot:
self.demo_cloud.delete_volume_snapshot(snapshot_name)
self.demo_cloud.delete_volume_snapshot(
snapshot_name, wait=True)
if image_name:
image = self.demo_cloud.get_image(image_name)
if image:
self.demo_cloud.delete_image(image_name, wait=True)
volume = self.demo_cloud.get_volume(volume_name)
if volume:
self.demo_cloud.delete_volume(volume_name)
self.demo_cloud.delete_volume(volume_name, wait=True)

View File

@ -29,6 +29,7 @@ from shade.tests.unit import base
NO_MD5 = '93b885adfe0da089cdf634904fd59f71'
NO_SHA256 = '6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d'
CINDER_URL = 'https://volume.example.com/v2/1c36b64c840a42cd9e9b931a369337f0'
class BaseTestImage(base.RequestsMockTestCase):
@ -726,3 +727,60 @@ class TestImageVersionDiscovery(BaseTestImage):
self.cloud._image_client.endpoint_override,
'https://image.example.com/v2/override')
self.assert_calls()
class TestImageVolume(BaseTestImage):
def test_create_image_volume(self):
volume_id = 'some-volume'
self.register_uri(
'POST', '{endpoint}/volumes/{id}/action'.format(
endpoint=CINDER_URL, id=volume_id),
json={'os-volume_upload_image': {'image_id': self.image_id}},
validate=dict(json={
u'os-volume_upload_image': {
u'container_format': u'bare',
u'disk_format': u'qcow2',
u'force': False,
u'image_name': u'fake_image'}}))
self.use_glance()
self.register_uri(
'GET', 'https://image.example.com/v2/images',
json=self.fake_search_return)
self.cloud.create_image(
'fake_image', self.imagefile.name, wait=True, timeout=1,
volume={'id': volume_id})
self.assert_calls()
def test_create_image_volume_duplicate(self):
volume_id = 'some-volume'
self.register_uri(
'POST', '{endpoint}/volumes/{id}/action'.format(
endpoint=CINDER_URL, id=volume_id),
json={'os-volume_upload_image': {'image_id': self.image_id}},
validate=dict(json={
u'os-volume_upload_image': {
u'container_format': u'bare',
u'disk_format': u'qcow2',
u'force': True,
u'image_name': u'fake_image'}}))
self.use_glance()
self.register_uri(
'GET', 'https://image.example.com/v2/images',
json=self.fake_search_return)
self.cloud.create_image(
'fake_image', self.imagefile.name, wait=True, timeout=1,
volume={'id': volume_id}, allow_duplicates=True)
self.assert_calls()