Remove redundant DB API arguments

The main reason is to be able to create root /executions and /tasks
endpoints, the one you should be able to get without knowing their
'workbook_name'.

Change-Id: I94b64f2b0e761259fd09c828bb63075ef898e4e3
Implements: blueprint mistral-ui
This commit is contained in:
Kirill Izotov 2014-06-19 16:07:53 +07:00
parent a2531e3595
commit 6a47676300
15 changed files with 193 additions and 217 deletions

View File

@ -91,7 +91,7 @@ class ExecutionsController(rest.RestController):
"""Return the specified Execution."""
LOG.debug("Fetch execution [workbook_name=%s, id=%s]" %
(workbook_name, id))
values = db_api.execution_get(workbook_name, id)
values = db_api.execution_get(id)
return Execution.from_dict(values)
@rest_utils.wrap_wsme_controller_exception
@ -101,8 +101,7 @@ class ExecutionsController(rest.RestController):
LOG.debug("Update execution [workbook_name=%s, id=%s, execution=%s]" %
(workbook_name, id, execution))
values = db_api.execution_update(workbook_name,
id,
values = db_api.execution_update(id,
execution.to_dict())
return Execution.from_dict(values)
@ -134,7 +133,7 @@ class ExecutionsController(rest.RestController):
"""Delete the specified Execution."""
LOG.debug("Delete execution [workbook_name=%s, id=%s]" %
(workbook_name, id))
db_api.execution_delete(workbook_name, id)
db_api.execution_delete(id)
@wsme_pecan.wsexpose(Executions, wtypes.text)
def get_all(self, workbook_name):
@ -142,7 +141,9 @@ class ExecutionsController(rest.RestController):
LOG.debug("Fetch executions [workbook_name=%s]" % workbook_name)
if db_api.workbook_get(workbook_name):
executions = [Execution.from_dict(values)
for values in db_api.executions_get(workbook_name)]
executions = [
Execution.from_dict(values) for values
in db_api.executions_get(workbook_name=workbook_name)
]
return Executions(executions=executions)

View File

@ -92,7 +92,7 @@ class TasksController(rest.RestController):
LOG.debug("Fetch task [workbook_name=%s, execution_id=%s, id=%s]" %
(workbook_name, execution_id, id))
values = db_api.task_get(workbook_name, execution_id, id)
values = db_api.task_get(id)
return Task.from_dict(values)
@rest_utils.wrap_wsme_controller_exception
@ -104,7 +104,7 @@ class TasksController(rest.RestController):
"[workbook_name=%s, execution_id=%s, id=%s, task=%s]" %
(workbook_name, execution_id, id, task))
if db_api.task_get(workbook_name, execution_id, id):
if db_api.task_get(id):
# TODO(rakhmerov): pass task result once it's implemented
engine = pecan.request.context['engine']
values = engine.convey_task_result(workbook_name,
@ -118,12 +118,13 @@ class TasksController(rest.RestController):
@wsme_pecan.wsexpose(Tasks, wtypes.text, wtypes.text)
def get_all(self, workbook_name, execution_id):
"""Return all tasks within the execution."""
db_api.ensure_execution_exists(workbook_name, execution_id)
db_api.ensure_execution_exists(execution_id)
LOG.debug("Fetch tasks [workbook_name=%s, execution_id=%s]" %
(workbook_name, execution_id))
tasks = [Task.from_dict(values)
for values in db_api.tasks_get(workbook_name, execution_id)]
for values in db_api.tasks_get(workbook_name=workbook_name,
execution_id=execution_id)]
return Tasks(tasks=tasks)

View File

