Reduce the number of "on-xxx" evaluations

* Mistral evaluates expressions under "on-xxx" clauses more than
  once during the processing of a workflow. It's now been fixed by
  storing crucial information about the result of those expressions
  after it was first obtained in the DB.
* Added two boolean fields "has_next_tasks" and "error_handled" in
  the TaskExecution class and the required migration. These fields
  allow not to calculate expressions under "on-xxx" clauses many
  times which leads to reducing execution time in case of heavy
  expressions and/or their data contexts.
* Minor style changes.

Closes-Bug: #1824121
Change-Id: Ib236ba7a72d8e578f9c52460d2a7d8d4540f9c37
This commit is contained in:
Renat Akhmerov 2019-04-15 13:52:26 +07:00
parent eb59216281
commit 83c541acbf
11 changed files with 156 additions and 90 deletions

View File

@ -0,0 +1,40 @@
# Copyright 2018 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Add has_next_tasks and error_handled to task execution.
Revision ID: 032
Revises: 031
Create Date: 2019-04-16 13:42:12.123412
"""
# revision identifiers, used by Alembic.
revision = '032'
down_revision = '031'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column(
'task_executions_v2',
sa.Column('has_next_tasks', sa.Boolean(), nullable=True)
)
op.add_column(
'task_executions_v2',
sa.Column('error_handled', sa.Boolean(), nullable=True)
)

View File

@ -316,6 +316,10 @@ def get_task_executions(limit=None, marker=None, sort_keys=None,
)
def get_task_executions_count(**kwargs):
return IMPL.get_task_executions_count(**kwargs)
def get_completed_task_executions(**kwargs):
return IMPL.get_completed_task_executions(**kwargs)

View File

@ -922,6 +922,15 @@ def get_task_executions(session=None, **kwargs):
return _get_collection(models.TaskExecution, **kwargs)
@b.session_aware()
def get_task_executions_count(session=None, **kwargs):
query = b.model_query(models.TaskExecution)
query = query.filter_by(**kwargs)
return query.count()
def _get_completed_task_executions_query(kwargs):
query = b.model_query(models.TaskExecution)

View File

@ -263,6 +263,19 @@ class TaskExecution(Execution):
# significantly.
processed = sa.Column(sa.BOOLEAN, default=False)
# Set to True if the completion of the task led to starting new
# tasks.
# The value of this property should be ignored if the task
# is not completed.
has_next_tasks = sa.Column(sa.Boolean, default=False)
# Set to True if the task finished with an error and the error
# is handled (e.g. with 'on-error' clause for direct workflows)
# so that the error shouldn't bubble up to the workflow level.
# The value of this property should be ignored if the task
# is not completed.
error_handled = sa.Column(sa.Boolean, default=False)
# Data Flow properties.
in_context = sa.Column(st.JsonLongDictType())
published = sa.Column(st.JsonLongDictType())

View File

@ -35,6 +35,7 @@ from mistral.notifiers import notification_events as events
from mistral import utils
from mistral.utils import wf_trace
from mistral.workflow import base as wf_base
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import states
@ -272,6 +273,14 @@ class Task(object):
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=self.task_ex)
# Check whether the task generated any next tasks.
if any([not commands.is_engine_command(c)] for c in cmds):
self.task_ex.has_next_tasks = True
# Check whether the error is handled.
if self.task_ex.state == states.ERROR:
self.task_ex.error_handled = any([c.handles_error for c in cmds])
# Mark task as processed after all decisions have been made
# upon its completion.
self.task_ex.processed = True

View File

@ -620,7 +620,7 @@ def _build_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
failed_tasks = sorted(
filter(
lambda t: not wf_ctrl.is_error_handled_for(t),
lambda t_ex: not wf_ctrl.is_error_handled_for(t_ex),
lookup_utils.find_error_task_executions(wf_ex.id)
),
key=lambda t: t.name

View File

@ -36,7 +36,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
class DataFlowEngineTest(engine_test_base.EngineTestCase):
def test_linear_dataflow(self):
linear_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -62,7 +62,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
result: "<% $.hi %>, <% $.to %>! Your <% env().from %>."
"""
wf_service.create_workflows(linear_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
@ -94,7 +94,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertNotIn('__execution', task1.in_context)
def test_linear_with_branches_dataflow(self):
linear_with_branches_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -132,7 +132,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
progress: <% task(notify).result %>
"""
wf_service.create_workflows(linear_with_branches_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
@ -189,7 +189,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
)
def test_parallel_tasks(self):
parallel_tasks_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -207,7 +207,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
var2: <% task(task2).result %>
"""
wf_service.create_workflows(parallel_tasks_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf',)
@ -238,7 +238,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertEqual(2, wf_output['var2'])
def test_parallel_tasks_complex(self):
parallel_tasks_complex_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -285,7 +285,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
var21: 21
"""
wf_service.create_workflows(parallel_tasks_complex_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
@ -331,7 +331,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertEqual(21, wf_output['var21'])
def test_sequential_tasks_publishing_same_var(self):
var_overwrite_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -364,7 +364,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
result: "<% $.greeting %>, <% $.to %>! <% env().from %>."
"""
wf_service.create_workflows(var_overwrite_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
@ -394,7 +394,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
)
def test_sequential_tasks_publishing_same_structured(self):
var_overwrite_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -418,7 +418,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
result: <% $.greeting %>
"""
wf_service.create_workflows(var_overwrite_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
@ -443,7 +443,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertDictEqual({'result': {}}, task3.published)
def test_linear_dataflow_implicit_publish(self):
linear_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -473,7 +473,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
<% task(task1).result %>, <% task(task21).result %>!
Your <% task(task22).result %>.
"""
wf_service.create_workflows(linear_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
@ -494,7 +494,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
)
def test_destroy_result(self):
linear_wf = """---
wf_text = """---
version: '2.0'
wf:
@ -508,7 +508,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
keep-result: false
"""
wf_service.create_workflows(linear_wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
@ -535,7 +535,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertIsNone(result)
def test_empty_with_items(self):
wf = """---
wf_text = """---
version: "2.0"
wf1_with_items:
@ -548,7 +548,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
publish:
result: <% task(task1).result %>
"""
wf_service.create_workflows(wf)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf1_with_items')
@ -569,7 +569,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertListEqual([], result)
def test_publish_on_error(self):
wf_def = """---
wf_text = """---
version: '2.0'
wf:
@ -586,7 +586,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
err: <% task(task1).result %>
"""
wf_service.create_workflows(wf_def)
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
@ -617,7 +617,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
)
def test_output_on_error_wb_yaql_failed(self):
wb_def = """---
wb_text = """---
version: '2.0'
name: wb
@ -646,7 +646,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
message: <% task(task1).result %>
"""
wb_service.create_workbook_v2(wb_def)
wb_service.create_workbook_v2(wb_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')

View File

@ -48,7 +48,8 @@ class InspectUtilsTest(base.BaseTest):
parameters_str = i_u.get_arg_list_as_str(clazz.__init__)
self.assertEqual(
'wf_ex, wf_spec, task_spec, ctx, triggered_by=null',
'wf_ex, wf_spec, task_spec, ctx, triggered_by=null,'
' handles_error=false',
parameters_str
)

View File

@ -22,17 +22,19 @@ class WorkflowCommand(object):
"""Workflow command.
A set of workflow commands form a communication protocol between workflow
controller and its clients. When workflow controller makes a decision about
how to continue a workflow it returns a set of commands so that a caller
knows what to do next.
controller and its clients. When a workflow controller makes a decision
about how to continue a workflow it returns a set of commands so that
a caller knows what to do next.
"""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None,
handles_error=False):
self.wf_ex = wf_ex
self.wf_spec = wf_spec
self.task_spec = task_spec
self.ctx = ctx or {}
self.triggered_by = triggered_by
self.handles_error = handles_error
def to_dict(self):
return {
@ -59,13 +61,15 @@ class Noop(WorkflowCommand):
class RunTask(WorkflowCommand):
"""Instruction to run a workflow task."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None,
handles_error=False):
super(RunTask, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)
self.wait = False
@ -102,13 +106,14 @@ class RunExistingTask(WorkflowCommand):
"""Command to run an existing workflow task."""
def __init__(self, wf_ex, wf_spec, task_ex, reset=True, triggered_by=None,
rerun=False):
handles_error=False, rerun=False):
super(RunExistingTask, self).__init__(
wf_ex,
wf_spec,
spec_parser.get_task_spec(task_ex.spec),
task_ex.in_context,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)
self.task_ex = task_ex
@ -131,13 +136,14 @@ class SetWorkflowState(WorkflowCommand):
"""Instruction to change a workflow state."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, new_state, msg=None,
triggered_by=None):
triggered_by=None, handles_error=False):
super(SetWorkflowState, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)
self.new_state = new_state
@ -156,7 +162,7 @@ class FailWorkflow(SetWorkflowState):
"""Instruction to fail a workflow."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None,
triggered_by=None):
triggered_by=None, handles_error=False):
super(FailWorkflow, self).__init__(
wf_ex,
wf_spec,
@ -164,7 +170,8 @@ class FailWorkflow(SetWorkflowState):
ctx,
states.ERROR,
msg=msg,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)
def __repr__(self):
@ -182,7 +189,7 @@ class SucceedWorkflow(SetWorkflowState):
"""Instruction to succeed a workflow."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None,
triggered_by=None):
triggered_by=None, handles_error=False):
super(SucceedWorkflow, self).__init__(
wf_ex,
wf_spec,
@ -190,7 +197,8 @@ class SucceedWorkflow(SetWorkflowState):
ctx,
states.SUCCESS,
msg=msg,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)
def __repr__(self):
@ -208,7 +216,7 @@ class PauseWorkflow(SetWorkflowState):
"""Instruction to pause a workflow."""
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None,
triggered_by=None):
triggered_by=None, handles_error=False):
super(PauseWorkflow, self).__init__(
wf_ex,
wf_spec,
@ -216,7 +224,8 @@ class PauseWorkflow(SetWorkflowState):
ctx,
states.PAUSED,
msg=msg,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)
def __repr__(self):
@ -238,6 +247,10 @@ ENGINE_CMD_CLS = {
}
def is_engine_command(cmd):
return cmd is not None and isinstance(cmd, (SetWorkflowState, Noop))
def get_command_class(cmd_name):
return ENGINE_CMD_CLS[cmd_name] if cmd_name in ENGINE_CMD_CLS else None
@ -250,7 +263,7 @@ def get_command_class(cmd_name):
# is not processed with this method at all. Might be a 'bad smell'.
# This all makes me think that we need to do some refactoring here.
def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
params=None, triggered_by=None):
params=None, triggered_by=None, handles_error=False):
cmd_cls = get_command_class(cmd_name) or RunTask
if issubclass(cmd_cls, SetWorkflowState):
@ -260,7 +273,8 @@ def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
task_spec,
ctx,
msg=params.get('msg'),
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)
else:
return cmd_cls(
@ -268,7 +282,8 @@ def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
wf_spec,
task_spec,
ctx,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=handles_error
)

