Add Location Import task flow

This change adds location_import task flow which includes
below tasks which are required for new add location api,
1. UpdateLocationTask
2. CalculateHash
3. VerifyValidationData
3. SetHashValues
4. SetImageToActiveTask

Related blueprint new-location-apis

Change-Id: Id5482582a29d947dcb74a506bf715cf6a2d05b3e
This commit is contained in:
Pranali Deore 2023-06-22 12:41:41 +00:00
parent ee7e96f06a
commit a0b7650c4b
9 changed files with 941 additions and 2 deletions

View File

@ -351,7 +351,8 @@ _TASK_SCHEMA = {
"description": _("The type of task represented by this content"),
"enum": [
"import",
"api_image_import"
"api_image_import",
"location_import",
],
"type": "string"
},

View File

@ -0,0 +1,351 @@
# Copyright 2024 RedHat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import glance_store as store
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import secretutils
from taskflow.patterns import linear_flow as lf
from taskflow import retry
from taskflow import task
import glance.async_.flows.api_image_import as image_import
from glance.common import exception
from glance.common import store_utils
from glance.i18n import _, _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class _HashCalculationFailed(exception.GlanceException):
def __init__(self, message):
super(_HashCalculationFailed, self).__init__(message)
class _InvalidLocation(exception.GlanceException):
def __init__(self, message):
super(_InvalidLocation, self).__init__(message)
class _CalculateHash(task.Task):
def __init__(self, task_id, task_type, image_repo, image_id,
hashing_algo, status=None):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
self.hashing_algo = hashing_algo
self.image_status = status
super(_CalculateHash, self).__init__(
name='%s-CalculateHash-%s' % (task_type, task_id))
def _calculate_hash(self, image):
current_os_hash_value = hashlib.new(self.hashing_algo)
current_checksum = secretutils.md5(usedforsecurity=False)
for chunk in image.get_data():
if chunk is None:
break
current_checksum.update(chunk)
current_os_hash_value.update(chunk)
image.checksum = current_checksum.hexdigest()
image.os_hash_value = current_os_hash_value.hexdigest()
def _set_checksum_and_hash(self, image):
retries = 0
while retries <= CONF.http_retries and image.os_hash_value is None:
retries += 1
try:
self._calculate_hash(image)
self.image_repo.save(image)
except IOError as e:
LOG.debug('[%i/%i] Hash calculation failed due to %s',
retries, CONF.http_retries,
encodeutils.exception_to_unicode(e))
if retries == CONF.http_retries:
if image.status != 'active':
# NOTE(pdeore): The image location add operation
# should succeed so this exception should be raised
# only when image status is not active.
msg = (_('Hash calculation failed for image %s '
'data') % self.image_id)
raise _HashCalculationFailed(msg)
else:
msg = (_LW("Hash calculation failed for image %s "
"data") % self.image_id)
LOG.warning(msg)
except store.exceptions.NotFound:
# NOTE(pdeore): This can happen if image delete attempted
# when hash calculation is in progress, which deletes the
# image data from backend(specially rbd) but image remains
# in 'active' state.
# see: https://bugs.launchpad.net/glance/+bug/2045769
# Once this ceph side issue is fixed, we'll keep only the
# warning message here and will remove the deletion part
# which is a temporary workaround.
LOG.debug(_('Failed to calculate checksum of %(image_id)s '
'as image data has been deleted from the '
'backend'), {'image_id': self.image_id})
image.delete()
self.image_repo.remove(image)
break
def execute(self):
image = self.image_repo.get(self.image_id)
if image.status == 'queued':
image.status = self.image_status
image.os_hash_algo = self.hashing_algo
self.image_repo.save(image)
self._set_checksum_and_hash(image)
def revert(self, result, **kwargs):
"""Set os_hash_algo to None when hash calculation fails
and remove the location by reverting image to queued
state
"""
try:
image = self.image_repo.get(self.image_id)
if image.status == 'importing':
if not image.locations[0]['url'].startswith("http"):
# NOTE(pdeore): `http` store doesn't allow deletion of
# location:
image.locations.pop()
image.status = 'queued'
image.os_hash_algo = None
self.image_repo.save(image)
except exception.NotFound:
LOG.debug("Image %s might have been deleted from the backend",
self.image_id)
class _VerifyValidationData(task.Task):
def __init__(self, task_id, task_type, image_repo, image_id,
val_data):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
self.val_data = val_data
super(_VerifyValidationData, self).__init__(
name='%s-VerifyValidationData-%s' % (task_type, task_id))
def execute(self):
"""Verify the Validation Data with calculated Hash
:param image_id: Glance Image ID
:val_data: Validation Data provider by user
"""
image = self.image_repo.get(self.image_id)
if self.val_data['os_hash_value'] != image.os_hash_value:
msg = (_("os_hash_value: (%s) not matched with actual "
"os_hash_value: (%s)") % (
self.val_data['os_hash_value'],
image.os_hash_value))
raise exception.InvalidParameterValue(msg)
def revert(self, result, **kwargs):
"""Set image status back to queued and
set the hash values to None
"""
try:
image = self.image_repo.get(self.image_id)
if not image.locations[0]['url'].startswith("http"):
# NOTE(pdeore): `http` store doesn't allow deletion of
# location
image.locations.pop()
image.status = 'queued'
image.os_hash_algo = None
image.os_hash_value = None
image.checksum = None
self.image_repo.save(image)
except exception.NotFound:
LOG.debug("Image %s might have been deleted from the backend",
self.image_id)
class _SetHashValues(task.Task):
def __init__(self, task_id, task_type, image_repo, image_id,
val_data):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
self.val_data = val_data
super(_SetHashValues, self).__init__(
name='%s-SetHashValues-%s' % (task_type, task_id))
def execute(self):
"""Set user provided hash algo and value hash properties to image
when do_secure_hash is False.
:param image_id: Glance Image ID
:val_data: Validation Data provided by user
"""
image = self.image_repo.get(self.image_id)
for k, v in self.val_data.items():
setattr(image, k, v)
self.image_repo.save(image)
class _UpdateLocationTask(task.Task):
def __init__(self, task_id, task_type, image_repo, image_id, url,
context):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
self.url = url
self.context = context
super(_UpdateLocationTask, self).__init__(
name='%s-UpdateLocationTask-%s' % (task_type, task_id))
def execute(self):
"""Update the image location
:param image_id: Glance Image ID
:param url: Location URL
"""
image = self.image_repo.get(self.image_id)
try:
# (NOTE(pdeore): Add metadata key to add the store identifier
# as location metadata
updated_location = {
'url': self.url,
'metadata': {},
}
if CONF.enabled_backends:
updated_location = store_utils.get_updated_store_location(
[updated_location], context=self.context)[0]
image.locations.append(updated_location)
self.image_repo.save(image)
except (exception.Invalid, exception.BadStoreUri) as e:
raise _InvalidLocation(e.msg)
class _SetImageToActiveTask(task.Task):
def __init__(self, task_id, task_type, image_repo, image_id):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
super(_SetImageToActiveTask, self).__init__(
name='%s-SetImageToActiveTask-%s' % (task_type, task_id))
def execute(self):
"""Set Image status to Active
:param image_id: Glance Image ID
"""
image = self.image_repo.get(self.image_id)
image.status = 'active'
self.image_repo.save(image)
def revert(self, result, **kwargs):
"""Set image status back to queued and
remove the location if it's added.
"""
try:
image = self.image_repo.get(self.image_id)
if image.status != 'active':
if not image.locations[0]['url'].startswith("http"):
# NOTE(pdeore): `http` store doesn't allow deletion of
# location
image.locations.pop()
if image.status == 'importing':
image.status = 'queued'
self.image_repo.save(image)
except exception.NotFound:
LOG.debug("Image %s might have been deleted from the backend",
self.image_id)
def get_flow(**kwargs):
"""Return task flow
:param task_id: Task ID
:param task_type: Type of the task
:param task_repo: Task repo
:param image_repo: Image repository used
:param image_id: ID of the Image to be processed
"""
task_id = kwargs.get('task_id')
task_type = kwargs.get('task_type')
task_repo = kwargs.get('task_repo')
image_repo = kwargs.get('image_repo')
admin_repo = kwargs.get('admin_repo')
image_id = kwargs.get('image_id')
val_data = kwargs.get('val_data', {})
loc_url = kwargs.get('loc_url')
context = kwargs.get('context')
hashing_algo = val_data.get("os_hash_algo",
CONF['hashing_algorithm'])
# Instantiate an action wrapper with the admin repo if we got one,
# otherwise with the regular repo.
action_wrapper = image_import.ImportActionWrapper(
admin_repo or image_repo, image_id, task_id)
kwargs['action_wrapper'] = action_wrapper
flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
flow.add(image_import._ImageLock(task_id, task_type, action_wrapper))
flow.add(
_UpdateLocationTask(task_id, task_type, image_repo, image_id,
loc_url, context))
if CONF.do_secure_hash:
if val_data:
flow.add(
_CalculateHash(task_id, task_type, image_repo, image_id,
hashing_algo, status='importing'))
flow.add(
_VerifyValidationData(task_id, task_type, image_repo,
image_id, val_data))
flow.add(
_SetImageToActiveTask(task_id, task_type, image_repo,
image_id))
else:
flow.add(
_SetImageToActiveTask(
task_id, task_type, image_repo, image_id))
flow.add(
_CalculateHash(task_id, task_type, image_repo, image_id,
hashing_algo))
elif val_data:
flow.add(
_SetHashValues(task_id, task_type, image_repo, image_id,
val_data))
flow.add(
_SetImageToActiveTask(task_id, task_type, image_repo, image_id))
else:
flow.add(
_SetImageToActiveTask(task_id, task_type, image_repo, image_id))
flow.add(
image_import._CompleteTask(task_id, task_type, task_repo,
action_wrapper))
return flow

