Add support for PartialTask list

- partial-task: Task that includes all the values except the 'Text'
columns(input, result & message).
- This patch is to have the database to return a partial-task list on users
invoking 'GET /v2/tasks'.
- Returning list of partial tasks will help in reducing the response time when the
user queries for the list of tasks.
- This patch also includes changes for making implementation and return values of
display task detail (GET /v2/tasks/{task_id}) and list tasks (GET /v2/tasks)
more explicit.

partially implements bp async-glance-workers

Change-Id: I4fbadc9a97e3147128c7c733384c7bb50918806f
This commit is contained in:
Venkatesh Sampath 2014-01-08 12:20:21 +05:30
parent 70ff603e39
commit 010c0801bd
17 changed files with 452 additions and 238 deletions

View File

@ -72,6 +72,13 @@ def proxy_task(context, task):
return ImmutableTaskProxy(task)
def proxy_task_details(context, task, task_details):
if is_task_mutable(context, task):
return task_details
else:
return ImmutableTaskDetailsProxy(task_details)
class ImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context):
@ -325,9 +332,7 @@ class ImmutableTaskProxy(object):
task_id = _immutable_attr('base', 'task_id')
type = _immutable_attr('base', 'type')
status = _immutable_attr('base', 'status')
input = _immutable_attr('base', 'input')
owner = _immutable_attr('base', 'owner')
message = _immutable_attr('base', 'message')
expires_at = _immutable_attr('base', 'expires_at')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
@ -348,6 +353,15 @@ class ImmutableTaskProxy(object):
raise exception.Forbidden(message)
class ImmutableTaskDetailsProxy(object):
def __init__(self, base):
self.base = base
input = _immutable_attr('base', 'input')
message = _immutable_attr('base', 'message')
result = _immutable_attr('base', 'result')
class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context):
@ -371,6 +385,13 @@ class TaskProxy(glance.domain.proxy.Task):
super(TaskProxy, self).__init__(task)
class TaskDetailsProxy(glance.domain.proxy.TaskDetails):
def __init__(self, task_details):
self.task_details = task_details
super(TaskDetailsProxy, self).__init__(task_details)
class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
def __init__(self, task_factory, context):
@ -378,9 +399,8 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
self.context = context
super(TaskFactoryProxy, self).__init__(
task_factory,
proxy_class=TaskProxy,
proxy_kwargs=None
)
task_proxy_class=TaskProxy,
task_details_proxy_class=TaskDetailsProxy)
def new_task(self, **kwargs):
owner = kwargs.get('owner', self.context.owner)
@ -396,17 +416,19 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
raise exception.Forbidden(message % owner)
class TaskRepoProxy(glance.domain.proxy.Repo):
class TaskRepoProxy(glance.domain.proxy.TaskRepo):
def __init__(self, task_repo, context):
self.task_repo = task_repo
self.context = context
super(TaskRepoProxy, self).__init__(task_repo)
def get(self, task_id):
task = self.task_repo.get(task_id)
return proxy_task(self.context, task)
def get_task_and_details(self, task_id):
task, task_details = self.task_repo.get_task_and_details(task_id)
return proxy_task(self.context, task), proxy_task_details(self.context,
task,
task_details)
def list(self, *args, **kwargs):
tasks = self.task_repo.list(*args, **kwargs)
def list_tasks(self, *args, **kwargs):
tasks = self.task_repo.list_tasks(*args, **kwargs)
return [proxy_task(self.context, t) for t in tasks]

View File

@ -357,34 +357,44 @@ class TaskProxy(glance.domain.proxy.Task):
self.base.run(executor)
class TaskRepoProxy(glance.domain.proxy.Repo):
class TaskDetailsProxy(glance.domain.proxy.TaskDetails):
def __init__(self, task_repo, context, policy):
def __init__(self, task_details, context, policy):
self.task_details = task_details
self.context = context
self.policy = policy
super(TaskDetailsProxy, self).__init__(task_details)
class TaskRepoProxy(glance.domain.proxy.TaskRepo):
def __init__(self, task_repo, context, task_policy):
self.context = context
self.policy = task_policy
self.task_repo = task_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(TaskRepoProxy, self).__init__(
task_repo,
item_proxy_class=TaskProxy,
item_proxy_kwargs=proxy_kwargs
)
super(TaskRepoProxy,
self).__init__(task_repo,
task_proxy_class=TaskProxy,
task_proxy_kwargs=proxy_kwargs,
task_details_proxy_class=TaskDetailsProxy,
task_details_proxy_kwargs=proxy_kwargs)
def get(self, task_id):
def get_task_and_details(self, task_id):
self.policy.enforce(self.context, 'get_task', {})
return super(TaskRepoProxy, self).get(task_id)
return super(TaskRepoProxy, self).get_task_and_details(task_id)
def list(self, *args, **kwargs):
def list_tasks(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_tasks', {})
return super(TaskRepoProxy, self).list(*args, **kwargs)
return super(TaskRepoProxy, self).list_tasks(*args, **kwargs)
def add(self, task):
def add(self, task, task_details=None):
self.policy.enforce(self.context, 'add_task', {})
return super(TaskRepoProxy, self).add(task)
super(TaskRepoProxy, self).add(task, task_details)
def save(self, task):
def save(self, task, task_details=None):
self.policy.enforce(self.context, 'modify_task', {})
return super(TaskRepoProxy, self).save(task)
super(TaskRepoProxy, self).save(task, task_details)
class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
@ -396,6 +406,7 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(TaskFactoryProxy, self).__init__(
task_factory,
proxy_class=TaskProxy,
proxy_kwargs=proxy_kwargs
)
task_proxy_class=TaskProxy,
task_proxy_kwargs=proxy_kwargs,
task_details_proxy_class=TaskDetailsProxy,
task_details_proxy_kwargs=proxy_kwargs)

