Refactor TaskFactory and Executor to take an admin ImageRepo

This refactors the task creation path to allow optional passing
of an admin-enabled ImageRepo to tasks. This will be used for
allowing import_image(method='copy-image') to update non-owned
image metadata during a copy.

Change-Id: Ief3440759fda4bd90979caa79641770cb022b7da
This commit is contained in:
Dan Smith 2020-06-25 12:54:19 -07:00
parent 019afb7894
commit 937a70ab83
8 changed files with 148 additions and 8 deletions

View File

@ -46,13 +46,18 @@ class TaskExecutor(object):
glance.domain.Image object into ORM semantics
image_factory: glance.domain.ImageFactory object to be used for
creating new images for certain types of tasks viz. import, cloning
admin_repo: glance.db.ImageRepo object which acts as a translator for
glance.domain.Image object into ORM semantics, but with an admin
context (optional)
"""
def __init__(self, context, task_repo, image_repo, image_factory):
def __init__(self, context, task_repo, image_repo, image_factory,
admin_repo=None):
self.context = context
self.task_repo = task_repo
self.image_repo = image_repo
self.image_factory = image_factory
self.admin_repo = admin_repo
def begin_processing(self, task_id):
task = self.task_repo.get(task_id)

View File

@ -87,13 +87,16 @@ CONF.register_opts(taskflow_executor_opts, group='taskflow_executor')
class TaskExecutor(glance.async_.TaskExecutor):
def __init__(self, context, task_repo, image_repo, image_factory):
def __init__(self, context, task_repo, image_repo, image_factory,
admin_repo=None):
self.context = context
self.task_repo = task_repo
self.image_repo = image_repo
self.image_factory = image_factory
self.admin_repo = admin_repo
super(TaskExecutor, self).__init__(context, task_repo, image_repo,
image_factory)
image_factory,
admin_repo=admin_repo)
@staticmethod
def _fetch_an_executor():
@ -123,6 +126,9 @@ class TaskExecutor(glance.async_.TaskExecutor):
'backend': task_input.get('backend')
}
if self.admin_repo:
kwds['admin_repo'] = self.admin_repo
if task.type == "import":
uri = script_utils.validate_location_uri(
task_input.get('import_from'))

View File

@ -497,10 +497,11 @@ class TaskFactory(object):
class TaskExecutorFactory(object):
eventlet_deprecation_warned = False
def __init__(self, task_repo, image_repo, image_factory):
def __init__(self, task_repo, image_repo, image_factory, admin_repo=None):
self.task_repo = task_repo
self.image_repo = image_repo
self.image_factory = image_factory
self.admin_repo = admin_repo
def new_task_executor(self, context):
try:
@ -526,7 +527,8 @@ class TaskExecutorFactory(object):
return executor(context,
self.task_repo,
self.image_repo,
self.image_factory)
self.image_factory,
admin_repo=self.admin_repo)
except ImportError:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Failed to load the %s executor provided "

View File

@ -133,13 +133,18 @@ class Gateway(object):
notifier_task_stub_repo, context)
return authorized_task_stub_repo
def get_task_executor_factory(self, context):
def get_task_executor_factory(self, context, admin_context=None):
task_repo = self.get_task_repo(context)
image_repo = self.get_repo(context)
image_factory = self.get_image_factory(context)
if admin_context:
admin_repo = self.get_repo(admin_context)
else:
admin_repo = None
return glance.domain.TaskExecutorFactory(task_repo,
image_repo,
image_factory)
image_factory,
admin_repo=admin_repo)
def get_metadef_namespace_factory(self, context):
ns_factory = glance.domain.MetadefNamespaceFactory()

View File

@ -55,6 +55,15 @@ class TestTaskExecutor(test_utils.BaseTestCase):
# assert the call
mock_run.assert_called_once_with(task_id, task_type)
def test_with_admin_repo(self):
admin_repo = mock.MagicMock()
executor = glance.async_.TaskExecutor(self.context,
self.task_repo,
self.image_repo,
self.image_factory,
admin_repo=admin_repo)
self.assertEqual(admin_repo, executor.admin_repo)
class TestImportTaskFlow(test_utils.BaseTestCase):

View File

@ -100,3 +100,26 @@ class TestTaskExecutor(test_utils.BaseTestCase):
self.assertEqual('failure', self.task.status)
self.task_repo.save.assert_called_with(self.task)
self.assertEqual(1, import_mock.call_count)
@mock.patch('stevedore.driver.DriverManager')
def test_get_flow_with_admin_repo(self, mock_driver):
admin_repo = mock.MagicMock()
executor = taskflow_executor.TaskExecutor(self.context,
self.task_repo,
self.image_repo,
self.image_factory,
admin_repo=admin_repo)
self.assertEqual(mock_driver.return_value.driver,
executor._get_flow(self.task))
mock_driver.assert_called_once_with(
'glance.flows', self.task.type,
invoke_on_load=True,
invoke_kwds={'task_id': self.task.task_id,
'task_type': self.task.type,
'context': self.context,
'task_repo': self.task_repo,
'image_repo': self.image_repo,
'image_factory': self.image_factory,
'backend': None,
'admin_repo': admin_repo,
'uri': 'http://cloud.foo/image.qcow2'})

View File

@ -546,7 +546,28 @@ class TestTaskExecutorFactory(test_utils.BaseTestCase):
mock_executor.assert_called_once_with(context,
self.task_repo,
self.image_repo,
self.image_factory)
self.image_factory,
admin_repo=None)
def test_new_task_executor_with_admin(self):
admin_repo = mock.MagicMock()
task_executor_factory = domain.TaskExecutorFactory(
self.task_repo,
self.image_repo,
self.image_factory,
admin_repo=admin_repo)
context = mock.Mock()
with mock.patch.object(oslo_utils.importutils,
'import_class') as mock_import_class:
mock_executor = mock.Mock()
mock_import_class.return_value = mock_executor
task_executor_factory.new_task_executor(context)
mock_executor.assert_called_once_with(context,
self.task_repo,
self.image_repo,
self.image_factory,
admin_repo=admin_repo)
def test_new_task_executor_error(self):
task_executor_factory = domain.TaskExecutorFactory(self.task_repo,

View File

@ -0,0 +1,69 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from glance import gateway
import glance.tests.utils as test_utils
class TestGateway(test_utils.BaseTestCase):
def setUp(self):
super(TestGateway, self).setUp()
self.gateway = gateway.Gateway()
self.context = mock.sentinel.context
@mock.patch('glance.domain.TaskExecutorFactory')
def test_get_task_executor_factory(self, mock_factory):
@mock.patch.object(self.gateway, 'get_task_repo')
@mock.patch.object(self.gateway, 'get_repo')
@mock.patch.object(self.gateway, 'get_image_factory')
def _test(mock_gif, mock_gr, mock_gtr):
self.gateway.get_task_executor_factory(self.context)
mock_gtr.assert_called_once_with(self.context)
mock_gr.assert_called_once_with(self.context)
mock_gif.assert_called_once_with(self.context)
mock_factory.assert_called_once_with(
mock_gtr.return_value,
mock_gr.return_value,
mock_gif.return_value,
admin_repo=None)
_test()
@mock.patch('glance.domain.TaskExecutorFactory')
def test_get_task_executor_factory_with_admin(self, mock_factory):
@mock.patch.object(self.gateway, 'get_task_repo')
@mock.patch.object(self.gateway, 'get_repo')
@mock.patch.object(self.gateway, 'get_image_factory')
def _test(mock_gif, mock_gr, mock_gtr):
mock_gr.side_effect = [mock.sentinel.image_repo,
mock.sentinel.admin_repo]
self.gateway.get_task_executor_factory(
self.context,
admin_context=mock.sentinel.admin_context)
mock_gtr.assert_called_once_with(self.context)
mock_gr.assert_has_calls([
mock.call(self.context),
mock.call(mock.sentinel.admin_context),
])
mock_gif.assert_called_once_with(self.context)
mock_factory.assert_called_once_with(
mock_gtr.return_value,
mock.sentinel.image_repo,
mock_gif.return_value,
admin_repo=mock.sentinel.admin_repo)
_test()