View File

@ -131,6 +131,10 @@ class TaskExecutor(glance.async_.TaskExecutor):
if task.type == 'api_image_import':
kwds['image_id'] = task_input['image_id']
kwds['import_req'] = task_input['import_req']
if task.type == 'location_import':
kwds['image_id'] = task_input['image_id']
kwds['loc_url'] = task_input.get('loc_url')
kwds['val_data'] = task_input.get('validation_data', {})
return driver.DriverManager('glance.flows', task.type,
invoke_on_load=True,
invoke_kwds=kwds).driver

View File

@ -377,6 +377,22 @@ Related options:
* show_image_direct_url
* location_strategy
""")),
cfg.BoolOpt('do_secure_hash', default=True,
help=_("""
Calculate hash and checksum for the image.
This configuration option indicates that /v2/images/{image_id}/locations
POST API will calculate hash and checksum of the image on the fly.
If False it will silently ignore the hash and checksum calculation.
Possible values:
* True
* False
""")),
cfg.IntOpt('http_retries', default=3,
help=_("""
The number of times to retry when any operation fails.
""")),
cfg.IntOpt('image_size_cap', default=1099511627776, min=1,
max=9223372036854775808,

View File

@ -63,6 +63,15 @@ def unpack_task_input(task):
if 'image_id' not in task_input:
msg = _("Missing required 'image_id' field")
raise exception.Invalid(msg)
elif task_type == 'location_import':
if not task_input:
msg = _("Input to location_import task is empty.")
raise exception.Invalid(msg)
for key in ['image_id', 'loc_url', 'validation_data']:
if key not in task_input:
msg = (_("Input does not contain '%(key)s' field") %
{"key": key})
raise exception.Invalid(msg)
else:
for key in ["import_from", "import_from_format", "image_properties"]:
if key not in task_input:

View File

@ -351,7 +351,7 @@ class ImageMemberFactory(object):
class Task(object):
_supported_task_type = ('import', 'api_image_import')
_supported_task_type = ('import', 'api_image_import', 'location_import')
_supported_task_status = ('pending', 'processing', 'success', 'failure')

View File

@ -0,0 +1,530 @@
# Copyright 2024 RedHat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import io
from unittest import mock
import glance_store as store
from oslo_config import cfg
from oslo_utils import units
import glance.async_.flows.location_import as import_flow
from glance.common import exception
from glance import context
import glance.tests.unit.utils as unit_test_utils
import glance.tests.utils as test_utils
CONF = cfg.CONF
BASE_URI = unit_test_utils.BASE_URI
TASK_TYPE = 'location_import'
TASK_ID1 = 'dbbe7231-020f-4311-87e1-5aaa6da56c02'
IMAGE_ID1 = '41f5b3b0-f54c-4cef-bd45-ce3e376a142f'
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
class TestCalculateHashTask(test_utils.BaseTestCase):
def setUp(self):
super(TestCalculateHashTask, self).setUp()
self.task_repo = mock.MagicMock()
self.task = mock.MagicMock()
self.hash_task_input = {
'image_id': IMAGE_ID1,
}
self.image_repo = mock.MagicMock()
self.image = self.image_repo.get.return_value
self.image.image_id = IMAGE_ID1
self.image.disk_format = 'raw'
self.image.container_format = 'bare'
self.config(do_secure_hash=True)
self.config(http_retries='3')
self.context = context.RequestContext(user_id=TENANT1,
project_id=TENANT1,
overwrite=False)
def test_execute_calculate_hash(self):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
hashing_algo = CONF.hashing_algorithm
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo)
hash_calculation.execute()
self.assertIsNotNone(self.image.checksum)
self.assertIsNotNone(self.image.os_hash_algo)
self.assertIsNotNone(self.image.os_hash_value)
self.assertEqual('active', self.image.status)
def test_hash_calculation_retry_count(self):
hashing_algo = CONF.hashing_algorithm
self.image.checksum = None
self.image.os_hash_value = None
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo,
status='importing')
self.image.get_data.side_effect = IOError
self.config(http_retries='10')
expected_msg = ("Hash calculation failed for image .* data")
self.assertRaisesRegex(import_flow._HashCalculationFailed,
expected_msg,
hash_calculation.execute)
self.assertEqual(CONF.http_retries, self.image.get_data.call_count)
self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo)
self.assertIsNone(self.image.checksum)
self.assertIsNone(self.image.os_hash_value)
hash_calculation.revert(None)
self.assertIsNone(self.image.os_hash_algo)
def test_execute_hash_calculation_fails_without_validation_data(self):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
self.hash_task_input.update(loc_url=self.loc_url)
self.image.checksum = None
self.image.os_hash_value = None
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
# Since Image is mocked here, self.image.locations will not be
# set hence setting it here to check that it's not popped out
# even after CalculateHash failure
self.image.locations = ['%s/fake_location_1' % (BASE_URI)]
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
hashing_algo = CONF.hashing_algorithm
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo)
self.image.get_data.side_effect = IOError
with mock.patch.object(import_flow.LOG, 'debug') as mock_debug:
hash_calculation.execute()
debug_logs = mock_debug.call_args_list
self.assertIn(("[%i/%i] Hash calculation failed due to %s",
1, 3, ''), debug_logs[0])
self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo)
self.assertIsNone(self.image.checksum)
self.assertIsNone(self.image.os_hash_value)
self.assertEqual('active', self.image.status)
self.assertEqual(1, len(self.image.locations))
hash_calculation.revert(None)
self.assertIsNone(self.image.os_hash_algo)
self.assertEqual('active', self.image.status)
self.assertEqual(1, len(self.image.locations))
# Hash Calculation failed when image is 'active'.
# exception will not be raised instead there will be warning log
self.image.get_data.side_effect = IOError
with mock.patch.object(import_flow.LOG, 'warning') as mock_warn:
hash_calculation.execute()
msg = ("Hash calculation failed for image %s data" % IMAGE_ID1)
mock_warn.assert_called_once_with(msg)
self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo)
self.assertIsNone(self.image.checksum)
self.assertIsNone(self.image.os_hash_value)
self.assertEqual('active', self.image.status)
self.assertEqual(1, len(self.image.locations))
hash_calculation.revert(None)
self.assertIsNone(self.image.os_hash_algo)
self.assertEqual('active', self.image.status)
self.assertEqual(1, len(self.image.locations))
def test_execute_hash_calculation_fails_for_store_other_that_http(self):
self.loc_url = "cinder://image/fake_location"
self.hash_task_input.update(loc_url=self.loc_url)
self.image.status = 'queued'
self.image.checksum = None
self.image.os_hash_value = None
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
# Since Image is mocked here, self.image.locations will not be
# set hence setting it here to check that it's not popped out
# even after CalculateHash failure
self.image.locations = [{'url': 'cinder://image/fake_location'}]
hashing_algo = CONF.hashing_algorithm
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo,
status='importing')
self.image.get_data.side_effect = IOError
expected_msg = ("Hash calculation failed for image .* data")
self.assertRaisesRegex(import_flow._HashCalculationFailed,
expected_msg,
hash_calculation.execute)
self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo)
self.assertIsNone(self.image.checksum)
self.assertIsNone(self.image.os_hash_value)
self.assertEqual('importing', self.image.status)
self.assertEqual(1, len(self.image.locations))
hash_calculation.revert(None)
self.assertIsNone(self.image.os_hash_algo)
self.assertEqual('queued', self.image.status)
self.assertEqual(0, len(self.image.locations))
def test_execute_hash_calculation_fails_if_image_data_deleted(self):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
self.hash_task_input.update(loc_url=self.loc_url)
self.image.checksum = None
self.image.os_hash_value = None
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
hashing_algo = CONF.hashing_algorithm
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo)
self.image.get_data.side_effect = store.exceptions.NotFound
hash_calculation.execute()
# Check if Image delete and image_repo.delete has been called
# if exception raised
self.image.delete.assert_called_once()
self.image_repo.remove.assert_called_once_with(self.image)
class TestVerifyValidationDataTask(test_utils.BaseTestCase):
def setUp(self):
super(TestVerifyValidationDataTask, self).setUp()
self.task_repo = mock.MagicMock()
self.task = mock.MagicMock()
self.val_data_task_input = {
'image_id': IMAGE_ID1,
}
self.image_repo = mock.MagicMock()
self.image = self.image_repo.get.return_value
self.image.image_id = IMAGE_ID1
self.image.disk_format = 'raw'
self.image.container_format = 'bare'
self.config(do_secure_hash=True)
def test_execute_with_valid_validation_data(self):
url = '%s/fake_location_1' % BASE_URI
self.image.status = 'queued'
self.image.locations = {"url": url, "metadata": {"store": "foo"}}
expected_size = 4 * units.Ki
expected_data = b"*" * expected_size
self.image.get_data.return_value = io.BytesIO(expected_data)
hash_value = hashlib.sha512(expected_data).hexdigest()
hashing_algo = CONF.hashing_algorithm
self.image.checksum = None
self.image.os_hash_value = None
val_data = {
'os_hash_algo': hashing_algo,
'os_hash_value': hash_value
}
self.val_data_task_input.update(val_data=val_data)
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo,
status='importing')
hash_calculation.execute()
self.image.os_hash_algo = val_data.get("os_hash_algo",
hashing_algo)
verify_validation_data = import_flow._VerifyValidationData(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, val_data)
verify_validation_data.execute()
self.assertEqual('sha512', self.image.os_hash_algo)
self.assertEqual(hash_value, self.image.os_hash_value)
self.assertEqual('importing', self.image.status)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
def test_execute_with_os_hash_value_other_than_512(self):
url = '%s/fake_location_1' % BASE_URI
self.image.status = 'queued'
self.image.locations = {"url": url, "metadata": {"store": "foo"}}
expected_size = 4 * units.Ki
expected_data = b"*" * expected_size
self.image.get_data.return_value = io.BytesIO(expected_data)
hash_value = hashlib.sha256(expected_data).hexdigest()
hashing_algo = 'sha256'
self.image.checksum = None
self.image.os_hash_value = None
val_data = {
'os_hash_algo': 'sha256',
'os_hash_value': hash_value
}
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo,
status='importing')
hash_calculation.execute()
self.val_data_task_input.update(val_data=val_data)
verify_validation_data = import_flow._VerifyValidationData(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, val_data)
verify_validation_data.execute()
self.assertEqual('sha256', self.image.os_hash_algo)
self.assertEqual(hash_value, self.image.os_hash_value)
self.assertEqual('importing', self.image.status)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
def test_execute_with_invalid_validation_data(self):
url = '%s/fake_location_1' % BASE_URI
self.image.status = 'queued'
self.image.locations = [{"url": url, "metadata": {"store": "foo"}}]
expected_size = 4 * units.Ki
expected_data = b"*" * expected_size
self.image.get_data.return_value = io.BytesIO(expected_data)
hashing_algo = CONF.hashing_algorithm
val_data = {
'os_hash_algo': hashing_algo,
'os_hash_value': hashlib.sha512(b'image_service').hexdigest()
}
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo,
status='importing')
hash_calculation.execute()
self.assertEqual('importing', self.image.status)
self.assertEqual(1, len(self.image.locations))
verify_validation_data = import_flow._VerifyValidationData(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1,
val_data)
expected_msg = ("os_hash_value: .* not matched with actual "
"os_hash_value: .*")
self.assertRaisesRegex(exception.InvalidParameterValue,
expected_msg,
verify_validation_data.execute)
verify_validation_data.revert(None)
self.assertIsNone(self.image.os_hash_algo)
self.assertIsNone(self.image.os_hash_value)
self.assertIsNone(self.image.checksum)
self.assertEqual('queued', self.image.status)
class TestSetHashValuesTask(test_utils.BaseTestCase):
def setUp(self):
super(TestSetHashValuesTask, self).setUp()
self.task_repo = mock.MagicMock()
self.task = mock.MagicMock()
self.hash_task_input = {
'image_id': IMAGE_ID1,
}
self.image_repo = mock.MagicMock()
self.image = self.image_repo.get.return_value
self.image.image_id = IMAGE_ID1
self.image.disk_format = 'raw'
self.image.container_format = 'bare'
def test_execute_with_valid_validation_data(self):
url = '%s/fake_location_1' % BASE_URI
self.image.status = 'queued'
self.image.locations = {"url": url, "metadata": {"store": "foo"}}
expected_size = 4 * units.Ki
expected_data = b"*" * expected_size
self.image.get_data.return_value = io.BytesIO(expected_data)
hash_value = hashlib.sha512(expected_data).hexdigest()
val_data = {
'os_hash_algo': 'sha512',
'os_hash_value': hash_value
}
self.hash_task_input.update(val_data=val_data)
set_hash_data = import_flow._SetHashValues(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, val_data)
set_hash_data.execute()
self.assertEqual('sha512', self.image.os_hash_algo)
self.assertEqual(hash_value, self.image.os_hash_value)
self.assertEqual('queued', self.image.status)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
class TestUpdateLocationTask(test_utils.BaseTestCase):
def setUp(self):
super(TestUpdateLocationTask, self).setUp()
self.task_repo = mock.MagicMock()
self.task = mock.MagicMock()
self.location_task_input = {
'image_id': IMAGE_ID1,
}
self.image_repo = mock.MagicMock()
self.image = self.image_repo.get.return_value
self.image.image_id = IMAGE_ID1
self.image.disk_format = 'raw'
self.image.container_format = 'bare'
self.context = context.RequestContext(user_id=TENANT1,
project_id=TENANT1,
overwrite=False)
def test_execute_with_valid_location(self):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
self.location_task_input.update(loc_url=self.loc_url)
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
def test_execute_with_invalid_location(self):
self.image.locations.append.side_effect = exception.BadStoreUri
loc_url = 'bogus_url'
self.image.status = 'queued'
self.location_task_input.update(loc_url=loc_url)
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, loc_url,
self.context)
self.assertRaises(import_flow._InvalidLocation,
location_update.execute)
self.assertEqual('queued', self.image.status)
class TestSetImageToActiveTask(test_utils.BaseTestCase):
def setUp(self):
super(TestSetImageToActiveTask, self).setUp()
self.task_repo = mock.MagicMock()
self.task = mock.MagicMock()
self.set_status_task_input = {
'image_id': IMAGE_ID1,
}
self.image_repo = mock.MagicMock()
self.image = self.image_repo.get.return_value
self.image.image_id = IMAGE_ID1
self.image.disk_format = 'raw'
self.image.container_format = 'bare'
self.context = context.RequestContext(user_id=TENANT1,
project_id=TENANT1,
overwrite=False)
def test_execute_set_image_to_active_state(self):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
self.set_status_task_input.update(loc_url=self.loc_url)
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
self.assertEqual('queued', self.image.status)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
def test_execute_set_image_to_active_state_failure(self):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
self.set_status_task_input.update(loc_url=self.loc_url)
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
self.assertEqual('queued', self.image.status)
# Test if image failed while saving to active state
self.image_repo.save.side_effect = ValueError
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
self.assertRaises(ValueError, set_image_active.execute)
# Test revert where location added in previous task is popped
# out incase of this task failure which didn't set image status
# 'active'.
self.image_repo.save.side_effect = None
self.image.status = 'queued'
set_image_active.revert(None)
self.assertEqual(0, self.image.locations.pop.call_count)
self.assertEqual('queued', self.image.status)

View File

@ -39,6 +39,33 @@ class TestScriptsUtils(test_utils.BaseTestCase):
self.assertEqual(task_input,
script_utils.unpack_task_input(task))
def test_unpack_task_type_location_import(self):
task_type = 'location_import'
task_input = {'image_id': mock.ANY,
'loc_url': mock.ANY,
'validation_data': {}}
task = mock.Mock(type=task_type, task_input=task_input)
self.assertEqual(task_input,
script_utils.unpack_task_input(task))
def test_unpack_task_type_location_import_error(self):
task_type = 'location_import'
task_input1 = {'image_id': mock.ANY,
'validation_data': {}}
task_input2 = {'loc_url': mock.ANY,
'validation_data': {}}
task_input3 = {'image_id': mock.ANY,
'loc_url': mock.ANY}
task1 = mock.Mock(type=task_type, task_input=task_input1)
task2 = mock.Mock(type=task_type, task_input=task_input2)
task3 = mock.Mock(type=task_type, task_input=task_input3)
self.assertRaises(exception.Invalid,
script_utils.unpack_task_input, task1)
self.assertRaises(exception.Invalid,
script_utils.unpack_task_input, task2)
self.assertRaises(exception.Invalid,
script_utils.unpack_task_input, task3)
def test_unpack_task_input_error(self):
task_input1 = {"import_from_format": "bar", "image_properties": "baz"}
task_input2 = {"import_from": "foo", "image_properties": "baz"}

View File

@ -69,6 +69,7 @@ oslo.policy.policies =
glance.flows =
api_image_import = glance.async_.flows.api_image_import:get_flow
import = glance.async_.flows.base_import:get_flow
location_import = glance.async_.flows.location_import:get_flow
glance.flows.import =
convert = glance.async_.flows.convert:get_flow