View File

@ -57,17 +57,19 @@ class TasksController(object):
live_time = CONF.task.task_time_to_live
try:
new_task = task_factory.new_task(task_type=task['type'],
task_input=task['input'],
owner=req.context.owner,
task_time_to_live=live_time)
task_repo.add(new_task)
new_task_details = task_factory.new_task_details(new_task.task_id,
task['input'])
task_repo.add(new_task, new_task_details)
except exception.Forbidden as e:
msg = (_("Forbidden to create task. Reason: %(reason)s")
% {'reason': unicode(e)})
LOG.info(msg)
raise webob.exc.HTTPForbidden(explanation=unicode(e))
return new_task
result = {'task': new_task, 'task_details': new_task_details}
return result
def index(self, req, marker=None, limit=None, sort_key='created_at',
sort_dir='desc', filters=None):
@ -82,7 +84,11 @@ class TasksController(object):
task_repo = self.gateway.get_task_repo(req.context)
try:
tasks = task_repo.list(marker, limit, sort_key, sort_dir, filters)
tasks = task_repo.list_tasks(marker,
limit,
sort_key,
sort_dir,
filters)
if len(tasks) != 0 and len(tasks) == limit:
result['next_marker'] = tasks[-1].task_id
except (exception.NotFound, exception.InvalidSortKey,
@ -98,7 +104,7 @@ class TasksController(object):
def get(self, req, task_id):
try:
task_repo = self.gateway.get_task_repo(req.context)
task = task_repo.get(task_id)
task, task_details = task_repo.get_task_and_details(task_id)
except exception.NotFound as e:
msg = (_("Failed to find task %(task_id)s. Reason: %(reason)s") %
{'task_id': task_id, 'reason': unicode(e)})
@ -109,7 +115,8 @@ class TasksController(object):
{'task_id': task_id, 'reason': unicode(e)})
LOG.info(msg)
raise webob.exc.HTTPForbidden(explanation=unicode(e))
return task
result = {'task': task, 'task_details': task_details}
return result
class RequestDeserializer(wsgi.JSONRequestDeserializer):
@ -226,11 +233,15 @@ class ResponseSerializer(wsgi.JSONResponseSerializer):
self.partial_task_schema = partial_task_schema \
or _get_partial_task_schema()
def _format_task(self, task, schema):
def _format_task(self, schema, task, task_details=None):
task_view = {}
attributes = ['type', 'status', 'input', 'result', 'owner', 'message']
for key in attributes:
task_attributes = ['type', 'status', 'owner']
task_details_attributes = ['input', 'result', 'message']
for key in task_attributes:
task_view[key] = getattr(task, key)
if task_details:
for key in task_details_attributes:
task_view[key] = getattr(task_details, key)
task_view['id'] = task.task_id
if task.expires_at:
task_view['expires_at'] = timeutils.isotime(task.expires_at)
@ -241,12 +252,19 @@ class ResponseSerializer(wsgi.JSONResponseSerializer):
task_view = schema.filter(task_view) # domain
return task_view
def create(self, response, task):
def create(self, response, result):
response.status_int = 201
self.get(response, task)
task = result['task']
task_details = result['task_details']
self._get(response, task, task_details)
def get(self, response, task):
task_view = self._format_task(task, self.task_schema)
def get(self, response, result):
task = result['task']
task_details = result['task_details']
self._get(response, task, task_details)
def _get(self, response, task, task_details):
task_view = self._format_task(self.task_schema, task, task_details)
body = json.dumps(task_view, ensure_ascii=False)
response.unicode_body = unicode(body)
response.content_type = 'application/json'
@ -256,8 +274,8 @@ class ResponseSerializer(wsgi.JSONResponseSerializer):
params.pop('marker', None)
query = urllib.urlencode(params)
body = {
'tasks': [self._format_task(i, self.partial_task_schema)
for i in result['tasks']],
'tasks': [self._format_task(self.partial_task_schema, task)
for task in result['tasks']],
'first': '/v2/tasks',
'schema': '/v2/schemas/tasks',
}

View File

