From a0b7650c4b526ee10312499c5b4ce5e890ec5e81 Mon Sep 17 00:00:00 2001 From: Pranali Deore Date: Thu, 22 Jun 2023 12:41:41 +0000 Subject: [PATCH] Add Location Import task flow This change adds location_import task flow which includes below tasks which are required for new add location api, 1. UpdateLocationTask 2. CalculateHash 3. VerifyValidationData 3. SetHashValues 4. SetImageToActiveTask Related blueprint new-location-apis Change-Id: Id5482582a29d947dcb74a506bf715cf6a2d05b3e --- glance/api/v2/tasks.py | 3 +- glance/async_/flows/location_import.py | 351 ++++++++++++ glance/async_/taskflow_executor.py | 4 + glance/common/config.py | 16 + glance/common/scripts/utils.py | 9 + glance/domain/__init__.py | 2 +- .../unit/async_/flows/test_location_import.py | 530 ++++++++++++++++++ .../unit/common/scripts/test_scripts_utils.py | 27 + setup.cfg | 1 + 9 files changed, 941 insertions(+), 2 deletions(-) create mode 100644 glance/async_/flows/location_import.py create mode 100644 glance/tests/unit/async_/flows/test_location_import.py diff --git a/glance/api/v2/tasks.py b/glance/api/v2/tasks.py index a02b892221..ad125861ce 100644 --- a/glance/api/v2/tasks.py +++ b/glance/api/v2/tasks.py @@ -351,7 +351,8 @@ _TASK_SCHEMA = { "description": _("The type of task represented by this content"), "enum": [ "import", - "api_image_import" + "api_image_import", + "location_import", ], "type": "string" }, diff --git a/glance/async_/flows/location_import.py b/glance/async_/flows/location_import.py new file mode 100644 index 0000000000..9936727bac --- /dev/null +++ b/glance/async_/flows/location_import.py @@ -0,0 +1,351 @@ +# Copyright 2024 RedHat Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import hashlib + +import glance_store as store +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import encodeutils +from oslo_utils import secretutils +from taskflow.patterns import linear_flow as lf +from taskflow import retry +from taskflow import task + +import glance.async_.flows.api_image_import as image_import +from glance.common import exception +from glance.common import store_utils +from glance.i18n import _, _LW + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class _HashCalculationFailed(exception.GlanceException): + + def __init__(self, message): + super(_HashCalculationFailed, self).__init__(message) + + +class _InvalidLocation(exception.GlanceException): + + def __init__(self, message): + super(_InvalidLocation, self).__init__(message) + + +class _CalculateHash(task.Task): + + def __init__(self, task_id, task_type, image_repo, image_id, + hashing_algo, status=None): + self.task_id = task_id + self.task_type = task_type + self.image_repo = image_repo + self.image_id = image_id + self.hashing_algo = hashing_algo + self.image_status = status + super(_CalculateHash, self).__init__( + name='%s-CalculateHash-%s' % (task_type, task_id)) + + def _calculate_hash(self, image): + current_os_hash_value = hashlib.new(self.hashing_algo) + current_checksum = secretutils.md5(usedforsecurity=False) + for chunk in image.get_data(): + if chunk is None: + break + current_checksum.update(chunk) + current_os_hash_value.update(chunk) + image.checksum = current_checksum.hexdigest() + image.os_hash_value = current_os_hash_value.hexdigest() + + def _set_checksum_and_hash(self, image): + retries = 0 + while retries <= CONF.http_retries and image.os_hash_value is None: + retries += 1 + try: + self._calculate_hash(image) + self.image_repo.save(image) + except IOError as e: + LOG.debug('[%i/%i] Hash calculation failed due to %s', + retries, CONF.http_retries, + encodeutils.exception_to_unicode(e)) + if retries == CONF.http_retries: + if image.status != 'active': + # NOTE(pdeore): The image location add operation + # should succeed so this exception should be raised + # only when image status is not active. + msg = (_('Hash calculation failed for image %s ' + 'data') % self.image_id) + raise _HashCalculationFailed(msg) + else: + msg = (_LW("Hash calculation failed for image %s " + "data") % self.image_id) + LOG.warning(msg) + except store.exceptions.NotFound: + # NOTE(pdeore): This can happen if image delete attempted + # when hash calculation is in progress, which deletes the + # image data from backend(specially rbd) but image remains + # in 'active' state. + # see: https://bugs.launchpad.net/glance/+bug/2045769 + # Once this ceph side issue is fixed, we'll keep only the + # warning message here and will remove the deletion part + # which is a temporary workaround. + LOG.debug(_('Failed to calculate checksum of %(image_id)s ' + 'as image data has been deleted from the ' + 'backend'), {'image_id': self.image_id}) + image.delete() + self.image_repo.remove(image) + break + + def execute(self): + image = self.image_repo.get(self.image_id) + if image.status == 'queued': + image.status = self.image_status + image.os_hash_algo = self.hashing_algo + self.image_repo.save(image) + self._set_checksum_and_hash(image) + + def revert(self, result, **kwargs): + """Set os_hash_algo to None when hash calculation fails + and remove the location by reverting image to queued + state + """ + try: + image = self.image_repo.get(self.image_id) + if image.status == 'importing': + if not image.locations[0]['url'].startswith("http"): + # NOTE(pdeore): `http` store doesn't allow deletion of + # location: + image.locations.pop() + image.status = 'queued' + image.os_hash_algo = None + self.image_repo.save(image) + except exception.NotFound: + LOG.debug("Image %s might have been deleted from the backend", + self.image_id) + + +class _VerifyValidationData(task.Task): + + def __init__(self, task_id, task_type, image_repo, image_id, + val_data): + self.task_id = task_id + self.task_type = task_type + self.image_repo = image_repo + self.image_id = image_id + self.val_data = val_data + super(_VerifyValidationData, self).__init__( + name='%s-VerifyValidationData-%s' % (task_type, task_id)) + + def execute(self): + """Verify the Validation Data with calculated Hash + + :param image_id: Glance Image ID + :val_data: Validation Data provider by user + """ + image = self.image_repo.get(self.image_id) + + if self.val_data['os_hash_value'] != image.os_hash_value: + msg = (_("os_hash_value: (%s) not matched with actual " + "os_hash_value: (%s)") % ( + self.val_data['os_hash_value'], + image.os_hash_value)) + raise exception.InvalidParameterValue(msg) + + def revert(self, result, **kwargs): + """Set image status back to queued and + set the hash values to None + """ + try: + image = self.image_repo.get(self.image_id) + if not image.locations[0]['url'].startswith("http"): + # NOTE(pdeore): `http` store doesn't allow deletion of + # location + image.locations.pop() + image.status = 'queued' + image.os_hash_algo = None + image.os_hash_value = None + image.checksum = None + self.image_repo.save(image) + except exception.NotFound: + LOG.debug("Image %s might have been deleted from the backend", + self.image_id) + + +class _SetHashValues(task.Task): + + def __init__(self, task_id, task_type, image_repo, image_id, + val_data): + self.task_id = task_id + self.task_type = task_type + self.image_repo = image_repo + self.image_id = image_id + self.val_data = val_data + super(_SetHashValues, self).__init__( + name='%s-SetHashValues-%s' % (task_type, task_id)) + + def execute(self): + """Set user provided hash algo and value hash properties to image + when do_secure_hash is False. + + :param image_id: Glance Image ID + :val_data: Validation Data provided by user + """ + image = self.image_repo.get(self.image_id) + for k, v in self.val_data.items(): + setattr(image, k, v) + self.image_repo.save(image) + + +class _UpdateLocationTask(task.Task): + + def __init__(self, task_id, task_type, image_repo, image_id, url, + context): + self.task_id = task_id + self.task_type = task_type + self.image_repo = image_repo + self.image_id = image_id + self.url = url + self.context = context + super(_UpdateLocationTask, self).__init__( + name='%s-UpdateLocationTask-%s' % (task_type, task_id)) + + def execute(self): + """Update the image location + + :param image_id: Glance Image ID + :param url: Location URL + """ + image = self.image_repo.get(self.image_id) + try: + # (NOTE(pdeore): Add metadata key to add the store identifier + # as location metadata + updated_location = { + 'url': self.url, + 'metadata': {}, + } + if CONF.enabled_backends: + updated_location = store_utils.get_updated_store_location( + [updated_location], context=self.context)[0] + + image.locations.append(updated_location) + self.image_repo.save(image) + except (exception.Invalid, exception.BadStoreUri) as e: + raise _InvalidLocation(e.msg) + + +class _SetImageToActiveTask(task.Task): + + def __init__(self, task_id, task_type, image_repo, image_id): + self.task_id = task_id + self.task_type = task_type + self.image_repo = image_repo + self.image_id = image_id + super(_SetImageToActiveTask, self).__init__( + name='%s-SetImageToActiveTask-%s' % (task_type, task_id)) + + def execute(self): + """Set Image status to Active + + :param image_id: Glance Image ID + """ + image = self.image_repo.get(self.image_id) + image.status = 'active' + self.image_repo.save(image) + + def revert(self, result, **kwargs): + """Set image status back to queued and + remove the location if it's added. + """ + try: + image = self.image_repo.get(self.image_id) + if image.status != 'active': + if not image.locations[0]['url'].startswith("http"): + # NOTE(pdeore): `http` store doesn't allow deletion of + # location + image.locations.pop() + if image.status == 'importing': + image.status = 'queued' + self.image_repo.save(image) + except exception.NotFound: + LOG.debug("Image %s might have been deleted from the backend", + self.image_id) + + +def get_flow(**kwargs): + """Return task flow + + :param task_id: Task ID + :param task_type: Type of the task + :param task_repo: Task repo + :param image_repo: Image repository used + :param image_id: ID of the Image to be processed + """ + task_id = kwargs.get('task_id') + task_type = kwargs.get('task_type') + task_repo = kwargs.get('task_repo') + image_repo = kwargs.get('image_repo') + admin_repo = kwargs.get('admin_repo') + image_id = kwargs.get('image_id') + val_data = kwargs.get('val_data', {}) + loc_url = kwargs.get('loc_url') + context = kwargs.get('context') + + hashing_algo = val_data.get("os_hash_algo", + CONF['hashing_algorithm']) + + # Instantiate an action wrapper with the admin repo if we got one, + # otherwise with the regular repo. + action_wrapper = image_import.ImportActionWrapper( + admin_repo or image_repo, image_id, task_id) + kwargs['action_wrapper'] = action_wrapper + + flow = lf.Flow(task_type, retry=retry.AlwaysRevert()) + flow.add(image_import._ImageLock(task_id, task_type, action_wrapper)) + flow.add( + _UpdateLocationTask(task_id, task_type, image_repo, image_id, + loc_url, context)) + if CONF.do_secure_hash: + if val_data: + flow.add( + _CalculateHash(task_id, task_type, image_repo, image_id, + hashing_algo, status='importing')) + flow.add( + _VerifyValidationData(task_id, task_type, image_repo, + image_id, val_data)) + flow.add( + _SetImageToActiveTask(task_id, task_type, image_repo, + image_id)) + else: + flow.add( + _SetImageToActiveTask( + task_id, task_type, image_repo, image_id)) + flow.add( + _CalculateHash(task_id, task_type, image_repo, image_id, + hashing_algo)) + elif val_data: + flow.add( + _SetHashValues(task_id, task_type, image_repo, image_id, + val_data)) + flow.add( + _SetImageToActiveTask(task_id, task_type, image_repo, image_id)) + else: + flow.add( + _SetImageToActiveTask(task_id, task_type, image_repo, image_id)) + + flow.add( + image_import._CompleteTask(task_id, task_type, task_repo, + action_wrapper)) + + return flow diff --git a/glance/async_/taskflow_executor.py b/glance/async_/taskflow_executor.py index ef90d60460..b648639a6a 100644 --- a/glance/async_/taskflow_executor.py +++ b/glance/async_/taskflow_executor.py @@ -131,6 +131,10 @@ class TaskExecutor(glance.async_.TaskExecutor): if task.type == 'api_image_import': kwds['image_id'] = task_input['image_id'] kwds['import_req'] = task_input['import_req'] + if task.type == 'location_import': + kwds['image_id'] = task_input['image_id'] + kwds['loc_url'] = task_input.get('loc_url') + kwds['val_data'] = task_input.get('validation_data', {}) return driver.DriverManager('glance.flows', task.type, invoke_on_load=True, invoke_kwds=kwds).driver diff --git a/glance/common/config.py b/glance/common/config.py index 1d749e4fe2..edf537c3b0 100644 --- a/glance/common/config.py +++ b/glance/common/config.py @@ -377,6 +377,22 @@ Related options: * show_image_direct_url * location_strategy +""")), + cfg.BoolOpt('do_secure_hash', default=True, + help=_(""" +Calculate hash and checksum for the image. + +This configuration option indicates that /v2/images/{image_id}/locations +POST API will calculate hash and checksum of the image on the fly. +If False it will silently ignore the hash and checksum calculation. + +Possible values: + * True + * False +""")), + cfg.IntOpt('http_retries', default=3, + help=_(""" +The number of times to retry when any operation fails. """)), cfg.IntOpt('image_size_cap', default=1099511627776, min=1, max=9223372036854775808, diff --git a/glance/common/scripts/utils.py b/glance/common/scripts/utils.py index 4a09d6fd43..f7c6a9f6a4 100644 --- a/glance/common/scripts/utils.py +++ b/glance/common/scripts/utils.py @@ -63,6 +63,15 @@ def unpack_task_input(task): if 'image_id' not in task_input: msg = _("Missing required 'image_id' field") raise exception.Invalid(msg) + elif task_type == 'location_import': + if not task_input: + msg = _("Input to location_import task is empty.") + raise exception.Invalid(msg) + for key in ['image_id', 'loc_url', 'validation_data']: + if key not in task_input: + msg = (_("Input does not contain '%(key)s' field") % + {"key": key}) + raise exception.Invalid(msg) else: for key in ["import_from", "import_from_format", "image_properties"]: if key not in task_input: diff --git a/glance/domain/__init__.py b/glance/domain/__init__.py index fe5986544f..1de161b75b 100644 --- a/glance/domain/__init__.py +++ b/glance/domain/__init__.py @@ -351,7 +351,7 @@ class ImageMemberFactory(object): class Task(object): - _supported_task_type = ('import', 'api_image_import') + _supported_task_type = ('import', 'api_image_import', 'location_import') _supported_task_status = ('pending', 'processing', 'success', 'failure') diff --git a/glance/tests/unit/async_/flows/test_location_import.py b/glance/tests/unit/async_/flows/test_location_import.py new file mode 100644 index 0000000000..c68f0c29ff --- /dev/null +++ b/glance/tests/unit/async_/flows/test_location_import.py @@ -0,0 +1,530 @@ +# Copyright 2024 RedHat Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import hashlib +import io +from unittest import mock + +import glance_store as store +from oslo_config import cfg +from oslo_utils import units + +import glance.async_.flows.location_import as import_flow +from glance.common import exception +from glance import context +import glance.tests.unit.utils as unit_test_utils +import glance.tests.utils as test_utils + + +CONF = cfg.CONF + +BASE_URI = unit_test_utils.BASE_URI + +TASK_TYPE = 'location_import' +TASK_ID1 = 'dbbe7231-020f-4311-87e1-5aaa6da56c02' +IMAGE_ID1 = '41f5b3b0-f54c-4cef-bd45-ce3e376a142f' +UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d' +TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df' + + +class TestCalculateHashTask(test_utils.BaseTestCase): + def setUp(self): + super(TestCalculateHashTask, self).setUp() + self.task_repo = mock.MagicMock() + self.task = mock.MagicMock() + self.hash_task_input = { + 'image_id': IMAGE_ID1, + } + self.image_repo = mock.MagicMock() + self.image = self.image_repo.get.return_value + self.image.image_id = IMAGE_ID1 + self.image.disk_format = 'raw' + self.image.container_format = 'bare' + self.config(do_secure_hash=True) + self.config(http_retries='3') + self.context = context.RequestContext(user_id=TENANT1, + project_id=TENANT1, + overwrite=False) + + def test_execute_calculate_hash(self): + self.loc_url = '%s/fake_location_1' % (BASE_URI) + self.image.status = 'queued' + hashing_algo = CONF.hashing_algorithm + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo) + hash_calculation.execute() + self.assertIsNotNone(self.image.checksum) + self.assertIsNotNone(self.image.os_hash_algo) + self.assertIsNotNone(self.image.os_hash_value) + self.assertEqual('active', self.image.status) + + def test_hash_calculation_retry_count(self): + hashing_algo = CONF.hashing_algorithm + self.image.checksum = None + self.image.os_hash_value = None + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo, + status='importing') + + self.image.get_data.side_effect = IOError + self.config(http_retries='10') + expected_msg = ("Hash calculation failed for image .* data") + self.assertRaisesRegex(import_flow._HashCalculationFailed, + expected_msg, + hash_calculation.execute) + self.assertEqual(CONF.http_retries, self.image.get_data.call_count) + self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo) + self.assertIsNone(self.image.checksum) + self.assertIsNone(self.image.os_hash_value) + + hash_calculation.revert(None) + self.assertIsNone(self.image.os_hash_algo) + + def test_execute_hash_calculation_fails_without_validation_data(self): + self.loc_url = '%s/fake_location_1' % (BASE_URI) + self.image.status = 'queued' + self.hash_task_input.update(loc_url=self.loc_url) + self.image.checksum = None + self.image.os_hash_value = None + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + + # Since Image is mocked here, self.image.locations will not be + # set hence setting it here to check that it's not popped out + # even after CalculateHash failure + self.image.locations = ['%s/fake_location_1' % (BASE_URI)] + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + hashing_algo = CONF.hashing_algorithm + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo) + + self.image.get_data.side_effect = IOError + with mock.patch.object(import_flow.LOG, 'debug') as mock_debug: + hash_calculation.execute() + debug_logs = mock_debug.call_args_list + self.assertIn(("[%i/%i] Hash calculation failed due to %s", + 1, 3, ''), debug_logs[0]) + self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo) + self.assertIsNone(self.image.checksum) + self.assertIsNone(self.image.os_hash_value) + self.assertEqual('active', self.image.status) + self.assertEqual(1, len(self.image.locations)) + + hash_calculation.revert(None) + self.assertIsNone(self.image.os_hash_algo) + self.assertEqual('active', self.image.status) + self.assertEqual(1, len(self.image.locations)) + + # Hash Calculation failed when image is 'active'. + # exception will not be raised instead there will be warning log + self.image.get_data.side_effect = IOError + with mock.patch.object(import_flow.LOG, 'warning') as mock_warn: + hash_calculation.execute() + msg = ("Hash calculation failed for image %s data" % IMAGE_ID1) + mock_warn.assert_called_once_with(msg) + self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo) + self.assertIsNone(self.image.checksum) + self.assertIsNone(self.image.os_hash_value) + self.assertEqual('active', self.image.status) + self.assertEqual(1, len(self.image.locations)) + + hash_calculation.revert(None) + self.assertIsNone(self.image.os_hash_algo) + self.assertEqual('active', self.image.status) + self.assertEqual(1, len(self.image.locations)) + + def test_execute_hash_calculation_fails_for_store_other_that_http(self): + self.loc_url = "cinder://image/fake_location" + self.hash_task_input.update(loc_url=self.loc_url) + self.image.status = 'queued' + self.image.checksum = None + self.image.os_hash_value = None + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + + # Since Image is mocked here, self.image.locations will not be + # set hence setting it here to check that it's not popped out + # even after CalculateHash failure + self.image.locations = [{'url': 'cinder://image/fake_location'}] + + hashing_algo = CONF.hashing_algorithm + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo, + status='importing') + + self.image.get_data.side_effect = IOError + expected_msg = ("Hash calculation failed for image .* data") + self.assertRaisesRegex(import_flow._HashCalculationFailed, + expected_msg, + hash_calculation.execute) + self.assertEqual(CONF.hashing_algorithm, self.image.os_hash_algo) + self.assertIsNone(self.image.checksum) + self.assertIsNone(self.image.os_hash_value) + self.assertEqual('importing', self.image.status) + self.assertEqual(1, len(self.image.locations)) + + hash_calculation.revert(None) + self.assertIsNone(self.image.os_hash_algo) + self.assertEqual('queued', self.image.status) + self.assertEqual(0, len(self.image.locations)) + + def test_execute_hash_calculation_fails_if_image_data_deleted(self): + self.loc_url = '%s/fake_location_1' % (BASE_URI) + self.image.status = 'queued' + self.hash_task_input.update(loc_url=self.loc_url) + self.image.checksum = None + self.image.os_hash_value = None + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + hashing_algo = CONF.hashing_algorithm + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo) + self.image.get_data.side_effect = store.exceptions.NotFound + hash_calculation.execute() + # Check if Image delete and image_repo.delete has been called + # if exception raised + self.image.delete.assert_called_once() + self.image_repo.remove.assert_called_once_with(self.image) + + +class TestVerifyValidationDataTask(test_utils.BaseTestCase): + def setUp(self): + super(TestVerifyValidationDataTask, self).setUp() + self.task_repo = mock.MagicMock() + self.task = mock.MagicMock() + self.val_data_task_input = { + 'image_id': IMAGE_ID1, + } + self.image_repo = mock.MagicMock() + self.image = self.image_repo.get.return_value + self.image.image_id = IMAGE_ID1 + self.image.disk_format = 'raw' + self.image.container_format = 'bare' + self.config(do_secure_hash=True) + + def test_execute_with_valid_validation_data(self): + url = '%s/fake_location_1' % BASE_URI + self.image.status = 'queued' + self.image.locations = {"url": url, "metadata": {"store": "foo"}} + expected_size = 4 * units.Ki + expected_data = b"*" * expected_size + self.image.get_data.return_value = io.BytesIO(expected_data) + hash_value = hashlib.sha512(expected_data).hexdigest() + hashing_algo = CONF.hashing_algorithm + self.image.checksum = None + self.image.os_hash_value = None + val_data = { + 'os_hash_algo': hashing_algo, + 'os_hash_value': hash_value + } + self.val_data_task_input.update(val_data=val_data) + + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo, + status='importing') + hash_calculation.execute() + + self.image.os_hash_algo = val_data.get("os_hash_algo", + hashing_algo) + + verify_validation_data = import_flow._VerifyValidationData( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, val_data) + + verify_validation_data.execute() + self.assertEqual('sha512', self.image.os_hash_algo) + self.assertEqual(hash_value, self.image.os_hash_value) + self.assertEqual('importing', self.image.status) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + def test_execute_with_os_hash_value_other_than_512(self): + url = '%s/fake_location_1' % BASE_URI + self.image.status = 'queued' + self.image.locations = {"url": url, "metadata": {"store": "foo"}} + expected_size = 4 * units.Ki + expected_data = b"*" * expected_size + self.image.get_data.return_value = io.BytesIO(expected_data) + hash_value = hashlib.sha256(expected_data).hexdigest() + hashing_algo = 'sha256' + self.image.checksum = None + self.image.os_hash_value = None + val_data = { + 'os_hash_algo': 'sha256', + 'os_hash_value': hash_value + } + + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo, + status='importing') + hash_calculation.execute() + + self.val_data_task_input.update(val_data=val_data) + + verify_validation_data = import_flow._VerifyValidationData( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, val_data) + + verify_validation_data.execute() + self.assertEqual('sha256', self.image.os_hash_algo) + self.assertEqual(hash_value, self.image.os_hash_value) + self.assertEqual('importing', self.image.status) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + def test_execute_with_invalid_validation_data(self): + url = '%s/fake_location_1' % BASE_URI + self.image.status = 'queued' + self.image.locations = [{"url": url, "metadata": {"store": "foo"}}] + expected_size = 4 * units.Ki + expected_data = b"*" * expected_size + self.image.get_data.return_value = io.BytesIO(expected_data) + hashing_algo = CONF.hashing_algorithm + val_data = { + 'os_hash_algo': hashing_algo, + 'os_hash_value': hashlib.sha512(b'image_service').hexdigest() + } + hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE, + self.image_repo, + IMAGE_ID1, + hashing_algo, + status='importing') + hash_calculation.execute() + + self.assertEqual('importing', self.image.status) + self.assertEqual(1, len(self.image.locations)) + verify_validation_data = import_flow._VerifyValidationData( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, + val_data) + expected_msg = ("os_hash_value: .* not matched with actual " + "os_hash_value: .*") + self.assertRaisesRegex(exception.InvalidParameterValue, + expected_msg, + verify_validation_data.execute) + verify_validation_data.revert(None) + self.assertIsNone(self.image.os_hash_algo) + self.assertIsNone(self.image.os_hash_value) + self.assertIsNone(self.image.checksum) + self.assertEqual('queued', self.image.status) + + +class TestSetHashValuesTask(test_utils.BaseTestCase): + def setUp(self): + super(TestSetHashValuesTask, self).setUp() + self.task_repo = mock.MagicMock() + self.task = mock.MagicMock() + self.hash_task_input = { + 'image_id': IMAGE_ID1, + } + self.image_repo = mock.MagicMock() + self.image = self.image_repo.get.return_value + self.image.image_id = IMAGE_ID1 + self.image.disk_format = 'raw' + self.image.container_format = 'bare' + + def test_execute_with_valid_validation_data(self): + url = '%s/fake_location_1' % BASE_URI + self.image.status = 'queued' + self.image.locations = {"url": url, "metadata": {"store": "foo"}} + expected_size = 4 * units.Ki + expected_data = b"*" * expected_size + self.image.get_data.return_value = io.BytesIO(expected_data) + hash_value = hashlib.sha512(expected_data).hexdigest() + val_data = { + 'os_hash_algo': 'sha512', + 'os_hash_value': hash_value + } + self.hash_task_input.update(val_data=val_data) + + set_hash_data = import_flow._SetHashValues( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, val_data) + + set_hash_data.execute() + self.assertEqual('sha512', self.image.os_hash_algo) + self.assertEqual(hash_value, self.image.os_hash_value) + self.assertEqual('queued', self.image.status) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + +class TestUpdateLocationTask(test_utils.BaseTestCase): + def setUp(self): + super(TestUpdateLocationTask, self).setUp() + self.task_repo = mock.MagicMock() + self.task = mock.MagicMock() + self.location_task_input = { + 'image_id': IMAGE_ID1, + } + self.image_repo = mock.MagicMock() + self.image = self.image_repo.get.return_value + self.image.image_id = IMAGE_ID1 + self.image.disk_format = 'raw' + self.image.container_format = 'bare' + self.context = context.RequestContext(user_id=TENANT1, + project_id=TENANT1, + overwrite=False) + + def test_execute_with_valid_location(self): + self.loc_url = '%s/fake_location_1' % (BASE_URI) + self.image.status = 'queued' + self.location_task_input.update(loc_url=self.loc_url) + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + def test_execute_with_invalid_location(self): + self.image.locations.append.side_effect = exception.BadStoreUri + loc_url = 'bogus_url' + self.image.status = 'queued' + self.location_task_input.update(loc_url=loc_url) + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, loc_url, + self.context) + self.assertRaises(import_flow._InvalidLocation, + location_update.execute) + self.assertEqual('queued', self.image.status) + + +class TestSetImageToActiveTask(test_utils.BaseTestCase): + def setUp(self): + super(TestSetImageToActiveTask, self).setUp() + self.task_repo = mock.MagicMock() + self.task = mock.MagicMock() + self.set_status_task_input = { + 'image_id': IMAGE_ID1, + } + self.image_repo = mock.MagicMock() + self.image = self.image_repo.get.return_value + self.image.image_id = IMAGE_ID1 + self.image.disk_format = 'raw' + self.image.container_format = 'bare' + self.context = context.RequestContext(user_id=TENANT1, + project_id=TENANT1, + overwrite=False) + + def test_execute_set_image_to_active_state(self): + self.loc_url = '%s/fake_location_1' % (BASE_URI) + self.image.status = 'queued' + self.set_status_task_input.update(loc_url=self.loc_url) + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + self.assertEqual('queued', self.image.status) + + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + set_image_active.execute() + self.assertEqual('active', self.image.status) + + def test_execute_set_image_to_active_state_failure(self): + self.loc_url = '%s/fake_location_1' % (BASE_URI) + self.image.status = 'queued' + self.set_status_task_input.update(loc_url=self.loc_url) + + location_update = import_flow._UpdateLocationTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url, + self.context) + + location_update.execute() + self.assertEqual(1, self.image.locations.append.call_count) + self.assertEqual('queued', self.image.status) + + # Test if image failed while saving to active state + self.image_repo.save.side_effect = ValueError + set_image_active = import_flow._SetImageToActiveTask( + TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1) + self.assertRaises(ValueError, set_image_active.execute) + + # Test revert where location added in previous task is popped + # out incase of this task failure which didn't set image status + # 'active'. + self.image_repo.save.side_effect = None + self.image.status = 'queued' + set_image_active.revert(None) + self.assertEqual(0, self.image.locations.pop.call_count) + self.assertEqual('queued', self.image.status) diff --git a/glance/tests/unit/common/scripts/test_scripts_utils.py b/glance/tests/unit/common/scripts/test_scripts_utils.py index a7d8155801..1c904da66d 100644 --- a/glance/tests/unit/common/scripts/test_scripts_utils.py +++ b/glance/tests/unit/common/scripts/test_scripts_utils.py @@ -39,6 +39,33 @@ class TestScriptsUtils(test_utils.BaseTestCase): self.assertEqual(task_input, script_utils.unpack_task_input(task)) + def test_unpack_task_type_location_import(self): + task_type = 'location_import' + task_input = {'image_id': mock.ANY, + 'loc_url': mock.ANY, + 'validation_data': {}} + task = mock.Mock(type=task_type, task_input=task_input) + self.assertEqual(task_input, + script_utils.unpack_task_input(task)) + + def test_unpack_task_type_location_import_error(self): + task_type = 'location_import' + task_input1 = {'image_id': mock.ANY, + 'validation_data': {}} + task_input2 = {'loc_url': mock.ANY, + 'validation_data': {}} + task_input3 = {'image_id': mock.ANY, + 'loc_url': mock.ANY} + task1 = mock.Mock(type=task_type, task_input=task_input1) + task2 = mock.Mock(type=task_type, task_input=task_input2) + task3 = mock.Mock(type=task_type, task_input=task_input3) + self.assertRaises(exception.Invalid, + script_utils.unpack_task_input, task1) + self.assertRaises(exception.Invalid, + script_utils.unpack_task_input, task2) + self.assertRaises(exception.Invalid, + script_utils.unpack_task_input, task3) + def test_unpack_task_input_error(self): task_input1 = {"import_from_format": "bar", "image_properties": "baz"} task_input2 = {"import_from": "foo", "image_properties": "baz"} diff --git a/setup.cfg b/setup.cfg index 6c926c1d35..adbd9b2b56 100644 --- a/setup.cfg +++ b/setup.cfg @@ -69,6 +69,7 @@ oslo.policy.policies = glance.flows = api_image_import = glance.async_.flows.api_image_import:get_flow import = glance.async_.flows.base_import:get_flow + location_import = glance.async_.flows.location_import:get_flow glance.flows.import = convert = glance.async_.flows.convert:get_flow