Merge "Add a workflow execution report endpoint"

This commit is contained in:
Zuul 2019-02-11 11:44:49 +00:00 committed by Gerrit Code Review
commit 58d6634702
5 changed files with 736 additions and 2 deletions

View File

@ -23,6 +23,7 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan import wsmeext.pecan as wsme_pecan
from mistral.api import access_control as acl from mistral.api import access_control as acl
from mistral.api.controllers.v2 import execution_report
from mistral.api.controllers.v2 import resources from mistral.api.controllers.v2 import resources
from mistral.api.controllers.v2 import task from mistral.api.controllers.v2 import task
from mistral.api.controllers.v2 import types from mistral.api.controllers.v2 import types
@ -82,6 +83,7 @@ def _get_workflow_execution(id, must_exist=True):
class ExecutionsController(rest.RestController): class ExecutionsController(rest.RestController):
tasks = task.ExecutionTasksController() tasks = task.ExecutionTasksController()
report = execution_report.ExecutionReportController()
@rest_utils.wrap_wsme_controller_exception @rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Execution, wtypes.text) @wsme_pecan.wsexpose(resources.Execution, wtypes.text)

View File

@ -0,0 +1,164 @@
# Copyright 2019 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from pecan import rest
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers.v2 import resources
from mistral.api.controllers.v2 import types
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.utils import rest_utils
from mistral.workflow import states
LOG = logging.getLogger(__name__)
def create_workflow_execution_entry(wf_ex):
return resources.WorkflowExecutionReportEntry.from_db_model(wf_ex)
def create_task_execution_entry(task_ex):
return resources.TaskExecutionReportEntry.from_db_model(task_ex)
def create_action_execution_entry(action_ex):
return resources.ActionExecutionReportEntry.from_db_model(action_ex)
def update_statistics_with_task(stat, task_ex):
if task_ex.state == states.RUNNING:
stat.increment_running()
elif task_ex.state == states.SUCCESS:
stat.increment_success()
elif task_ex.state == states.ERROR:
stat.increment_error()
elif task_ex.state == states.IDLE:
stat.increment_idle()
elif task_ex.state == states.PAUSED:
stat.increment_paused()
def analyse_task_execution(task_ex_id, stat, filters, cur_depth):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
if filters['errors_only'] and task_ex.state != states.ERROR:
return None
update_statistics_with_task(stat, task_ex)
entry = create_task_execution_entry(task_ex)
child_executions = task_ex.executions
entry.action_executions = []
entry.workflow_executions = []
for c_ex in child_executions:
if isinstance(c_ex, db_models.ActionExecution):
entry.action_executions.append(
create_action_execution_entry(c_ex)
)
else:
entry.workflow_executions.append(
analyse_workflow_execution(c_ex.id, stat, filters, cur_depth)
)
return entry
def analyse_workflow_execution(wf_ex_id, stat, filters, cur_depth):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
entry = create_workflow_execution_entry(wf_ex)
max_depth = filters['max_depth']
# Don't get deeper into the workflow task executions if
# maximum depth is defined and the current depth exceeds it.
if 0 <= max_depth < cur_depth:
return entry
task_execs = wf_ex.task_executions
entry.task_executions = []
for t_ex in task_execs:
task_exec_entry = analyse_task_execution(
t_ex.id,
stat,
filters,
cur_depth + 1
)
if task_exec_entry:
entry.task_executions.append(task_exec_entry)
return entry
def build_report(wf_ex_id, filters):
report = resources.ExecutionReport()
stat = resources.ExecutionReportStatistics()
report.statistics = stat
report.root_workflow_execution = analyse_workflow_execution(
wf_ex_id,
stat,
filters,
0
)
return report
class ExecutionReportController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.ExecutionReport, types.uuid, bool, int)
def get(self, workflow_execution_id, errors_only=False, max_depth=-1):
"""Return workflow execution report.
:param workflow_execution_id: The ID of the workflow execution to
generate a report for.
:param errors_only: Optional. If True, only error paths of the
execution tree are included into the report. The root execution
(with the specified id) is always included, but its tasks may
or may not be included depending on this flag's value.
:param max_depth: Optional. Limits the depth of recursion while
obtaining the execution tree. That is, subworkflows of what
maximum depth will be included into the report. If a value of the
flag is a negative number then no limit is set.
The root execution has depth 0 so if the flag is 0 then only
the root execution, its tasks and their actions will be included.
If some of the tasks in turn run workflows then these subworkflows
will be also included but without their tasks. The algorithm will
fully analyse their tasks only if max_depth is greater than zero.
"""
LOG.info(
"Fetch execution report [workflow_execution_id=%s]",
workflow_execution_id
)
filters = {
'errors_only': errors_only,
'max_depth': max_depth
}
return build_report(workflow_execution_id, filters)

