Add '__task_execution' structure to task execution context on the fly
* Previously we stored the data structure describing the current task execution (id and name) in the inbound task execution context directly so that it'd be saved to DB. This was needed to evaluate YAQL/Jinja function task() without parameters properly. However, it's not needed, we can just build a context view on the fly just before evaluating an expression. Change-Id: If523039446ab3e2ccc9542617de2a170168f6e20 Closes-Bug: #1764704
This commit is contained in:
parent
4936d778e5
commit
6b7b58ed6c
@ -338,6 +338,7 @@ class RetryPolicy(base.TaskPolicy):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(task_ex),
|
||||
data_flow.evaluate_task_outbound_context(task_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
|
@ -311,8 +311,6 @@ class Task(object):
|
||||
task_name = self.task_spec.get_name()
|
||||
task_type = self.task_spec.get_type()
|
||||
|
||||
data_flow.add_current_task_to_context(self.ctx, task_id, task_name)
|
||||
|
||||
values = {
|
||||
'id': task_id,
|
||||
'name': task_name,
|
||||
@ -430,16 +428,13 @@ class RegularTask(Task):
|
||||
self._schedule_actions()
|
||||
|
||||
def _update_inbound_context(self):
|
||||
task_ex = self.task_ex
|
||||
assert task_ex
|
||||
assert self.task_ex
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||
|
||||
self.ctx = wf_ctrl.get_task_inbound_context(self.task_spec)
|
||||
data_flow.add_current_task_to_context(self.ctx, task_ex.id,
|
||||
task_ex.name)
|
||||
|
||||
utils.update_dict(task_ex.in_context, self.ctx)
|
||||
utils.update_dict(self.task_ex.in_context, self.ctx)
|
||||
|
||||
def _update_triggered_by(self):
|
||||
assert self.task_ex
|
||||
@ -515,17 +510,17 @@ class RegularTask(Task):
|
||||
)
|
||||
|
||||
def _evaluate_expression(self, expression, ctx=None):
|
||||
ctx = ctx or self.ctx
|
||||
ctx_view = data_flow.ContextView(
|
||||
ctx,
|
||||
data_flow.get_current_task_dict(self.task_ex),
|
||||
ctx or self.ctx,
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
)
|
||||
input_dict = expr.evaluate_recursively(
|
||||
|
||||
return expr.evaluate_recursively(
|
||||
expression,
|
||||
ctx_view
|
||||
)
|
||||
return input_dict
|
||||
|
||||
def _build_action(self):
|
||||
action_name = self.task_spec.get_action_name()
|
||||
|
@ -846,14 +846,6 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
task2_1_ex = self._assert_single_item(tasks_execs, name='task2_1')
|
||||
task2_2_ex = self._assert_single_item(tasks_execs, name='task2_2')
|
||||
|
||||
# TODO(rakhmerov): Find out why '__task_execution' is still
|
||||
# in the inbound context
|
||||
del task0_ex.in_context['__task_execution']
|
||||
del task1_1_ex.in_context['__task_execution']
|
||||
del task1_2_ex.in_context['__task_execution']
|
||||
del task2_1_ex.in_context['__task_execution']
|
||||
del task2_2_ex.in_context['__task_execution']
|
||||
|
||||
self.assertDictEqual({}, task0_ex.in_context)
|
||||
self.assertDictEqual({'var0': 'val0'}, task1_1_ex.in_context)
|
||||
self.assertDictEqual(
|
||||
|
@ -206,10 +206,12 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions, name='task1'
|
||||
wf_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
task2_ex = self._assert_single_item(
|
||||
wf_ex.task_executions, name='task2'
|
||||
wf_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
self.assertDictEqual(
|
||||
@ -229,6 +231,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
|
||||
task2_ex.published
|
||||
)
|
||||
|
||||
# The internal data needed for evaluation of the task() function
|
||||
# should not be persisted to DB.
|
||||
self.assertNotIn('__task_execution', task1_ex.in_context)
|
||||
self.assertNotIn('__task_execution', task2_ex.in_context)
|
||||
|
||||
def test_task_function_no_name_on_complete_case(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
@ -190,7 +190,12 @@ def publish_variables(task_ex, task_spec):
|
||||
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
expr_ctx = ContextView(task_ex.in_context, wf_ex.context, wf_ex.input)
|
||||
expr_ctx = ContextView(
|
||||
get_current_task_dict(task_ex),
|
||||
task_ex.in_context,
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
||||
if task_ex.name in expr_ctx:
|
||||
LOG.warning(
|
||||
@ -268,19 +273,14 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx):
|
||||
return output or ctx
|
||||
|
||||
|
||||
def add_current_task_to_context(ctx, task_id, task_name):
|
||||
ctx['__task_execution'] = {
|
||||
'id': task_id,
|
||||
'name': task_name
|
||||
def get_current_task_dict(task_ex):
|
||||
return {
|
||||
'__task_execution': {
|
||||
'id': task_ex.id,
|
||||
'name': task_ex.name
|
||||
}
|
||||
}
|
||||
|
||||
return ctx
|
||||
|
||||
|
||||
def remove_internal_data_from_context(ctx):
|
||||
if '__task_execution' in ctx:
|
||||
del ctx['__task_execution']
|
||||
|
||||
|
||||
def add_openstack_data_to_context(wf_ex):
|
||||
wf_ex.context = wf_ex.context or {}
|
||||
|
@ -125,8 +125,6 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
elif not t_s:
|
||||
t_s = self.wf_spec.get_tasks()[task_ex.name]
|
||||
|
||||
data_flow.remove_internal_data_from_context(ctx)
|
||||
|
||||
triggered_by = [
|
||||
{
|
||||
'task_id': task_ex.id,
|
||||
@ -176,8 +174,6 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
data_flow.evaluate_task_outbound_context(t_ex)
|
||||
)
|
||||
|
||||
data_flow.remove_internal_data_from_context(ctx)
|
||||
|
||||
return ctx
|
||||
|
||||
def get_logical_task_state(self, task_ex):
|
||||
@ -248,6 +244,7 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
t_name = task_ex.name
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(task_ex),
|
||||
ctx or data_flow.evaluate_task_outbound_context(task_ex),
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
|
Loading…
Reference in New Issue
Block a user