@ -93,51 +93,50 @@ def workbook_definition_put(workbook_name, text):
# Executions
def execution_get(workbook_name, id):
return IMPL.execution_get(workbook_name, id)
def execution_get(id):
return IMPL.execution_get(id)
def ensure_execution_exists(workbook_name, execution_id):
return IMPL.ensure_execution_exists(workbook_name, execution_id)
def ensure_execution_exists(execution_id):
return IMPL.ensure_execution_exists(execution_id)
def execution_create(workbook_name, values):
return IMPL.execution_create(workbook_name, values)
def execution_update(workbook_name, id, values):
return IMPL.execution_update(workbook_name, id, values)
def execution_update(id, values):
return IMPL.execution_update(id, values)
def execution_delete(workbook_name, id):
return IMPL.execution_delete(workbook_name, id)
def execution_delete(id):
return IMPL.execution_delete(id)
def executions_get(workbook_name):
return IMPL.executions_get_all(workbook_name=workbook_name)
def executions_get(**kwargs):
return IMPL.executions_get(**kwargs)
# Tasks
def task_get(workbook_name, execution_id, id):
return IMPL.task_get(workbook_name, execution_id, id)
def task_get(id):
return IMPL.task_get(id)
def task_create(workbook_name, execution_id, values):
return IMPL.task_create(workbook_name, execution_id, values)
def task_create(execution_id, values):
return IMPL.task_create(execution_id, values)
def task_update(workbook_name, execution_id, id, values):
return IMPL.task_update(workbook_name, execution_id, id, values)
def task_update(id, values):
return IMPL.task_update(id, values)
def task_delete(workbook_name, execution_id, id):
return IMPL.task_delete(workbook_name, execution_id, id)
def task_delete(id):
return IMPL.task_delete(id)
def tasks_get(workbook_name, execution_id):
return IMPL.tasks_get_all(workbook_name=workbook_name,
execution_id=execution_id)
def tasks_get(**kwargs):
return IMPL.tasks_get(**kwargs)
# Listeners

View File