View File

@ -721,3 +721,158 @@ class EventTriggers(resource.ResourceList):
"marker=123e4567-e89b-12d3-a456-426655440000") "marker=123e4567-e89b-12d3-a456-426655440000")
return triggers_sample return triggers_sample
class BaseExecutionReportEntry(resource.Resource):
"""Execution report entry resource."""
id = wtypes.text
name = wtypes.text
created_at = wtypes.text
updated_at = wtypes.text
state = wtypes.text
state_info = wtypes.text
@classmethod
def sample(cls):
# TODO(rakhmerov): complete
return cls(
id='123e4567-e89b-12d3-a456-426655441414',
created_at='2019-01-30T00:00:00.000000',
updated_at='2019-01-30T00:00:00.000000',
state=states.SUCCESS
)
class ActionExecutionReportEntry(BaseExecutionReportEntry):
"""Action execution report entry resource."""
accepted = bool
last_heartbeat = wtypes.text
@classmethod
def sample(cls):
sample = super(ActionExecutionReportEntry, cls).sample()
sample.accepted = True
sample.last_heartbeat = '2019-01-30T00:00:00.000000'
return sample
class WorkflowExecutionReportEntry(BaseExecutionReportEntry):
"""Workflow execution report entry resource."""
# NOTE(rakhmerov): task_executions has to be declared below
# after we declare a class for task execution entry resource.
@classmethod
def sample(cls):
sample = super(WorkflowExecutionReportEntry, cls).sample()
# We can't define a non-empty list task executions here because
# the needed class is not defined yet. Since this is just a sample
# we can sacrifice it.
sample.task_executions = []
return sample
class TaskExecutionReportEntry(BaseExecutionReportEntry):
"""Task execution report entity resource."""
action_executions = [ActionExecutionReportEntry]
workflow_executions = [WorkflowExecutionReportEntry]
@classmethod
def sample(cls):
sample = super(TaskExecutionReportEntry, cls).sample()
sample.action_executions = [ActionExecutionReportEntry.sample()]
sample.workflow_executions = []
return sample
# We have to declare this field later because of the dynamic binding.
# It can't be within WorkflowExecutionReportEntry before
# TaskExecutionReportEntry is declared.
WorkflowExecutionReportEntry.task_executions = [TaskExecutionReportEntry]
wtypes.registry.reregister(WorkflowExecutionReportEntry)
class ExecutionReportStatistics(resource.Resource):
"""Execution report statistics.
TODO(rakhmerov): There's much more we can add here. For example,
information about action, average (and also min and max) task execution
run time etc.
"""
total_tasks_count = wtypes.IntegerType(minimum=0)
running_tasks_count = wtypes.IntegerType(minimum=0)
success_tasks_count = wtypes.IntegerType(minimum=0)
error_tasks_count = wtypes.IntegerType(minimum=0)
idle_tasks_count = wtypes.IntegerType(minimum=0)
paused_tasks_count = wtypes.IntegerType(minimum=0)
def __init__(self, **kw):
self.total_tasks_count = 0
self.running_tasks_count = 0
self.success_tasks_count = 0
self.error_tasks_count = 0
self.idle_tasks_count = 0
self.paused_tasks_count = 0
super(ExecutionReportStatistics, self).__init__(**kw)
def increment_running(self):
self.running_tasks_count += 1
self.total_tasks_count += 1
def increment_success(self):
self.success_tasks_count += 1
self.total_tasks_count += 1
def increment_error(self):
self.error_tasks_count += 1
self.total_tasks_count += 1
def increment_idle(self):
self.idle_tasks_count += 1
self.total_tasks_count += 1
def increment_paused(self):
self.paused_tasks_count += 1
self.total_tasks_count += 1
@classmethod
def sample(cls):
return cls(
total_tasks_count=10,
running_tasks_count=3,
success_tasks_count=5,
error_tasks_count=2,
idle_tasks_count=0,
paused_tasks_count=0
)
class ExecutionReport(resource.Resource):
"""Execution report resource."""
statistics = ExecutionReportStatistics
"""General statistics about the workflow execution hierarchy."""
root_workflow_execution = WorkflowExecutionReportEntry
"""Root entry of the report associated with a workflow execution."""
@classmethod
def sample(cls):
sample = cls()
sample.statistics = ExecutionReportStatistics.sample()
sample.root_workflow_execution = WorkflowExecutionReportEntry.sample()
return sample