View File

@ -139,7 +139,8 @@ class DirectWorkflowController(base.WorkflowController):
t_s,
ctx,
params=params,
triggered_by=triggered_by
triggered_by=triggered_by,
handles_error=(event_name == 'on-error')
)
self._configure_if_join(cmd)
@ -169,7 +170,6 @@ class DirectWorkflowController(base.WorkflowController):
ctx = {}
for batch in self._find_end_task_executions_as_batches():
for t_ex in batch:
ctx = utils.merge_dicts(
ctx,
@ -193,62 +193,35 @@ class DirectWorkflowController(base.WorkflowController):
return self._find_indirectly_affected_created_joins(task_name)
def is_error_handled_for(self, task_ex):
# TODO(rakhmerov): The method works in a different way than
# all_errors_handled(). It doesn't evaluate expressions under
# "on-error" clause.
return bool(self.wf_spec.get_on_error_clause(task_ex.name))
def all_errors_handled(self):
for t_ex in lookup_utils.find_error_task_executions(self.wf_ex.id):
ctx_view = data_flow.ContextView(
data_flow.get_current_task_dict(t_ex),
data_flow.evaluate_task_outbound_context(t_ex),
data_flow.get_workflow_environment_dict(self.wf_ex),
self.wf_ex.context,
self.wf_ex.input
)
cnt = lookup_utils.find_task_executions_count(
workflow_execution_id=self.wf_ex.id,
state=states.ERROR,
error_handled=False
)
tasks_on_error = self._find_next_tasks_for_clause(
self.wf_spec.get_on_error_clause(t_ex.name),
ctx_view
)
if not tasks_on_error:
return False
return True
return cnt == 0
def _find_end_task_executions_as_batches(self):
def is_end_task(t_ex):
try:
return not self._has_outbound_tasks(t_ex)
except exc.MistralException:
# If some error happened during the evaluation of outbound
# tasks we consider that the given task is an end task.
# Due to this output-on-error could reach the outbound context
# of given task also.
return True
batches = lookup_utils.find_completed_task_executions_as_batches(
self.wf_ex.id
workflow_execution_id=self.wf_ex.id,
has_next_tasks=False
)
for batch in batches:
yield list(filter(is_end_task, batch))
yield batch
def may_complete_workflow(self, task_ex):
res = super(DirectWorkflowController, self).may_complete_workflow(
task_ex
)
return res and not self._has_outbound_tasks(task_ex)
def _has_outbound_tasks(self, task_ex):
# In order to determine if there are outbound tasks we just need
# to calculate next task names (based on task outbound context)
# and remove all engine commands. To do the latter it's enough to
# check if there's a corresponding task specification for a task name.
return bool([
t_name for t_name in self._find_next_task_names(task_ex)
if self.wf_spec.get_tasks()[t_name]
])
return res and not task_ex.has_next_tasks
def _find_next_task_names(self, task_ex):
return [t[0] for t in self._find_next_tasks(task_ex)]
@ -297,7 +270,7 @@ class DirectWorkflowController(base.WorkflowController):
def _find_next_tasks_for_clause(clause, ctx):
"""Finds next tasks names.
This method finds next task(command) base on given {name: condition}
This method finds next tasks(commands) base on given {name: condition}
dictionary.
:param clause: Tuple (task_name, condition, parameters) taken from

View File

@ -162,10 +162,12 @@ def find_completed_task_executions(wf_ex_id):
return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id)
def find_completed_task_executions_as_batches(wf_ex_id):
return db_api.get_completed_task_executions_as_batches(
workflow_execution_id=wf_ex_id
)
def find_completed_task_executions_as_batches(**kwargs):
return db_api.get_completed_task_executions_as_batches(**kwargs)
def find_task_executions_count(**kwargs):
return db_api.get_task_executions_count(**kwargs)
def get_task_execution_cache_size():