@ -284,47 +284,62 @@ class TaskRepo(object):
def _format_task_from_db(self, db_task):
return glance.domain.Task(
task_id=db_task['id'],
type=db_task['type'],
task_type=db_task['type'],
status=db_task['status'],
input=db_task['input'],
result=db_task['result'],
owner=db_task['owner'],
message=db_task['message'],
expires_at=db_task['expires_at'],
created_at=db_task['created_at'],
updated_at=db_task['updated_at'],
)
def _format_task_to_db(self, task):
return {'id': task.task_id,
def _format_task_details_from_db(self, db_task):
return glance.domain.TaskDetails(
task_id=db_task['id'],
task_input=db_task['input'],
result=db_task['result'],
message=db_task['message'],
)
def _format_task_to_db(self, task, task_details=None):
task = {'id': task.task_id,
'type': task.type,
'status': task.status,
'input': task.input,
'result': task.result,
'input': None,
'result': None,
'owner': task.owner,
'message': task.message,
'message': None,
'expires_at': task.expires_at,
'created_at': task.created_at,
'updated_at': task.updated_at}
if task_details is not None:
task.update({
'input': task_details.input,
'result': task_details.result,
'message': task_details.message,
})
return task
def __init__(self, context, db_api):
self.context = context
self.db_api = db_api
def get(self, task_id):
def get_task_and_details(self, task_id):
try:
db_api_task = self.db_api.task_get(self.context, task_id)
except (exception.NotFound, exception.Forbidden):
msg = _('Could not find task %s') % task_id
raise exception.NotFound(msg)
return self._format_task_from_db(db_api_task)
return (self._format_task_from_db(db_api_task),
self._format_task_details_from_db(db_api_task))
def list(self,
marker=None,
limit=None,
sort_key='created_at',
sort_dir='desc',
filters=None):
def list_tasks(self,
marker=None,
limit=None,
sort_key='created_at',
sort_dir='desc',
filters=None):
db_api_tasks = self.db_api.task_get_all(self.context,
filters=filters,
marker=marker,
@ -333,8 +348,8 @@ class TaskRepo(object):
sort_dir=sort_dir)
return [self._format_task_from_db(task) for task in db_api_tasks]
def save(self, task):
task_values = self._format_task_to_db(task)
def save(self, task, task_details=None):
task_values = self._format_task_to_db(task, task_details)
try:
updated_values = self.db_api.task_update(self.context,
task.task_id,
@ -344,8 +359,8 @@ class TaskRepo(object):
raise exception.NotFound(msg)
task.updated_at = updated_values['updated_at']
def add(self, task):
task_values = self._format_task_to_db(task)
def add(self, task, task_details=None):
task_values = self._format_task_to_db(task, task_details)
updated_values = self.db_api.task_create(self.context, task_values)
task.created_at = updated_values['created_at']
task.updated_at = updated_values['updated_at']

View File

@ -813,8 +813,7 @@ def task_get_all(context, filters=None, marker=None, limit=None,
filtered_tasks = []
for task in tasks:
task_info = DATA['task_info'][task['id']]
filtered_tasks.append(_format_task_from_db(task, task_info))
filtered_tasks.append(_format_task_from_db(task, task_info_ref=None))
return filtered_tasks

View File

@ -1153,8 +1153,7 @@ def task_get_all(context, filters=None, marker=None, limit=None,
filters = filters or {}
session = _get_session()
query = session.query(models.Task)\
.options(sa_orm.joinedload(models.Task.info))
query = session.query(models.Task)
if not (context.is_admin or admin_as_user == True) and \
context.owner is not None:
@ -1191,11 +1190,7 @@ def task_get_all(context, filters=None, marker=None, limit=None,
tasks = []
for task_ref in task_refs:
# NOTE(venkatesh): call to task_ref.info does not make any
# separate query call to fetch task info as it has been
# eagerly loaded using joinedload(models.Task.info) method above.
task_info_ref = task_ref.info
tasks.append(_task_format(task_ref, task_info_ref))
tasks.append(_task_format(task_ref, task_info_ref=None))
return tasks

View File

@ -307,22 +307,19 @@ class Task(object):
_supported_task_status = ('pending', 'processing', 'success', 'failure')
def __init__(self, task_id, type, status, input, result, owner, message,
def __init__(self, task_id, task_type, status, owner,
expires_at, created_at, updated_at, task_time_to_live=48):
if type not in self._supported_task_type:
raise exception.InvalidTaskType(type)
if task_type not in self._supported_task_type:
raise exception.InvalidTaskType(task_type)
if status not in self._supported_task_status:
raise exception.InvalidTaskStatus(status)
self.task_id = task_id
self._status = status
self.type = type
self.input = input
self.result = result
self.type = task_type
self.owner = owner
self.message = message
self.expires_at = expires_at
# NOTE(nikhil): We use '_time_to_live' to determine how long a
# task should live from the time it succeeds or fails.
@ -384,13 +381,23 @@ class Task(object):
self.expires_at = timeutils.utcnow() + self._time_to_live
class TaskDetails(object):
def __init__(self, task_id, task_input, message, result):
if task_id is None:
raise exception.TaskException(_('task_id is required to create '
'a new TaskDetails object'))
self.task_id = task_id
self.input = task_input
self.message = message
self.result = result
class TaskFactory(object):
def new_task(self, task_type, task_input, owner, task_time_to_live=48):
def new_task(self, task_type, owner, task_time_to_live=48):
task_id = str(uuid.uuid4())
status = 'pending'
result = None
message = None
# Note(nikhil): expires_at would be set on the task, only when it
# succeeds or fails.
expires_at = None
@ -400,12 +407,12 @@ class TaskFactory(object):
task_id,
task_type,
status,
task_input,
result,
owner,
message,
expires_at,
created_at,
updated_at,
task_time_to_live
)
def new_task_details(self, task_id, task_input, message=None, result=None):
return TaskDetails(task_id, task_input, message, result)

View File

@ -39,11 +39,44 @@ class Helper(object):
return self.proxy_class(obj, **self.proxy_kwargs)
def unproxy(self, obj):
if self.proxy_class is None:
if obj is None or self.proxy_class is None:
return obj
return obj.base
class TaskRepo(object):
def __init__(self,
base,
task_proxy_class=None, task_proxy_kwargs=None,
task_details_proxy_class=None,
task_details_proxy_kwargs=None):
self.base = base
self.task_proxy_helper = Helper(task_proxy_class, task_proxy_kwargs)
self.task_details_proxy_helper = Helper(task_details_proxy_class,
task_details_proxy_kwargs)
def get_task_and_details(self, task_id):
task, task_details = self.base.get_task_and_details(task_id)
return (self.task_proxy_helper.proxy(task),
self.task_details_proxy_helper.proxy(task_details))
def list_tasks(self, *args, **kwargs):
tasks = self.base.list_tasks(*args, **kwargs)
return [self.task_proxy_helper.proxy(task) for task in tasks]
def add(self, task, task_details=None):
self.base.add(self.task_proxy_helper.unproxy(task),
self.task_details_proxy_helper.unproxy(task_details))
def save(self, task, task_details=None):
self.base.save(self.task_proxy_helper.unproxy(task),
self.task_details_proxy_helper.unproxy(task_details))
def remove(self, task):
base_task = self.task_proxy_helper.unproxy(task)
self.base.remove(base_task)
class Repo(object):
def __init__(self, base, item_proxy_class=None, item_proxy_kwargs=None):
self.base = base
@ -140,10 +173,7 @@ class Task(object):
task_id = _proxy('base', 'task_id')
type = _proxy('base', 'type')
status = _proxy('base', 'status')
input = _proxy('base', 'input')
result = _proxy('base', 'result')
owner = _proxy('base', 'owner')
message = _proxy('base', 'message')
expires_at = _proxy('base', 'expires_at')
created_at = _proxy('base', 'created_at')
updated_at = _proxy('base', 'updated_at')
@ -161,11 +191,32 @@ class Task(object):
self.base.fail(message)
class TaskDetails(object):
def __init__(self, base):
self.base = base
task_id = _proxy('base', 'task_id')
input = _proxy('base', 'input')
result = _proxy('base', 'result')
message = _proxy('base', 'message')
class TaskFactory(object):
def __init__(self, base, proxy_class=None, proxy_kwargs=None):
self.helper = Helper(proxy_class, proxy_kwargs)
def __init__(self,
base,
task_proxy_class=None,
task_proxy_kwargs=None,
task_details_proxy_class=None,
task_details_proxy_kwargs=None):
self.task_helper = Helper(task_proxy_class, task_proxy_kwargs)
self.task_details_helper = Helper(task_details_proxy_class,
task_details_proxy_kwargs)
self.base = base
def new_task(self, **kwargs):
t = self.base.new_task(**kwargs)
return self.helper.proxy(t)
return self.task_helper.proxy(t)
def new_task_details(self, task_id, task_input, message=None, result=None):
td = self.base.new_task_details(task_id, task_input, message, result)
return self.task_details_helper.proxy(td)

View File

@ -289,36 +289,42 @@ class ImageProxy(glance.domain.proxy.Image):
self.notifier.info('image.activate', payload)
class TaskRepoProxy(glance.domain.proxy.Repo):
class TaskRepoProxy(glance.domain.proxy.TaskRepo):
def __init__(self, task_repo, context, notifier):
self.task_repo = task_repo
self.context = context
self.notifier = notifier
proxy_kwargs = {'context': self.context, 'notifier': self.notifier}
super(TaskRepoProxy, self).__init__(task_repo,
item_proxy_class=TaskProxy,
item_proxy_kwargs=proxy_kwargs)
super(TaskRepoProxy, self) \
.__init__(task_repo,
task_proxy_class=TaskProxy,
task_proxy_kwargs=proxy_kwargs,
task_details_proxy_class=TaskDetailsProxy,
task_details_proxy_kwargs=proxy_kwargs)
def add(self, task):
def add(self, task, task_details=None):
self.notifier.info('task.create',
format_task_notification(task))
return super(TaskRepoProxy, self).add(task)
super(TaskRepoProxy, self).add(task, task_details)
def remove(self, task):
payload = format_task_notification(task)
payload['deleted'] = True
payload['deleted_at'] = timeutils.isotime()
self.notifier.info('task.delete', payload)
return super(TaskRepoProxy, self).add(task)
super(TaskRepoProxy, self).remove(task)
class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
def __init__(self, factory, context, notifier):
def __init__(self, task_factory, context, notifier):
kwargs = {'context': context, 'notifier': notifier}
super(TaskFactoryProxy, self).__init__(factory,
proxy_class=TaskProxy,
proxy_kwargs=kwargs)
super(TaskFactoryProxy, self).__init__(
task_factory,
task_proxy_class=TaskProxy,
task_proxy_kwargs=kwargs,
task_details_proxy_class=TaskDetailsProxy,
task_details_proxy_kwargs=kwargs)
class TaskProxy(glance.domain.proxy.Task):
@ -350,3 +356,12 @@ class TaskProxy(glance.domain.proxy.Task):
self.notifier.info('task.failure',
format_task_notification(self.task))
return super(TaskProxy, self).fail(message)
class TaskDetailsProxy(glance.domain.proxy.TaskDetails):
def __init__(self, task_details, context, notifier):
self.task_details = task_details
self.context = context
self.notifier = notifier
super(TaskDetailsProxy, self).__init__(task_details)

View File

@ -1463,9 +1463,9 @@ class TaskTests(test_utils.BaseTestCase):
self.assertIsNone(task['deleted_at'])
self.assertEqual(task['created_at'], fixture['created_at'])
self.assertEqual(task['updated_at'], fixture['updated_at'])
self.assertEqual(task['input'], fixture['input'])
self.assertEqual(task['result'], fixture['result'])
self.assertEqual(task['message'], fixture['message'])
task_details_keys = ['input', 'message', 'result']
for key in task_details_keys:
self.assertFalse(key in task)
def test_task_create(self):
task_id = str(uuid.uuid4())

View File

@ -864,9 +864,8 @@ class TestImmutableTask(utils.BaseTestCase):
task_factory = glance.domain.TaskFactory()
self.context = glance.context.RequestContext(tenant=TENANT2)
task_type = 'import'
task_input = '{"loc": "fake"}'
owner = TENANT2
task = task_factory.new_task(task_type, task_input, owner)
task = task_factory.new_task(task_type, owner)
self.task = authorization.ImmutableTaskProxy(task)
def _test_change(self, attr, value):
@ -893,15 +892,9 @@ class TestImmutableTask(utils.BaseTestCase):
def test_change_status(self):
self._test_change('status', 'success')
def test_change_input(self):
self._test_change('input', {'foo': 'bar'})
def test_change_owner(self):
self._test_change('owner', 'fake')
def test_change_message(self):
self._test_change('message', 'fake')
def test_change_expires_at(self):
self._test_change('expires_at', 'fake')
@ -952,7 +945,6 @@ class TestTaskFactoryProxy(utils.BaseTestCase):
def test_task_create_default_owner(self):
owner = self.request1.context.owner
task = self.task_factory.new_task(task_type=self.task_type,
task_input=self.task_input,
owner=owner)
self.assertEqual(task.owner, TENANT1)
@ -985,26 +977,25 @@ class TestTaskRepoProxy(utils.BaseTestCase):
def __init__(self, fixtures):
self.fixtures = fixtures
def get(self, task_id):
def get_task_and_details(self, task_id):
for f in self.fixtures:
if f.task_id == task_id:
return f
return f, None
else:
raise ValueError(task_id)
def list(self, *args, **kwargs):
def list_tasks(self, *args, **kwargs):
return self.fixtures
def setUp(self):
super(TestTaskRepoProxy, self).setUp()
task_factory = glance.domain.TaskFactory()
task_type = 'import'
task_input = '{"loc": "fake"}'
owner = None
self.fixtures = [
task_factory.new_task(task_type, task_input, owner),
task_factory.new_task(task_type, task_input, owner),
task_factory.new_task(task_type, task_input, owner),
task_factory.new_task(task_type, owner),
task_factory.new_task(task_type, owner),
task_factory.new_task(task_type, owner),
]
self.context = glance.context.RequestContext(tenant=TENANT1)
task_repo = self.TaskRepoStub(self.fixtures)
@ -1014,33 +1005,28 @@ class TestTaskRepoProxy(utils.BaseTestCase):
)
def test_get_mutable_task(self):
task = self.task_repo.get(self.fixtures[0].task_id)
task, _ = self.task_repo.get_task_and_details(self.fixtures[0].task_id)
self.assertEqual(task.task_id, self.fixtures[0].task_id)
def test_get_immutable_task(self):
task = self.task_repo.get(self.fixtures[1].task_id)
self.assertRaises(
exception.Forbidden,
setattr,
task,
'input',
'foo'
)
task_id = self.fixtures[1].task_id
task, task_details = self.task_repo.get_task_and_details(task_id)
self.assertRaises(exception.Forbidden,
setattr,
task_details,
'input',
'foo')
def test_list(self):
tasks = self.task_repo.list()
tasks = self.task_repo.list_tasks()
self.assertEqual(tasks[0].task_id, self.fixtures[0].task_id)
self.assertRaises(
exception.Forbidden,
setattr,
tasks[1],
'input',
'foo'
)
self.assertRaises(
exception.Forbidden,
setattr,
tasks[2],
'input',
'foo'
)
self.assertRaises(exception.Forbidden,
setattr,
tasks[1],
'owner',
'foo')
self.assertRaises(exception.Forbidden,
setattr,
tasks[2],
'owner',
'foo')

View File

@ -562,87 +562,97 @@ class TestTaskRepo(test_utils.BaseTestCase):
[self.db.task_create(None, task) for task in self.tasks]
def test_get(self):
task = self.task_repo.get(UUID1)
task, task_details = self.task_repo.get_task_and_details(UUID1)
self.assertEqual(task.task_id, UUID1)
self.assertEqual(task.type, 'import')
self.assertEqual(task.status, 'pending')
self.assertEqual(task.input, self.fake_task_input)
self.assertEqual(task.result, '')
self.assertEqual(task.task_id, task_details.task_id)
self.assertEqual(task_details.input, self.fake_task_input)
self.assertEqual(task_details.result, '')
self.assertEqual(task.owner, TENANT1)
def test_get_not_found(self):
self.assertRaises(exception.NotFound, self.task_repo.get,
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
str(uuid.uuid4()))
def test_get_forbidden(self):
self.assertRaises(exception.NotFound, self.task_repo.get, UUID4)
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
UUID4)
def test_list(self):
tasks = self.task_repo.list()
tasks = self.task_repo.list_tasks()
task_ids = set([i.task_id for i in tasks])
self.assertEqual(set([UUID1, UUID2, UUID3]), task_ids)
def test_list_with_type(self):
filters = {'type': 'import'}
tasks = self.task_repo.list(filters=filters)
tasks = self.task_repo.list_tasks(filters=filters)
task_ids = set([i.task_id for i in tasks])
self.assertEqual(set([UUID1, UUID2, UUID3]), task_ids)
def test_list_with_status(self):
filters = {'status': 'failure'}
tasks = self.task_repo.list(filters=filters)
tasks = self.task_repo.list_tasks(filters=filters)
task_ids = set([i.task_id for i in tasks])
self.assertEqual(set([UUID3]), task_ids)
def test_list_with_marker(self):
full_tasks = self.task_repo.list()
full_tasks = self.task_repo.list_tasks()
full_ids = [i.task_id for i in full_tasks]
marked_tasks = self.task_repo.list(marker=full_ids[0])
marked_tasks = self.task_repo.list_tasks(marker=full_ids[0])
actual_ids = [i.task_id for i in marked_tasks]
self.assertEqual(actual_ids, full_ids[1:])
def test_list_with_last_marker(self):
tasks = self.task_repo.list()
marked_tasks = self.task_repo.list(marker=tasks[-1].task_id)
tasks = self.task_repo.list_tasks()
marked_tasks = self.task_repo.list_tasks(marker=tasks[-1].task_id)
self.assertEqual(len(marked_tasks), 0)
def test_limited_list(self):
limited_tasks = self.task_repo.list(limit=2)
limited_tasks = self.task_repo.list_tasks(limit=2)
self.assertEqual(len(limited_tasks), 2)
def test_list_with_marker_and_limit(self):
full_tasks = self.task_repo.list()
full_tasks = self.task_repo.list_tasks()
full_ids = [i.task_id for i in full_tasks]
marked_tasks = self.task_repo.list(marker=full_ids[0], limit=1)
marked_tasks = self.task_repo.list_tasks(marker=full_ids[0], limit=1)
actual_ids = [i.task_id for i in marked_tasks]
self.assertEqual(actual_ids, full_ids[1:2])
def test_sorted_list(self):
tasks = self.task_repo.list(sort_key='status', sort_dir='desc')
tasks = self.task_repo.list_tasks(sort_key='status', sort_dir='desc')
task_ids = [i.task_id for i in tasks]
self.assertEqual([UUID2, UUID1, UUID3], task_ids)
def test_add_task(self):
task_type = 'import'
task = self.task_factory.new_task(task_type, self.fake_task_input,
None)
task = self.task_factory.new_task(task_type, None)
self.assertEqual(task.updated_at, task.created_at)
self.task_repo.add(task)
retrieved_task = self.task_repo.get(task.task_id)
task_details = self.task_factory.new_task_details(task.task_id,
self.fake_task_input)
self.task_repo.add(task, task_details)
retrieved_task, retrieved_task_details = \
self.task_repo.get_task_and_details(task.task_id)
self.assertEqual(retrieved_task.updated_at, task.updated_at)
self.assertEqual(retrieved_task_details.task_id,
retrieved_task.task_id)
self.assertEqual(retrieved_task_details.input, task_details.input)
def test_save_task(self):
task = self.task_repo.get(UUID1)
task, task_details = self.task_repo.get_task_and_details(UUID1)
original_update_time = task.updated_at
self.task_repo.save(task)
current_update_time = task.updated_at
self.assertTrue(current_update_time > original_update_time)
task = self.task_repo.get(UUID1)
task, task_details = self.task_repo.get_task_and_details(UUID1)
self.assertEqual(task.updated_at, current_update_time)
def test_remove_task(self):
task = self.task_repo.get(UUID1)
task, task_details = self.task_repo.get_task_and_details(UUID1)
self.task_repo.remove(task)
self.assertRaises(exception.NotFound,
self.task_repo.get,
self.task_repo.get_task_and_details,
task.task_id)

View File

@ -305,28 +305,38 @@ class TestTaskFactory(test_utils.BaseTestCase):
def test_new_task(self):
task_type = 'import'
task_input = '{"import_from": "fake"}'
owner = TENANT1
task = self.task_factory.new_task(task_type, task_input, owner)
task = self.task_factory.new_task(task_type, owner)
self.assertTrue(task.task_id is not None)
self.assertTrue(task.created_at is not None)
self.assertEqual(task.created_at, task.updated_at)
self.assertEqual(task.status, 'pending')
self.assertEqual(task.owner, TENANT1)
self.assertEqual(task.input, '{"import_from": "fake"}')
def test_new_task_invalid_type(self):
task_type = 'blah'
task_input = '{"import_from": "fake"}'
owner = TENANT1
self.assertRaises(
exception.InvalidTaskType,
self.task_factory.new_task,
task_type,
task_input,
owner,
)
def test_new_task_details(self):
task_id = 'fake_task_id'
task_input = '{"import_from": "fake"}'
result = '{"result": "success"}'
message = 'fake message'
task_details = self.task_factory.new_task_details(task_id,
task_input,
message,
result)
self.assertEqual(task_details.task_id, task_id)
self.assertEqual(task_details.input, task_input)
self.assertEqual(task_details.result, result)
self.assertEqual(task_details.message, message)
class TestTask(test_utils.BaseTestCase):
@ -334,13 +344,10 @@ class TestTask(test_utils.BaseTestCase):
super(TestTask, self).setUp()
self.task_factory = domain.TaskFactory()
task_type = 'import'
task_input = ('{"import_from": "file:///home/a.img",'
' "import_from_format": "qcow2"}')
owner = TENANT1
task_ttl = CONF.task.task_time_to_live
self.gateway = unittest_utils.FakeGateway()
self.task = self.task_factory.new_task(task_type,
task_input,
owner,
task_time_to_live=task_ttl)
@ -351,12 +358,9 @@ class TestTask(test_utils.BaseTestCase):
exception.InvalidTaskStatus,
domain.Task,
task_id,
type='import',
task_type='import',
status=status,
input=None,
result=None,
owner=None,
message=None,
expires_at=None,
created_at=timeutils.utcnow(),
updated_at=timeutils.utcnow()
@ -443,3 +447,28 @@ class TestTask(test_utils.BaseTestCase):
expected
)
timeutils.clear_time_override()
class TestTaskDetails(test_utils.BaseTestCase):
def setUp(self):
super(TestTaskDetails, self).setUp()
self.task_input = ('{"import_from": "file:///home/a.img",'
' "import_from_format": "qcow2"}')
def test_task_details_init(self):
task_details_values = ['task_id_1',
self.task_input,
'result',
'None']
task_details = domain.TaskDetails(*task_details_values)
self.assertIsNotNone(task_details)
def test_task_details_with_no_task_id(self):
task_id = None
task_details_values = [task_id,
self.task_input,
'result',
'None']
self.assertRaises(exception.TaskException,
domain.TaskDetails,
*task_details_values)

View File

@ -290,7 +290,6 @@ class TestTaskFactory(test_utils.BaseTestCase):
super(TestTaskFactory, self).setUp()
self.factory = mock.Mock()
self.fake_type = 'import'
self.fake_input = "fake input"
self.fake_owner = "owner"
def test_proxy_plain(self):
@ -298,34 +297,52 @@ class TestTaskFactory(test_utils.BaseTestCase):
proxy_factory.new_task(
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
self.factory.new_task.assert_called_once_with(
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
proxy_factory.new_task_details("task_01", "input")
self.factory.new_task_details.assert_called_once_with(
"task_01",
"input",
None, None
)
def test_proxy_wrapping(self):
proxy_factory = proxy.TaskFactory(
self.factory,
proxy_class=FakeProxy,
proxy_kwargs={'dog': 'bark'}
)
task_proxy_class=FakeProxy,
task_proxy_kwargs={'dog': 'bark'},
task_details_proxy_class=FakeProxy,
task_details_proxy_kwargs={'dog': 'bark'})
self.factory.new_task.return_value = 'fake_task'
self.factory.new_task_details.return_value = 'fake_task_detail'
task = proxy_factory.new_task(
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
self.factory.new_task.assert_called_once_with(
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
self.assertIsInstance(task, FakeProxy)
self.assertEqual(task.base, 'fake_task')
task_details = proxy_factory.new_task_details('task_01', "input")
self.factory.new_task_details.assert_called_once_with(
'task_01',
"input",
None, None
)
self.assertIsInstance(task_details, FakeProxy)
self.assertEqual(task_details.base, 'fake_task_detail')

View File

@ -20,6 +20,7 @@ import webob
from glance.common import exception
import glance.context
from glance import domain
from glance import notifier
from glance.openstack.common import timeutils
import glance.tests.unit.utils as unit_test_utils
@ -388,16 +389,17 @@ class TestTaskNotifications(utils.BaseTestCase):
super(TestTaskNotifications, self).setUp()
self.task = TaskStub(
task_id='aaa',
type='import',
task_type='import',
status='pending',
input={"loc": "fake"},
result='',
owner=TENANT2,
message='',
expires_at=None,
created_at=DATETIME,
updated_at=DATETIME
)
self.task_details = domain.TaskDetails(task_id=self.task.task_id,
task_input={"loc": "fake"},
result='',
message='')
self.context = glance.context.RequestContext(
tenant=TENANT2,
user=USER1
@ -414,6 +416,9 @@ class TestTaskNotifications(utils.BaseTestCase):
self.context,
self.notifier
)
self.task_details_proxy = notifier.TaskDetailsProxy(self.task_details,
self.context,
self.notifier)
timeutils.set_time_override()
def tearDown(self):
@ -421,7 +426,7 @@ class TestTaskNotifications(utils.BaseTestCase):
timeutils.clear_time_override()
def test_task_create_notification(self):
self.task_repo_proxy.add(self.task_proxy)
self.task_repo_proxy.add(self.task_proxy, self.task_details_proxy)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]

View File

@ -85,13 +85,13 @@ class ImageMembershipStub(object):
class TaskRepoStub(object):
def get(self, *args, **kwargs):
return 'task_from_get'
def get_task_and_details(self, *args, **kwargs):
return 'task_from_get', 'task_details_from_get'
def add(self, *args, **kwargs):
return 'task_from_add'
def list(self, *args, **kwargs):
def list_tasks(self, *args, **kwargs):
return ['task_from_list_0', 'task_from_list_1']
@ -385,7 +385,9 @@ class TestTaskPolicy(test_utils.BaseTestCase):
{},
self.policy
)
self.assertRaises(exception.Forbidden, task_repo.get, UUID1)
self.assertRaises(exception.Forbidden,
task_repo.get_task_and_details,
UUID1)
def test_get_task_allowed(self):
rules = {"get_task": True}
@ -395,9 +397,9 @@ class TestTaskPolicy(test_utils.BaseTestCase):
{},
self.policy
)
output = task_repo.get(UUID1)
self.assertIsInstance(output, glance.api.policy.TaskProxy)
self.assertEqual(output.task, 'task_from_get')
task, task_details = task_repo.get_task_and_details(UUID1)
self.assertIsInstance(task, glance.api.policy.TaskProxy)
self.assertEqual(task.task, 'task_from_get')
def test_get_tasks_not_allowed(self):
rules = {"get_tasks": False}
@ -407,7 +409,7 @@ class TestTaskPolicy(test_utils.BaseTestCase):
{},
self.policy
)
self.assertRaises(exception.Forbidden, task_repo.list)
self.assertRaises(exception.Forbidden, task_repo.list_tasks)
def test_get_tasks_allowed(self):
rules = {"get_task": True}
@ -417,7 +419,7 @@ class TestTaskPolicy(test_utils.BaseTestCase):
{},
self.policy
)
tasks = task_repo.list()
tasks = task_repo.list_tasks()
for i, task in enumerate(tasks):
self.assertIsInstance(task, glance.api.policy.TaskProxy)
self.assertEqual(task.task, 'task_from_list_%d' % i)