@ -207,9 +207,6 @@ def model_query(model, session=None):
"""Query helper.
:param model: base model to query
:param context: context to query under
:param project_only: if present and context is user-type, then restrict
query to match the context's tenant_id.
"""
return session.query(model)
@ -373,56 +370,53 @@ def execution_create(workbook_name, values, session=None):
@to_dict
@session_aware()
def execution_update(workbook_name, execution_id, values, session=None):
execution = _execution_get(workbook_name, execution_id)
def execution_update(execution_id, values, session=None):
execution = _execution_get(execution_id)
if not execution:
raise exc.NotFoundException(
"Execution not found [workbook_name=%s, execution_id=%s]" %
(workbook_name, execution_id))
"Execution not found [execution_id=%s]" % execution_id)
execution.update(values.copy())
return execution
@session_aware()
def execution_delete(workbook_name, execution_id, session=None):
execution = _execution_get(workbook_name, execution_id)
def execution_delete(execution_id, session=None):
execution = _execution_get(execution_id)
if not execution:
raise exc.NotFoundException(
"Execution not found [workbook_name=%s, execution_id=%s]" %
(workbook_name, execution_id))
"Execution not found [execution_id=%s]" % execution_id)
session.delete(execution)
@to_dict
def execution_get(workbook_name, execution_id):
execution = _execution_get(workbook_name, execution_id)
def execution_get(execution_id):
execution = _execution_get(execution_id)
if not execution:
raise exc.NotFoundException(
"Execution not found [workbook_name=%s, execution_id=%s]" %
(workbook_name, execution_id))
"Execution not found [execution_id=%s]" % execution_id)
return execution
def ensure_execution_exists(workbook_name, execution_id):
execution_get(workbook_name, execution_id)
def ensure_execution_exists(execution_id):
execution_get(execution_id)
@to_dict
def executions_get_all(**kwargs):
return _executions_get_all(**kwargs)
def executions_get(**kwargs):
return _executions_get(**kwargs)
def _executions_get_all(**kwargs):
def _executions_get(**kwargs):
query = model_query(m.WorkflowExecution)
return query.filter_by(**kwargs).all()
def _execution_get(workbook_name, execution_id):
def _execution_get(execution_id):
query = model_query(m.WorkflowExecution)
return query.filter_by(id=execution_id,
workbook_name=workbook_name).first()
return query.filter_by(id=execution_id).first()
# Workflow tasks.
@ -430,13 +424,10 @@ def _execution_get(workbook_name, execution_id):
@to_dict
@session_aware()
def task_create(workbook_name, execution_id, values, session=None):
def task_create(execution_id, values, session=None):
task = m.Task()
task.update(values)
task.update({
'workbook_name': workbook_name,
'execution_id': execution_id
})
task.update({'execution_id': execution_id})
try:
task.save(session=session)
@ -449,12 +440,11 @@ def task_create(workbook_name, execution_id, values, session=None):
@to_dict
@session_aware()
def task_update(workbook_name, execution_id, task_id, values, session=None):
task = _task_get(workbook_name, execution_id, task_id)
def task_update(task_id, values, session=None):
task = _task_get(task_id)
if not task:
raise exc.NotFoundException(
"Task not found [workbook_name=%s, execution_id=%s, task_id=%s]" %
(workbook_name, execution_id, task_id))
"Task not found [task_id=%s]" % task_id)
task.update(values.copy())
@ -462,39 +452,37 @@ def task_update(workbook_name, execution_id, task_id, values, session=None):
@session_aware()
def task_delete(workbook_name, execution_id, task_id, session=None):
task = _task_get(workbook_name, execution_id, task_id)
def task_delete(task_id, session=None):
task = _task_get(task_id)
if not task:
raise exc.NotFoundException(
"Task not found [workbook_name=%s, execution_id=%s, task_id=%s]" %
(workbook_name, execution_id, task_id))
"Task not found [task_id=%s]" % task_id)
session.delete(task)
@to_dict
def task_get(workbook_name, execution_id, task_id):
task = _task_get(workbook_name, execution_id, task_id)
def task_get(task_id):
task = _task_get(task_id)
if not task:
raise exc.NotFoundException(
"Task not found [workbook_name=%s, execution_id=%s, task_id=%s]" %
(workbook_name, execution_id, task_id))
"Task not found [task_id=%s]" % task_id)
return task
def _task_get(workbook_name, execution_id, task_id):
def _task_get(task_id):
query = model_query(m.Task)
return query.filter_by(id=task_id,
workbook_name=workbook_name,
execution_id=execution_id).first()
return query.filter_by(id=task_id).first()
@to_dict
def tasks_get_all(**kwargs):
return _tasks_get_all(**kwargs)
def tasks_get(**kwargs):
return _tasks_get(**kwargs)
def _tasks_get_all(**kwargs):
def _tasks_get(**kwargs):
query = model_query(m.Task)
return query.filter_by(**kwargs).all()
result = query.filter_by(**kwargs).all()
return result

View File