View File

@ -36,8 +36,15 @@ from mistral.workflow import states
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
STATE_TYPES = wtypes.Enum(str, states.IDLE, states.RUNNING, states.SUCCESS,
states.ERROR, states.RUNNING_DELAYED) STATE_TYPES = wtypes.Enum(
str,
states.IDLE,
states.RUNNING,
states.SUCCESS,
states.ERROR,
states.RUNNING_DELAYED
)
def _get_task_resource_with_result(task_ex): def _get_task_resource_with_result(task_ex):

View File

@ -0,0 +1,406 @@
# Copyright 2019 - Nokia Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit.api import base
from mistral.tests.unit.engine import base as engine_base
from mistral.workflow import states
class TestExecutionReportController(base.APITest, engine_base.EngineTestCase):
def test_simple_sequence_wf(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get('/v2/executions/%s/report' % wf_ex.id)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual(wf_ex.id, root_wf_ex['id'])
self.assertEqual(wf_ex.name, root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(2, len(tasks))
# Verify task1 info.
task1 = self._assert_single_item(
tasks,
name='task1',
state=states.SUCCESS
)
self.assertEqual(0, len(task1['workflow_executions']))
self.assertEqual(1, len(task1['action_executions']))
task1_action = task1['action_executions'][0]
self.assertEqual(states.SUCCESS, task1_action['state'])
self.assertEqual('std.noop', task1_action['name'])
# Verify task2 info.
task2 = self._assert_single_item(
tasks,
name='task2',
state=states.ERROR
)
self.assertEqual(1, len(task2['action_executions']))
task2_action = task2['action_executions'][0]
self.assertEqual(0, len(task2['workflow_executions']))
self.assertEqual(states.ERROR, task2_action['state'])
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(1, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(1, stat['success_tasks_count'])
self.assertEqual(2, stat['total_tasks_count'])
def test_nested_wf(self):
wb_text = """---
version: '2.0'
name: wb
workflows:
parent_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
workflow: sub_wf
on-success: task3
task3:
action: std.fail
sub_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wb_service.create_workbook_v2(wb_text)
wf_ex = self.engine.start_workflow('wb.parent_wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get('/v2/executions/%s/report' % wf_ex.id)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual('wb.parent_wf', root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(2, len(tasks))
# Verify task1 info.
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.SUCCESS, task1['state'])
self.assertEqual(0, len(task1['workflow_executions']))
self.assertEqual(1, len(task1['action_executions']))
task1_action = task1['action_executions'][0]
self.assertEqual(states.SUCCESS, task1_action['state'])
self.assertEqual('std.noop', task1_action['name'])
# Verify task2 info.
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.ERROR, task2['state'])
self.assertEqual(0, len(task2['action_executions']))
self.assertEqual(1, len(task2['workflow_executions']))
sub_wf_entry = task2['workflow_executions'][0]
self.assertEqual(states.ERROR, sub_wf_entry['state'])
sub_wf_tasks = sub_wf_entry['task_executions']
self.assertEqual(2, len(sub_wf_tasks))
sub_wf_task1 = self._assert_single_item(
sub_wf_tasks,
name='task1',
state=states.SUCCESS
)
sub_wf_task2 = self._assert_single_item(
sub_wf_tasks,
name='task2',
state=states.ERROR
)
self.assertEqual(1, len(sub_wf_task1['action_executions']))
self.assertEqual(
states.SUCCESS,
sub_wf_task1['action_executions'][0]['state']
)
self.assertEqual(1, len(sub_wf_task2['action_executions']))
self.assertEqual(
states.ERROR,
sub_wf_task2['action_executions'][0]['state']
)
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(2, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(2, stat['success_tasks_count'])
self.assertEqual(4, stat['total_tasks_count'])
def test_nested_wf_errors_only(self):
wb_text = """---
version: '2.0'
name: wb
workflows:
parent_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
workflow: sub_wf
on-success: task3
task3:
action: std.fail
sub_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wb_service.create_workbook_v2(wb_text)
wf_ex = self.engine.start_workflow('wb.parent_wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get(
'/v2/executions/%s/report?errors_only=true' % wf_ex.id
)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual('wb.parent_wf', root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(1, len(tasks))
# There must be only task2 in the response.
# Verify task2 info.
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.ERROR, task2['state'])
self.assertEqual(0, len(task2['action_executions']))
self.assertEqual(1, len(task2['workflow_executions']))
sub_wf_entry = task2['workflow_executions'][0]
self.assertEqual(states.ERROR, sub_wf_entry['state'])
sub_wf_tasks = sub_wf_entry['task_executions']
self.assertEqual(1, len(sub_wf_tasks))
sub_wf_task2 = self._assert_single_item(
sub_wf_tasks,
name='task2',
state=states.ERROR
)
self.assertEqual(1, len(sub_wf_task2['action_executions']))
self.assertEqual(
states.ERROR,
sub_wf_task2['action_executions'][0]['state']
)
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(2, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(0, stat['success_tasks_count'])
self.assertEqual(2, stat['total_tasks_count'])
def test_nested_wf_max_depth(self):
wb_text = """---
version: '2.0'
name: wb
workflows:
parent_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
workflow: sub_wf
on-success: task3
task3:
action: std.fail
sub_wf:
tasks:
task1:
action: std.noop
on-success: task2
task2:
action: std.fail
"""
wb_service.create_workbook_v2(wb_text)
wf_ex = self.engine.start_workflow('wb.parent_wf')
self.await_workflow_error(wf_ex.id)
resp = self.app.get('/v2/executions/%s/report?max_depth=0' % wf_ex.id)
self.assertEqual(200, resp.status_int)
# Now let's verify the response structure
self.assertIn('root_workflow_execution', resp.json)
root_wf_ex = resp.json['root_workflow_execution']
self.assertIsInstance(root_wf_ex, dict)
self.assertEqual('wb.parent_wf', root_wf_ex['name'])
self.assertEqual(states.ERROR, root_wf_ex['state'])
self.assertGreater(len(root_wf_ex['state_info']), 0)
tasks = root_wf_ex['task_executions']
self.assertIsInstance(tasks, list)
self.assertEqual(2, len(tasks))
# Verify task1 info.
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.SUCCESS, task1['state'])
self.assertEqual(0, len(task1['workflow_executions']))
self.assertEqual(1, len(task1['action_executions']))
task1_action = task1['action_executions'][0]
self.assertEqual(states.SUCCESS, task1_action['state'])
self.assertEqual('std.noop', task1_action['name'])
# Verify task2 info.
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.ERROR, task2['state'])
self.assertEqual(0, len(task2['action_executions']))
self.assertEqual(1, len(task2['workflow_executions']))
sub_wf_entry = task2['workflow_executions'][0]
self.assertEqual(states.ERROR, sub_wf_entry['state'])
# We still must have an entry for the subworkflow itself
# but it must not have info about task executions because
# we've now limited max depth.
self.assertNotIn('task_executions', sub_wf_entry)
# Verify statistics.
stat = resp.json['statistics']
self.assertEqual(1, stat['error_tasks_count'])
self.assertEqual(0, stat['idle_tasks_count'])
self.assertEqual(0, stat['paused_tasks_count'])
self.assertEqual(0, stat['running_tasks_count'])
self.assertEqual(1, stat['success_tasks_count'])
self.assertEqual(2, stat['total_tasks_count'])