View File

@ -41,10 +41,10 @@ DATETIME = datetime.datetime(2013, 9, 28, 15, 27, 36, 325355)
ISOTIME = '2013-09-28T15:27:36Z'
def _db_fixture(id, **kwargs):
def _db_fixture(task_id, **kwargs):
default_datetime = timeutils.utcnow()
obj = {
'id': id,
'id': task_id,
'status': 'pending',
'type': 'import',
'input': {},
@ -61,22 +61,23 @@ def _db_fixture(id, **kwargs):
return obj
def _domain_fixture(id, **kwargs):
def _domain_fixture(task_id, **kwargs):
default_datetime = timeutils.utcnow()
properties = {
'task_id': id,
'status': 'pending',
'type': 'import',
'input': {},
'result': None,
'owner': None,
'message': None,
'expires_at': None,
'created_at': default_datetime,
'updated_at': default_datetime,
task_properties = {
'task_id': task_id,
'status': kwargs.get('status', 'pending'),
'task_type': kwargs.get('type', 'import'),
'owner': kwargs.get('owner', None),
'expires_at': kwargs.get('expires_at', None),
'created_at': kwargs.get('created_at', default_datetime),
'updated_at': kwargs.get('updated_at', default_datetime),
}
properties.update(kwargs)
return glance.domain.Task(**properties)
task = glance.domain.Task(**task_properties)
task_details = glance.domain.TaskDetails(task_id,
kwargs.get('input', {}),
kwargs.get('message', None),
kwargs.get('result', None))
return {'task': task, 'task_details': task_details}
class TestTasksController(test_utils.BaseTestCase):
@ -267,8 +268,11 @@ class TestTasksController(test_utils.BaseTestCase):
def test_get(self):
request = unit_test_utils.get_fake_request()
output = self.controller.get(request, task_id=UUID1)
self.assertEqual(UUID1, output.task_id)
self.assertEqual('import', output.type)
task = output['task']
task_details = output['task_details']
self.assertEqual(UUID1, task.task_id)
self.assertEqual(UUID1, task_details.task_id)
self.assertEqual('import', task.type)
def test_get_non_existent(self):
request = unit_test_utils.get_fake_request()
@ -289,10 +293,12 @@ class TestTasksController(test_utils.BaseTestCase):
"image_from_format": "qcow2"}
}
output = self.controller.create(request, task=task)
self.assertEqual('import', output.type)
task = output['task']
task_details = output['task_details']
self.assertEqual('import', task.type)
self.assertEqual({
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
"image_from_format": "qcow2"}, output.input)
"image_from_format": "qcow2"}, task_details.input)
output_logs = [nlog for nlog in self.notifier.get_logs()
if nlog['event_type'] == 'task.create']
self.assertEqual(len(output_logs), 1)
@ -550,7 +556,8 @@ class TestTasksSerializer(test_utils.BaseTestCase):
}
request = webob.Request.blank('/v2/tasks')
response = webob.Response(request=request)
result = {'tasks': self.fixtures}
task_fixtures = [f['task'] for f in self.fixtures]
result = {'tasks': task_fixtures}
self.serializer.index(response, result)
actual = jsonutils.loads(response.body)
self.assertEqual(expected, actual)
@ -559,7 +566,8 @@ class TestTasksSerializer(test_utils.BaseTestCase):
def test_index_next_marker(self):
request = webob.Request.blank('/v2/tasks')
response = webob.Response(request=request)
result = {'tasks': self.fixtures, 'next_marker': UUID2}
task_fixtures = [f['task'] for f in self.fixtures]
result = {'tasks': task_fixtures, 'next_marker': UUID2}
self.serializer.index(response, result)
output = jsonutils.loads(response.body)
self.assertEqual('/v2/tasks?marker=%s' % UUID2, output['next'])
@ -568,7 +576,8 @@ class TestTasksSerializer(test_utils.BaseTestCase):
url = '/v2/tasks?limit=10&sort_key=id&sort_dir=asc'
request = webob.Request.blank(url)
response = webob.Response(request=request)
result = {'tasks': self.fixtures, 'next_marker': UUID2}
task_fixtures = [f['task'] for f in self.fixtures]
result = {'tasks': task_fixtures, 'next_marker': UUID2}
self.serializer.index(response, result)
output = jsonutils.loads(response.body)
self.assertEqual('/v2/tasks?sort_key=id&sort_dir=asc&limit=10',
@ -631,33 +640,56 @@ class TestTasksSerializer(test_utils.BaseTestCase):
'schema': '/v2/schemas/task',
}
response = webob.Response()
self.serializer.get(response, self.fixtures[1])
actual = jsonutils.loads(response.body)
self.assertEqual(expected, actual)
self.assertEqual('application/json', response.content_type)
def test_create(self):
response = webob.Response()
self.serializer.create(response, self.fixtures[3])
serialized_task = jsonutils.loads(response.body)
self.assertEqual(response.status_int, 201)
self.assertEqual(self.fixtures[3].task_id,
jsonutils.loads(response.body)['id'])
self.assertTrue('expires_at' in jsonutils.loads(response.body))
self.assertEqual(self.fixtures[3]['task'].task_id,
serialized_task['id'])
self.assertEqual(self.fixtures[3]['task_details'].task_id,
serialized_task['id'])
self.assertEqual(self.fixtures[3]['task_details'].input,
serialized_task['input'])
self.assertTrue('expires_at' in serialized_task)
self.assertEqual('application/json', response.content_type)
def test_create_ensure_expires_at_is_not_returned(self):
response = webob.Response()
self.serializer.create(response, self.fixtures[0])
serialized_task = jsonutils.loads(response.body)
self.assertEqual(response.status_int, 201)
self.assertEqual(self.fixtures[0].task_id,
jsonutils.loads(response.body)['id'])
self.assertFalse('expires_at' in jsonutils.loads(response.body))
self.assertEqual(self.fixtures[0]['task'].task_id,
serialized_task['id'])
self.assertEqual(self.fixtures[0]['task_details'].task_id,
serialized_task['id'])
self.assertEqual(self.fixtures[0]['task_details'].input,
serialized_task['input'])
self.assertFalse('expires_at' in serialized_task)
self.assertEqual('application/json', response.content_type)
response = webob.Response()
self.serializer.create(response, self.fixtures[1])
serialized_task = jsonutils.loads(response.body)
self.assertEqual(response.status_int, 201)
self.assertEqual(self.fixtures[1].task_id,
jsonutils.loads(response.body)['id'])
self.assertFalse('expires_at' in jsonutils.loads(response.body))
self.assertEqual(self.fixtures[1]['task'].task_id,
serialized_task['id'])
self.assertEqual(self.fixtures[1]['task_details'].task_id,
serialized_task['id'])
self.assertEqual(self.fixtures[1]['task_details'].input,
serialized_task['input'])
self.assertFalse('expires_at' in serialized_task)
self.assertEqual('application/json', response.content_type)