@ -130,10 +130,9 @@ class Engine(object):
:type kwargs: dict
:return: Workflow execution.
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
return db_api.execution_update(workbook_name, execution_id,
return db_api.execution_update(execution_id,
{"state": states.STOPPED})
def convey_task_result(self, cntx, **kwargs):
@ -165,7 +164,7 @@ class Engine(object):
try:
workbook = self._get_workbook(workbook_name)
# TODO(rakhmerov): validate state transition
task = db_api.task_get(workbook_name, execution_id, task_id)
task = db_api.task_get(task_id)
wf_trace_msg = "Task '%s' [%s -> %s" % \
(task['name'], task['state'], state)
@ -180,12 +179,13 @@ class Engine(object):
task, outbound_context = self._update_task(workbook, task, state,
task_output)
execution = db_api.execution_get(workbook_name, execution_id)
execution = db_api.execution_get(execution_id)
self._create_next_tasks(task, workbook)
# Determine what tasks need to be started.
tasks = db_api.tasks_get(workbook_name, execution_id)
tasks = db_api.tasks_get(workbook_name=workbook_name,
execution_id=execution_id)
new_exec_state = self._determine_execution_state(execution, tasks)
@ -196,7 +196,7 @@ class Engine(object):
WORKFLOW_TRACE.info(wf_trace_msg)
execution = \
db_api.execution_update(workbook_name, execution_id, {
db_api.execution_update(execution_id, {
"state": new_exec_state
})
@ -240,7 +240,7 @@ class Engine(object):
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
execution = db_api.execution_get(workbook_name, execution_id)
execution = db_api.execution_get(execution_id)
if not execution:
raise exc.EngineException("Workflow execution not found "
@ -258,11 +258,9 @@ class Engine(object):
:type kwargs: dict
:return: Current task state.
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
task_id = kwargs.get('task_id')
task = db_api.task_get(workbook_name, execution_id, task_id)
task = db_api.task_get(task_id)
if not task:
raise exc.EngineException("Task not found.")
@ -301,7 +299,7 @@ class Engine(object):
state, task_runtime_context = retry.get_task_runtime(task)
action_spec = workbook.get_action(task.get_full_action_name())
db_task = db_api.task_create(workbook_name, execution_id, {
db_task = db_api.task_create(execution_id, {
"name": task.name,
"requires": task.requires,
"task_spec": task.to_dict(),
@ -309,7 +307,8 @@ class Engine(object):
else action_spec.to_dict(),
"state": state,
"tags": task.get_property("tags", None),
"task_runtime_context": task_runtime_context
"task_runtime_context": task_runtime_context,
"workbook_name": workbook_name
})
tasks.append(db_task)
@ -338,8 +337,6 @@ class Engine(object):
:return: task, outbound_context. task is the updated task and
computed outbound context.
"""
workbook_name = task['workbook_name']
execution_id = task['execution_id']
task_spec = workbook.tasks.get(task["name"])
task_runtime_context = task["task_runtime_context"]
@ -352,8 +349,7 @@ class Engine(object):
update_values = {"state": state,
"output": task_output,
"task_runtime_context": task_runtime_context}
task = db_api.task_update(workbook_name, execution_id, task["id"],
update_values)
task = db_api.task_update(task["id"], update_values)
return task, outbound_context
@ -369,9 +365,8 @@ class Engine(object):
"""
db_api.start_tx()
try:
workbook_name = task['workbook_name']
execution_id = task['execution_id']
execution = db_api.execution_get(workbook_name, execution_id)
execution = db_api.execution_get(execution_id)
# Change state from DELAYED to IDLE to unblock processing.
@ -379,9 +374,7 @@ class Engine(object):
% (task['name'],
task['state'], states.IDLE))
db_task = db_api.task_update(workbook_name,
execution_id,
task['id'],
db_task = db_api.task_update(task['id'],
{"state": states.IDLE})
task_to_start = [db_task]
data_flow.prepare_tasks(task_to_start, outbound_context)

View File

@ -39,9 +39,7 @@ def prepare_tasks(tasks, context):
task['in_context'] = context
task['parameters'] = evaluate_task_parameters(task, context)
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
db_api.task_update(task['id'],
{'in_context': task['in_context'],
'parameters': task['parameters']})

View File

@ -73,12 +73,9 @@ class DefaultExecutor(executor.Executor):
try:
db_api.start_tx()
try:
db_api.execution_update(task['workbook_name'],
task['execution_id'],
db_api.execution_update(task['execution_id'],
{'state': states.ERROR})
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
db_api.task_update(task['id'],
{'state': states.ERROR})
db_api.commit_tx()
finally:
@ -101,11 +98,8 @@ class DefaultExecutor(executor.Executor):
LOG.info("Received a task: %s" % task)
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
db_exec = db_api.execution_get(task['workbook_name'],
task['execution_id'])
db_task = db_api.task_get(task['id'])
db_exec = db_api.execution_get(task['execution_id'])
if not db_exec or not db_task:
return
@ -122,9 +116,7 @@ class DefaultExecutor(executor.Executor):
db_task['state'],
states.RUNNING))
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
db_api.task_update(task['id'],
{'state': states.RUNNING})
self._do_task_action(db_task)

View File

@ -174,8 +174,7 @@ class ExecutionTest(test_base.DbTestCase):
EXECUTIONS[0])
self.assertIsInstance(created, dict)
fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'],
created['id'])
fetched = db_api.execution_get(created['id'])
self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched)
@ -184,14 +183,12 @@ class ExecutionTest(test_base.DbTestCase):
EXECUTIONS[0])
self.assertIsInstance(created, dict)
updated = db_api.execution_update(EXECUTIONS[0]['workbook_name'],
created['id'],
updated = db_api.execution_update(created['id'],
{'task': 'task10'})
self.assertIsInstance(updated, dict)
self.assertEqual('task10', updated['task'])
fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'],
created['id'])
fetched = db_api.execution_get(created['id'])
self.assertDictEqual(updated, fetched)
def test_execution_list(self):
@ -200,7 +197,7 @@ class ExecutionTest(test_base.DbTestCase):
created1 = db_api.execution_create(EXECUTIONS[1]['workbook_name'],
EXECUTIONS[1])
fetched = db_api.executions_get_all(
fetched = db_api.executions_get(
workbook_name=EXECUTIONS[0]['workbook_name'])
self.assertEqual(2, len(fetched))
@ -212,16 +209,13 @@ class ExecutionTest(test_base.DbTestCase):
EXECUTIONS[0])
self.assertIsInstance(created, dict)
fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'],
created['id'])
fetched = db_api.execution_get(created['id'])
self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched)
db_api.execution_delete(EXECUTIONS[0]['workbook_name'],
created['id'])
db_api.execution_delete(created['id'])
self.assertRaises(exc.NotFoundException,
db_api.execution_get,
EXECUTIONS[0]['workbook_name'],
created['id'])
@ -267,44 +261,34 @@ TASKS = [
class TaskTest(test_base.DbTestCase):
def test_task_create_and_get(self):
created = db_api.task_create(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created = db_api.task_create(TASKS[0]['execution_id'],
TASKS[0])
self.assertIsInstance(created, dict)
fetched = db_api.task_get(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created['id'])
fetched = db_api.task_get(created['id'])
self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched)
def test_task_update(self):
created = db_api.task_create(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created = db_api.task_create(TASKS[0]['execution_id'],
TASKS[0])
self.assertIsInstance(created, dict)
updated = db_api.task_update(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created['id'],
updated = db_api.task_update(created['id'],
{'description': 'my new desc'})
self.assertIsInstance(updated, dict)
self.assertEqual('my new desc', updated['description'])
fetched = db_api.task_get(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created['id'])
fetched = db_api.task_get(created['id'])
self.assertDictEqual(updated, fetched)
def test_task_list(self):
created0 = db_api.task_create(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created0 = db_api.task_create(TASKS[0]['execution_id'],
TASKS[0])
created1 = db_api.task_create(TASKS[1]['workbook_name'],
TASKS[1]['execution_id'],
created1 = db_api.task_create(TASKS[1]['execution_id'],
TASKS[1])
fetched = db_api.tasks_get_all(
fetched = db_api.tasks_get(
workbook_name=TASKS[0]['workbook_name'])
self.assertEqual(2, len(fetched))
@ -312,22 +296,16 @@ class TaskTest(test_base.DbTestCase):
self.assertDictEqual(created1, fetched[1])
def test_task_delete(self):
created = db_api.task_create(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created = db_api.task_create(TASKS[0]['execution_id'],
TASKS[0])
self.assertIsInstance(created, dict)
fetched = db_api.task_get(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created['id'])
fetched = db_api.task_get(created['id'])
self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched)
db_api.task_delete(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'],
created['id'])
db_api.task_delete(created['id'])
self.assertRaises(exc.NotFoundException, db_api.task_get,
TASKS[0]['workbook_name'], TASKS[0]['execution_id'],
created['id'])

View File

@ -59,13 +59,15 @@ class TestEngine(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME, "create-vms",
CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
task = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])[0]
self.engine.convey_task_result(WB_NAME, execution['id'], task['id'],
states.SUCCESS, None)
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
execution = db_api.execution_get(WB_NAME, execution['id'])
task = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])[0]
execution = db_api.execution_get(execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)
@ -85,13 +87,15 @@ class TestEngine(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks[0]['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self.assertIsNotNone(tasks)
self.assertEqual(2, len(tasks))
@ -108,8 +112,9 @@ class TestEngine(base.EngineTestCase):
tasks[1]['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(tasks[0]['state'], states.SUCCESS)
@ -132,8 +137,9 @@ class TestEngine(base.EngineTestCase):
"create-vm-nova",
CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
execution = db_api.execution_get(WB_NAME, execution['id'])
task = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])[0]
execution = db_api.execution_get(execution['id'])
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)
@ -153,7 +159,8 @@ class TestEngine(base.EngineTestCase):
"start-task",
CONTEXT)
# Only the first task is RUNNING
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self.assertEqual(len(tasks), 1)
task = self._assert_single_item(tasks,
name='start-task',
@ -164,7 +171,8 @@ class TestEngine(base.EngineTestCase):
task['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self.assertEqual(len(tasks), 3)
self._assert_single_item(tasks,
name='start-task',
@ -181,7 +189,8 @@ class TestEngine(base.EngineTestCase):
task1['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
tasks_2 = self._assert_multiple_items(tasks, 2,
name='task-two',
@ -195,8 +204,9 @@ class TestEngine(base.EngineTestCase):
tasks_2[1]['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
execution = db_api.execution_get(execution['id'])
self._assert_multiple_items(tasks, 4, state=states.SUCCESS)
self.assertEqual(execution['state'], states.SUCCESS)
@ -215,7 +225,8 @@ class TestEngine(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME,
"start-task",
CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self.assertEqual(execution['state'], states.RUNNING)
start_task = self._assert_single_item(tasks,
@ -226,7 +237,8 @@ class TestEngine(base.EngineTestCase):
self.engine.convey_task_result(WB_NAME, execution['id'],
start_task['id'],
states.ERROR, CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self.assertEqual(len(tasks), 4)
task3 = self._assert_single_item(tasks,
@ -250,8 +262,9 @@ class TestEngine(base.EngineTestCase):
task4['id'],
states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
execution = db_api.execution_get(execution['id'])
self._assert_multiple_items(tasks, 3, state=states.SUCCESS)
self._assert_single_item(tasks, state=states.ERROR)
@ -270,8 +283,9 @@ class TestEngine(base.EngineTestCase):
def test_engine_with_no_namespaces(self):
execution = self.engine.start_workflow_execution(WB_NAME, "task1", {})
tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertIsNotNone(tasks)
self.assertEqual(1, len(tasks))
@ -289,8 +303,9 @@ class TestEngine(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME,
"std_http_task", {})
tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertEqual(1, len(tasks))
self.assertEqual(states.SUCCESS, tasks[0]['state'])

View File

@ -113,8 +113,7 @@ class TestExecutor(base.DbTestCase):
# Create a new task.
SAMPLE_TASK['execution_id'] = execution['id']
task = db_api.task_create(SAMPLE_TASK['workbook_name'],
SAMPLE_TASK['execution_id'],
task = db_api.task_create(SAMPLE_TASK['execution_id'],
SAMPLE_TASK)
self.assertIsInstance(task, dict)
self.assertIn('id', task)
@ -124,7 +123,5 @@ class TestExecutor(base.DbTestCase):
ex_client.handle_task(SAMPLE_CONTEXT, task=task)
# Check task execution state.
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
db_task = db_api.task_get(task['id'])
self.assertEqual(db_task['state'], states.SUCCESS)

View File

@ -90,13 +90,13 @@ class DataFlowTest(base.EngineTestCase):
CTX)
# We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'],
execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertEqual(states.SUCCESS, execution['state'])
self.assertDictEqual(CTX, execution['context'])
tasks = db_api.tasks_get(wb['name'], execution['id'])
tasks = db_api.tasks_get(workbook_name=wb['name'],
execution_id=execution['id'])
self.assertEqual(2, len(tasks))
@ -158,13 +158,13 @@ class DataFlowTest(base.EngineTestCase):
CTX)
# We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'],
execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertEqual(states.SUCCESS, execution['state'])
self.assertDictEqual(CTX, execution['context'])
tasks = db_api.tasks_get(wb['name'], execution['id'])
tasks = db_api.tasks_get(workbook_name=wb['name'],
execution_id=execution['id'])
self.assertEqual(3, len(tasks))
@ -260,13 +260,13 @@ class DataFlowTest(base.EngineTestCase):
CTX)
# We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'],
execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertEqual(states.SUCCESS, execution['state'])
self.assertDictEqual(CTX, execution['context'])
tasks = db_api.tasks_get(wb['name'], execution['id'])
tasks = db_api.tasks_get(workbook_name=wb['name'],
execution_id=execution['id'])
self.assertEqual(2, len(tasks))
@ -333,13 +333,13 @@ class DataFlowTest(base.EngineTestCase):
CTX)
# We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['workbook_name'],
execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertEqual(states.SUCCESS, execution['state'])
self.assertDictEqual(CTX, execution['context'])
tasks = db_api.tasks_get(wb['name'], execution['id'])
tasks = db_api.tasks_get(workbook_name=wb['name'],
execution_id=execution['id'])
self.assertEqual(3, len(tasks))
@ -442,7 +442,8 @@ class DataFlowTest(base.EngineTestCase):
execution = self.engine.start_workflow_execution(workbook['name'],
task_name, {})
tasks = db_api.tasks_get(workbook['name'], execution['id'])
tasks = db_api.tasks_get(workbook_name=workbook['name'],
execution_id=execution['id'])
task = self._assert_single_item(tasks, name=task_name)
@ -455,7 +456,7 @@ class DataFlowTest(base.EngineTestCase):
self.engine.convey_task_result(workbook['name'], execution['id'],
task['id'], states.SUCCESS, {})
execution = db_api.execution_get(workbook['name'], execution['id'])
execution = db_api.execution_get(execution['id'])
self.assertEqual(states.SUCCESS, execution['state'])
finally:

View File

@ -70,12 +70,12 @@ class DataFlowModuleTest(base.DbTestCase):
self.assertEqual('val32', parameters['p2'])
def test_prepare_tasks(self):
task = db_api.task_create(WB_NAME, EXEC_ID, TASK.copy())
task = db_api.task_create(EXEC_ID, TASK.copy())
tasks = [task]
data_flow.prepare_tasks(tasks, CONTEXT)
db_task = db_api.task_get(WB_NAME, EXEC_ID, tasks[0]['id'])
db_task = db_api.task_get(tasks[0]['id'])
self.assertDictEqual(CONTEXT, db_task['in_context'])
self.assertDictEqual({'p1': 'My string',

View File

@ -77,7 +77,8 @@ class TaskRetryTest(base.EngineTestCase):
def test_no_retry(self):
execution = self.engine.start_workflow_execution(WB_NAME,
'retry_task', None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self.engine.convey_task_result(WB_NAME, execution['id'],
tasks[0]['id'], states.SUCCESS,
@ -96,7 +97,8 @@ class TaskRetryTest(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME,
'retry_task', None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
task_spec = workbook.tasks.get(tasks[0]['name'])
retry_count, _, __ = task_spec.get_retry_parameters()
@ -106,7 +108,8 @@ class TaskRetryTest(base.EngineTestCase):
{'output': 'result'})
# TODO(rakhmerov): It's not stable, need to avoid race condition.
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name='retry_task')
self._assert_single_item(tasks, task_runtime_context={
@ -122,7 +125,8 @@ class TaskRetryTest(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME,
'retry_task', None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
task_spec = workbook.tasks.get(tasks[0]['name'])
retry_count, _, __ = task_spec.get_retry_parameters()
@ -136,7 +140,8 @@ class TaskRetryTest(base.EngineTestCase):
{'output': 'result'})
# TODO(rakhmerov): It's not stable, need to avoid race condition.
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name='retry_task')
self._assert_single_item(tasks, task_runtime_context={
@ -153,7 +158,8 @@ class TaskRetryTest(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME,
task_name, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
task_spec = workbook.tasks.get(tasks[0]['name'])
retry_count, _, delay = task_spec.get_retry_parameters()
@ -162,7 +168,8 @@ class TaskRetryTest(base.EngineTestCase):
tasks[0]['id'], states.ERROR,
{'output': 'result'})
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
# TODO(rakhmerov): It's not stable, need to avoid race condition.
self._assert_single_item(tasks, name=task_name)
@ -176,7 +183,8 @@ class TaskRetryTest(base.EngineTestCase):
{'output': 'result'})
# TODO(rakhmerov): It's not stable, need to avoid race condition.
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name=task_name)
self._assert_single_item(tasks, task_runtime_context={
@ -195,7 +203,8 @@ class TaskRetryTest(base.EngineTestCase):
execution = self.engine.start_workflow_execution(WB_NAME,
task_name_1, None)
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name=task_name_1)
@ -203,7 +212,8 @@ class TaskRetryTest(base.EngineTestCase):
tasks[0]['id'], states.SUCCESS,
{'output': 'result'})
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name=task_name_2)
@ -215,7 +225,8 @@ class TaskRetryTest(base.EngineTestCase):
tasks[1]['id'], states.ERROR,
{'output': 'result'})
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
# TODO(rakhmerov): It's not stable, need to avoid race condition.
self._assert_single_item(tasks, name=task_name_1)
@ -229,7 +240,8 @@ class TaskRetryTest(base.EngineTestCase):
{'output': 'result'})
# TODO(rakhmerov): It's not stable, need to avoid race condition.
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name=task_name_2)
self._assert_single_item(tasks, task_runtime_context={
@ -252,7 +264,8 @@ class TaskRetryTest(base.EngineTestCase):
start_task, None)
# TODO(rakhmerov): It's not stable, need to avoid race condition.
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name=start_task)
self._assert_single_item(tasks, task_runtime_context={
@ -280,7 +293,8 @@ class TaskRetryTest(base.EngineTestCase):
None)
# TODO(rakhmerov): It's not stable, need to avoid race condition.
tasks = db_api.tasks_get(WB_NAME, execution['id'])
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name=start_task)
self._assert_single_item(tasks, task_runtime_context={

View File

@ -70,7 +70,8 @@ class TestTransport(base.EngineTestCase):
execution = self.engine.start_workflow_execution(
WB_NAME, 'create-vms', CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
task = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])[0]
# Check task execution state. There is no timeout mechanism in
# unittest. There is an example to add a custom timeout decorator that
@ -78,9 +79,7 @@ class TestTransport(base.EngineTestCase):
# process time. However, it seems more straightforward to keep the
# loop finite.
for i in range(0, 50):
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
db_task = db_api.task_get(task['id'])
# Ensure the request reached the executor and the action has ran.
if db_task['state'] != states.IDLE:
# We have to wait sometime due to time interval between set

View File

@ -41,4 +41,4 @@ commands = bash tools/lintstack.sh
show-source = true
ignore = H803,H305,H405,H904
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,tools
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,tools,scripts